Author Archives: yogesh.mali@gmail.com

How Databases Work

In this post, we will explore how databases work. There are number of databases including SQL and NoSQL, but we will mostly be talking about relational databases.

Introduction

Every time, I have used databases for queries, I have wondered how the databases work internally. Coming from programming languages background, it is easy to understand on the surface.

A user writes a query with certain syntax, the code gets compiled into machine code with grammar and an interpreter executes the machine code. That’s a simple understanding.

Databases Internals

In short, databases have frontend of Interface, SQL Command Processor and Virtual Machine.

And there is a backend of B-Tree, Pager and OS Interface.

This particular architecture is specific to SQLite database, but most of the other SQL (relational) databases work in the similar ways.

Overview of Database

Let’s dive into the overview of the database internals. In previous section, I showed the architecture that contains frontend and backend components. Let’s look at those components and what they do

A user types SQL query using user interface. SQL query goes through three stages tokenizer, parser and code generator.

From the written SQL query, tokenizer creates identifiable tokens. These tokens are then parsed in attribute grammar for the language. Code generator takes the generated grammar to create virtual machine bytecode.

Virtual-machine then takes operations generated from bytecode and stores in a data structure B-Tree. Virtual Machine is a big switch statement.

B-Tree consists of many nodes and each node is a page. B-Tree retrieves a page from disk or saves the page to disk by issuing a command to Pager.

Pager does the majority of work of reading and writing in the database files. OS Interface is an OS dependent layer and helps Pager in reading and writing the data in files. Depending on the OS of server where the database is running, OS interface can function differently.

Understanding B-Tree

Database uses B-Tree data structure to represent table and index.

B-Tree is a data structure that allows efficient data storage and retrieval.

Search for a specific value is O(log n) for time complexity.

Insert OR Delete a value is also O(log n) for time complexity.

Space Complexity while using B-Tree is also O(n).

B-Tree is a self-balancing tree that keeps the data sorted. Having sorted data allows fast search and insertion or deletion operation.

The main difference between Binary Tree and B-Tree is that B-Tree can have more than 2 children.

B-Tree properties

  1. Every node has at most m children
  2. Every node except root and leaves has at least m/2 children
  3. Root node has at least 2 children unless it is a leaf node
  4. All leaves appear on the same level
  5. A non-leaf node with k children contains k-1 keys

Binary Search

  • Each node contains a sorted array of keys.
  • Binary search is used within each node to find key.
  • It reduces the number of comparisons needed within nodes.

The way search works in B-Tree is as follows:

  • Search starts at the root node.
  • Use binary search to find either the key if it exists in current node OR find the correct child node to traverse next
  • Repeat the process until key is found or leaf is reached.

Conclusion

In this post, we shared the internals of database and how database works. If  you want to read more about SQLite DB, you can visit the official documentation here.

NodeJS Streams Explained: A Detailed Walkthrough

In this post, I will show how the NodeJS streaming is a powerful feature to process a large set of data.

NodeJS offers few in built npm libraries for streaming. stream is one of those libraries.

Introduction

Streams in NodeJS can be one of the best features as well as the most misunderstood features at the same time. And a lot of time, the confusion stems from number of options that are available within npm ecosystem. Streaming is a general data handling technique allows to process the data sequentially at a controlled speed without overwhelming the memory or CPU.

When processing a large set of data, especially from files, it could be challenging to read all the data in memory and process it. This can create high memory usage as well CPU usage. In turn, it can cause backend service to fail.

Streaming in NodeJS

Streaming is an old concept. It has been popularized when we started building a lot of data-intensive applications. The most popular being Netflix or Youtube. Idea of stream is to take small set of data (a character or a byte) and process it and continue the process till the we have completely read all the data.

There are mainly two types of streams – readable and writable. There are also duplex that does both reading and writing.

  • Readable stream (Input Stream) is where you read the data from.
  • Writeable stream (Output Stream) is where you write the data into.

