Using Bull Queues in NestJS Application

In many scenarios, you will have to handle asynchronous CPU-intensive tasks. Especially, if an application is asking for data through REST API. REST endpoint should respond within a limited timeframe.

In this post, I will show how we can use queues to handle asynchronous tasks. We will be using Bull queues in a simple NestJS application.

Bull Queue in NestJs

Queues are a data structure that follows a linear order. In most systems, queues act like a series of tasks. A publisher publishes a message or task to the queue. A consumer picks up that message for further processing. This can happen asynchronously, providing much-needed respite to CPU-intensive tasks. Once the consumer consumes the message, the message is not available to any other consumer.

Bull queues are based on Redis. In my previous post, I covered how to add a health check for Redis or a database in a NestJS application.

  • Set up NestJS Application
  • Bull Queues in NestJS Application
  • Implementing a Processor to process queue data
  • Integrating Bull Dashboard
    • Add Bull Board Class
    • Add a Controller
  • Conclusion

Set up NestJS Application

As part of this demo, we will create a simple application. We will upload user data through csv file. A controller will accept this file and pass it to a queue. A processor will pick up the queued job and process the file to save data from CSV file into the database.

nest new bullqueuedemo

Once this command creates the folder for bullqueuedemo, we will set up Prisma ORM to connect to the database. (Note – make sure you install prisma dependencies.).

npx prisma init

If you are using a Windows machine, you might run into an error for running prisma init. All things considered, set up an environment variable to avoid this error.

set PRISMA_CLI_QUERY_ENGINE_TYPE=binary

set PRISMA_CLIENT_ENGINE_TYPE=binary

Once the schema is created, we will update it with our database tables. For this demo, we are creating a single table user.

// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema

generator client {
  provider = "prisma-client-js"
  engineType = "binary"
}

datasource db {
  provider = "mysql"
  url      = env("DATABASE_URL")
}


model User {
  id    Int     @default(autoincrement()) @id
  email String  @unique
  first_name  String
  last_name   String?
}

Now if we run npm run prisma migrate dev, it will create a database table.

In summary, so far we have created a NestJS application and set up our database with Prisma ORM. Let’s look at the configuration we have to add for Bull Queue.

Bull Queues in NestJS Application

Install @nestjs/bull dependency. This dependency encapsulates the bull library. We will assume that you have redis installed and running. By default, Redis will run on port 6379.

We will add REDIS_HOST and REDIS_PORT as environment variables in our .env file. Install two dependencies for Bull as follows:

npm install @nestjs/bull

npm install @types/bull

Afterward, we will set up the connection with Redis by adding BullModule to our app module.

@Module({
  imports: [
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        redis: {
          host: configService.get('REDIS_HOST'),
          port: Number(configService.get('REDIS_PORT')),
        },
      }),
      inject: [ConfigService]
    }),
    BullModule.registerQueue({
      name: 'file-upload-queue'
    }), 
  ],
  controllers: [AppController, BullBoardController],
  providers: [UserService, PrismaService, FileUploadProcessor,],
})
export class AppModule {}

We are injecting ConfigService. This service allows us to fetch environment variables at runtime. With this, we will be able to use BullModule across our application.

As you can see in the above code, we have BullModule.registerQueue and that registers our queue file-upload-queue. Let’s now add this queue in our controller where will use it.

@Controller('/api/bullqueuedemo')
export class AppController {
  constructor(@InjectQueue('file-upload-queue') private fileQueue: Queue) {
    queuePool.add(fileQueue);
  }

  @Post('/uploadFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadCsvFile(@UploadedFile() file): Promise {
    const job = await this.fileQueue.add('csvfilejob', {file: file});
    console.log(`created job ${ job.id}`);
  }

  @Get('/')
  async getHello(): Promise {
    return "Hello World";
  }
}

Let’s go over this code slowly to understand what’s happening.

  • In the constructor, we are injecting the queue InjectQueue('file-upload-queue').
  • Our POST API is for uploading a csv file.
  • We are using a FileInterceptor. This is a feature that NestJS offers to intercept the request and extract files from the request. This interceptor takes two arguments fieldName and options.
  • storage option allows us to store the uploaded file in a folder called csv in the current directory of execution. The uploaded file will be renamed with a randomly generated name and extension .csv.
  • In the method uploadCsvFile, we receive the uploaded file. This comes from our FileInterceptor. We use our injected queue to add a job with a name csvfilejob and data containing the file.

Implementing a Processor to process Queue data

Thereafter, we have added a job to our queue file-upload-queue. Now to process this job further, we will implement a processor FileUploadProcessor.

We will annotate this consumer with @Processor('file-upload-queue').

@Processor('file-upload-queue')
export class FileUploadProcessor {

    constructor(private readonly userService: UserService){}

