NodeJS Streams Explained: A Detailed Walkthrough

In this post, I will show how the NodeJS streaming is a powerful feature to process a large set of data.

NodeJS offers few in built npm libraries for streaming. stream is one of those libraries.

Introduction

Streams in NodeJS can be one of the best features as well as the most misunderstood features at the same time. And a lot of time, the confusion stems from number of options that are available within npm ecosystem. Streaming is a general data handling technique allows to process the data sequentially at a controlled speed without overwhelming the memory or CPU.

When processing a large set of data, especially from files, it could be challenging to read all the data in memory and process it. This can create high memory usage as well CPU usage. In turn, it can cause backend service to fail.

Streaming in NodeJS

Streaming is an old concept. It has been popularized when we started building a lot of data-intensive applications. The most popular being Netflix or Youtube. Idea of stream is to take small set of data (a character or a byte) and process it and continue the process till the we have completely read all the data.

There are mainly two types of streams – readable and writable. There are also duplex that does both reading and writing.

  • Readable stream (Input Stream) is where you read the data from.
  • Writeable stream (Output Stream) is where you write the data into.

Files, Database, Console can be considered for readable stream while they can be considered for writable stream as well. Readable stream can be combined with writable stream to make processing easier. This is also considered as piping. Piping has been there from the time of unix invention. If you have used pipe in unix where you can combine more than one commands, theoretically, it is the same concept when combining two streams.

Transform

NodeJS offers steams features with a number of powerful concepts. And one of them is transform. We just not only transfer the data between readable and writeable streams, but we can also do transformation on this data as it becomes available through readable stream.

Another powerful feature for transform is that you can combine multiple transforms and do various operations on the data that is getting passed from readable (input) stream. To create a transform, you will need stream from nodejs.
const { Transform } = require('stream');

There is another package through2 that’s a wrapper over transform. Considering the package has not been updated in more than 4 years now, I do not recommend it. You can just use the inbuilt transform from nodejs stream package.

Alternatively, you can also implement Custom Transform by extending Transform class. Each transform comes with a function that has chunk, encoding and callback. Chunk represents the data from stream, encoding if you are using some encoded data and callback to return after processing the chunk.

Piping

As previously said, piping comes from Unix. But within NodeJS stream, we can also use pipe to combine multiple streams. This allows data to flow from one stream to another as it gets processed.
In data intensive applications, we will come across scenarios where we will have to perform various operations on data at different stages. In such scenarios, piping allows to combine transform and pass data between transform.

Here is an example of how piping can be implemented

const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

await pipelineAsync(
        buildSellObjectTransform,
        splitUserDataTransform,
        enqueueDataTransform
      );

Backpressure

So far, I have mentioned that how strong stream as a feature is from NodeJS. But it comes with its own set of concerns/issues. Processing large set of data can still pose challenges with stream.

At the end, it depends on how fast the stream is processing and how fast output is handling this data.

Look at example from above where I have a pipeline. And I will explain why I used pipelineAsync instead of just pipeline.

When there is a constraint on resources, we want to make sure that we don’t overwhelm the downstream services. Input stream will keep sending data till it reaches the end of it, but transform OR other downstream services that are handling that data needs to match the same speed as input stream.

Streams are fire and forget . Once they start sending data, they don’t care on how other services are handling that data. This creates an issue.

This is what NodeJS documentation describes –
There is a general problem that occurs during data handling called backpressure and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog.

If you look at below picture, you can see a faucet releasing the water at force and if we don’t apply backpressure on the faucet, water can overflow. Backpressure is the same idea in nodejs stream.

Backpressure in NodeJS Streams
By applying some backpressure, stream only release certain size of data and wait for receiving service to process it before releasing more data. There are various ways to solve the problem for backpressure.

  • Piping is one of the solutions to handle backpressure. If you use pipe() from nodejs, this can handle backpressure. Sometimes, you have to explicitly set the value for highWaterMark on what data size to handle.
  • pipelineAsync/pipeline handles backpressure automatically without having to set highWaterMark.

Error Handling

