Godwin Ekuma I learn so that I can solve problems

Asynchronous task processing in Node.js with Bull

5 min read 1652

Asynchronous Task Processing in Node.js With Bull

When handling requests from API clients, you might run into a situation where a request initiates a CPU-intensive operation that could potentially block other requests. Instead of processing such tasks immediately and blocking other requests, you can defer it to be processed in the future by adding information about the task in a processor called a queue. A task consumer will then pick up the task from the queue and process it.

Queues are helpful for solving common application scaling and performance challenges in an elegant way. According to the NestJS documentation, examples of problems that queues can help solve include:

  • Smoothing out processing peaks
  • Breaking up monolithic tasks that may otherwise block the Node.js event loop
  • Providing a reliable communication channel across various services

Bull is a Node library that implements a fast and robust queue system based on Redis. Although it is possible to implement queues directly using Redis commands, Bull is an abstraction/wrapper on top of Redis. It provides an API that takes care of all the low-level details and enriches Redis’ basic functionality so that more complex use cases can be handled easily.

Installation

Before we begin using Bull, we need to have Redis installed. Follow the guide on Redis Labs guide to install Redis, then install Bull using npm or yarn.

npm install bull --save

Or:

yarn add bull

Creating a queue

Create a queue by instantiating a new instance of Bull.

Syntax

Queue(queueName: string, url?: string, opts?: QueueOptions): Queue

The optional url parameter is used to specify the Redis connection string. If no url is specified, bull will try to connect to default Redis server running on localhost:6379

QueueOptions interface

interface QueueOptions {
  limiter?: RateLimiter;
  redis?: RedisOpts;
  prefix?: string = 'bull'; // prefix for all queue keys.
  defaultJobOptions?: JobOpts;
  settings?: AdvancedSettings;
}

RateLimiter

limiter:RateLimiter is an optional field in QueueOptions used to configure maximum number and duration of jobs that can be processed at a time. See RateLimiter for more information.

RedisOption

redis: RedisOpts is also an optional field in QueueOptions. It’s an alternative to Redis url string. See RedisOpts for more information.

AdvancedSettings

settings: AdvancedSettings is an advanced queue configuration settings. It is optional, and Bull warns that shouldn’t override the default advanced settings unless you have a good understanding of the internals of the queue. See AdvancedSettings for more information.

We made a custom demo for .
No really. Click here to check it out.

A basic queue would look like this:

const Queue = require(bull);

const videoQueue - new Queue('video');

Creating a queue with QueueOptions

// limit the queue to a maximum of 100 jobs per 10 seconds
const Queue = require(bull);

const videoQueue - new Queue('video', {
  limiter: {
  max: 100,
  duration: 10000
  }
});

Each queue instance can perform three different roles: job producer, job consumer, and/or events listener. Each queue can have one or many producers, consumers, and listeners.

Producers

A job producer creates and adds a task to a queue instance. Redis stores only serialized data, so the task should be added to the queue as a JavaScript object, which is a serializable data format.

add(name?: string, data: object, opts?: JobOpts): Promise<Job>

A task would be executed immediately if the queue is empty. Otherwise, the task would be added to the queue and executed once the processor idles out or based on task priority.

You can add the optional name argument to ensure that only a processor defined with a specific name will execute a task. A named job must have a corresponding named consumer. Otherwise, the queue will complain that you’re missing a processor for the given job.

Job options

Jobs can have additional options associated with them. Pass an options object after the data argument in the add() method.

Job options properties include:

interface JobOpts {
  priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT  (lowest priority). Note that
  // using priorities has a slight impact on performance, so do not use it if not required.

  delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both
  // server and clients should have their clocks synchronized. [optional].

  attempts: number; // The total number of attempts to try the job until it completes.

  repeat: RepeatOpts; // Repeat job according to a cron specification.

  backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails

  lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false)
  timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]

  jobId: number | string; // Override the job ID - by default, the job ID is a unique
  // integer, but you can use this setting to override it.
  // If you use this option, it is up to you to ensure the
  // jobId is unique. If you attempt to add a job with an id that
  // already exists, it will not be added.

  removeOnComplete: boolean | number; // If true, removes the job when it successfully
  // completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.

  removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
  // Default behavior is to keep the job in the failed set.
  stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
}


interface RepeatOpts {
  cron?: string; // Cron string
  tz?: string; // Timezone
  startDate?: Date | string | number; // Start date when the repeat job should start repeating (only with cron).
  endDate?: Date | string | number; // End date when the repeat job should stop repeating.
  limit?: number; // Number of times the job should repeat at max.
  every?: number; // Repeat every millis (cron setting cannot be used together with this setting.)
  count?: number; // The start value for the repeat iteration count.
}


