zip Stream arrays of objects in Node.js

What are streams?
Streams are one of the fundamental concepts that power Node.js applications. They are data-handling method and are used to read or write input into output sequentially.
Streams are a way to handle reading/writing files, network communications, or any kind of end-to-end information exchange in an efficient way.
What makes streams unique, is that instead of a program reading a file into memory all at once like in the traditional way, streams read chunks of data piece by piece, processing its content without keeping it all in memory.
This makes streams really powerful when working with large amounts of data, for example, a file size can be larger than your free memory space, making it impossible to read the whole file into the memory in order to process it. That’s where streams come to the rescue!
Using streams to process smaller chunks of data, makes it possible to read larger files.
Zip
The Zip operator applies a combination to the emitted items in sequence, by two or more streams, resulting in a stream of emitted items in the returned stream. It strictly applies this combination in sequence, that is, the first emitted item in this new returned stream by Zip will be the combination applied to the first item emitted by the stream #1, and the first item emitted by the stream #2, the second emitted item will be the result of the combination applied to the second item emitted by the stream #1 and the second item emitted by the stream #2, and will continue until the last set of emitted items (or more, in case we have more source Stream).
Go on and interact with the marbles in this link to help you understand this concept. Try to click and drag the marbles of the two source Observables and verify the returned Observable stream.
Code time!
creating a 2 readable streams and pushing values into it
const items1 = [{id: 1, name: 'name1'}, {id: 2, name: 'name2'}, {id: 3, name: 'name3'}, {id: 4, name: 'name4'}]
items1.forEach(item => stream1.push(item))
// no more data
stream1.push(null)
const items2 = [{id: 1, age: 23}, {id: 2, age: 24}, {id: 3, age: 24}, {id: 4, age: 25}].reverse()
const stream2 = new Readable({
objectMode: true
})
items2.forEach(item => stream2.push(item))
// no more data
stream2.push(null)
creating zipStream functions
module.exports = function (/*streams...*/) {
// The output stream that will be returned to the caller
const output = new PassThrough({objectMode: true, end: false});
let sources = argsOrArgArray(arguments);
output.setMaxListeners(0)
const {length} = sources;
let buffers = Array.prototype.slice.call(sources).map(() => []);
output.on('unpipe', remove)
for (let sourceIndex = 0; sourceIndex < length; sourceIndex++) {
sources[sourceIndex].on('data', (chunk) => {
buffers[sourceIndex].push(chunk);
if (buffers.every((buffer) => buffer.length)) {
push()
}
});
sources[sourceIndex].once('end', remove.bind(null, sources[sourceIndex]))
sources[sourceIndex].once('error', output.emit.bind(output, 'error'))
}
function isEmpty() {
return sources.length == 0;
}
function push() {
const result = buffers.map((buffer) => buffer.shift());
let entries = {};
entries = result.map((r) => Object.assign(entries, r))
output.push(JSON.stringify(entries[0]));
}
function remove(source) {
const items = buffers.find((buffer) => buffer.length)
if (items) {
items.forEach((r) =>{
output.push(JSON.stringify(r));
})
buffers = Array.prototype.slice.call(sources).map(() => []);
}
sources = Array.prototype.slice.call(sources).filter(function (it) {
return it !== source
})
if (!sources.length && output.readable) {
output.end()
}
}
return output
}
The zipStream method returns an stream that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other stream, with the results of this function becoming the items emitted by the returned stream. It applies this function in strict sequence, so the first item emitted by the new stream will be the result of the function applied to the first item emitted by stream#1 and the first item emitted by stream#2; the second item emitted by the new zipStream will be the result of the function applied to the second item emitted by stream #1 and the second item emitted by stream #2; and so forth. It will only emit as many items as the number of items emitted by the source stream that emits the fewest items.
zipStreams
const zipStream = zipStreams(stream1, stream2);
zipStream.pipe(process.stdout);
❯ node index.js
{"id":1,"name":"name1","age":23}{"id":2,"name":"name2","age":24}{"id":3,"name":"name3","age":24}{"id":4,"name":"name4","age":25}