Author Archives: yogesh.mali@gmail.com

System Thinking

In this post, I want to dabble around system thinking. The post comes from a tweet that someone tweeted about software and how software can stay error free if there are no code changes.

Introduction

It’s naïve to think of software as a static system. Static systems are those that do not change in response to external factors. However, I argue that truly static systems are exceedingly rare. Consider furniture, books, or other objects in your home—they might seem static at first glance. But one factor changes them all: time.

Time is the universal force that alters everything. Imagine a piece of furniture crafted today. How will it look 100 years from now? Or 200 years? We see many antiquated buildings that have endured for decades, even centuries, but they are not immune to change. Over time, cracks may form in the walls, and pipes installed a century ago may deteriorate. Time is a relentless and influential external factor, and no system, no matter how static it seems, is exempt.

The same applies to software. Whether you make code changes or not, the passage of time affects software. Systems degrade, dependencies become outdated, and performance may decline. Time alone transforms even seemingly unchanging software into a dynamic system.

Dynamic systems are inherently subject to change. They evolve in response to external variables, internal behaviors, and time.

Given that systems are rarely static, how should we approach thinking about them?

To understand systems effectively, you need to consider both the big picture and the small picture—often referred to as the macro view and the micro view.

Developing a mindset for system thinking takes experience, and often, the best way to learn is by building a system yourself. Every system exhibits certain properties, and the manifestation of these properties determines how the system will behave.

System Thinking in Software

Usually, when it comes to software, there can be two types of feedback loops. One that is automatic and other that is manual. Automatic is mostly when the system adapts based on the scenarios it is in and how it has acted based on the input. With the advancement of technology, we have seen a self-healing systems and that is mostly automatic feedback loop. The example that comes to mind is kubernetes auto scaling feature.

Kubernetes auto scales up the pod based on the load it sees on the system and scales down when the load on the system goes down.

On the other hand, software serves a customer and customer provides a feedback about what the software is doing and what not. Based on that feedback, an engineer can tweak the system to behave differently for the customer. That’s manual feedback.

System thinking often involves observing events or data to identify patterns of behavior over time. This will surface the underlying structure that triggers those events.

Conclusion

In this post, I covered some thoughts about system thinking. System thinking in software is even more critical when building a distributed system. An experienced engineer will always look at macro view while making decisions for micro.

 

How Databases Work

In this post, we will explore how databases work. There are number of databases including SQL and NoSQL, but we will mostly be talking about relational databases.

Introduction

Every time, I have used databases for queries, I have wondered how the databases work internally. Coming from programming languages background, it is easy to understand on the surface.

A user writes a query with certain syntax, the code gets compiled into machine code with grammar and an interpreter executes the machine code. That’s a simple understanding.

Databases Internals

In short, databases have frontend of Interface, SQL Command Processor and Virtual Machine.

And there is a backend of B-Tree, Pager and OS Interface.

This particular architecture is specific to SQLite database, but most of the other SQL (relational) databases work in the similar ways.

Overview of Database

Let’s dive into the overview of the database internals. In previous section, I showed the architecture that contains frontend and backend components. Let’s look at those components and what they do

A user types SQL query using user interface. SQL query goes through three stages tokenizer, parser and code generator.

From the written SQL query, tokenizer creates identifiable tokens. These tokens are then parsed in attribute grammar for the language. Code generator takes the generated grammar to create virtual machine bytecode.

Virtual-machine then takes operations generated from bytecode and stores in a data structure B-Tree. Virtual Machine is a big switch statement.

B-Tree consists of many nodes and each node is a page. B-Tree retrieves a page from disk or saves the page to disk by issuing a command to Pager.

Pager does the majority of work of reading and writing in the database files. OS Interface is an OS dependent layer and helps Pager in reading and writing the data in files. Depending on the OS of server where the database is running, OS interface can function differently.

Understanding B-Tree

Database uses B-Tree data structure to represent table and index.

B-Tree is a data structure that allows efficient data storage and retrieval.

Search for a specific value is O(log n) for time complexity.

Insert OR Delete a value is also O(log n) for time complexity.

Space Complexity while using B-Tree is also O(n).

B-Tree is a self-balancing tree that keeps the data sorted. Having sorted data allows fast search and insertion or deletion operation.

The main difference between Binary Tree and B-Tree is that B-Tree can have more than 2 children.

B-Tree properties

  1. Every node has at most m children
  2. Every node except root and leaves has at least m/2 children
  3. Root node has at least 2 children unless it is a leaf node
  4. All leaves appear on the same level
  5. A non-leaf node with k children contains k-1 keys

Binary Search

  • Each node contains a sorted array of keys.
  • Binary search is used within each node to find key.
  • It reduces the number of comparisons needed within nodes.