Files, Database, Console can be considered for readable stream while they can be considered for writable stream as well. Readable stream can be combined with writable stream to make processing easier. This is also considered as piping. Piping has been there from the time of unix invention. If you have used pipe in unix where you can combine more than one commands, theoretically, it is the same concept when combining two streams.

Transform

NodeJS offers steams features with a number of powerful concepts. And one of them is transform. We just not only transfer the data between readable and writeable streams, but we can also do transformation on this data as it becomes available through readable stream.

Another powerful feature for transform is that you can combine multiple transforms and do various operations on the data that is getting passed from readable (input) stream. To create a transform, you will need stream from nodejs.
const { Transform } = require('stream');

There is another package through2 that’s a wrapper over transform. Considering the package has not been updated in more than 4 years now, I do not recommend it. You can just use the inbuilt transform from nodejs stream package.

Alternatively, you can also implement Custom Transform by extending Transform class. Each transform comes with a function that has chunk, encoding and callback. Chunk represents the data from stream, encoding if you are using some encoded data and callback to return after processing the chunk.

Piping

As previously said, piping comes from Unix. But within NodeJS stream, we can also use pipe to combine multiple streams. This allows data to flow from one stream to another as it gets processed.
In data intensive applications, we will come across scenarios where we will have to perform various operations on data at different stages. In such scenarios, piping allows to combine transform and pass data between transform.

Here is an example of how piping can be implemented

const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

await pipelineAsync(
        buildSellObjectTransform,
        splitUserDataTransform,
        enqueueDataTransform
      );

Backpressure

So far, I have mentioned that how strong stream as a feature is from NodeJS. But it comes with its own set of concerns/issues. Processing large set of data can still pose challenges with stream.

At the end, it depends on how fast the stream is processing and how fast output is handling this data.

Look at example from above where I have a pipeline. And I will explain why I used pipelineAsync instead of just pipeline.

When there is a constraint on resources, we want to make sure that we don’t overwhelm the downstream services. Input stream will keep sending data till it reaches the end of it, but transform OR other downstream services that are handling that data needs to match the same speed as input stream.

Streams are fire and forget . Once they start sending data, they don’t care on how other services are handling that data. This creates an issue.

This is what NodeJS documentation describes –
There is a general problem that occurs during data handling called backpressure and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog.

If you look at below picture, you can see a faucet releasing the water at force and if we don’t apply backpressure on the faucet, water can overflow. Backpressure is the same idea in nodejs stream.

Backpressure in NodeJS Streams
By applying some backpressure, stream only release certain size of data and wait for receiving service to process it before releasing more data. There are various ways to solve the problem for backpressure.

  • Piping is one of the solutions to handle backpressure. If you use pipe() from nodejs, this can handle backpressure. Sometimes, you have to explicitly set the value for highWaterMark on what data size to handle.
  • pipelineAsync/pipeline handles backpressure automatically without having to set highWaterMark.

Error Handling

We have talked about reading data, handling data. But what happens to streams or pipeline if there is a corrupt data or some data processing function failed either through system error or custom error.
In cases when there is an error in transform stream, you can use callback(error) to catch the error in your calling function. And on error, you can either destroy or drain the stream so the objects get cleaned up and the stream doesn’t end up occupying memory waiting for garbage collection.

async _transform(chunk, encoding, callback) {
    try {
      const sellItem = this.buildSellObject(chunk, this.customFields);
      if (this.push(sellItem)) {
        callback();
      } else {
        this.once('drain', callback);
      }
    } catch (error) {
      this.logger.error(error);
      callback(error);
    }
  }

And the calling function

    try {
      await pipelineAsync(
        buildSellObjectTransform,
        splitUserDataTransform,
        enqueueDataTransform
      );
    } catch (error) {
      this.logger.error(`${logPrefix} Error in processing file data: ${error}`);
      throw error;
    } finally {
      this.logger.info(`${logPrefix} ends file processing`);
      fileStream.destroy();
    }