We have talked about reading data, handling data. But what happens to streams or pipeline if there is a corrupt data or some data processing function failed either through system error or custom error.
In cases when there is an error in transform stream, you can use callback(error) to catch the error in your calling function. And on error, you can either destroy or drain the stream so the objects get cleaned up and the stream doesn’t end up occupying memory waiting for garbage collection.

async _transform(chunk, encoding, callback) {
    try {
      const sellItem = this.buildSellObject(chunk, this.customFields);
      if (this.push(sellItem)) {
        callback();
      } else {
        this.once('drain', callback);
      }
    } catch (error) {
      this.logger.error(error);
      callback(error);
    }
  }

And the calling function

    try {
      await pipelineAsync(
        buildSellObjectTransform,
        splitUserDataTransform,
        enqueueDataTransform
      );
    } catch (error) {
      this.logger.error(`${logPrefix} Error in processing file data: ${error}`);
      throw error;
    } finally {
      this.logger.info(`${logPrefix} ends file processing`);
      fileStream.destroy();
    }

Conclusion

In this post, I shared the details on how powerful the feature of streams from nodejs is. With transform and piping, the stream can be used in various use cases of large data processing.

References

Scaling Bull Jobs In NestJS Application

In this post, I want to show how to scale processing of bull jobs in a NestJS Application. As we know, we can use Bull Queue mechanism for asynchronous tasks, we can also easily process a lot of tasks parallelly using Bull Queue. One technique is to use horizontal scaling your workers.

This post will show the demonstration of horizontal scaling of bull queue workers.

Introduction

This is a simple idea and I want to show case the scaling power of bull queue. One thing I really like about bull jobs is that they are performant and easy to scale. The jobs also give an option to do asynchronous processing. Keep in mind that if you have a CPU intensive task, you can still use Bull queue. I have covered how to use Bull Queue for asynchronous processing.

As part of this post, we will upload a large file. The controller stores the file on disk, but adds a job to bull queue. The worker for this job will then read the file data and add each record to another bull queue.

We will have multiple workers to process file data. Each worker will process a separate job that is there in the bull queue.

Adding a single job

Let’s start with a single job first. I will not be covering any fundamentals about Bull Queues and workers. I previously wrote about worker pattern.

Nevertheless, we have a NestJS application with an API to upload a file. This API will create the first job in the queue file-upload-queue as follows:


  @Post('/uploadFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadLargeCsvFile(@UploadedFile() file): Promise {
    const job = await this.fileQueue.add('process-file', {file: file});
    console.log(`created job ${ job.id}`);
    await this.fileQueue.close();
  }

Above code basically adds a job to file-upload-queue to process that file.

Adding multiple jobs

One of the features that bull library offers to add multiple jobs to a queue. In our example, we read the data from file and we add each record as a job to another bull queue. This allows us to add multiple jobs to queue file-data-queue.


import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { Job, Queue } from "bull";

const csv = require('csvtojson');

@Processor('file-upload-queue')
export class FileUploadProcessor{

    constructor(@InjectQueue('file-data-queue') private fileDataQueue: Queue) {}
    
    @Process('process-file')
    async processFile(job: Job) {
        const file = job.data.file;
        const filePath = file.path;
        const userData = await csv().fromFile(filePath);

        await this.fileDataQueue.addBulk(userData.map(user => ({
            name: 'process-data',
            data: user
        })));

        console.log('file uploaded successfully');
    }
    
}

We use addBulk functionality to add all the records from the file to queue file-data-queue.

Worker

Creating a worker through NestJS framework is simple. NestJS has a feature to run standalone application. We will use the same to run our workers while creating a separate module to process jobs from file-data-queue.

Our separate module FileDataModule will have a processor to process each record from the file.


import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";


@Processor('file-data-queue')
export class FileDataProcessor{

    
    @Process('process-data')
    async processFile(job: Job) {
        const data = job.data;

        console.log('processing data for a single user');
        console.log(data);

        // To-Do add some processing like inserting this data in DB
    }
    
}

We will use createApplicationContext to create a worker for FileDataModule like below:


import { NestFactory } from "@nestjs/core";
import { FileDataModule } from "src/file-read/file-data.module";

