Monthly Archives: May 2023

Using Flows with Bull Queue in a NestJS Application

In this post, I will show how to use the new feature of Flows with Bull Queue using NestJS application. If you are new to using Bull Queue, here is my previous post about using bull queue in NestJS application.

What are Flows?

Flows are the new feature from Bull Queue. With this feature, we can establish a parent-child relationship between jobs. A job in a queue can create a number of children jobs and push them into another queue for processing.

In real-world applications, you will need to split large images, video files, or even text files into smaller chunks, process them and bind them together again. Flow is an ideal candidate to implement splitting and binding operations and processing those chunks in parallel.

Divide and Conquer OR Map-Reduce are very well-known programming techniques. Flows follow the same pattern when processing CPU-intensive jobs.

File Validation And Processing

Now, let’s look at a real example to understand how the Flows work. As part of this example, we will be processing a large file. This large fill will go through file validation and valid records will be processed further or merged into another file OR stored in the database. To understand this better, look at the diagram below:

Using Flows with Bull Queue

Flows with Bull Queue

 

I will not cover the basics to get started with NestJS Application, but you can look at this previous post to get started.

BullMQ offers Flows as a feature currently. One key thing to note is that BullMQ and Bull are different open-source libraries supporting the same set of features. And both libraries work with NestJs as well. Bull does not offer Flows as a feature yet.

Install bullmq as the library for your application.

npm i  bullmq

npm i @nestjs/bullmq

Splitting a large file

To understand Flows better, let’s create our design flow first.

  • Upload a large file.
  • A bull job will split the file into chunks.
  • We will create a Flow to process each of these chunks.
  • The parent job of the flow will merge the result of all chunks into another file.

We are using a NestJS application as part of this demo and we have a controller to upload the file.

Nevertheless, we will first configure our Bull Queues with Redis Connection.

import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AppController } from './app.controller';
import { BullBoardController } from './bull-board-controller';
import { FileUploadProcessor } from './file-upload.processor';
import { PrismaService } from './prisma.service';
import { UserService } from './user.service';
import { TransformFileProcessor } from './transform-file-processor';
import { SplitFileProcessor } from './split-file.processor';
import { MergeDataProcessor } from './merge-data.processor';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [    
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        connection: {
          host: configService.get('REDIS_HOST'),
          port: Number(configService.get('REDIS_PORT')),
        }
      }),
      inject: [ConfigService],
    }),
    BullModule.registerQueue({
      name: 'file-upload-queue'
    },
    {
      name: 'split-file-queue',
    },
    {
      name: 'transform-file-queue',
    },
    {
      name: 'merge-data-queue'
    }),
    BullModule.registerFlowProducer({
      name: 'merge-all-files',
    }),
  ],
  controllers: [AppController, BullBoardController],
  providers: [UserService, PrismaService, FileUploadProcessor, 
           TransformFileProcessor, 
           SplitFileProcessor, 
           MergeDataProcessor],
})
export class AppModule {}

We have configured Bull Queues and registered those queues and the flow.

The controller to upload a file is a REST API.


  @Post('/uploadALargeFile')
  @UseInterceptors(FileInterceptor("csv", {
    storage: diskStorage({
      destination: './csv',
      fileName: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * cb(null, `${randomName}${extname(file.originalname)}`))))
      }
    })
  }))
  async uploadLargeCsvFile(@UploadedFile() file): Promise {
    const job = await this.splitFileQueue.add('split', {file: file});
    console.log(`created job ${ job.id}`);
    await this.splitFileQueue.close();
  }

We use FileInterceptor to upload the file on a local disk and add the file to the Bull Queue job. This will job will process this file further.

split-file-queue will process the job. It will read the file and split the large file into chunks of 500 rows. Then it will add all these chunks to another queue to transform each file.


    async process(job: Job) {        
        const file  = job.data.file;        
        const filePath = file.path;
        const chunksOfInputFile = await this.splitInChunks(filePath);
        console.log(chunksOfInputFile);
        await this.addChunksToQueue(chunksOfInputFile);
    }

