Nkere-Awaji Inwan Full-stack/GitOps engineer at Mercurie. GCP fanboy. I write code and about code.

Getting started with Cloud Pub/Sub in Node

8 min read 2398

Introduction

In recent years, EDA(event-driven architecture) has been on the rise due to the popularity of microservices. Microservices are made up of small independent units deployed independently communicating together to make up a full-fledged application. Maintaining resilient communications with these services can be a hassle due to their individuality and modularity. This is why message brokers are needed.

A message broker, simply put, is an intermediary software that relays messages from one or more services to other services. An example of such software is Cloud Pub/Sub.

Cloud Pub/Sub is a fully managed message broker by Google which relays information passed from a service/services to other subscribing services using the publisher-subscriber model. Pub/Sub makes it easy for teams to easily set up communication channels between services and not have to worry about individual service uptime, retry policies, and idempotency.

In this article, we will be taking a look at how we can implement inter-service communications using Cloud Pub/Sub in Node.js.

Prerequisites

This tutorial uses the following:

  1. Basic knowledge of Node.js
  2. Yarn or NPM installed (we’ll be using yarn)
  3. Git installed
  4. Ngrok installed
  5. A Google account
  6. A development machine configured to run Node.js

Installation

Run the following commands in any terminal of choice to initialize the project directory:

$ git clone -b boilerplate http://github.com/enkaypeter/cloud-pub-sub-tut/
$ cd cloud-pub-sub-tut && yarn

Packages installed:

  • express — A lightweight Node.js web framework for spinning up RESTful APIs. We will use this to handle routing in our backend API
  • nodemon — This package will help us automatically restart our development server when we make code changes/edits
  • body-parser — A middleware to parse incoming request inputs into our req.body object
  • morgan — HTTP request logger middleware for Node.js. This will help us debug our API while in development
  • helmet — This is a collection of middlewares for which our express-based server by setting HTTP headers which conforms to best security practices
  • cors — This package will help enable cross-origin resource sharing on our server
  • dotenv — This package will enable us to have access to the environment defined in a .env file from our Node application via the process.env object
  • google-cloud/pubsub — This is the Node.js client for Cloud Pub/Sub. We will be using this to publish messages and subscribe to topics defined in our pub/sub console

Setting up a service account for Pub/Sub on GCP

To initialize Pub/Sub in our Node app, we will need to get a service account configured with Pub/Sub access on our GCP (Google Cloud Platform) console. We will do so in the following steps:

  • Log in to the Google Cloud Console and select a project or follow the prompt to create one if it’s your first time
  • Navigate to the Pub/Sub Section to enable the API for your project
  • Head over to the Service Accounts section to select a project and create a service account like so:

Enter a service account name e.g pub-sub-key and click on create.

  • Grant access Pub/Sub admin access to the services account like so:

grant access to the pub adminClick on Continue to give grant access to the service account

  • Click on Create Key, select the JSON option, and download the JSON object. We will be needing this file inside our Node project directory to authenticate the pub/sub client

Definition of terms

In this section, we will define a couple of terms that will be used throughout this article. We will also take a look at a typical message flow from the publisher to subscriber.

  • Message — This is the data entity which is relayed to subscribing services on a particular topic
  • Topic — As with every conversation, there’s is a theme of communication. A topic is a theme which represents a feed of messages
  • Subscription — This is a coalition of subscriber entities which receives published messages on a particular topic
  • Subscriber — This is an entity which is set on course to receive and acknowledge messages from a particular topic either by push or pull
  • Publisher — This is an entity which creates and broadcasts a message to subscribing services on a topic

To understand properly how a message is being passed from a publisher to subscriber, we will take a look at a flow diagram the Pub/Sub documentation:

pub sub flow

  1. The publisher application (e.g an orders service) sends a message (orders object) to a topic (orders_topic)
  2. Pub/Sub ensures messages are retained in storage until services subscribing to the said topic (orders_topic) acknowledges message receipt
  3. Pub/Sub then fans-out the message to all subscribing entities in the orders_topic
  4. A subscriber then receives the message from Pub/Sub. This is either done by pushing to a subscriber configured endpoint or by the subscriber pulling the said message from Pub/Sub
  5. The subscriber sends an acknowledgment to Pub/Sub for each message received and then removes the acknowledged message from the message queue

Problem domain and proposed solution

In this section, we will state our problem domain and proposed solution (proof of concept) which we will be building together as we go further.

Problem domain

Say for instance an e-commerce software/solution with multiple services. An order service, delivery service, and notification service. The orders service receives the user’s order, processes it, and sends it to the delivery service for processing. The delivery services will then process it and from time to time notifies (notification service) the user on the delivery status of the package.

Implementing this software architecture by making regular HTTP calls between these services will cause an anomalous system behavior when edge-cases such as service downtimes, the addition of more services, etc. are introduced.

Proposed solution

