In this post, I will show how the NodeJS streaming is a powerful feature to process a large set of data.
NodeJS offers few in built npm libraries for streaming. stream
is one of those libraries.
Introduction
Streams in NodeJS can be one of the best features as well as the most misunderstood features at the same time. And a lot of time, the confusion stems from number of options that are available within npm ecosystem. Streaming is a general data handling technique allows to process the data sequentially at a controlled speed without overwhelming the memory or CPU.
When processing a large set of data, especially from files, it could be challenging to read all the data in memory and process it. This can create high memory usage as well CPU usage. In turn, it can cause backend service to fail.
Streaming in NodeJS
Streaming is an old concept. It has been popularized when we started building a lot of data-intensive applications. The most popular being Netflix or Youtube. Idea of stream is to take small set of data (a character or a byte) and process it and continue the process till the we have completely read all the data.
There are mainly two types of streams – readable and writable. There are also duplex that does both reading and writing.
- Readable stream (Input Stream) is where you read the data from.
- Writeable stream (Output Stream) is where you write the data into.
Files, Database, Console can be considered for readable stream while they can be considered for writable stream as well. Readable stream can be combined with writable stream to make processing easier. This is also considered as piping. Piping has been there from the time of unix invention. If you have used pipe in unix where you can combine more than one commands, theoretically, it is the same concept when combining two streams.
Transform
NodeJS offers steams
features with a number of powerful concepts. And one of them is transform
. We just not only transfer the data between readable and writeable streams, but we can also do transformation on this data as it becomes available through readable stream.
Another powerful feature for transform
is that you can combine multiple transforms and do various operations on the data that is getting passed from readable (input) stream. To create a transform
, you will need stream
from nodejs.
const { Transform } = require('stream');
There is another package through2
that’s a wrapper over transform
. Considering the package has not been updated in more than 4 years now, I do not recommend it. You can just use the inbuilt transform
from nodejs stream
package.
Alternatively, you can also implement Custom Transform by extending Transform
class. Each transform comes with a function that has chunk, encoding and callback
. Chunk represents the data from stream, encoding if you are using some encoded data and callback to return after processing the chunk.
Piping
As previously said, piping comes from Unix. But within NodeJS stream, we can also use pipe to combine multiple streams. This allows data to flow from one stream to another as it gets processed.
In data intensive applications, we will come across scenarios where we will have to perform various operations on data at different stages. In such scenarios, piping allows to combine transform and pass data between transform.
Here is an example of how piping can be implemented
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
await pipelineAsync(
buildSellObjectTransform,
splitUserDataTransform,
enqueueDataTransform
);
Backpressure
So far, I have mentioned that how strong stream as a feature is from NodeJS. But it comes with its own set of concerns/issues. Processing large set of data can still pose challenges with stream.
At the end, it depends on how fast the stream is processing and how fast output is handling this data.
Look at example from above where I have a pipeline. And I will explain why I used pipelineAsync
instead of just pipeline.
When there is a constraint on resources, we want to make sure that we don’t overwhelm the downstream services. Input stream will keep sending data till it reaches the end of it, but transform OR other downstream services that are handling that data needs to match the same speed as input stream.
Streams are fire and forget . Once they start sending data, they don’t care on how other services are handling that data. This creates an issue.
This is what NodeJS documentation describes –
There is a general problem that occurs during data handling called backpressure and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog.
If you look at below picture, you can see a faucet releasing the water at force and if we don’t apply backpressure on the faucet, water can overflow. Backpressure is the same idea in nodejs
stream.
By applying some backpressure, stream only release certain size of data and wait for receiving service to process it before releasing more data. There are various ways to solve the problem for backpressure.
- Piping is one of the solutions to handle backpressure. If you use
pipe()
from nodejs, this can handle backpressure. Sometimes, you have to explicitly set the value for highWaterMark
on what data size to handle.
pipelineAsync/pipeline
handles backpressure automatically without having to set highWaterMark
.
Error Handling
We have talked about reading data, handling data. But what happens to streams or pipeline if there is a corrupt data or some data processing function failed either through system error or custom error.
In cases when there is an error in transform stream, you can use callback(error)
to catch the error in your calling function. And on error, you can either destroy or drain the stream so the objects get cleaned up and the stream doesn’t end up occupying memory waiting for garbage collection.
async _transform(chunk, encoding, callback) {
try {
const sellItem = this.buildSellObject(chunk, this.customFields);
if (this.push(sellItem)) {
callback();
} else {
this.once('drain', callback);
}
} catch (error) {
this.logger.error(error);
callback(error);
}
}
And the calling function
try {
await pipelineAsync(
buildSellObjectTransform,
splitUserDataTransform,
enqueueDataTransform
);
} catch (error) {
this.logger.error(`${logPrefix} Error in processing file data: ${error}`);
throw error;
} finally {
this.logger.info(`${logPrefix} ends file processing`);
fileStream.destroy();
}
Conclusion
In this post, I shared the details on how powerful the feature of streams
from nodejs is. With transform and piping, the stream can be used in various use cases of large data processing.
References