Using Flows with Bull Queue in a NestJS Application

In this post, I will show how to use the new feature of Flows with Bull Queue using NestJS application. If you are new to using Bull Queue, here is my previous post about using bull queue in NestJS application.

What are Flows?

Flows are the new feature from Bull Queue. With this feature, we can establish a parent-child relationship between jobs. A job in a queue can create a number of children jobs and push them into another queue for processing.

In real-world applications, you will need to split large images, video files, or even text files into smaller chunks, process them and bind them together again. Flow is an ideal candidate to implement splitting and binding operations and processing those chunks in parallel.

Divide and Conquer OR Map-Reduce are very well-known programming techniques. Flows follow the same pattern when processing CPU-intensive jobs.

File Validation And Processing

Now, let’s look at a real example to understand how the Flows work. As part of this example, we will be processing a large file. This large fill will go through file validation and valid records will be processed further or merged into another file OR stored in the database. To understand this better, look at the diagram below:

Using Flows with Bull Queue

Flows with Bull Queue

 

I will not cover the basics to get started with NestJS Application, but you can look at this previous post to get started.

BullMQ offers Flows as a feature currently. One key thing to note is that BullMQ and Bull are different open-source libraries supporting the same set of features. And both libraries work with NestJs as well. Bull does not offer Flows as a feature yet.

Install bullmq as the library for your application.

npm i  bullmq

npm i @nestjs/bullmq

Splitting a large file

To understand Flows better, let’s create our design flow first.

  • Upload a large file.
  • A bull job will split the file into chunks.
  • We will create a Flow to process each of these chunks.
  • The parent job of the flow will merge the result of all chunks into another file.

We are using a NestJS application as part of this demo and we have a controller to upload the file.

Nevertheless, we will first configure our Bull Queues with Redis Connection.

import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AppController } from './app.controller';
import { BullBoardController } from './bull-board-controller';
import { FileUploadProcessor } from './file-upload.processor';
import { PrismaService } from './prisma.service';
import { UserService } from './user.service';
import { TransformFileProcessor } from './transform-file-processor';
import { SplitFileProcessor } from './split-file.processor';
import { MergeDataProcessor } from './merge-data.processor';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [    
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        connection: {
          host: configService.get('REDIS_HOST'),
          port: Number(configService.get('REDIS_PORT')),
        }
      }),
      inject: [ConfigService],
    }),
    BullModule.registerQueue({
      name: 'file-upload-queue'
    },
    {
      name: 'split-file-queue',
    },
    {
      name: 'transform-file-queue',
    },
    {
      name: 'merge-data-queue'
    }),
    BullModule.registerFlowProducer({
      name: 'merge-all-files',
    }),
  ],
  controllers: [AppController, BullBoardController],
  providers: [UserService, PrismaService, FileUploadProcessor, 
           TransformFileProcessor, 
           SplitFileProcessor, 
           MergeDataProcessor],
})
export class AppModule {}

We have configured Bull Queues and registered those queues and the flow.

The controller to upload a file is a REST API.


  @Post('/uploadALargeFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadLargeCsvFile(@UploadedFile() file): Promise {
    const job = await this.splitFileQueue.add('split', {file: file});
    console.log(`created job ${ job.id}`);
    await this.splitFileQueue.close();
  }

We use FileInterceptor to upload the file on a local disk and add the file to the Bull Queue job. This will job will process this file further.

split-file-queue will process the job. It will read the file and split the large file into chunks of 500 rows. Then it will add all these chunks to another queue to transform each file.


    async process(job: Job) {        
        const file  = job.data.file;        
        const filePath = file.path;
        const chunksOfInputFile = await this.splitInChunks(filePath);
        console.log(chunksOfInputFile);
        await this.addChunksToQueue(chunksOfInputFile);
    }

Interestingly, we add chunks to the queue through a Flow. The feature that Bull Queue offers for a parent-children relationship with jobs.

    async addChunksToQueue(files: string[]) {
        const flowProducer = new FlowProducer();

        try {
            return await flowProducer.add({
                name: 'merge-all-files',
                queueName: 'merge-data-queue',
                children: files.map((file) => ({
                    name: 'transform-file',
                    queueName: 'transform-file-queue',
                    data: { file: file},
                })),            
            });
        } catch (err) {
            console.log (`Error adding flow ${err}`);
        }

    }

In the above code, we can see that we created a flow merge-all-files. The parent job is going to get processed in merge-data-queue. transform-file-queue will process each file that we split previously.