The way search works in B-Tree is as follows:

  • Search starts at the root node.
  • Use binary search to find either the key if it exists in current node OR find the correct child node to traverse next
  • Repeat the process until key is found or leaf is reached.

Conclusion

In this post, we shared the internals of database and how database works. If  you want to read more about SQLite DB, you can visit the official documentation here.

NodeJS Streams Explained: A Detailed Walkthrough

In this post, I will show how the NodeJS streaming is a powerful feature to process a large set of data.

NodeJS offers few in built npm libraries for streaming. stream is one of those libraries.

Introduction

Streams in NodeJS can be one of the best features as well as the most misunderstood features at the same time. And a lot of time, the confusion stems from number of options that are available within npm ecosystem. Streaming is a general data handling technique allows to process the data sequentially at a controlled speed without overwhelming the memory or CPU.

When processing a large set of data, especially from files, it could be challenging to read all the data in memory and process it. This can create high memory usage as well CPU usage. In turn, it can cause backend service to fail.

Streaming in NodeJS

Streaming is an old concept. It has been popularized when we started building a lot of data-intensive applications. The most popular being Netflix or Youtube. Idea of stream is to take small set of data (a character or a byte) and process it and continue the process till the we have completely read all the data.

There are mainly two types of streams – readable and writable. There are also duplex that does both reading and writing.

  • Readable stream (Input Stream) is where you read the data from.
  • Writeable stream (Output Stream) is where you write the data into.

Files, Database, Console can be considered for readable stream while they can be considered for writable stream as well. Readable stream can be combined with writable stream to make processing easier. This is also considered as piping. Piping has been there from the time of unix invention. If you have used pipe in unix where you can combine more than one commands, theoretically, it is the same concept when combining two streams.

Transform

NodeJS offers steams features with a number of powerful concepts. And one of them is transform. We just not only transfer the data between readable and writeable streams, but we can also do transformation on this data as it becomes available through readable stream.

Another powerful feature for transform is that you can combine multiple transforms and do various operations on the data that is getting passed from readable (input) stream. To create a transform, you will need stream from nodejs.
const { Transform } = require('stream');

There is another package through2 that’s a wrapper over transform. Considering the package has not been updated in more than 4 years now, I do not recommend it. You can just use the inbuilt transform from nodejs stream package.

Alternatively, you can also implement Custom Transform by extending Transform class. Each transform comes with a function that has chunk, encoding and callback. Chunk represents the data from stream, encoding if you are using some encoded data and callback to return after processing the chunk.

Piping

As previously said, piping comes from Unix. But within NodeJS stream, we can also use pipe to combine multiple streams. This allows data to flow from one stream to another as it gets processed.
In data intensive applications, we will come across scenarios where we will have to perform various operations on data at different stages. In such scenarios, piping allows to combine transform and pass data between transform.

Here is an example of how piping can be implemented

const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

await pipelineAsync(
        buildSellObjectTransform,
        splitUserDataTransform,
        enqueueDataTransform
      );

Backpressure

So far, I have mentioned that how strong stream as a feature is from NodeJS. But it comes with its own set of concerns/issues. Processing large set of data can still pose challenges with stream.

At the end, it depends on how fast the stream is processing and how fast output is handling this data.

Look at example from above where I have a pipeline. And I will explain why I used pipelineAsync instead of just pipeline.

When there is a constraint on resources, we want to make sure that we don’t overwhelm the downstream services. Input stream will keep sending data till it reaches the end of it, but transform OR other downstream services that are handling that data needs to match the same speed as input stream.

Streams are fire and forget . Once they start sending data, they don’t care on how other services are handling that data. This creates an issue.

This is what NodeJS documentation describes –
There is a general problem that occurs during data handling called backpressure and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog.

If you look at below picture, you can see a faucet releasing the water at force and if we don’t apply backpressure on the faucet, water can overflow. Backpressure is the same idea in nodejs stream.

Backpressure in NodeJS Streams
By applying some backpressure, stream only release certain size of data and wait for receiving service to process it before releasing more data. There are various ways to solve the problem for backpressure.

  • Piping is one of the solutions to handle backpressure. If you use pipe() from nodejs, this can handle backpressure. Sometimes, you have to explicitly set the value for highWaterMark on what data size to handle.
  • pipelineAsync/pipeline handles backpressure automatically without having to set highWaterMark.

Error Handling

We have talked about reading data, handling data. But what happens to streams or pipeline if there is a corrupt data or some data processing function failed either through system error or custom error.
In cases when there is an error in transform stream, you can use callback(error) to catch the error in your calling function. And on error, you can either destroy or drain the stream so the objects get cleaned up and the stream doesn’t end up occupying memory waiting for garbage collection.

