When handling requests from API clients, you might run into a situation where a request initiates a CPU-intensive operation that could potentially block other requests. Instead of processing such tasks immediately and blocking other requests, you can defer it to be processed in the future by adding information about the task in a processor called a queue. A task consumer will then pick up the task from the queue and process it.
Queues are helpful for solving common application scaling and performance challenges in an elegant way. According to the NestJS documentation, examples of problems that queues can help solve include:
Bull is a Node library that implements a fast and robust queue system based on Redis. Although it is possible to implement queues directly using Redis commands, Bull is an abstraction/wrapper on top of Redis. It provides an API that takes care of all the low-level details and enriches Redis’ basic functionality so that more complex use cases can be handled easily.
Before we begin using Bull, we need to have Redis installed. Follow the guide on Redis Labs guide to install Redis, then install Bull using npm or yarn.
npm install bull --save
Or:
yarn add bull
Create a queue by instantiating a new instance of Bull.
Queue(queueName: string, url?: string, opts?: QueueOptions): Queue
The optional url
parameter is used to specify the Redis connection string. If no url
is specified, bull will try to connect to default Redis server running on localhost:6379
QueueOptions
interfaceinterface QueueOptions { limiter?: RateLimiter; redis?: RedisOpts; prefix?: string = 'bull'; // prefix for all queue keys. defaultJobOptions?: JobOpts; settings?: AdvancedSettings; }
RateLimiter
limiter:RateLimiter
is an optional field in QueueOptions
used to configure maximum number and duration of jobs that can be processed at a time. See RateLimiter for more information.
RedisOption
redis: RedisOpts
is also an optional field in QueueOptions
. It’s an alternative to Redis url
string. See RedisOpts
for more information.
AdvancedSettings
settings: AdvancedSettings
is an advanced queue configuration settings. It is optional, and Bull warns that shouldn’t override the default advanced settings unless you have a good understanding of the internals of the queue. See AdvancedSettings for more information.
A basic queue would look like this:
const Queue = require(bull); const videoQueue - new Queue('video');
QueueOptions
// limit the queue to a maximum of 100 jobs per 10 seconds const Queue = require(bull); const videoQueue - new Queue('video', { limiter: { max: 100, duration: 10000 } });
Each queue instance can perform three different roles: job producer, job consumer, and/or events listener. Each queue can have one or many producers, consumers, and listeners.
A job producer creates and adds a task to a queue instance. Redis stores only serialized data, so the task should be added to the queue as a JavaScript object, which is a serializable data format.
add(name?: string, data: object, opts?: JobOpts): Promise<Job>
A task would be executed immediately if the queue is empty. Otherwise, the task would be added to the queue and executed once the processor idles out or based on task priority.
You can add the optional name argument to ensure that only a processor defined with a specific name will execute a task. A named job must have a corresponding named consumer. Otherwise, the queue will complain that you’re missing a processor for the given job.
Jobs can have additional options associated with them. Pass an options object after the data argument in the add()
method.
Job options properties include:
interface JobOpts { priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that // using priorities has a slight impact on performance, so do not use it if not required. delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both // server and clients should have their clocks synchronized. [optional]. attempts: number; // The total number of attempts to try the job until it completes. repeat: RepeatOpts; // Repeat job according to a cron specification. backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false) timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional] jobId: number | string; // Override the job ID - by default, the job ID is a unique // integer, but you can use this setting to override it. // If you use this option, it is up to you to ensure the // jobId is unique. If you attempt to add a job with an id that // already exists, it will not be added. removeOnComplete: boolean | number; // If true, removes the job when it successfully // completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set. removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep // Default behavior is to keep the job in the failed set. stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace. } interface RepeatOpts { cron?: string; // Cron string tz?: string; // Timezone startDate?: Date | string | number; // Start date when the repeat job should start repeating (only with cron). endDate?: Date | string | number; // End date when the repeat job should stop repeating. limit?: number; // Number of times the job should repeat at max. every?: number; // Repeat every millis (cron setting cannot be used together with this setting.) count?: number; // The start value for the repeat iteration count. } interface BackoffOpts { type: string; // Backoff type, which can be either `fixed` or `exponential`. A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings. delay: number; // Backoff delay, in milliseconds. }
A basic producer would look like this:
const videoQueue - new Queue('video') videoQueue.add({video: 'video.mp4'})
A named job can be defined like so:
videoQueue.add('video'. {input: 'video.mp4'})
Below is an example of customizing a job with job options.
videoQueue.add('video'. {input: 'video.mp4'}, {delay: 3000, attempts: 5, lifo: true, timeout: 10000 })
A job consumer, also called a worker, defines a process function (processor). The process function is responsible for handling each job in the queue.
process(processor: ((job, done?) => Promise<any>) | string)
If the queue is empty, the process function will be called once a job is added to the queue. Otherwise, it will be called every time the worker is idling and there are jobs in the queue to be processed.
The process function is passed an instance of the job as the first argument. A job includes all relevant data the process function needs to handle a task. The data is contained in the data
property of the job object. A job also contains methods such as progress(progress?: number)
for reporting the job’s progress, log(row: string)
for adding a log row to this job-specific job, moveToCompleted
, moveToFailed
, etc.
Bull processes jobs in the order in which they were added to the queue. If you want jobs to be processed in parallel, specify a concurrency
argument. Bull will then call the workers in parallel, respecting the maximum value of the RateLimiter
.
process(concurrency: number, processor: ((job, done?) => Promise<any>) | string)
As shown above, a job can be named. A named job can only be processed by a named processor. Define a named processor by specifying a name argument in the process function.
process(name: string, concurrency: number, processor: ((job, done?) => Promise<any>) | string)
Throughout the lifecycle of a queue and/or job, Bull emits useful events that you can listen to using event listeners. An event can be local to a given queue instance (worker). Listeners to a local event will only receive notifications produced in the given queue instance.
Below is a local progress event.
queue.on('progress', function(job, progress){ console.log(`${jod.id} is in progress`) })
Other possible events types include error
, waiting
, active
, stalled
, completed
, failed
, paused
, resumed
, cleaned
, drained
, and removed
.
By prefixing global:
to the local event name, you can listen to all events produced by all the workers on a given queue.
Below is a global progress event.
queue.on('global:progress', function(jobId){ console.log(`${jobId} is in progress`) })
Notice that for a global event, the jobId
is passed instead of a the job object.
Let’s say an e-commerce company wants to encourage customers to buy new products in its marketplace. The company decided to add an option for users to opt into emails about new products.
Because outgoing email is one of those internet services that can have very high latencies and fail, we need to keep the act of sending emails for new marketplace arrivals out of the typical code flow for those operations. To do this, we’ll use a task queue to keep a record of who needs to be emailed.
const Queue = require('bull'); const sgMail = require('@sendgrid/mail'); sgMail.setApiKey(process.env.SENDGRID_API_KEY); export class EmailQueue{ constructor(){ // initialize queue this.queue = new Queue('marketplaceArrival'); // add a worker this.queue.process('email', job => { this.sendEmail(job) }) } addEmailToQueue(data){ this.queue.add('email', data) } async sendEmail(job){ const { to, from, subject, text, html} = job.data; const msg = { to, from, subject, text, html }; try { await sgMail.send(msg) job.moveToCompleted('done', true) } catch (error) { if (error.response) { job.moveToFailed({message: 'job failed'}) } } } }
By now, you should have a solid, foundational understanding of what Bull does and how to use it.
To learn more about implementing a task queue with Bull, check out some common patterns on GitHub.
Deploying a Node-based web app or website is the easy part. Making sure your Node instance continues to serve resources to your app is where things get tougher. If you’re interested in ensuring requests to the backend or third-party services are successful, try LogRocket.
LogRocket is like a DVR for web and mobile apps, recording literally everything that happens while a user interacts with your app. Instead of guessing why problems happen, you can aggregate and report on problematic network requests to quickly understand the root cause.
LogRocket instruments your app to record baseline performance timings such as page load time, time to first byte, slow network requests, and also logs Redux, NgRx, and Vuex actions/state. Start monitoring for free.
Would you be interested in joining LogRocket's developer community?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowSimplify component interaction and dynamic theming in Vue 3 with defineExpose and for better control and flexibility.
Explore how to integrate TypeScript into a Node.js and Express application, leveraging ts-node, nodemon, and TypeScript path aliases.
es-toolkit is a lightweight, efficient JavaScript utility library, ideal as a modern Lodash alternative for smaller bundles.
The use cases for the ResizeObserver API may not be immediately obvious, so let’s take a look at a few practical examples.
2 Replies to "Asynchronous task processing in Node.js with Bull"
In your example, you mean:
addEmailToQueue(data){
this.queue.add(’email’, data)
}
Instead of
addEmailToQueue(data){
this.addEmailToQueue.add(’email’, data)
}
addEmailToQueue(data){
this.queue.add(’email’, data)
}
Thank you for pointing it out.