In many scenarios, you will have to handle asynchronous CPU-intensive tasks. Especially, if an application is asking for data through REST API. REST endpoint should respond within a limited timeframe.
In this post, I will show how we can use queues to handle asynchronous tasks. We will be using Bull queues in a simple NestJS application.
Queues are a data structure that follows a linear order. In most systems, queues act like a series of tasks. A publisher publishes a message or task to the queue. A consumer picks up that message for further processing. This can happen asynchronously, providing much-needed respite to CPU-intensive tasks. Once the consumer consumes the message, the message is not available to any other consumer.
Bull queues are based on Redis. In my previous post, I covered how to add a health check for Redis or a database in a NestJS application.
- Set up NestJS Application
- Bull Queues in NestJS Application
- Implementing a Processor to process queue data
- Integrating Bull Dashboard
- Add Bull Board Class
- Add a Controller
- Conclusion
Set up NestJS Application
As part of this demo, we will create a simple application. We will upload user data through csv file. A controller will accept this file and pass it to a queue. A processor will pick up the queued job and process the file to save data from CSV file into the database.
nest new bullqueuedemo
Once this command creates the folder for bullqueuedemo
, we will set up Prisma ORM to connect to the database. (Note – make sure you install prisma dependencies.).
npx prisma init
If you are using a Windows machine, you might run into an error for running prisma init. All things considered, set up an environment variable to avoid this error.
set PRISMA_CLI_QUERY_ENGINE_TYPE=binary
set PRISMA_CLIENT_ENGINE_TYPE=binary
Once the schema is created, we will update it with our database tables. For this demo, we are creating a single table user
.
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema
generator client {
provider = "prisma-client-js"
engineType = "binary"
}
datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}
model User {
id Int @default(autoincrement()) @id
email String @unique
first_name String
last_name String?
}
Now if we run npm run prisma migrate dev
, it will create a database table.
In summary, so far we have created a NestJS application and set up our database with Prisma ORM. Let’s look at the configuration we have to add for Bull Queue.
Bull Queues in NestJS Application
Install @nestjs/bull
dependency. This dependency encapsulates the bull library. We will assume that you have redis
installed and running. By default, Redis will run on port 6379.
We will add REDIS_HOST
and REDIS_PORT
as environment variables in our .env
file. Install two dependencies for Bull as follows:
npm install @nestjs/bull
npm install @types/bull
Afterward, we will set up the connection with Redis by adding BullModule
to our app module.
@Module({
imports: [
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('REDIS_HOST'),
port: Number(configService.get('REDIS_PORT')),
},
}),
inject: [ConfigService]
}),
BullModule.registerQueue({
name: 'file-upload-queue'
}),
],
controllers: [AppController, BullBoardController],
providers: [UserService, PrismaService, FileUploadProcessor,],
})
export class AppModule {}
We are injecting ConfigService. This service allows us to fetch environment variables at runtime. With this, we will be able to use BullModule
across our application.
As you can see in the above code, we have BullModule.registerQueue
and that registers our queue file-upload-queue
. Let’s now add this queue in our controller where will use it.
@Controller('/api/bullqueuedemo')
export class AppController {
constructor(@InjectQueue('file-upload-queue') private fileQueue: Queue) {
queuePool.add(fileQueue);
}
@Post('/uploadFile')
@UseInterceptors(FileInterceptor("csv", {
storage: diskStorage({
destination: './csv',
fileName: (req, file, cb) => {
const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
}
})
}))
async uploadCsvFile(@UploadedFile() file): Promise {
const job = await this.fileQueue.add('csvfilejob', {file: file});
console.log(`created job ${ job.id}`);
}
@Get('/')
async getHello(): Promise {
return "Hello World";
}
}
Let’s go over this code slowly to understand what’s happening.
- In the constructor, we are injecting the queue
InjectQueue('file-upload-queue')
. - Our POST API is for uploading a csv file.
- We are using a FileInterceptor. This is a feature that NestJS offers to intercept the request and extract files from the request. This interceptor takes two arguments
fieldName
andoptions
. storage
option allows us to store the uploaded file in a folder calledcsv
in the current directory of execution. The uploaded file will be renamed with a randomly generated name and extension.csv
.- In the method
uploadCsvFile
, we receive the uploaded file. This comes from our FileInterceptor. We use our injected queue to add a job with a namecsvfilejob
and data containing the file.
Implementing a Processor to process Queue data
Thereafter, we have added a job to our queue file-upload-queue
. Now to process this job further, we will implement a processor FileUploadProcessor
.
We will annotate this consumer with @Processor('file-upload-queue')
.
@Processor('file-upload-queue')
export class FileUploadProcessor {
constructor(private readonly userService: UserService){}
@Process('csvfilejob')
async processFile(job: Job) {
const file = job.data.file;
const filePath = file.path;
const userData = await csv().fromFile(filePath);
console.log(userData);
for(const user of userData) {
const input = {
email: user.email,
first_name: user.first_name,
last_name: user.last_name,
};
const userCreated = await this.userService.createUser(input);
console.log('User created -', userCreated.id );
}
}
}
Shortly, we can see we consume the job from the queue and fetch the file from job data. Note that we have to add @Process(jobName)
to the method that will be consuming the job. processFile
method consumes the job. We convert CSV data to JSON and then process each row to add a user to our database using UserService.
Once you create FileUploadProcessor
, make sure to register that as a provider in your app module.
To show this, if I execute the API through Postman, I will see the following data in the console:
[Nest] 21264 - 04/22/2022, 4:57:19 PM LOG [NestFactory] Starting Nest application...
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] DiscoveryModule dependencies initialized +43ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] ConfigHostModule dependencies initialized +0ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] BullModule dependencies initialized +4ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] ConfigModule dependencies initialized +0ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] BullModule dependencies initialized +12ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] BullModule dependencies initialized +10ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [RoutesResolver] AppController {/api/bullqueuedemo}: +62ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [RouterExplorer] Mapped {/api/bullqueuedemo/uploadFile, POST} route +3ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [RouterExplorer] Mapped {/api/bullqueuedemo, GET} route +1ms
[Nest] 21264 - 04/22/2022, 4:57:20 PM LOG [NestApplication] Nest application successfully started +582ms
created job 2
[
{
id: '1',
email: 'john.doe@gmail.com',
first_name: 'John',
last_name: 'Doe'
},
{
id: '2',
email: 'jacob.drake@gmail.com',
first_name: 'Jacob',
last_name: 'Drake'
},
{
id: '3',
email: 'jos.butler@gmail.com',
first_name: 'Jos',
last_name: 'Butler'
}
]
User created - 1
User created - 2
User created - 3
Bull queues offer a number of features:
- Minimal CPU usage
- Robust design based on redis
- Concurrency
- Retry
- Rate limiter
- Event monitoring
One question that constantly comes up is how do we monitor these queues if jobs fail or are paused. A simple solution would be using Redis CLI, but Redis CLI is not always available, especially in Production environments. Finally, comes a simple UI-based dashboard – Bull Dashboard.
Integrating Bull Dashboard
The great thing about Bull queues is that there is a UI available to monitor the queues. One can also add some options that can allow a user to retry jobs that are in a failed state. Let’s install two dependencies @bull-board/express
and @bull-board/api
.
npm install @bull-board/express
– This installs an express server-specific adapter. If you are using fastify
with your NestJS application, you will need @bull-board/fastify
.
npm install @bull-board/api
– This installs a core server API that allows creating of a Bull dashboard.
Add Bull Board Class
We will create a bull board queue class that will set a few properties for us. It will create a queuePool. This queuePool will get populated every time any new queue is injected. We will also need a method getBullBoardQueues
to pull all the queues when loading the UI.
@Injectable()
export class BullBoardQueue { }
export const queuePool: Set = new Set();
export const getBullBoardQueues = (): BaseAdapter[] => {
const bullBoardQueues = [...queuePool].reduce((acc: BaseAdapter[], val) => {
acc.push(new BullAdapter(val))
return acc
}, []);
return bullBoardQueues
}
Add a controller
There are a couple of ways we could have accessed UI, but I prefer adding this through a controller, so my frontend can call the API. We create a BullBoardController
to map our incoming request, response, and next like Express middleware. In our path for UI, we have a server adapter for Express. This allows us to set a base path. We fetch all the injected queues so far using getBullBoardQueues
method described above. We then use createBullBoard
API to get addQueue
method. serverAdapter
has provided us with a router that we use to route incoming requests. Before we route that request, we need to do a little hack of replacing entryPointPath with /
.
@Controller('/queues/admin')
export class BullBoardController{
@All('*')
admin(
@Request() req: express.Request,
@Response() res: express.Response,
@Next() next: express.NextFunction,
) {
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/queues/admin');
const queues = getBullBoardQueues();
const router = serverAdapter.getRouter() as express.Express;
const { addQueue } = createBullBoard({
queues: [],
serverAdapter,
});
queues.forEach((queue: BaseAdapter) => {
addQueue(queue);
});
const entryPointPath = '/queues/admin/';
req.url = req.url.replace(entryPointPath, '/');
router(req, res, next);
}
}
Now if we run our application and access the UI, we will see a nice UI for Bull Dashboard as below:
Finally, the nice thing about this UI is that you can see all the segregated options.
Conclusion
Bull queues are a great feature to manage some resource-intensive tasks. In this post, we learned how we can add Bull queues in our NestJS application. We also easily integrated a Bull Board with our application to manage these queues. The code for this post is available here.
Do you want to read more posts about NestJS? Send me your feedback here.