Monthly Archives: April 2022

Using Bull Queues in NestJS Application

In many scenarios, you will have to handle asynchronous CPU-intensive tasks. Especially, if an application is asking for data through REST API. REST endpoint should respond within a limited timeframe.

In this post, I will show how we can use queues to handle asynchronous tasks. We will be using Bull queues in a simple NestJS application.

Bull Queue in NestJs

Queues are a data structure that follows a linear order. In most systems, queues act like a series of tasks. A publisher publishes a message or task to the queue. A consumer picks up that message for further processing. This can happen asynchronously, providing much-needed respite to CPU-intensive tasks. Once the consumer consumes the message, the message is not available to any other consumer.

Bull queues are based on Redis. In my previous post, I covered how to add a health check for Redis or a database in a NestJS application.

  • Set up NestJS Application
  • Bull Queues in NestJS Application
  • Implementing a Processor to process queue data
  • Integrating Bull Dashboard
    • Add Bull Board Class
    • Add a Controller
  • Conclusion

Set up NestJS Application

As part of this demo, we will create a simple application. We will upload user data through csv file. A controller will accept this file and pass it to a queue. A processor will pick up the queued job and process the file to save data from CSV file into the database.

nest new bullqueuedemo

Once this command creates the folder for bullqueuedemo, we will set up Prisma ORM to connect to the database. (Note – make sure you install prisma dependencies.).

npx prisma init

If you are using a Windows machine, you might run into an error for running prisma init. All things considered, set up an environment variable to avoid this error.

set PRISMA_CLI_QUERY_ENGINE_TYPE=binary

set PRISMA_CLIENT_ENGINE_TYPE=binary

Once the schema is created, we will update it with our database tables. For this demo, we are creating a single table user.

// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

generator client {
  provider = "prisma-client-js"
  engineType = "binary"
}

datasource db {
  provider = "mysql"
  url      = env("DATABASE_URL")
}


model User {
  id    Int     @default(autoincrement()) @id
  email String  @unique
  first_name  String
  last_name   String?
}

Now if we run npm run prisma migrate dev, it will create a database table.

In summary, so far we have created a NestJS application and set up our database with Prisma ORM. Let’s look at the configuration we have to add for Bull Queue.

Bull Queues in NestJS Application

Install @nestjs/bull dependency. This dependency encapsulates the bull library. We will assume that you have redis installed and running. By default, Redis will run on port 6379.

We will add REDIS_HOST and REDIS_PORT as environment variables in our .env file. Install two dependencies for Bull as follows:

npm install @nestjs/bull

npm install @types/bull

Afterward, we will set up the connection with Redis by adding BullModule to our app module.

@Module({
  imports: [
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        redis: {
          host: configService.get('REDIS_HOST'),
          port: Number(configService.get('REDIS_PORT')),
        },
      }),
      inject: [ConfigService]
    }),
    BullModule.registerQueue({
      name: 'file-upload-queue'
    }), 
  ],
  controllers: [AppController, BullBoardController],
  providers: [UserService, PrismaService, FileUploadProcessor,],
})
export class AppModule {}

We are injecting ConfigService. This service allows us to fetch environment variables at runtime. With this, we will be able to use BullModule across our application.

As you can see in the above code, we have BullModule.registerQueue and that registers our queue file-upload-queue. Let’s now add this queue in our controller where will use it.

@Controller('/api/bullqueuedemo')
export class AppController {
  constructor(@InjectQueue('file-upload-queue') private fileQueue: Queue) {
    queuePool.add(fileQueue);
  }

  @Post('/uploadFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadCsvFile(@UploadedFile() file): Promise {
    const job = await this.fileQueue.add('csvfilejob', {file: file});
    console.log(`created job ${ job.id}`);
  }

  @Get('/')
  async getHello(): Promise {
    return "Hello World";
  }
}

Let’s go over this code slowly to understand what’s happening.

  • In the constructor, we are injecting the queue InjectQueue('file-upload-queue').
  • Our POST API is for uploading a csv file.
  • We are using a FileInterceptor. This is a feature that NestJS offers to intercept the request and extract files from the request. This interceptor takes two arguments fieldName and options.
  • storage option allows us to store the uploaded file in a folder called csv in the current directory of execution. The uploaded file will be renamed with a randomly generated name and extension .csv.
  • In the method uploadCsvFile, we receive the uploaded file. This comes from our FileInterceptor. We use our injected queue to add a job with a name csvfilejob and data containing the file.

Implementing a Processor to process Queue data

Thereafter, we have added a job to our queue file-upload-queue. Now to process this job further, we will implement a processor FileUploadProcessor.

We will annotate this consumer with @Processor('file-upload-queue').

@Processor('file-upload-queue')
export class FileUploadProcessor {

    constructor(private readonly userService: UserService){}