interface BackoffOpts {
  type: string; // Backoff type, which can be either `fixed` or `exponential`. A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings.
  delay: number; // Backoff delay, in milliseconds.
}

A basic producer would look like this:

const videoQueue - new Queue('video')

videoQueue.add({video: 'video.mp4'})

A named job can be defined like so:

videoQueue.add('video'. {input: 'video.mp4'})

Below is an example of customizing a job with job options.

videoQueue.add('video'. {input: 'video.mp4'}, {delay: 3000, attempts: 5, lifo: true, timeout: 10000 })

Consumers

A job consumer, also called a worker, defines a process function (processor). The process function is responsible for handling each job in the queue.

process(processor: ((job, done?) => Promise<any>) | string)

If the queue is empty, the process function will be called once a job is added to the queue. Otherwise, it will be called every time the worker is idling and there are jobs in the queue to be processed.

The process function is passed an instance of the job as the first argument. A job includes all relevant data the process function needs to handle a task. The data is contained in the data property of the job object. A job also contains methods such as progress(progress?: number) for reporting the job’s progress, log(row: string) for adding a log row to this job-specific job, moveToCompleted, moveToFailed, etc.

Bull processes jobs in the order in which they were added to the queue. If you want jobs to be processed in parallel, specify a concurrency argument. Bull will then call the workers in parallel, respecting the maximum value of the RateLimiter .

process(concurrency: number, processor: ((job, done?) => Promise<any>) | string)

As shown above, a job can be named. A named job can only be processed by a named processor. Define a named processor by specifying a name argument in the process function.

process(name: string, concurrency: number, processor: ((job, done?) => Promise<any>) | string)

Event listeners

Throughout the lifecycle of a queue and/or job, Bull emits useful events that you can listen to using event listeners. An event can be local to a given queue instance (worker). Listeners to a local event will only receive notifications produced in the given queue instance.

Below is a local progress event.

queue.on('progress', function(job, progress){
  console.log(`${jod.id} is in progress`)
})

Other possible events types include error, waiting, active, stalled, completed, failed, paused, resumed, cleaned, drained, and removed.

By prefixing global: to the local event name, you can listen to all events produced by all the workers on a given queue.

Below is a global progress event.

queue.on('global:progress', function(jobId){
  console.log(`${jobId} is in progress`)
})

Notice that for a global event, the jobId is passed instead of a the job object.

A practical example

Let’s say an e-commerce company wants to encourage customers to buy new products in its marketplace. The company decided to add an option for users to opt into emails about new products.

Because outgoing email is one of those internet services that can have very high latencies and fail, we need to keep the act of sending emails for new marketplace arrivals out of the typical code flow for those operations. To do this, we’ll use a task queue to keep a record of who needs to be emailed.

const Queue = require('bull');
const sgMail = require('@sendgrid/mail');
sgMail.setApiKey(process.env.SENDGRID_API_KEY);

export class EmailQueue{
  constructor(){
    // initialize queue
    this.queue = new Queue('marketplaceArrival');
    // add a worker
    this.queue.process('email', job => {
      this.sendEmail(job)
    })
  }
  addEmailToQueue(data){
    this.queue.add('email', data)
  }
  async sendEmail(job){
    const { to, from, subject, text, html} = job.data;
    const msg = {
      to,
      from,
      subject,
      text,
      html
    };
    try {
      await sgMail.send(msg)
      job.moveToCompleted('done', true)
    } catch (error) {
      if (error.response) {
        job.moveToFailed({message: 'job failed'})
      }
    }
  }
}

Conclusion

By now, you should have a solid, foundational understanding of what Bull does and how to use it.

To learn more about implementing a task queue with Bull, check out some common patterns on GitHub.

200’s only Monitor failed and slow network requests in production

Deploying a Node-based web app or website is the easy part. Making sure your Node instance continues to serve resources to your app is where things get tougher. If you’re interested in ensuring requests to the backend or third party services are successful, try LogRocket. https://logrocket.com/signup/

LogRocket is like a DVR for web apps, recording literally everything that happens on your site. Instead of guessing why problems happen, you can aggregate and report on problematic network requests to quickly understand the root cause.

LogRocket instruments your app to record baseline performance timings such as page load time, time to first byte, slow network requests, and also logs Redux, NgRx, and Vuex actions/state. .
Godwin Ekuma I learn so that I can solve problems

2 Replies to “Asynchronous task processing in Node.js with Bull”

  1. In your example, you mean:

    addEmailToQueue(data){
    this.queue.add(’email’, data)
    }

    Instead of

    addEmailToQueue(data){
    this.addEmailToQueue.add(’email’, data)
    }

  2. addEmailToQueue(data){
    this.queue.add(’email’, data)
    }

    Thank you for pointing it out.

Leave a Reply