Conclusion

In this post, I shared the details on how powerful the feature of streams from nodejs is. With transform and piping, the stream can be used in various use cases of large data processing.

References

Scaling Bull Jobs In NestJS Application

In this post, I want to show how to scale processing of bull jobs in a NestJS Application. As we know, we can use Bull Queue mechanism for asynchronous tasks, we can also easily process a lot of tasks parallelly using Bull Queue. One technique is to use horizontal scaling your workers.

This post will show the demonstration of horizontal scaling of bull queue workers.

Introduction

This is a simple idea and I want to show case the scaling power of bull queue. One thing I really like about bull jobs is that they are performant and easy to scale. The jobs also give an option to do asynchronous processing. Keep in mind that if you have a CPU intensive task, you can still use Bull queue. I have covered how to use Bull Queue for asynchronous processing.

As part of this post, we will upload a large file. The controller stores the file on disk, but adds a job to bull queue. The worker for this job will then read the file data and add each record to another bull queue.

We will have multiple workers to process file data. Each worker will process a separate job that is there in the bull queue.

Adding a single job

Let’s start with a single job first. I will not be covering any fundamentals about Bull Queues and workers. I previously wrote about worker pattern.

Nevertheless, we have a NestJS application with an API to upload a file. This API will create the first job in the queue file-upload-queue as follows:


  @Post('/uploadFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadLargeCsvFile(@UploadedFile() file): Promise {
    const job = await this.fileQueue.add('process-file', {file: file});
    console.log(`created job ${ job.id}`);
    await this.fileQueue.close();
  }

Above code basically adds a job to file-upload-queue to process that file.

Adding multiple jobs

One of the features that bull library offers to add multiple jobs to a queue. In our example, we read the data from file and we add each record as a job to another bull queue. This allows us to add multiple jobs to queue file-data-queue.


import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { Job, Queue } from "bull";

const csv = require('csvtojson');

@Processor('file-upload-queue')
export class FileUploadProcessor{

    constructor(@InjectQueue('file-data-queue') private fileDataQueue: Queue) {}
    
    @Process('process-file')
    async processFile(job: Job) {
        const file = job.data.file;
        const filePath = file.path;
        const userData = await csv().fromFile(filePath);

        await this.fileDataQueue.addBulk(userData.map(user => ({
            name: 'process-data',
            data: user
        })));

        console.log('file uploaded successfully');
    }
    
}

We use addBulk functionality to add all the records from the file to queue file-data-queue.

Worker

Creating a worker through NestJS framework is simple. NestJS has a feature to run standalone application. We will use the same to run our workers while creating a separate module to process jobs from file-data-queue.

Our separate module FileDataModule will have a processor to process each record from the file.


import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";


@Processor('file-data-queue')
export class FileDataProcessor{

    
    @Process('process-data')
    async processFile(job: Job) {
        const data = job.data;

        console.log('processing data for a single user');
        console.log(data);

        // To-Do add some processing like inserting this data in DB
    }
    
}

We will use createApplicationContext to create a worker for FileDataModule like below:


import { NestFactory } from "@nestjs/core";
import { FileDataModule } from "src/file-read/file-data.module";

async function main() {
    const app = await NestFactory.createApplicationContext(FileDataModule, 
        {
            bufferLogs: true,
            abortOnError: false,
        }
    );
    app.enableShutdownHooks();
    await app.init();
    console.log(`Worker started`);

    process.on('SIGINT', async () => {
        console.log(`SIGINT signal received`);
        try {
            console.log('closing app...');
            await app.close();
            console.log(`Worker stopped`);
        } catch (error) {
            console.error(`Error during shutdown: ${error.message}`);

        } finally {
            console.log('exiting...');
            process.exit(0);
        }
    });
}

main();

This worker basically starts the application and waits for SIGINT signal to be terminated. Considering this worker is create the application context for FileDataModule, it uses the processor FileDataProcessor to process data from the queue.