async _transform(chunk, encoding, callback) {
    try {
      const sellItem = this.buildSellObject(chunk, this.customFields);
      if (this.push(sellItem)) {
        callback();
      } else {
        this.once('drain', callback);
      }
    } catch (error) {
      this.logger.error(error);
      callback(error);
    }
  }

And the calling function

    try {
      await pipelineAsync(
        buildSellObjectTransform,
        splitUserDataTransform,
        enqueueDataTransform
      );
    } catch (error) {
      this.logger.error(`${logPrefix} Error in processing file data: ${error}`);
      throw error;
    } finally {
      this.logger.info(`${logPrefix} ends file processing`);
      fileStream.destroy();
    }

Conclusion

In this post, I shared the details on how powerful the feature of streams from nodejs is. With transform and piping, the stream can be used in various use cases of large data processing.

References

Scaling Bull Jobs In NestJS Application

In this post, I want to show how to scale processing of bull jobs in a NestJS Application. As we know, we can use Bull Queue mechanism for asynchronous tasks, we can also easily process a lot of tasks parallelly using Bull Queue. One technique is to use horizontal scaling your workers.

This post will show the demonstration of horizontal scaling of bull queue workers.

Introduction

This is a simple idea and I want to show case the scaling power of bull queue. One thing I really like about bull jobs is that they are performant and easy to scale. The jobs also give an option to do asynchronous processing. Keep in mind that if you have a CPU intensive task, you can still use Bull queue. I have covered how to use Bull Queue for asynchronous processing.

As part of this post, we will upload a large file. The controller stores the file on disk, but adds a job to bull queue. The worker for this job will then read the file data and add each record to another bull queue.

We will have multiple workers to process file data. Each worker will process a separate job that is there in the bull queue.

Adding a single job

Let’s start with a single job first. I will not be covering any fundamentals about Bull Queues and workers. I previously wrote about worker pattern.

Nevertheless, we have a NestJS application with an API to upload a file. This API will create the first job in the queue file-upload-queue as follows:


  @Post('/uploadFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadLargeCsvFile(@UploadedFile() file): Promise {
    const job = await this.fileQueue.add('process-file', {file: file});
    console.log(`created job ${ job.id}`);
    await this.fileQueue.close();
  }

Above code basically adds a job to file-upload-queue to process that file.

Adding multiple jobs

One of the features that bull library offers to add multiple jobs to a queue. In our example, we read the data from file and we add each record as a job to another bull queue. This allows us to add multiple jobs to queue file-data-queue.


import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { Job, Queue } from "bull";

const csv = require('csvtojson');

@Processor('file-upload-queue')
export class FileUploadProcessor{

    constructor(@InjectQueue('file-data-queue') private fileDataQueue: Queue) {}
    
    @Process('process-file')
    async processFile(job: Job) {
        const file = job.data.file;
        const filePath = file.path;
        const userData = await csv().fromFile(filePath);

        await this.fileDataQueue.addBulk(userData.map(user => ({
            name: 'process-data',
            data: user
        })));

        console.log('file uploaded successfully');
    }
    
}

We use addBulk functionality to add all the records from the file to queue file-data-queue.

Worker

Creating a worker through NestJS framework is simple. NestJS has a feature to run standalone application. We will use the same to run our workers while creating a separate module to process jobs from file-data-queue.

Our separate module FileDataModule will have a processor to process each record from the file.


import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";


@Processor('file-data-queue')
export class FileDataProcessor{

    
    @Process('process-data')
    async processFile(job: Job) {
        const data = job.data;

        console.log('processing data for a single user');
        console.log(data);

        // To-Do add some processing like inserting this data in DB
    }
    
}

We will use createApplicationContext to create a worker for FileDataModule like below:


import { NestFactory } from "@nestjs/core";
import { FileDataModule } from "src/file-read/file-data.module";

async function main() {
    const app = await NestFactory.createApplicationContext(FileDataModule, 
        {
            bufferLogs: true,
            abortOnError: false,
        }
    );
    app.enableShutdownHooks();
    await app.init();
    console.log(`Worker started`);

    process.on('SIGINT', async () => {
        console.log(`SIGINT signal received`);
        try {
            console.log('closing app...');
            await app.close();
            console.log(`Worker stopped`);
        } catch (error) {
            console.error(`Error during shutdown: ${error.message}`);

        } finally {
            console.log('exiting...');
            process.exit(0);
        }
    });
}

main();

This worker basically starts the application and waits for SIGINT signal to be terminated. Considering this worker is create the application context for FileDataModule, it uses the processor FileDataProcessor to process data from the queue.

Scaling Workers

We will run two instances of the worker we created above. We will also be running our NestJS Application and if we have imported FileDataModule in our main application module, we will have three instances of FileDataProcessor running to process the jobs from the bull queue file-data-queue.