    @Process('csvfilejob')
    async processFile(job: Job) {
        const file = job.data.file;
        const filePath = file.path;
        const userData = await csv().fromFile(filePath);

        console.log(userData);

        for(const user of userData) {
            const input = {
                email: user.email,
                first_name: user.first_name,
                last_name: user.last_name,
            };
            const userCreated = await this.userService.createUser(input);
            console.log('User created -', userCreated.id );
        }

    }
    
}

Shortly, we can see we consume the job from the queue and fetch the file from job data. Note that we have to add @Process(jobName) to the method that will be consuming the job. processFile method consumes the job. We convert CSV data to JSON and then process each row to add a user to our database using UserService.

Once you create FileUploadProcessor, make sure to register that as a provider in your app module.

To show this, if I execute the API through Postman, I will see the following data in the console:

[Nest] 21264  - 04/22/2022, 4:57:19 PM     LOG [NestFactory] Starting Nest application...
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] DiscoveryModule dependencies initialized +43ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] ConfigHostModule dependencies initialized +0ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +4ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] ConfigModule dependencies initialized +0ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +12ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +10ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RoutesResolver] AppController {/api/bullqueuedemo}: +62ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RouterExplorer] Mapped {/api/bullqueuedemo/uploadFile, POST} route +3ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RouterExplorer] Mapped {/api/bullqueuedemo, GET} route +1ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [NestApplication] Nest application successfully started +582ms
created job 2
[
  {
    id: '1',
    email: 'john.doe@gmail.com',
    first_name: 'John',
    last_name: 'Doe'
  },
  {
    id: '2',
    email: 'jacob.drake@gmail.com',
    first_name: 'Jacob',
    last_name: 'Drake'
  },
  {
    id: '3',
    email: 'jos.butler@gmail.com',
    first_name: 'Jos',
    last_name: 'Butler'
  }
]
User created - 1
User created - 2
User created - 3

Bull queues offer a number of features:

  • Minimal CPU usage
  • Robust design based on redis
  • Concurrency
  • Retry
  • Rate limiter
  • Event monitoring

One question that constantly comes up is how do we monitor these queues if jobs fail or are paused. A simple solution would be using Redis CLI, but Redis CLI is not always available, especially in Production environments. Finally, comes a simple UI-based dashboard – Bull Dashboard.

Integrating Bull Dashboard

The great thing about Bull queues is that there is a UI available to monitor the queues. One can also add some options that can allow a user to retry jobs that are in a failed state. Let’s install two dependencies @bull-board/express and @bull-board/api .

npm install @bull-board/express – This installs an express server-specific adapter. If you are using fastify with your NestJS application, you will need @bull-board/fastify.

npm install @bull-board/api – This installs a core server API that allows creating of a Bull dashboard.

Add Bull Board Class

We will create a bull board queue class that will set a few properties for us. It will create a queuePool. This queuePool will get populated every time any new queue is injected. We will also need a method getBullBoardQueuesto pull all the queues when loading the UI.


@Injectable()
export class BullBoardQueue { }

export const queuePool: Set = new Set();

export const getBullBoardQueues = (): BaseAdapter[] => {
    const bullBoardQueues = [...queuePool].reduce((acc: BaseAdapter[], val) => {
        acc.push(new BullAdapter(val))
        return acc
    }, []);

    return bullBoardQueues
}

Add a controller

There are a couple of ways we could have accessed UI, but I prefer adding this through a controller, so my frontend can call the API. We create a BullBoardController to map our incoming request, response, and next like Express middleware. In our path for UI, we have a server adapter for Express. This allows us to set a base path. We fetch all the injected queues so far using getBullBoardQueuesmethod described above. We then use createBullBoardAPI to get addQueue method. serverAdapterhas provided us with a router that we use to route incoming requests. Before we route that request, we need to do a little hack of replacing entryPointPath with /.


@Controller('/queues/admin')
export class BullBoardController{
    
    @All('*')
    admin(
        @Request() req: express.Request,
        @Response() res: express.Response,
        @Next() next: express.NextFunction,
    ) {
        const serverAdapter = new ExpressAdapter();
        serverAdapter.setBasePath('/queues/admin');
        const queues = getBullBoardQueues();
        const router = serverAdapter.getRouter() as express.Express;
        const { addQueue } = createBullBoard({
            queues: [],
            serverAdapter,
        });
        queues.forEach((queue: BaseAdapter) => {
            addQueue(queue);
        });
        const entryPointPath = '/queues/admin/';
        req.url = req.url.replace(entryPointPath, '/');
        router(req, res, next);
    }
}

Now if we run our application and access the UI, we will see a nice UI for Bull Dashboard as below:
Bull Queues NestJS Application - Dashboard

Finally, the nice thing about this UI is that you can see all the segregated options.

Conclusion