Scaling Workers

We will run two instances of the worker we created above. We will also be running our NestJS Application and if we have imported FileDataModule in our main application module, we will have three instances of FileDataProcessor running to process the jobs from the bull queue file-data-queue.

There are two concepts to understand in Bull Queue since bull can offer either.

Parallelism

In Parallelism, two or more tasks run in parallel, independent of each other. The simplest way to understand this is when you have multiple machines running and each performing its own task.

Concurrency

Two or more tasks runs at the same time while diving the available CPU so that all the tasks can advance in their processing.

Concurrency might not increase the throughout, but parallelism will. Parallelism also scales linearly that means if you add more workers, more jobs will get processed.

Bull offers configuration for concurrency. But in this demo, we are focusing on parallelism.

Demo

We have already described the scenario. We have a NestJS application with an API to upload a file is running in one terminal. We have two workers running in two other terminals.

We upload the file through Postman with our API. This API will create the first job. Processor for this job then adds multiple jobs to another queue file-data-queue.

The NestJS application and the two workers then process jobs from this queue in parallel. The three screenshots below show the application, the worker 1 and the worker 2.

The demo of Worker running in NestJS Application

Bull Worker 1 processing the job

Bull Worker 2 Processing the bull jobs

NestJS Application Processing the bull jobs

Conclusion

In this post, I showed how to scale bull job workers horizontally. This allows us to process jobs with high throughput. The code for this post is available here.

Custom Decorators in a NestJS Application

NestJS is a great framework to build NodeJS based server application. Some of the features like pipes, guards, interceptors make writing a NestJS application even cleaner. Thanks to Dependency Injection, the code can be descriptive and modularized further.

In this post, I will cover the concept of Custom Decorator and how one can use it to write some common code that you might have across your code base.

What are Custom Decorators?

Decorator is not a new concept as it has been there in other languages, especially Java based framework – Spring Boot. Annotation based decorators do certain common task. In short, you don’t have to write a lot of duplicate code to achieve something.

In Javascript and Typescript languages, decorators are relatively new.

Simply, decorators are functions. These functions can also return functions. Once you have defined a decorator, you use decorator in your code by using @ with decorator name. I will show this further in the example.

In NestJS, there are a lot of decorators that we use that are provided by the framework.

Param Decorators like @Req(), @Res(), @Body(), @Param(), @Query() OR even decorators to bind incoming requests to controller like @Get(), @Post(), @Put(). All these decorators take the incoming request and parse the request for parameters or handlers.

Post request coming to a controller get handled HTTP route handler. Let’s look at this simple example below:

    @Controller()
   export class AppController {
     
       constructor(private readonly appService: AppService) {}
       
       @Get()
       getHello(): String {
         return "Hello World"
       }
  }

As you can see, we have a Controller for defining the API and Get for get API with default path. These are decorators performing certain tasks in the framework behind.

When should you use Custom Decorators

When writing the business logic, there can be various scenarios OR if there is a duplicate code, you can think of using custom decorators. Nevertheless, the following are the key situations when you should use custom decorators

  1. Avoid complexity with repetitive code – In many cases, you might have to do certain check like idempotency check, user id existence check. You can either write a custom guard or a custom decorator to implement repetitive functionality with a custom decorator.
  2. Loose coupling – To make your application code more loosely coupled, irrespective of what server you use underneath (express or fastify).

Example of Custom Decorator

Let’s dive into a simple example for custom decorator.

Our UI is sending a request with a payload that contains mostly snake case parameters. This request comes to one of our API on the backend controller for processing.

Custom Decorator that we will be building is ParameterTransformer. This custom decorator takes the request body and transforms the request body parameter to camel case and forwards to controller.

import { createParamDecorator, ExecutionContext } from "@nestjs/common";

function snakeToCamel(str: string): string {
    return str.replace(/([-_][a-z])/g, (group) =>
      group.toUpperCase().replace('-', '').replace('_', '')
    );
}