Transforming the file

In transform-file-queue, we read the file and validate each record. As part of this transformation, we store error records separately from valid records. Valid records from each file job are then stored in a transformed file.


@Processor('transform-file-queue')
export class TransformFileProcessor extends WorkerHost{
    
    async process(job: Job): Promise {
        const file  = job.data.file;

        console.log('Validating the file', file);

        return await this.validateAndWriteFile(file, job.id!); 
    }

    async validateAndWriteFile(file: string, jobId: string) {
        console.log('transforming the file');
        const srcName = basename(file);            
        const output = `./output/transformed-${srcName}`;

        const validatedData: string[] = [];
        const errorData: string[] = [];
        const validate = new Promise<{ errorData: string[], validatedData: string[] }>(function(resolve, reject){
            fs.createReadStream(file)
            .pipe(csvparser())        
            .on('data', (data) => {                               
                const regExp = /[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,3}$/
                if (data['Name'] === '') {
                    errorData.push(`${data['Name']},${data['Email']},${data['EmployeeId']}`);
                } else if (!regExp.test(data['Email'])) {
                    errorData.push(`${data['Name']},${data['Email']},${data['EmployeeId']}`);
                } else {
                    validatedData.push(`${data['Name']},${data['Email']},${data['EmployeeId']}`);
                }
            })
            .on('end', () => {
                resolve({
                    errorData,
                    validatedData,
                });
            });
        });

        const result: {
            errorData: string[],
            validatedData: string[],
        } = await validate;

        console.log('Any invalid data ', result?.errorData);

        const csvData = validatedData.map((e) => {
            return e.replace(/;/g, ",");
        });
        fs.writeFile(output, csvData.join("\r\n"), (err) => {
            console.log(err || 'done')
        });
        return output;
    }
}

Once we complete processing all the file jobs, the parent job of merging these files will start the process.

The one key benefit of Flow is that it will be in a waiting-children state for children jobs to complete. Once all the children are done, it will be moved into the wait state and further, it will start the processing.

Merging the files

Another advantage of Flow is that parent jobs can access the result of all the children jobs. In this case, we will be processing all the transformed files and merging them into a single file. The resulting file will contain all the valid data that we can use for further processing.


@Processor('merge-data-queue')
export class MergeDataProcessor extends WorkerHost {    
        
    async process(job: Job): Promise {        
        const transformedChunks = await job.getChildrenValues();
        const files = Object.values(transformedChunks).sort();

        console.log('Start merging data into a single file', files);

        await this.mergeFiles(
            job.id, 
            files,
            `./output/merged-${job.id}.csv`
        );
    }

    async mergeFiles(
        jobId: string,
        files: string[],
        finalOutputFileName: string,
    ): Promise {

        const data = [];
        files.forEach( (file) => {
            const fileData = fs.readFileSync(file);
            data.push(fileData);
        });
        fs.writeFile(finalOutputFileName, data.join("\r\n"), (err) => {
            console.log(err || 'done')
        });
    }
}

Flows are a really great feature if you are processing a large set of data and consuming CPU.

Conclusion

In this post, I showed how to use Bull Queue Flows in a NestJS application. One can easily, use this feature for various tasks where you need some kind of parent-child relationship. The code for this demo is available bull queue github repository.

Best Practices for Securing Spring Security Applications with Two-Factor Authentication

As more and more sensitive information is being shared online, it is becoming increasingly important to have strong security measures in place to protect against unauthorized access. One way to enhance the security of web applications is to implement two-factor authentication (2FA) in addition to traditional username and password authentication. In this article, we’ll explore best practices for securing Spring Security applications with 2FA.

What is Two-Factor Authentication?

Two-factor authentication (2FA) is a security process that requires users to provide two different authentication factors to access an application. These factors typically fall into three categories:

  1. Something the user knows, such as a password or PIN
  2. Something the user has, such as a smart card or mobile device
  3. Something the user is, such as a biometric identifier like a fingerprint or facial recognition

By requiring two different authentication factors, 2FA adds an extra layer of security to traditional username and password authentication.

Why Use Two-Factor Authentication in Spring Security Applications?

Implementing 2FA in Spring Security applications can greatly enhance the security of your application. Here are some reasons why:

  1. Protection against password guessing attacks: Attackers may use automated scripts to guess passwords until they gain access to an account. 2FA adds an extra layer of security, making it more difficult for attackers to gain access.
  2. Compliance with security regulations: Many industries have regulations that require 2FA, such as the Payment Card Industry Data Security Standard (PCI DSS).
  3. Enhanced user trust: By providing an additional layer of security, users will have more trust in your application, which can lead to increased usage and adoption.