There are two concepts to understand in Bull Queue since bull can offer either.

Parallelism

In Parallelism, two or more tasks run in parallel, independent of each other. The simplest way to understand this is when you have multiple machines running and each performing its own task.

Concurrency

Two or more tasks runs at the same time while diving the available CPU so that all the tasks can advance in their processing.

Concurrency might not increase the throughout, but parallelism will. Parallelism also scales linearly that means if you add more workers, more jobs will get processed.

Bull offers configuration for concurrency. But in this demo, we are focusing on parallelism.

Demo

We have already described the scenario. We have a NestJS application with an API to upload a file is running in one terminal. We have two workers running in two other terminals.

We upload the file through Postman with our API. This API will create the first job. Processor for this job then adds multiple jobs to another queue file-data-queue.

The NestJS application and the two workers then process jobs from this queue in parallel. The three screenshots below show the application, the worker 1 and the worker 2.

The demo of Worker running in NestJS Application

Bull Worker 1 processing the job

Bull Worker 2 Processing the bull jobs

NestJS Application Processing the bull jobs

Conclusion

In this post, I showed how to scale bull job workers horizontally. This allows us to process jobs with high throughput. The code for this post is available here.

Custom Decorators in a NestJS Application

NestJS is a great framework to build NodeJS based server application. Some of the features like pipes, guards, interceptors make writing a NestJS application even cleaner. Thanks to Dependency Injection, the code can be descriptive and modularized further.

In this post, I will cover the concept of Custom Decorator and how one can use it to write some common code that you might have across your code base.

What are Custom Decorators?

Decorator is not a new concept as it has been there in other languages, especially Java based framework – Spring Boot. Annotation based decorators do certain common task. In short, you don’t have to write a lot of duplicate code to achieve something.

In Javascript and Typescript languages, decorators are relatively new.

Simply, decorators are functions. These functions can also return functions. Once you have defined a decorator, you use decorator in your code by using @ with decorator name. I will show this further in the example.

In NestJS, there are a lot of decorators that we use that are provided by the framework.

Param Decorators like @Req(), @Res(), @Body(), @Param(), @Query() OR even decorators to bind incoming requests to controller like @Get(), @Post(), @Put(). All these decorators take the incoming request and parse the request for parameters or handlers.

Post request coming to a controller get handled HTTP route handler. Let’s look at this simple example below:

    @Controller()
   export class AppController {
     
       constructor(private readonly appService: AppService) {}
       
       @Get()
       getHello(): String {
         return "Hello World"
       }
  }

As you can see, we have a Controller for defining the API and Get for get API with default path. These are decorators performing certain tasks in the framework behind.

When should you use Custom Decorators

When writing the business logic, there can be various scenarios OR if there is a duplicate code, you can think of using custom decorators. Nevertheless, the following are the key situations when you should use custom decorators

  1. Avoid complexity with repetitive code – In many cases, you might have to do certain check like idempotency check, user id existence check. You can either write a custom guard or a custom decorator to implement repetitive functionality with a custom decorator.
  2. Loose coupling – To make your application code more loosely coupled, irrespective of what server you use underneath (express or fastify).

Example of Custom Decorator

Let’s dive into a simple example for custom decorator.

Our UI is sending a request with a payload that contains mostly snake case parameters. This request comes to one of our API on the backend controller for processing.

Custom Decorator that we will be building is ParameterTransformer. This custom decorator takes the request body and transforms the request body parameter to camel case and forwards to controller.

import { createParamDecorator, ExecutionContext } from "@nestjs/common";

function snakeToCamel(str: string): string {
    return str.replace(/([-_][a-z])/g, (group) =>
      group.toUpperCase().replace('-', '').replace('_', '')
    );
}

export const ParameterTransformer = createParamDecorator(
    (data: unknown, ctx: ExecutionContext) => {
        const request = ctx.switchToHttp().getRequest();

        const body = request.body;
        const transformedBody = Object.keys(body).reduce((acc, key) => {
            const camelKey = snakeToCamel(key);
            acc[camelKey] = body[key];
            return acc;
          }, {});
      
        return transformedBody;
    }
);

Now we can use this custom decorator in our controller for incoming request body.

  @Get()
  getHello(@ParameterTransformer() helloWorldDto: HelloWorldDto): string {
    console.log(helloWorldDto);
    return this.appService.getHello();
  }

Even though the transformed object type has been defined as HelloWorldDto, the returned object from decorator won’t necessarily follow that type.

This particular example is useful when you want to update backend API without changing on frontend.

Conclusion

In this post, I showed how to create a custom decorator in a NestJS Application. Custom decorators can improve readability and reduce complexity. A fair warning to not over use the custom decorators as that can easily increase complexity.