Google Pub/Sub being a message broker can receive this message from the order service, and act as a relay between the order service and the other services (delivery, notification). Pub/Sub has message retry policies that help curb missing orders due to service downtime/failure. It also makes it possible for teams to add more services to the application stack and still keep the entire system in rhythm by making it possible for new and existing services to subscribe to one or more topics.

Building the publisher and subscriber

In this section, we will build out our publisher and subscribers (push and pull) logic which will contain three entry point files — orders.js (publisher), delivery-sub.js (subscriber A), and notification-sub.js (subscriber B). These three files signify our three services in our microservice architecture. Each service has their respective routes and controllers but shares the same pub-sub repository. The pub-sub repository houses re-usable pub-sub functions for publishing messages and receiving published messages. A pictorial representation of the boilerplate branch can be seen below:

project directory structure

Pub/Sub repository

This is where we define all the functions that’ll enable us to carry out all our Pub/Sub related tasks e.g publishing a message and listening for push or pull subscriptions:

// src/repositories/pub-sub-repo.js

module.exports = {
    publishMessage: async (pubSubClient, topicName, payload) => {
        const dataBuffer = Buffer.from(JSON.stringify(payload));
        const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
        console.log(`Message ${messageId} published.`);
        return messageId;
    }
 ...
};

The snippet above shows the publishMessage function which accepts three parameters, the pubSubClient, topicName, and payload. This function serializes the JSON payload into a buffer (which is the Pub/Sub desired message format) then publishes it to the specified topic on execution:

// src/repositories/pub-sub-repo.js

module.exports = {
  ...

  listenForPullMessages: (pubSubClient, subscriptionName,  timeout) => {
      const subscription = pubSubClient.subscription(subscriptionName);
      let messageCount = 0;
      const messageHandler = message => {
          console.log(`Received message ${message.id}:`);
          console.log(`\tData: ${message.data}`);
          console.log(`\tAttributes: ${message.attributes}`);
          messageCount += 1;
          message.ack();
      };
      subscription.on('message', messageHandler);
      setTimeout(() => {
          subscription.removeListener('message', messageHandler);
          console.log(`${messageCount} message(s) received.`);
      }, timeout * 1000);
  }

  ...

};

The snippet above shows a subscriber function that pulls messages broadcasted to a subscription tied to a topic. On execution, this function listens to messages fanned-out from the publisher for t * 1000ms ( if t=60; the listener will listen for 60 seconds which equals 1 minute):

// src/repositories/pub-sub-repo.js

module.exports = {
  ...

  listenForPushMessages: (payload) => {
    const message = Buffer.from(payload, 'base64').toString(
        'utf-8'
    );
    let parsedMessage = JSON.parse(message);
    console.log(parsedMessage);
    return parsedMessage;
  }

  ...

}

The snippet above accepts a message from a configured subscriber endpoint and parses the buffer into JSON format for consumption by the individual subscribers.



Building the publisher

The base logic for our publisher lies in the src/controllers/orders-controllers.js file. This acts as the orders service which accepts orders from users, processes the order and then sends a message to concerned services (delivery and notifications) notifying them of a new order:

// src/controllers/orders-controller.js

const { PubSub } = require("@google-cloud/pubsub");
const pubsubRepository = require("../repositories/pub-sub-repo");

const pubSubClient = new PubSub();
const topicName = "orders_topic";
const { publishMessage } = pubsubRepository;

module.exports = {
    ...

    createOrders: async (req, res) => {
        let ordersObj = req.body;
        let messageId = await publishMessage(pubSubClient, topicName, ordersObj);
        return res.status(200).json({
            success: true,
            message: `Message ${messageId} published :)`
        })
    }

};

The snippet above shows the createOrders method which accepts the orders request body and publishes the object to the orders_topic.

Building the subscribers

There are two subscribers that represent two standalone services — delivery and notifications. We will be building the delivery service alone in this section because the same steps can be recreated for notifications service:

// src/controllers/delivery-controller.js

const { PubSub  } = require("@google-cloud/pubsub");
const pubSubClient = new PubSub();
const subscriptionName = "delivery_sub";
const timeout = 60;
const pubsubRepository = require("../repositories/pub-sub-repo");
const { listenForPullMessages, listenForPushMessages } = pubsubRepository;
module.exports = {
    ...

    pullDelivery: (req, res) => {
        try {
            listenForPullMessages(pubSubClient, subscriptionName, timeout);            
        } catch (error) {
            return res.status(500).json({
                success: false,
                message: "Couldn't receive orders object :(",
                data: error
            })                        
        }

    },
    pushDelivery: async (req, res) => {
        try {
            let messageResponse = await listenForPushMessages(req.body.message.data);
            return res.status(200).json({
                success: true,
                message: "Message received successfully :)",
                data: messageResponse
            })

        } catch (error) {
            return res.status(500).json({
                success: false,
                message: "Couldn't receive orders object :(",
                data: error
            })                        
        }
    }
};

The pullDelivery function executes the listenForPushMessages function from the pubsubRepository which accepts three arguments; the pubSubClient, the name of the subscription (the notifications service will be called notifications_sub), and a timeout of 60 seconds.