Bull queues are a great feature to manage some resource-intensive tasks. In this post, we learned how we can add Bull queues in our NestJS application. We also easily integrated a Bull Board with our application to manage these queues. The code for this post is available here.

Do you want to read more posts about NestJS? Send me your feedback here.

Adding Health checks in NestJS Application

The health check endpoint provides the details of how our application is doing. In this post, we will show how to add health checks to your NestJS application. If you want to learn about enabling CORS in your NestJS application, you can read more about it here.

Why add Health Checks?

Once you build and deploy your application, you need to know if your application is running smoothly in an easier way without making some application business logic calls. Health checks offer a way to check if the database is running smoothly, your storage disk is fine, and your application service is executing as intended.

The most important reason you need health checks is so you can continue to monitor your application. A metrics collection service (like Micrometer) can continue to validate the application. It can verify that there are no software or hardware failures. Any moment, there is any software or hardware failure, it can trigger a notification for manual or automatic intervention to get the application back on track. This improves the reliability of the application.

Health checks in NestJS Application

In NestJS framework, Terminus library offers a way to integrate readiness/liveness health checks. A service or component of infrastructure will continuously hit a GET endpoint. Service will take action based on the response.

Let’s get started. We will add the terminus library to our NestJS application.

npm install @nestjs/terminus.

Terminus integration offers graceful shutdown as well Kubernetes readiness/liveness check for http applications. Liveness check tells if the container is up and running. A readiness check tells if the container is ready to accept incoming requests.

We will also set up a number of checks for database, memory, disk, and redis in this post to show how health checks work.

How to set up a Health check in NestJS?

Once we have added nestjs/terminus package, we can create a health check endpoint and include some predefined indicators. These indicators include HTTP check, Database connectivity check, Memory and Disk check.

Depending on what ORM, you are using, nestjs offers some inbuilt packages like TypeORM or Sequlize health check indicators.

The health check will provide us with a combination of indicators. This set of indicators provides us with information to indicate how our application is doing.

DiskHealthIndicator

Let’s start with how the server’s hard disk is doing.

DiskHealthIndicator contains the check for disk storage of the current machine.

Once we add DiskHealthIndicator in our health controller, we will check for storage as follows:

this.disk.checkStorage('diskStorage', { thresholdPercent: 0.5, path: 'C:\\'});

HttpHealthIndicator

HttpHealthIndicator will provide the details of our HTTP application and if it is up and running. Explicitly, we will add @nestjs/axios package to our project.

npm install @nestjs/axios.

Additionally. we will use pingCheck method to verify if we are able to connect to the application.

this.http.pingCheck('Basic check', 'http://localhost:3000');

MemoryHealthIndicator

Overall, MemoryHealthIndicator provides the details of the memory of the machine on which the application is running.

this.memory.checkHeap('memory_heap', 300*1024*1024);

this.memory.checkRSS('memory_rss',300*1024*1024);

Database Health Check

Assuming your application is using a database, you will need a database health check. Subsequently,  nestjs/terminus provides database health check through ORM packages like TypeORM, Sequelize, or Mongoose. As part of this demo, we will create a custom database health check since we use Prisma ORM.

NestJS Application

In any case, let’s create nestjs application with nestjs/cli.

nest new healthcheckdemo.

As previously stated, we will use Prisma ORM.

npm install prisma --save-dev.

This will install Prisma cli. Now if we run npx prisma init , it will create a barebone of schema.prisma file where we will create our database model schema.

In this application, I am using a simple schema where a user can sign up to create posts. I am also using the MySQL database. This schema will look like the below:

// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

generator client {
  provider = "prisma-client-js"
  engineType = "binary"
}

datasource db {
  provider = "mysql"
  url      = env("DATABASE_URL")
}

model User {
  id    Int     @default(autoincrement()) @id
  email String  @unique
  name  String?
  posts Post[]
}

model Post {
  id        Int      @default(autoincrement()) @id
  title     String
  content   String?
  published Boolean? @default(false)
  author    User?    @relation(fields: [authorId], references: [id])
  authorId  Int?
}

By default, Prisma will create .env file if it was not there before. It will also add a default variable for DATABASE_URL.

If we run npm run prisma migrate dev, it will create those database tables in our DB.

Further, let’s create an app module in our sample application for healthcheckdemo.

import { Module } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { UserService } from './user.service';
import { HealthModule } from './health/health.module';
import { HttpModule } from '@nestjs/axios';
import { PrismaService } from './prisma.service';

@Module({
  imports: [HealthModule, HttpModule],
  controllers: [AppController],
  providers: [AppService, UserService, PrismaClient, PrismaService,],
})
export class AppModule {}

We will also create HealthModule that will serve the purpose for the HealthController.

import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { PrismaService } from 'src/prisma.service';

