How to Use Pub/Sub with NodeJS

In this post, I show how to use pub/sub pattern with the NodeJS application. We will use the Google Cloud Pub/Sub module for building this sample application.

What is Pub/Sub?

Most architectures used to be synchronous previously. But with the advent of microservices, asynchronous communication is an equal part of the design. Pub/Sub is one such model that allows asynchronous communication. Usually, in event-driven architecture, one service publishes an event and another service consumes that event.

A message broker plays a relay role when it comes to publishing and consuming the messages. Both Google Cloud(Pub-Sub) and AWS offer a service (SNS & SQS) that allows applications to use the Pub-Sub model. Another advantage of Pub/Sub is that it allows to set up a retry policy, covers idempotency. You can learn more about event-driven architecture here.

Push-Pull

In any pub-sub model, there are two patterns of implementation. One is Push and the other is Pull.

In Pull Model

  • The consumer sends a request to pull any messages.
  • Pub/Sub server responds with a message if there are any messages available and not previously consumed.
  • The consumer sends an acknowledgment.

In Push Model

  • The publisher publishes a message to Pub/Sub Server
  • The Pub/Sub server sends the message to the specified endpoint on the consumer side.
  • Once the consumer receives the messages, it sends an acknowledgment.

NodeJS Application

As part of the post, we will create a nodejs application that will use the pub-sub model. This application will send simple messages to Google Cloud pub/sub. We will have another consumer application that will consume this message.

Accordingly, before we write our application, let’s make sure you have installed gcloud emulator on your environment. First, install gcloud sdk depending on what OS you have.

Now initialize the gcloud on your environment and you will have to log in for this

gcloud init

Gcloud will ask a bunch of questions to choose a project and configure a cloud environment.

Now, we will install a pub-sub component emulator for gcloud on our local environment.

gcloud components install pubsub-emulator

Now to get started with pub-sub service, use the following command

gcloud beta emulators pubsub start --project=pubsubdemo --host-port=localhost:8085

This command will start the pubsub service on your machine at localhost:8085. Since we will have to use this service in our application, we will need to know where the service is located. So set two environment variables

PUBSUB_EMULATOR_HOST=localhost:8085

PUBSUB_PROJECT_ID=pubsubdemo

Publisher Application

In general, we have a Publisher application. This application checks if the topic exists in Pub-Sub service and if not, then creates that topic. Once it creates the topic, it sends the data through a message to Pub-Sub service topic.

The code for this application will look like below:


const { PubSub } = require('@google-cloud/pubsub');
require('dotenv').config();

const pubsubClient = new PubSub();

const data = JSON.stringify({
  "userId": "50001",
  "companyId": "acme",
  "companyName": "Acme Company",
  "firstName": "John",
  "lastName": "Doe",
  "email": "john.doe@acme.com",
  "country": "US",
  "city": "Austin",
  "status": "Active",
  "effectiveDate": "11/11/2021",
  "department": "sales",
  "title": "Sales Lead"
});
const topicName = "PubSubExample";

async function createTopic() {
  // Creates a new topic
  await pubsubClient.createTopic(topicName);
  console.log(`Topic ${topicName} created.`);
}

async function doesTopicExist() {
  const topics = await pubsubClient.getTopics();
  const topicExists = topics.find((topic) => topic.name === topicName);
  return (topics && topicExists);
}

if(!doesTopicExist()) {
  createTopic();
}

async function publishMessage() {
    const dataBuffer = Buffer.from(data);

    try {
      const messageId = await pubsubClient.topic(topicName).publish(dataBuffer);
      console.log(`Message ${messageId} published`);
    } catch(error) {
      console.error(`Received error while publishing: ${error.message}`);
      process.exitCode = 1;
    }
}

publishMessage();

Conversely, let’s look at the consumer application.


require('dotenv').config();

const { PubSub } = require(`@google-cloud/pubsub`);

const pubsubClient = new PubSub();
const subscriptionName = 'consumeUserData';
const timeout = 60;
const topicName = 'PubSubExample';

async function createSubscription() {
  // Creates a new subscription
  await pubsubClient.topic(topicName).createSubscription(subscriptionName);
  console.log(`Subscription ${subscriptionName} created.`);
}

async function doesSubscriptionExist() {
  const subscriptions = await pubsubClient.getSubscriptions();
  const subscriptionExist = subscriptions.find((sub) => sub.name === subscriptionName);
  return (subscriptions && subscriptionExist);
}

if(!doesSubscriptionExist()) {
    createSubscription().catch(console.error);
}

const subscription = pubsubClient.subscription(subscriptionName);

let messageCount = 0;

const messageHandler = message => {
  console.log(`message received ${message.id}`);
  console.log(`Data: ${message.data}`);
  messageCount += 1;

  message.ack();
};

subscription.on(`message`, messageHandler);
setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received`);
}, timeout * 1000);

Basically, this consumer application verifies if the subscription exists, if not it creates a subscription against the topic where our publisher application is sending messages. Once the message arrives in pub-sub topic, the consumer application pulls that message. This application implements the PULL model of pub-sub.

Demo

On starting pubsub service emulator, we will see the log like below:

How to use Pub Sub with NodeJS - PubSub Emulator

Now, let’s execute the publisher application and we will see a console log of message publishing

How to Publish a message to Pub Sub with NodeJS

If you execute the same application, you will not see the message Topic PubSubExample created.

Now if execute the consumer application, we will pull the message that publisher sent to topic.

How to consume message from Pub Sub

Same demo with a simple loom video here.

Conclusion

In this post, I showed how to use Pub Sub with NodeJS application. Pub-Sub is a powerful model to use in enterprise applications. It allows us to build services that can communicate asynchronously. If you have more questions about this topic, feel free to reach out to me.