async function main() {
    const app = await NestFactory.createApplicationContext(FileDataModule, 
        {
            bufferLogs: true,
            abortOnError: false,
        }
    );
    app.enableShutdownHooks();
    await app.init();
    console.log(`Worker started`);

    process.on('SIGINT', async () => {
        console.log(`SIGINT signal received`);
        try {
            console.log('closing app...');
            await app.close();
            console.log(`Worker stopped`);
        } catch (error) {
            console.error(`Error during shutdown: ${error.message}`);

        } finally {
            console.log('exiting...');
            process.exit(0);
        }
    });
}

main();

This worker basically starts the application and waits for SIGINT signal to be terminated. Considering this worker is create the application context for FileDataModule, it uses the processor FileDataProcessor to process data from the queue.

Scaling Workers

We will run two instances of the worker we created above. We will also be running our NestJS Application and if we have imported FileDataModule in our main application module, we will have three instances of FileDataProcessor running to process the jobs from the bull queue file-data-queue.

There are two concepts to understand in Bull Queue since bull can offer either.

Parallelism

In Parallelism, two or more tasks run in parallel, independent of each other. The simplest way to understand this is when you have multiple machines running and each performing its own task.

Concurrency

Two or more tasks runs at the same time while diving the available CPU so that all the tasks can advance in their processing.

Concurrency might not increase the throughout, but parallelism will. Parallelism also scales linearly that means if you add more workers, more jobs will get processed.

Bull offers configuration for concurrency. But in this demo, we are focusing on parallelism.

Demo

We have already described the scenario. We have a NestJS application with an API to upload a file is running in one terminal. We have two workers running in two other terminals.

We upload the file through Postman with our API. This API will create the first job. Processor for this job then adds multiple jobs to another queue file-data-queue.

The NestJS application and the two workers then process jobs from this queue in parallel. The three screenshots below show the application, the worker 1 and the worker 2.

The demo of Worker running in NestJS Application

Bull Worker 1 processing the job

Bull Worker 2 Processing the bull jobs

NestJS Application Processing the bull jobs

Conclusion

In this post, I showed how to scale bull job workers horizontally. This allows us to process jobs with high throughput. The code for this post is available here.

Custom Decorators in a NestJS Application

NestJS is a great framework to build NodeJS based server application. Some of the features like pipes, guards, interceptors make writing a NestJS application even cleaner. Thanks to Dependency Injection, the code can be descriptive and modularized further.

In this post, I will cover the concept of Custom Decorator and how one can use it to write some common code that you might have across your code base.

What are Custom Decorators?

Decorator is not a new concept as it has been there in other languages, especially Java based framework – Spring Boot. Annotation based decorators do certain common task. In short, you don’t have to write a lot of duplicate code to achieve something.

In Javascript and Typescript languages, decorators are relatively new.

Simply, decorators are functions. These functions can also return functions. Once you have defined a decorator, you use decorator in your code by using @ with decorator name. I will show this further in the example.

In NestJS, there are a lot of decorators that we use that are provided by the framework.

Param Decorators like @Req(), @Res(), @Body(), @Param(), @Query() OR even decorators to bind incoming requests to controller like @Get(), @Post(), @Put(). All these decorators take the incoming request and parse the request for parameters or handlers.

Post request coming to a controller get handled HTTP route handler. Let’s look at this simple example below:

    @Controller()
   export class AppController {
     
       constructor(private readonly appService: AppService) {}
       
       @Get()
       getHello(): String {
         return "Hello World"
       }
  }

As you can see, we have a Controller for defining the API and Get for get API with default path. These are decorators performing certain tasks in the framework behind.

When should you use Custom Decorators

When writing the business logic, there can be various scenarios OR if there is a duplicate code, you can think of using custom decorators. Nevertheless, the following are the key situations when you should use custom decorators

  1. Avoid complexity with repetitive code – In many cases, you might have to do certain check like idempotency check, user id existence check. You can either write a custom guard or a custom decorator to implement repetitive functionality with a custom decorator.
  2. Loose coupling – To make your application code more loosely coupled, irrespective of what server you use underneath (express or fastify).

Example of Custom Decorator

Let’s dive into a simple example for custom decorator.

Our UI is sending a request with a payload that contains mostly snake case parameters. This request comes to one of our API on the backend controller for processing.