Best Practices for Implementing Two-Factor Authentication in Spring Security Applications

Here are some best practices for implementing 2FA in Spring Security applications:

1. Choose a secure 2FA method

When implementing 2FA, it is important to choose a secure method for the second factor. Some common 2FA methods include:

  • SMS authentication: Sends a unique code to the user’s mobile phone via SMS
  • Mobile authentication app: Users install an app on their mobile device to generate a unique code
  • Email authentication: Sends a unique code to the user’s email address

Each of these methods has its pros and cons. It is important to choose a method that is secure and convenient for your users.

2. Use a separate authentication provider

When implementing 2FA, it is recommended to use a separate authentication provider for the second factor. This provider should be separate from the authentication provider used for the initial login. This helps prevent attackers from gaining access to both factors.

3. Store sensitive data securely

Sensitive data, such as user passwords and 2FA codes, should be stored securely. This includes hashing and salting passwords and encrypting 2FA codes.

4. Provide clear instructions to users

Users may be unfamiliar with 2FA and may need clear instructions on how to use it. It is important to provide clear and concise instructions to users to help them understand how to use 2FA.

5. Test thoroughly

Before implementing 2FA in a production environment, it is important to test thoroughly to ensure that it is working correctly. This includes testing for usability, security, and performance.

Conclusion

In this post, I shared some of the best practices that one can follow while implementing two-factor authentication (2FA) in a Spring Security application. You can learn more about Spring Security in my book Simplifying Spring Security.

Outbox Pattern – Microservice Architecture

In this post, I will talk about the Outbox pattern for Microservice Architecture. There are various design patterns that you will need as you build scalable microservices.  An outbox pattern is one such pattern in your toolbox.

As you build more services, you will notice how and when to use such patterns. Other than design patterns, you will need communication patterns between microservices.

When to use Outbox Pattern?

In many scenarios, a microservice (for example Microservice A) will receive data from a client or another microservice.  This microservice A has to save and process the data and then it needs to send processed data to another microservice for further processing. This will look like below:

Outbox Pattern - Microservice Architecture

Basically, Microservice A has to process and send data in the same transaction. This can be erroneous in many cases if Microservce A saved the data in DB, but failed to send it to another microservice.

In many eCommerce platforms, an Order Service will receive order data and it needs to send order details and calculated amount to Invoice Service to bill for that transaction. In such cases, it’s not the best design to save and send the data in the same transaction.  That’s where the Outbox pattern comes into play. We will talk about this pattern in a further section.

What is Outbox Pattern?

When a microservice receives critical data, it persists the data in a database. And now the data is available to be published for another microservice that requires it.

You can schedule a job that regularly publishes this critical data to another microservice. This way, we separate the two processes of saving and publishing.

It will look like below:

Outbox Pattern - how to design

As you can see above, Microservice A stores the transaction in a database. A regularly scheduled job takes that data from transactions and publishes it as a domain event to the message broker. Microservice B reads that and processes it further.

There is another option with Outbox Pattern. When Microservice A stores the data transaction data for an entity in the database, it can copy the same data into another table called Outbox. The regularly scheduled job will pick up domain event from outbox table and publishes them to the message broker.

 

Why to use this pattern?

If you are working with critical data and it needs to be consistent across services, it is beneficial to use the outbox pattern. The pattern assures data consistency by separating the processes of saving and publishing.

Another advantage of the outbox pattern is if a microservice did not receive or could not process any events, you will always have that event stored in the outbox table to reprocess.

Conclusion

In this post, we discussed Outbox Pattern. It’s one of the simple patterns to learn about and use where you need data consistency across services.

If you are looking for a consultation about microservices design, I am available to offer my consultation on a need basis.

Building a Scalable NestJS API with AWS Lambda

In this post, we show an example of building a scalable NestJS API with AWS Lambda.

If you are not familiar with the NestJS framework, I would recommend going through their documentation. NestJS is a popular Node.js framework for building scalable and efficient web applications. AWS Lambda is a serverless computing service that enables developers to run code without provisioning or managing servers. We will combine the two technologies to build a scalable and cost-effective API.

Setting up NestJS Project

To set up a NestJS project, we first need to install the NestJS CLI using the following command:

npm install -g @nestjs/cli

Once the CLI is installed, we can create a new project using the following command:

nest new nestjsapi-demo

This will create a new NestJS project in a directory named nestjsapi-demo. We can then navigate to this directory and start the development server using the following command:

npm run start:dev

AWS Lambda Support

To add AWS Lambda support to our NestJS project, we need to install the aws-serverless-express package using the following command:

npm install aws-serverless-express @types/aws-serverless-express @types/aws-lambda

This package allows us to run our NestJS application on AWS Lambda using the Express framework.

To eventually be able to use AWS Lambda resources, we will need an AWS account. Here I assume that you have an AWS account and be able to use your credentials to deploy AWS resources. You can configure your credentials in your local machine with aws configure command. This should store your credentials in .aws file

aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY

Creating an AWS Lambda Function

When you create a serverless project, it generates an API gateway by default. Each serverless needs an entry point to build a NestJS project to a lambda function.

To create an AWS Lambda function, we need to create a new file called lambda.ts in the root of our project directory. We can then add the following code to the file:


import { Handler } from 'aws-lambda';
import { createServer, proxy } from 'aws-serverless-express';
import { AppModule } from './app.module';
import * as express from 'express';
import { NestFactory } from '@nestjs/core/nest-factory';
import { ExpressAdapter } from '@nestjs/platform-express';
import { Server } from 'http';

const server = express();

async function bootstrap(): Promise {
  const app = await NestFactory.create(AppModule, new ExpressAdapter(server));
  app.enableCors();
  await app.init();
  return createServer(server);
}

let cachedServer: Server;

export const handler: Handler = async (event, context) => {
  if (!cachedServer) {
    cachedServer = await bootstrap();
  }
  return proxy(cachedServer, event, context, 'PROMISE').promise;
};

This code creates an instance of our NestJS application and creates an AWS Lambda handler function that proxies requests to our application.

We will need to update our tsconfig.lambda.json file to include where the lambda function is.

{
    "extends": "./tsconfig.json",
    "compilerOptions": {
      "module": "commonjs",
      "outDir": "./dist-lambda",
      "noEmit": false
    },
    "include": ["src/lambda.ts"]
}

Adding scripts to build and deploy

Once we create a lambda entry point, we can configure serverless to build this lambda. Let’s create a YAML file serverless.yml

 


service: nestjsapi-demo

plugins:
 - '@hewmen/serverless-plugin-typescript' 
 - serverless-plugin-optimize
 - serverless-offline
 - serverless-plugin-warmup

provider:
 name: aws
 runtime: nodejs14.x

functions:
 main: # The name of the lambda function
   # The module 'handler' is exported in the file 'src/lambda'
   handler: src/lambda.handler
   events:
     - http:
         method: any
         path: /{any+}

We need those serverless plugins to be able to build this project and create a Lambda main function for our entry point.

@hewmen/serverless-plugin-typescript: Serverless plugin for Typescript support that works out of the box without the need to install any other compiler or plugins.
serverless-plugin-optimize: Plugin to transpile and minify your code
serverless-offline plugin: Plugin to be able to test your app offline.
serverless-warmup plugin:  This plugin solves cold-start by creating a scheduled lambda.

We will need a couple of scripts to build and deploy our NestJS API app to AWS Lambda.

    "build-lambda": "tsc --project tsconfig.lambda.json",
    "deploy-lambda": "sls deploy",

build-lambda script will let us build our nestJS typescript project.

Deploying NestJS application to AWS Lambda

To deploy our NestJS API to AWS Lambda, we need to first create a new AWS Lambda function using the AWS Console. Once the function is created, we can upload our lambda.ts file as the function code.

We also need to configure the function’s handler to be lambda.handler and set the runtime to Node.js 14.x.

We will run our script npm run deploy-lambda and that should build and deploy lambda to AWS Lambda resource. (Make sure you have configured your AWS credentials to be able to deploy this).

Probably, you might come across this error when deploying the lambda –

× Stack nestjsapi-demo-dev failed to deploy (0s)
Environment: win32, node 14.19.0, framework 3.28.1, plugin 6.2.3, SDK 4.3.2
Credentials: Local, "default" profile
Docs:        docs.serverless.com
Support:     forum.serverless.com
Bugs:        github.com/serverless/serverless/issues

Error:
Error: [{"messageText":"Unknown compiler option 'incremental'.","category":1,"code":5023},{"messageText":"Unknown compiler option 'strictBindCallApply'.","category":1,"code":5023}]