export const ParameterTransformer = createParamDecorator(
    (data: unknown, ctx: ExecutionContext) => {
        const request = ctx.switchToHttp().getRequest();

        const body = request.body;
        const transformedBody = Object.keys(body).reduce((acc, key) => {
            const camelKey = snakeToCamel(key);
            acc[camelKey] = body[key];
            return acc;
          }, {});
      
        return transformedBody;
    }
);

Now we can use this custom decorator in our controller for incoming request body.

  @Get()
  getHello(@ParameterTransformer() helloWorldDto: HelloWorldDto): string {
    console.log(helloWorldDto);
    return this.appService.getHello();
  }

Even though the transformed object type has been defined as HelloWorldDto, the returned object from decorator won’t necessarily follow that type.

This particular example is useful when you want to update backend API without changing on frontend.

Conclusion

In this post, I showed how to create a custom decorator in a NestJS Application. Custom decorators can improve readability and reduce complexity. A fair warning to not over use the custom decorators as that can easily increase complexity.

Background Job Scheduling with Pg-Boss and NestJS

In a distributed system, we need to automate various workflows and there are ways to automate these workflows. One of the ways that I have used frequently is, background jobs. And if you noticed, even my last post was about using background jobs with trigger.dev. In this post, I will share a new way for background job scheduling with Pg-Boss and NestJS.  pg-boss is a job queue based on postgres database. How cool is that? And if it is not clear, I love Postgres database.

What is Pg-Boss?

pg-boss is a job queue library for node.js application and uses Postgres for persistence. It uses reliability of Postgres for storing, scheduling jobs. Over last 10 years, Postgres as database had made changes and pg-boss as a feature for background job queuing is one of them. In short, pg-boss reduces one more infrastructure item while building a distributed system application.

Pg-Boss vs Bull Queues

You might have noticed, I have written extensively about Bull Queues. Bull queues are another way for job queue system based on redis. The question that might come immediately, then why use pg-boss? Bull queues have been great and easy to use as well. There has been a huge community support for bull queues. Then what is so special about pg-boss?

I get it.

There is no right or wrong answer when choosing pg-boss or bull queues. It all depends on the system you are building.

Then why choose pg-boss?

If you are already using Postgres as a database for your application, then it is a no-brainer to use pg-boss instead of bull queues which are based on Redis.  Redis is an additional infrastructure overhead if you are not using cache for any other purposes.

Since pg-boss is based on Postgres and Postgres is really good at transactional support, locking and handles the queue-based mechanism really well.

Nevertheless, choose the queueing mechanism that fits your needs right. There is no trade-off if you choose Bull queues over pg-boss.

Fundamentals of Pg-Boss

Pg-Boss is a queue system and it is very similar to various other queue systems like Bull, AWS SQS.

Jobs in Pg-Boss queue are a state machine. All jobs start with created state and then move to active when picked up for processing.

When the worker picks up the job to process and completes it successfully, it moves to completed state.

If the job fails, it moves to failed state and can be moved to retry state if retries options are configured. If this job is retried, it can move back into active state.

active job takes too long to process, then it moves into expired state.

Any job that is either in created or active state, you can cancel them with cancel and then the job will move into cancelled state.

All jobs that are completed, failed, cancelled or expired, can move to archive state.

Pg-Boss and NestJS

In this post, we will show a simple example of how to use pg-boss queue with NestJS application and how we can schedule a background job with Pg-Boss.

We will use @apricote/nest-pg-boss module in our application to integrate pg-boss.

I will not go over setting up a sample NestJS application. In this sample application, I am using Prisma ORM and Postgres database.

Let’s start with installing this module dependency in your NestJS Application.

npm install @apricote/nest-pg-boss

Once the dependency is installed, we will set up the PGBossModule module in our top main module.