Custom Decorator that we will be building is ParameterTransformer. This custom decorator takes the request body and transforms the request body parameter to camel case and forwards to controller.

import { createParamDecorator, ExecutionContext } from "@nestjs/common";

function snakeToCamel(str: string): string {
    return str.replace(/([-_][a-z])/g, (group) =>
      group.toUpperCase().replace('-', '').replace('_', '')
    );
}

export const ParameterTransformer = createParamDecorator(
    (data: unknown, ctx: ExecutionContext) => {
        const request = ctx.switchToHttp().getRequest();

        const body = request.body;
        const transformedBody = Object.keys(body).reduce((acc, key) => {
            const camelKey = snakeToCamel(key);
            acc[camelKey] = body[key];
            return acc;
          }, {});
      
        return transformedBody;
    }
);

Now we can use this custom decorator in our controller for incoming request body.

  @Get()
  getHello(@ParameterTransformer() helloWorldDto: HelloWorldDto): string {
    console.log(helloWorldDto);
    return this.appService.getHello();
  }

Even though the transformed object type has been defined as HelloWorldDto, the returned object from decorator won’t necessarily follow that type.

This particular example is useful when you want to update backend API without changing on frontend.

Conclusion

In this post, I showed how to create a custom decorator in a NestJS Application. Custom decorators can improve readability and reduce complexity. A fair warning to not over use the custom decorators as that can easily increase complexity.

Background Job Scheduling with Pg-Boss and NestJS

In a distributed system, we need to automate various workflows and there are ways to automate these workflows. One of the ways that I have used frequently is, background jobs. And if you noticed, even my last post was about using background jobs with trigger.dev. In this post, I will share a new way for background job scheduling with Pg-Boss and NestJS.  pg-boss is a job queue based on postgres database. How cool is that? And if it is not clear, I love Postgres database.

What is Pg-Boss?

pg-boss is a job queue library for node.js application and uses Postgres for persistence. It uses reliability of Postgres for storing, scheduling jobs. Over last 10 years, Postgres as database had made changes and pg-boss as a feature for background job queuing is one of them. In short, pg-boss reduces one more infrastructure item while building a distributed system application.

Pg-Boss vs Bull Queues

You might have noticed, I have written extensively about Bull Queues. Bull queues are another way for job queue system based on redis. The question that might come immediately, then why use pg-boss? Bull queues have been great and easy to use as well. There has been a huge community support for bull queues. Then what is so special about pg-boss?

I get it.

There is no right or wrong answer when choosing pg-boss or bull queues. It all depends on the system you are building.

Then why choose pg-boss?

If you are already using Postgres as a database for your application, then it is a no-brainer to use pg-boss instead of bull queues which are based on Redis.  Redis is an additional infrastructure overhead if you are not using cache for any other purposes.

Since pg-boss is based on Postgres and Postgres is really good at transactional support, locking and handles the queue-based mechanism really well.

Nevertheless, choose the queueing mechanism that fits your needs right. There is no trade-off if you choose Bull queues over pg-boss.

Fundamentals of Pg-Boss

Pg-Boss is a queue system and it is very similar to various other queue systems like Bull, AWS SQS.

Jobs in Pg-Boss queue are a state machine. All jobs start with created state and then move to active when picked up for processing.

When the worker picks up the job to process and completes it successfully, it moves to completed state.

If the job fails, it moves to failed state and can be moved to retry state if retries options are configured. If this job is retried, it can move back into active state.

active job takes too long to process, then it moves into expired state.

Any job that is either in created or active state, you can cancel them with cancel and then the job will move into cancelled state.

All jobs that are completed, failed, cancelled or expired, can move to archive state.

Pg-Boss and NestJS

In this post, we will show a simple example of how to use pg-boss queue with NestJS application and how we can schedule a background job with Pg-Boss.

We will use @apricote/nest-pg-boss module in our application to integrate pg-boss.

I will not go over setting up a sample NestJS application. In this sample application, I am using Prisma ORM and Postgres database.

Let’s start with installing this module dependency in your NestJS Application.

npm install @apricote/nest-pg-boss

Once the dependency is installed, we will set up the PGBossModule module in our top main module.