One way to fix this issue is to remove – incremental: true option from tsconfig.json

Overall, once you run npm run deploy-lambda , you will see your lambda deployed in  AWS like below:

Nest JS API with AWS Lambda

Testing our AWS Lambda Function

To test our AWS Lambda function, we can use the AWS Lambda Console or the AWS CLI. We can invoke the function using the following command:

aws lambda invoke --function-name function-name --payload {} response.json

This command will invoke our function and store the response in a file called response.json.

Conclusion

In this blog post, we discussed how to build a scalable NestJS API with AWS Lambda. By combining the power of NestJS and AWS Lambda, we can build a scalable and cost-effective API that can handle large amounts of traffic. With the steps outlined in this blog post, you should now be able to create your own NestJS API with AWS Lambda support.

Technical Debt Explained

Tech Debt aka technical debt is part of the software engineering development process. This is also known as Code debt sometimes. The immediate question that pops up is all tech debt bad?

The simple answer to that question is – It depends.

Some tech debt is intentional and some is unintentional. As we explore this topic as part of the software engineering development process, it is necessary to understand every developer has to pay back the debt at some point in time.

What is Tech Debt?

Technical debt is a quick and dirty solution to a software problem. In many cases, to launch an MVP (minimum viable product), engineers come up with a quick solution. It’s not necessarily that a quick solution is bad. But if it is not thoroughly tested in real-world scenarios, it can cause some downstream effects. Is all technical debt bad? Technical debt gives a sense that it is bad. But it all depends on the scenario you are working on.

If you are implementing a solution quickly, it might align temporarily with the expectation, but might not always be right. This is where technical debt comes into the picture. Recognizing technical debt is necessary. Addressing the debt depends on the situation.

You can not prevent technical debt. In many cases, you do not have all the information when you are building a solution and you have to make assumptions.

Bad code is not necessarily a tech debt. But it can incur a cost in long term and that would end up being a tech debt.

Types of Tech Debt

Let’s look at different types of tech debts.

Intentional Tech Debt

To release a product to market early, many times engineers and stakeholders reduce the scope of the requirements. In many cases, everyone involved had to make choices, and that ends up causing an intentional tech debt. To get user feedback quickly, the product makes such a choice and accepts the fact the performance can be poor, the product can be unstable. There is an inherent risk involved with intentional tech debt.

Unintentional Tech Debt

On the other spectrum is unintentional tech debt. Most of this tech debt arises from the complexity of requirements and products. Product managers, domain experts, and engineers make assumptions in case of complexity. With unknowns involved, it becomes really hard to predict what can cause issues with the product. In such cases, engineering teams cut corners, and test the product insufficiently. If teams can recognize this debt, they can address it. Engineering teams have to adjust their development process to address this debt.

Environment Tech Debt

When engineers build a software product, it involves multiple third parties like libraries, other vendor products, or software. As time progresses, if engineers don’t keep the product up-to-date with various components, the product can start accruing an environmental tech debt. The most serious of tech debt in such cases can be a security threat. If engineers don’t patch libraries on time, security vulnerabilities can expose the product to risk. This kind of debt is recognizable over time and the teams involved need to remain on top.

The Effects of Tech Debt

There can be effects of tech debt both business-wise or financially.

  • Poor user experience
  • Security concerns and issues
  • Reduced time for feature development
  • Possible loss of customers
  • Lower productivity

Managing Tech Debt

As previously stated, engineers need to address tech debt at some point in time. How do you manage tech debt then?

  1. Make time to address tech debt. Once tech debt is identified, engineering teams should make time to address tech debt to avoid the long-term effect.
  2. Separate good debt and bad debt. Address bad debt sooner. Evaluate if good debt is to be kept in the system for a long.
  3. Measure the impact of tech debt. If you can create performance metrics around tech debt, measure the performance. This gives clarity on how the debt is affecting and why it needs to be addressed.
  4. Adopt the new processes and quality reviews to avoid future tech debt.

Conclusion

In this post, we talked about technical debt and the kind of impact it can create on user experience and business. As an engineer, it can take the experience to identify this debt. But always make sure to make time to address the tech debt.

If you want to learn more about how to implement two-factor authentication with spring security, here are my previous two posts

  1. How to implement two-factor authentication with Spring Security – Part I
  2. How to implement two-factor authentication with Spring Security – Part II