Monthly Archives: April 2024

Background Job Scheduling with Pg-Boss and NestJS

In a distributed system, we need to automate various workflows and there are ways to automate these workflows. One of the ways that I have used frequently is, background jobs. And if you noticed, even my last post was about using background jobs with trigger.dev. In this post, I will share a new way for background job scheduling with Pg-Boss and NestJS.  pg-boss is a job queue based on postgres database. How cool is that? And if it is not clear, I love Postgres database.

What is Pg-Boss?

pg-boss is a job queue library for node.js application and uses Postgres for persistence. It uses reliability of Postgres for storing, scheduling jobs. Over last 10 years, Postgres as database had made changes and pg-boss as a feature for background job queuing is one of them. In short, pg-boss reduces one more infrastructure item while building a distributed system application.

Pg-Boss vs Bull Queues

You might have noticed, I have written extensively about Bull Queues. Bull queues are another way for job queue system based on redis. The question that might come immediately, then why use pg-boss? Bull queues have been great and easy to use as well. There has been a huge community support for bull queues. Then what is so special about pg-boss?

I get it.

There is no right or wrong answer when choosing pg-boss or bull queues. It all depends on the system you are building.

Then why choose pg-boss?

If you are already using Postgres as a database for your application, then it is a no-brainer to use pg-boss instead of bull queues which are based on Redis.  Redis is an additional infrastructure overhead if you are not using cache for any other purposes.

Since pg-boss is based on Postgres and Postgres is really good at transactional support, locking and handles the queue-based mechanism really well.

Nevertheless, choose the queueing mechanism that fits your needs right. There is no trade-off if you choose Bull queues over pg-boss.

Fundamentals of Pg-Boss

Pg-Boss is a queue system and it is very similar to various other queue systems like Bull, AWS SQS.

Jobs in Pg-Boss queue are a state machine. All jobs start with created state and then move to active when picked up for processing.

When the worker picks up the job to process and completes it successfully, it moves to completed state.

If the job fails, it moves to failed state and can be moved to retry state if retries options are configured. If this job is retried, it can move back into active state.

active job takes too long to process, then it moves into expired state.

Any job that is either in created or active state, you can cancel them with cancel and then the job will move into cancelled state.

All jobs that are completed, failed, cancelled or expired, can move to archive state.

Pg-Boss and NestJS

In this post, we will show a simple example of how to use pg-boss queue with NestJS application and how we can schedule a background job with Pg-Boss.

We will use @apricote/nest-pg-boss module in our application to integrate pg-boss.

I will not go over setting up a sample NestJS application. In this sample application, I am using Prisma ORM and Postgres database.

Let’s start with installing this module dependency in your NestJS Application.

npm install @apricote/nest-pg-boss

Once the dependency is installed, we will set up the PGBossModule module in our top main module.

Add the following module in your app.module.ts file


    PGBossModule.forRootAsync({
      application_name: 'default',
      useFactory: (config: ConfigService) => ({
        host: config.get('DB_HOST'),
        user: config.get('DB_USERNAME'),
        password: config.get("DB_PASSWORD"),
        database: config.get("DB_DATABASE"),
        schema: "public",
        max: config.get("DB_POOL_MAX"),
      }),
      inject: [ConfigService],
    }),

As you can see there are few environment variables are involved, make sure you have those configured for your database.

Once you start the application, you will notice few things about the Pg-Boss. It will initialize that module and if you have created any jobs, it will display the corresponding workers.

Interestingly, when the Pg-Boss module is initialized, it will also create these tables in your Postgres database.
archive
job
schedule
subscription
version
Pg-Boss module uses these tables to persist the jobs and their states. It also allows us to schedule these jobs at particular frequency. In short, a nice replacement for cron jobs.

In the next section, I will show how to create a job, schedule it and run it with an handler.

Scheduling Job with Pg-Boss

Let’s create our first job for Pg-Boss queue. @apricote/nest-pg-boss library offers an handy method createJob .

I created a simple interface for job data that I will pass for my job. This job will send a welcome email to user after they sign up for my application.

import { createJob } from "@apricote/nest-pg-boss";

export interface UserJobData {
    email: string;
    firstName: string;
}

export const userCreatedJob = createJob('user-signup-job');

And here we created a simple job user-signup-job.

And now include this job in your module so it will be created on the application start up. Basically, it will create a worker to process the job when the application will trigger this job.

@Module({
  imports: [DBAccessModule, CompanyApiModule, UsersModule, PGBossModule.forJobs([userCreatedJob])],
  controllers: [UsersController],
  providers: [UsersService, UserJobService],
  exports: [UsersService]
})
export class UsersApiModule {}

In my controller for sign-up, application saves the user information in database. Once that is complete, it will send this job in the queue.


    async createUser(user: CreateUserDto, company: Company) {
        const hashedPassword = await bcrypt.hash(user.password, 12);

        const userToBeCreated = User.createNewUser({            
            firstName: user.firstName,
            lastName: user.lastName,
            email: user.email,
            companyId: company.id,
            password: hashedPassword,
        });
        const savedUser = await this.userRepository.save(userToBeCreated);

        await this.userCreatedJobService.send({email: savedUser.email, firstName: savedUser.firstName}, {});

        return savedUser;
    }

In the next section, we will see how to run this job with an handler.

Running Job with Handlers

In my previous createUser method, we sent a job to worker for processing. I have a service UserCreatedJobService and that is a worker to handle this job.


import { Injectable } from "@nestjs/common";
import { UserJobData, userCreatedJob } from "common/jobs";
import { Job } from "pg-boss";

@Injectable()
export class UserJobService {
    
    @userCreatedJob.Handle()
    async handleJob(job: Job) {
        console.log(`sending an email to user with email ${job.data.email} and name ${job.data.firstName}`);
    }
}

The decorate @userCreatedJob.Handle() allows us to process this job added in the queue.

More about Job-Options

We showed a simple job in this post. But, in a real production application, you might need to create complex jobs depending on your requirements. Good thing is that Pg-Boss offers variety of options to configure these jobs.

  • priority – You can configure priority for the job with a number. For higher priority jobs, use the bigger number.
  • retry options – There are few options like retryLimit, retryDelay and retryBackoff available to retry the jobs.
  • expiration options – If you have a long running jobs, you probably need to set expireInSeconds option. There is also expireInMinutes OR expireInHours option.
  • retention options – This option allows to retain the job in a particular state before it can be archived. retentionSeconds, retentionMinutes.
  • deferred job – The option startAfter allows to specify when to start the job.
  • unique jobs – singletonKey OR useSingletonQueue allow a unique key for the job and that ensures only one job with that key is processed.

Conclusion

In this post, I shared the details of Pg-Boss queue library and how to use it for background job scheduling in a NestJS application.