Add the following module in your app.module.ts file


    PGBossModule.forRootAsync({
      application_name: 'default',
      useFactory: (config: ConfigService) => ({
        host: config.get('DB_HOST'),
        user: config.get('DB_USERNAME'),
        password: config.get("DB_PASSWORD"),
        database: config.get("DB_DATABASE"),
        schema: "public",
        max: config.get("DB_POOL_MAX"),
      }),
      inject: [ConfigService],
    }),

As you can see there are few environment variables are involved, make sure you have those configured for your database.

Once you start the application, you will notice few things about the Pg-Boss. It will initialize that module and if you have created any jobs, it will display the corresponding workers.

Interestingly, when the Pg-Boss module is initialized, it will also create these tables in your Postgres database.
archive
job
schedule
subscription
version
Pg-Boss module uses these tables to persist the jobs and their states. It also allows us to schedule these jobs at particular frequency. In short, a nice replacement for cron jobs.

In the next section, I will show how to create a job, schedule it and run it with an handler.

Scheduling Job with Pg-Boss

Let’s create our first job for Pg-Boss queue. @apricote/nest-pg-boss library offers an handy method createJob .

I created a simple interface for job data that I will pass for my job. This job will send a welcome email to user after they sign up for my application.

import { createJob } from "@apricote/nest-pg-boss";

export interface UserJobData {
    email: string;
    firstName: string;
}

export const userCreatedJob = createJob('user-signup-job');

And here we created a simple job user-signup-job.

And now include this job in your module so it will be created on the application start up. Basically, it will create a worker to process the job when the application will trigger this job.

@Module({
  imports: [DBAccessModule, CompanyApiModule, UsersModule, PGBossModule.forJobs([userCreatedJob])],
  controllers: [UsersController],
  providers: [UsersService, UserJobService],
  exports: [UsersService]
})
export class UsersApiModule {}

In my controller for sign-up, application saves the user information in database. Once that is complete, it will send this job in the queue.


    async createUser(user: CreateUserDto, company: Company) {
        const hashedPassword = await bcrypt.hash(user.password, 12);

        const userToBeCreated = User.createNewUser({            
            firstName: user.firstName,
            lastName: user.lastName,
            email: user.email,
            companyId: company.id,
            password: hashedPassword,
        });
        const savedUser = await this.userRepository.save(userToBeCreated);

        await this.userCreatedJobService.send({email: savedUser.email, firstName: savedUser.firstName}, {});

        return savedUser;
    }

In the next section, we will see how to run this job with an handler.

Running Job with Handlers

In my previous createUser method, we sent a job to worker for processing. I have a service UserCreatedJobService and that is a worker to handle this job.


import { Injectable } from "@nestjs/common";
import { UserJobData, userCreatedJob } from "common/jobs";
import { Job } from "pg-boss";

@Injectable()
export class UserJobService {
    
    @userCreatedJob.Handle()
    async handleJob(job: Job) {
        console.log(`sending an email to user with email ${job.data.email} and name ${job.data.firstName}`);
    }
}

The decorate @userCreatedJob.Handle() allows us to process this job added in the queue.

More about Job-Options

We showed a simple job in this post. But, in a real production application, you might need to create complex jobs depending on your requirements. Good thing is that Pg-Boss offers variety of options to configure these jobs.

  • priority – You can configure priority for the job with a number. For higher priority jobs, use the bigger number.
  • retry options – There are few options like retryLimit, retryDelay and retryBackoff available to retry the jobs.
  • expiration options – If you have a long running jobs, you probably need to set expireInSeconds option. There is also expireInMinutes OR expireInHours option.
  • retention options – This option allows to retain the job in a particular state before it can be archived. retentionSeconds, retentionMinutes.
  • deferred job – The option startAfter allows to specify when to start the job.
  • unique jobs – singletonKey OR useSingletonQueue allow a unique key for the job and that ensures only one job with that key is processed.

Conclusion

In this post, I shared the details of Pg-Boss queue library and how to use it for background job scheduling in a NestJS application.

Long Running Background Jobs With Trigger Dev and NestJS

In this post, I show how to create long-running background jobs using Trigger.dev and NestJS framework.