import { HealthController } from './health.controller';
import { PrismaOrmHealthIndicator } from './prismaorm.health';

@Module({
  imports: [
    TerminusModule,   
  ],
  controllers: [HealthController],
  providers: [ PrismaOrmHealthIndicator, PrismaService]
})
export class HealthModule {}

In this HealthModule, you will notice there is PrismaOrmHealthIndicator. Before we dive into PrismaOrmHealthIndicator, we need to generate Prisma Client .

npm install @prisma/client will generate the Prisma client for your database model. This will expose CRUD operations for your database model, making it easier for developers to focus on business logic rather than how to access data from a database.

We will abstract away Prisma Client APIs to create database queries in a separate service PrismaService. This service will also instantiate Prisma Client.


import { INestApplication, Injectable, OnModuleInit } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';

@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit {
  async onModuleInit() {
    await this.$connect();
  }

  async enableShutdownHooks(app: INestApplication) {
    this.$on('beforeExit', async () => {
      await app.close();
    });
  }
}

There is a documented issue with enableShutdownHooks. We will use enableShutdownHooks call when closing the application.

Health Controller

For a health check, we will need a health controller. We talked about the health module in the previous section, There are two important pieces left before we show how the health check will look.

Let’s create a health controller.

nest g controller health

This will generate a controller for us.

import { Controller, Get, Inject } from '@nestjs/common';
import { DiskHealthIndicator, HealthCheck, HealthCheckService, HttpHealthIndicator, MemoryHealthIndicator, MicroserviceHealthIndicator } from '@nestjs/terminus';
import { PrismaOrmHealthIndicator } from './prismaorm.health';


@Controller('health')
export class HealthController {
    constructor(
        private health: HealthCheckService,
        private http: HttpHealthIndicator,
        @Inject(PrismaOrmHealthIndicator)
        private db: PrismaOrmHealthIndicator,
        private disk: DiskHealthIndicator,
        private memory: MemoryHealthIndicator,        
    ) {}

  @Get()
  @HealthCheck()
  check() {
    return this.health.check([
      () => this.http.pingCheck('basic check', 'http://localhost:3000'),
      () => this.disk.checkStorage('diskStorage', { thresholdPercent: 0.5, path: 'C:\\'}),
      () => this.db.pingCheck('healthcheckdemo'),
      () => this.memory.checkHeap('memory_heap', 300*1024*1024),
      () => this.memory.checkRSS('memory_rss', 300*1024*1024),
      
      // Mongoose for MongoDB check
      // Redis check
    ]);
  }
}

In the health controller, we have a GET endpoint /health to provide details of how our application, memory of the machine, storage disk, and databases are doing. NestJs do not offer any ORM Health indicator for Prisma. So I am writing a custom indicator to find out database health.

By and large, this custom Prisma health indicator will be:


import { Injectable, InternalServerErrorException } from "@nestjs/common";
import { HealthIndicator, HealthIndicatorResult } from "@nestjs/terminus";
import { PrismaService } from "src/prisma.service";


@Injectable()
export class PrismaOrmHealthIndicator extends HealthIndicator {
    constructor(private readonly prismaService: PrismaService) {
        super();
    }

    async pingCheck(databaseName: string): Promise {
        try {
            await this.prismaService.$queryRaw`SELECT 1`;
            return this.getStatus(databaseName, true);
        } catch (e) {
            throw new InternalServerErrorException('Prisma check failed', e);
        }
    }
}

We are extending the abstract class HealthIndicator and implementing a method called pingCheck in this PrismaOrmHealthIndicator class. This method uses PrismaService to query the database that has been passed. We use SELECT 1 query. If the query is successful, we get the database status as true.

Also, note that this class PrismaOrmHealthIndicator is injectable and we are injecting that in our HealthController.

Now if we start the application and execute the endpoint, we will get the response as below:


{
  "status": "ok",
  "info": {
    "basic check": {
      "status": "up"
    },
    "diskStorage": {
      "status": "up"
    },
    "healthcheckdemo": {
      "status": "up"
    },
    "memory_heap": {
      "status": "up"
    },
    "memory_rss": {
      "status": "up"
    }
  },
  "error": {},
  "details": {
    "basic check": {
      "status": "up"
    },
    "diskStorage": {
      "status": "up"
    },
    "healthcheckdemo": {
      "status": "up"
    },
    "memory_heap": {
      "status": "up"
    },
    "memory_rss": {
      "status": "up"
    }
  }
}

As you can see, everything seems to be running fine. healthcheckdemo is the database name that I am using in MySQL.

Similarly, we can also add redis and mongoose as part of health checks in our NestJS application.

Conclusion

In this post, we create a simple NestJS application to demonstrate how to add health checks. The code for this post is available here.

If you have any feedback for this post OR my book Simplifying Spring Security, I would love to hear your feedback.