    @Process('csvfilejob')
    async processFile(job: Job) {
        const file = job.data.file;
        const filePath = file.path;
        const userData = await csv().fromFile(filePath);

        console.log(userData);

        for(const user of userData) {
            const input = {
                email: user.email,
                first_name: user.first_name,
                last_name: user.last_name,
            };
            const userCreated = await this.userService.createUser(input);
            console.log('User created -', userCreated.id );
        }

    }
    
}

Shortly, we can see we consume the job from the queue and fetch the file from job data. Note that we have to add @Process(jobName) to the method that will be consuming the job. processFile method consumes the job. We convert CSV data to JSON and then process each row to add a user to our database using UserService.

Once you create FileUploadProcessor, make sure to register that as a provider in your app module.

To show this, if I execute the API through Postman, I will see the following data in the console:

[Nest] 21264  - 04/22/2022, 4:57:19 PM     LOG [NestFactory] Starting Nest application...
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] DiscoveryModule dependencies initialized +43ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] ConfigHostModule dependencies initialized +0ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +4ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] ConfigModule dependencies initialized +0ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +12ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] BullModule dependencies initialized +10ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RoutesResolver] AppController {/api/bullqueuedemo}: +62ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RouterExplorer] Mapped {/api/bullqueuedemo/uploadFile, POST} route +3ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [RouterExplorer] Mapped {/api/bullqueuedemo, GET} route +1ms
[Nest] 21264  - 04/22/2022, 4:57:20 PM     LOG [NestApplication] Nest application successfully started +582ms
created job 2
[
  {
    id: '1',
    email: 'john.doe@gmail.com',
    first_name: 'John',
    last_name: 'Doe'
  },
  {
    id: '2',
    email: 'jacob.drake@gmail.com',
    first_name: 'Jacob',
    last_name: 'Drake'
  },
  {
    id: '3',
    email: 'jos.butler@gmail.com',
    first_name: 'Jos',
    last_name: 'Butler'
  }
]
User created - 1
User created - 2
User created - 3

Bull queues offer a number of features:

  • Minimal CPU usage
  • Robust design based on redis
  • Concurrency
  • Retry
  • Rate limiter
  • Event monitoring

One question that constantly comes up is how do we monitor these queues if jobs fail or are paused. A simple solution would be using Redis CLI, but Redis CLI is not always available, especially in Production environments. Finally, comes a simple UI-based dashboard – Bull Dashboard.

Integrating Bull Dashboard

The great thing about Bull queues is that there is a UI available to monitor the queues. One can also add some options that can allow a user to retry jobs that are in a failed state. Let’s install two dependencies @bull-board/express and @bull-board/api .

npm install @bull-board/express – This installs an express server-specific adapter. If you are using fastify with your NestJS application, you will need @bull-board/fastify.

npm install @bull-board/api – This installs a core server API that allows creating of a Bull dashboard.

Add Bull Board Class

We will create a bull board queue class that will set a few properties for us. It will create a queuePool. This queuePool will get populated every time any new queue is injected. We will also need a method getBullBoardQueuesto pull all the queues when loading the UI.


@Injectable()
export class BullBoardQueue { }

export const queuePool: Set = new Set();

export const getBullBoardQueues = (): BaseAdapter[] => {
    const bullBoardQueues = [...queuePool].reduce((acc: BaseAdapter[], val) => {
        acc.push(new BullAdapter(val))
        return acc
    }, []);

    return bullBoardQueues
}

Add a controller

There are a couple of ways we could have accessed UI, but I prefer adding this through a controller, so my frontend can call the API. We create a BullBoardController to map our incoming request, response, and next like Express middleware. In our path for UI, we have a server adapter for Express. This allows us to set a base path. We fetch all the injected queues so far using getBullBoardQueuesmethod described above. We then use createBullBoardAPI to get addQueue method. serverAdapterhas provided us with a router that we use to route incoming requests. Before we route that request, we need to do a little hack of replacing entryPointPath with /.


@Controller('/queues/admin')
export class BullBoardController{
    
    @All('*')
    admin(
        @Request() req: express.Request,
        @Response() res: express.Response,
        @Next() next: express.NextFunction,
    ) {
        const serverAdapter = new ExpressAdapter();
        serverAdapter.setBasePath('/queues/admin');
        const queues = getBullBoardQueues();
        const router = serverAdapter.getRouter() as express.Express;
        const { addQueue } = createBullBoard({
            queues: [],
            serverAdapter,
        });
        queues.forEach((queue: BaseAdapter) => {
            addQueue(queue);
        });
        const entryPointPath = '/queues/admin/';
        req.url = req.url.replace(entryPointPath, '/');
        router(req, res, next);
    }
}

Now if we run our application and access the UI, we will see a nice UI for Bull Dashboard as below:
Bull Queues NestJS Application - Dashboard

Finally, the nice thing about this UI is that you can see all the segregated options.

Conclusion

Bull queues are a great feature to manage some resource-intensive tasks. In this post, we learned how we can add Bull queues in our NestJS application. We also easily integrated a Bull Board with our application to manage these queues. The code for this post is available here.

Do you want to read more posts about NestJS? Send me your feedback here.