I have been playing around on how to write cron jobs that listen and do things on certain events. You can write a simple cron job that runs on a schedule to check something.

What if you want to trigger some scheduled work on some event or some webhook trigger? Can we have a long-running background job for that? Serverless frameworks like Vercel offer edge functions, but they are limited by timeout, so they don’t serve the purpose of long-running. Let’s dive deep into building a background job with trigger dev and NestJS.

Set up NestJS Application

Here are some of my previous posts where I have covered about setting up a NestJS application.

To demonstrate this particular post, I have a simple NestJS application running.

Configure Trigger.dev

Let’s set up trigger.dev for a long-running background job. We will need trigger.dev sdk in our NestJS project.

npm install @trigger.dev/sdk @trigger.dev/nestjs @nestjs/config

If you have not created an account on trigger.dev, you can create one now. They offer a hobby plan that does not charge anything. You can also self-host trigger.dev.

Once you have signed up, you will need to get API KEY and API URL.

Set the environment variable in your NestJS project

TRIGGER_API_KEY=api_key_value_from_trigger_dev

TRIGGER_API_URL=https://cloud.trigger.dev

Create a Trigger Module in NestJS

In your main module, add a new module to configure trigger with API and URL.


TriggerDevModule.registerAsync({
    inject: [ConfigService],
    useFactory: (config: ConfigService) => ({
      id: 'betterjavacode',
      apiKey: config.getOrThrow('TRIGGER_API_KEY'),
      apiUrl: config.getOrThrow('TRIGGER_API_URL'),
      verbose: false,
      ioLogLocalEnabled: true,
    }),
  }),

Define a job

Once we have the trigger module set up, we will define a background job. To be able to set up a job, we will create a new controller job.controller.ts.

 


import { Controller, Get } from "@nestjs/common";
import { InjectTriggerDevClient } from "@trigger.dev/nestjs";
import { TriggerClient, eventTrigger } from "@trigger.dev/sdk";


@Controller()
export class JobController {
    constructor(
        @InjectTriggerDevClient() private readonly client: TriggerClient
    ) {
        this.client.defineJob({
            id: 'test-job-one',
            name: 'Sample Job One',
            version: '0.0.1',
            trigger: eventTrigger({
                name: 'test.event',
            }),
            run: async (payload, io, ctx) => {
                await io.logger.info('Hello world!', { payload });

                return {
                    message: `Hello world - ${payload.name}!`,
                };
            }
        });
    }

    @Get('/get-simple-job-trigger')
    getSimpleJobTrigger(): string {
        return `running trigger.dev with client-id ${this.client.id}`;
    }
}

Anyhow, this might be a lot to understand. So, I will go over this controller in steps.

With trigger.dev, one can define a job. The job contains a trigger and a task.

Trigger is either an event, a scheduler, or a manual that triggers the job.

A task is what the job executes on triggered. This is the basic block of run defined within the job.

As shown in the code, I have defined a job with id test-job-one that gets triggered with an event trigger test.event and when triggered, it runs a job that returns Hello world - ${payload.name}.

Now, to be able to create this job and run this job on a server, we have to create this job in trigger.dev app. One way to define is to configure this in package.json

 


"trigger.dev": {
    "endpointId": "betterjavacode"
  }

Once you run the app, you will see the job created in trigger.dev portal

TriggerDev Job and NestJS Application

Running the app

While running the app, make sure to run both nestjs app and trigger.dev.

npm run start – to start the nestjs application

npx @trigger.dev/cli@latest dev --port 3001 – to start trigger dev app

How to trigger the job

trigger.dev provides a test run option. In trigger.dev portal, go to your job and see the section for test and you will see an option to pass payload to your job. And then press Run button.

Test Run with Trigger.Dev

I like the flexibility of trigger.dev on defining the job. Another feature I love about trigger.dev is that they have various integrations available like OpenAI, Linear, Slack etc. If you need to develop some automated workflow, trigger.dev is a great choice to build that workflow.

Conclusion

In this post, I showed how to use trigger.dev to create a long-running background job with NestJS application. Hopefully, I will cover a real example of a workflow with trigger.dev in upcoming posts.