Interestingly, we add chunks to the queue through a Flow. The feature that Bull Queue offers for a parent-children relationship with jobs.

    async addChunksToQueue(files: string[]) {
        const flowProducer = new FlowProducer();

        try {
            return await flowProducer.add({
                name: 'merge-all-files',
                queueName: 'merge-data-queue',
                children: files.map((file) => ({
                    name: 'transform-file',
                    queueName: 'transform-file-queue',
                    data: { file: file},
                })),            
            });
        } catch (err) {
            console.log (`Error adding flow ${err}`);
        }

    }

In the above code, we can see that we created a flow merge-all-files. The parent job is going to get processed in merge-data-queue. transform-file-queue will process each file that we split previously.

Transforming the file

In transform-file-queue, we read the file and validate each record. As part of this transformation, we store error records separately from valid records. Valid records from each file job are then stored in a transformed file.


@Processor('transform-file-queue')
export class TransformFileProcessor extends WorkerHost{
    
    async process(job: Job): Promise {
        const file  = job.data.file;

        console.log('Validating the file', file);

        return await this.validateAndWriteFile(file, job.id!); 
    }

    async validateAndWriteFile(file: string, jobId: string) {
        console.log('transforming the file');
        const srcName = basename(file);            
        const output = `./output/transformed-${srcName}`;

        const validatedData: string[] = [];
        const errorData: string[] = [];
        const validate = new Promise<{ errorData: string[], validatedData: string[] }>(function(resolve, reject){
            fs.createReadStream(file)
            .pipe(csvparser())        
            .on('data', (data) => {                               
                const regExp = /[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,3}$/
                if (data['Name'] === '') {
                    errorData.push(`${data['Name']},${data['Email']},${data['EmployeeId']}`);
                } else if (!regExp.test(data['Email'])) {
                    errorData.push(`${data['Name']},${data['Email']},${data['EmployeeId']}`);
                } else {
                    validatedData.push(`${data['Name']},${data['Email']},${data['EmployeeId']}`);
                }
            })
            .on('end', () => {
                resolve({
                    errorData,
                    validatedData,
                });
            });
        });

        const result: {
            errorData: string[],
            validatedData: string[],
        } = await validate;

        console.log('Any invalid data ', result?.errorData);

        const csvData = validatedData.map((e) => {
            return e.replace(/;/g, ",");
        });
        fs.writeFile(output, csvData.join("\r\n"), (err) => {
            console.log(err || 'done')
        });
        return output;
    }
}

Once we complete processing all the file jobs, the parent job of merging these files will start the process.

The one key benefit of Flow is that it will be in a waiting-children state for children jobs to complete. Once all the children are done, it will be moved into the wait state and further, it will start the processing.

Merging the files

Another advantage of Flow is that parent jobs can access the result of all the children jobs. In this case, we will be processing all the transformed files and merging them into a single file. The resulting file will contain all the valid data that we can use for further processing.


@Processor('merge-data-queue')
export class MergeDataProcessor extends WorkerHost {    
        
    async process(job: Job): Promise {        
        const transformedChunks = await job.getChildrenValues();
        const files = Object.values(transformedChunks).sort();

        console.log('Start merging data into a single file', files);

        await this.mergeFiles(
            job.id, 
            files,
            `./output/merged-${job.id}.csv`
        );
    }

    async mergeFiles(
        jobId: string,
        files: string[],
        finalOutputFileName: string,
    ): Promise {

        const data = [];
        files.forEach( (file) => {
            const fileData = fs.readFileSync(file);
            data.push(fileData);
        });
        fs.writeFile(finalOutputFileName, data.join("\r\n"), (err) => {
            console.log(err || 'done')
        });
    }
}

Flows are a really great feature if you are processing a large set of data and consuming CPU.

Conclusion

In this post, I showed how to use Bull Queue Flows in a NestJS application. One can easily, use this feature for various tasks where you need some kind of parent-child relationship. The code for this demo is available bull queue github repository.