The pushDelivery function, since it’s a webhook, accepts the message gotten from pub/sub as an argument to the listenForPushMessages function for deserialization into JSON.

Connecting the dots

In this section, we’ll head over to our Google Cloud Console to create a topic and subscribers. We will also go learn how to run our three services respectively for a proof of concept.

To create a topic we will navigate to the Pub/Sub section on the cloud console and create the orders_topic like so:

create orders topic

After you click on create topic, you’ll be routed to the newly created orders_topic page where you’ll create a subscription as seen below:

order topic page

HTTPS_URL represents the host URL where our delivery service is hosted on. Pub/Sub requires all push endpoints to be deployed on HTTPS. For our notifications service, we will just repeat the step above and replace Subscription ID with “notification_sub” and Endpoint URL with {{HTTPS_URL}}/api/delivery/push.

To get a {{HTTPS_URL}} we will be deploying our subscribers with ngrok in the next section

Running our services

To demonstrate three micro-services we created three entry-points (orders.js, notifications-sub.js, and delivery-sub.js) in our project directory as opposed to having just on app.js file.

These files have already been created and bootstrapped in our project repository. Below is how our orders services is defined:

// src/orders.js

const express = require('express');
const morgan = require('morgan');
const bodyParser = require('body-parser');
require('dotenv').config();
const app = express();
const ordersRoute = require('./routes/orders');
const helmet = require('helmet');
const cors = require('cors');
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());
const { MAIN_PORT, NODE_ENV } = process.env;
NODE_ENV !== "production" ? app.use(morgan('dev')) : app.use(morgan('combined'));
app.use(helmet());
app.use(cors());
app.use('/api/orders', ordersRoute);


app.listen(MAIN_PORT);
if (NODE_ENV !== "production" ) {
    console.log(`Orders service is running at http://localhost:${MAIN_PORT}`);
}

If you’re still in the boilerplate branch, please switch to the master branch like so by running the command below

$ git fetch origin master && git checkout master

Before we run our application, we need to create our .env file and copy in our service account key into our project directory. The .env file should look something like this:

MAIN_PORT=8001
PORT_1=8002
PORT_2=8003

GCP_PROJ_ID=PROJECT_ID
GOOGLE_APPLICATION_CREDENTIALS=FILENAME.json

Where PROJECT_ID = GCP project id and FILENAME = service account file name as both created in the service account setup section.

Now that this is out of the way, let’s set up six terminals to run our three services concurrently. On a Mac machine using iTerm2, you can create six terminals by splitting a single-window into horizontal halves using CMD+SHIFT+D. Then split each horizontal halve into three places vertically by using CMD+D twice on each horizontal halve. If all goes well your terminal should look like the image below:

iTerm2 to create six terminals

Next up we will run our services locally in the upper half of the terminal by running the following commands on each section like so:

//upper terminal 1 (order service)
$ yarn start:main
//upper terminal 2 (delivery service)
$ yarn start:delivery
//upper terminal 3 (notification service)
$ yarn start:notification

Then in the lower half, we will provision public URLs for our localhost servers using ngrok by running the following commands in each section like so:

//upper terminal 1 (order service)
$ ngrok http 8001
//upper terminal 2 (delivery service)
$ ngrok http 8002
//upper terminal 3 (notification service)
$ ngrok http 8003

Running the commands in the snippet above should give you a terminal that looks like the image below:
six terminals after ngrok commands are run

A sample order request and response on the orders service can be seen in the image below:

Conclusion

In this tutorial, we’ve learned what Cloud Pub/Sub is and how to build a simple case to demonstrate its usage in a service-oriented/microservice architecture. Should you want to get more information on the Cloud Pub/Sub, you can visit the official documentation. For more Pub/Sub related content, you can check out the Pub/Sub Made Easy series on YouTube.

The source code for this tutorial is available on GitHub as well. Feel free to clone it, fork it, or submit an issue.

200’s only Monitor failed and slow network requests in production

Deploying a Node-based web app or website is the easy part. Making sure your Node instance continues to serve resources to your app is where things get tougher. If you’re interested in ensuring requests to the backend or third party services are successful, try LogRocket. https://logrocket.com/signup/

LogRocket is like a DVR for web and mobile apps, recording literally everything that happens while a user interacts with your app. Instead of guessing why problems happen, you can aggregate and report on problematic network requests to quickly understand the root cause.

LogRocket instruments your app to record baseline performance timings such as page load time, time to first byte, slow network requests, and also logs Redux, NgRx, and Vuex actions/state. .
Nkere-Awaji Inwan Full-stack/GitOps engineer at Mercurie. GCP fanboy. I write code and about code.

6 Replies to “Getting started with Cloud Pub/Sub in Node”

  1. I’m getting the error, “Endpoint URL is invalid” in Google Cloud Console with the URL: {{HTTPS_URL}}/api/delivery/push

  2. Hi Brad, please you’re to replace {{HTTPS_URL}} with a live URL as described in the “Running our Services” section

Leave a Reply