Add the following module in your app.module.ts file


    PGBossModule.forRootAsync({
      application_name: 'default',
      useFactory: (config: ConfigService) => ({
        host: config.get('DB_HOST'),
        user: config.get('DB_USERNAME'),
        password: config.get("DB_PASSWORD"),
        database: config.get("DB_DATABASE"),
        schema: "public",
        max: config.get("DB_POOL_MAX"),
      }),
      inject: [ConfigService],
    }),

As you can see there are few environment variables are involved, make sure you have those configured for your database.

Once you start the application, you will notice few things about the Pg-Boss. It will initialize that module and if you have created any jobs, it will display the corresponding workers.

Interestingly, when the Pg-Boss module is initialized, it will also create these tables in your Postgres database.
archive
job
schedule
subscription
version
Pg-Boss module uses these tables to persist the jobs and their states. It also allows us to schedule these jobs at particular frequency. In short, a nice replacement for cron jobs.

In the next section, I will show how to create a job, schedule it and run it with an handler.

Scheduling Job with Pg-Boss

Let’s create our first job for Pg-Boss queue. @apricote/nest-pg-boss library offers an handy method createJob .

I created a simple interface for job data that I will pass for my job. This job will send a welcome email to user after they sign up for my application.

import { createJob } from "@apricote/nest-pg-boss";

export interface UserJobData {
    email: string;
    firstName: string;
}

export const userCreatedJob = createJob('user-signup-job');

And here we created a simple job user-signup-job.

And now include this job in your module so it will be created on the application start up. Basically, it will create a worker to process the job when the application will trigger this job.

@Module({
  imports: [DBAccessModule, CompanyApiModule, UsersModule, PGBossModule.forJobs([userCreatedJob])],
  controllers: [UsersController],
  providers: [UsersService, UserJobService],
  exports: [UsersService]
})
export class UsersApiModule {}

In my controller for sign-up, application saves the user information in database. Once that is complete, it will send this job in the queue.


    async createUser(user: CreateUserDto, company: Company) {
        const hashedPassword = await bcrypt.hash(user.password, 12);

        const userToBeCreated = User.createNewUser({            
            firstName: user.firstName,
            lastName: user.lastName,
            email: user.email,
            companyId: company.id,
            password: hashedPassword,
        });
        const savedUser = await this.userRepository.save(userToBeCreated);

        await this.userCreatedJobService.send({email: savedUser.email, firstName: savedUser.firstName}, {});

        return savedUser;
    }

In the next section, we will see how to run this job with an handler.

Running Job with Handlers

In my previous createUser method, we sent a job to worker for processing. I have a service UserCreatedJobService and that is a worker to handle this job.


import { Injectable } from "@nestjs/common";
import { UserJobData, userCreatedJob } from "common/jobs";
import { Job } from "pg-boss";

@Injectable()
export class UserJobService {
    
    @userCreatedJob.Handle()
    async handleJob(job: Job) {
        console.log(`sending an email to user with email ${job.data.email} and name ${job.data.firstName}`);
    }
}

The decorate @userCreatedJob.Handle() allows us to process this job added in the queue.

More about Job-Options

We showed a simple job in this post. But, in a real production application, you might need to create complex jobs depending on your requirements. Good thing is that Pg-Boss offers variety of options to configure these jobs.

  • priority – You can configure priority for the job with a number. For higher priority jobs, use the bigger number.
  • retry options – There are few options like retryLimit, retryDelay and retryBackoff available to retry the jobs.
  • expiration options – If you have a long running jobs, you probably need to set expireInSeconds option. There is also expireInMinutes OR expireInHours option.
  • retention options – This option allows to retain the job in a particular state before it can be archived. retentionSeconds, retentionMinutes.
  • deferred job – The option startAfter allows to specify when to start the job.
  • unique jobs – singletonKey OR useSingletonQueue allow a unique key for the job and that ensures only one job with that key is processed.

Conclusion

In this post, I shared the details of Pg-Boss queue library and how to use it for background job scheduling in a NestJS application.