Maciej Cieslar A JavaScript developer and a blogger @ https://www.mcieslar.com/

A complete guide to threads in Node.js

15 min read 4211

Many people wonder how a single-threaded Node.js can compete with multithreaded back ends. As such, it may seem counterintuitive that so many huge companies pick Node as their back end, given its supposed single-threaded nature. To know why, we have to understand what we really mean when we say that Node is single-threaded.

JavaScript was created to be just good enough to do simple things on the web, like validate a form or, say, create a rainbow-colored mouse trail. It was only in 2009 that Ryan Dahl, creator of Node.js, made it possible for developers to use the language to write back-end code.

Back-end languages, which generally support multithreading, have all kinds of mechanisms for syncing values between threads and other thread-oriented features. To add support for such things to JavaScript would require changing the entire language, which wasn’t really Dahl’s goal. For plain JavaScript to support multithreading, he had to create a workaround. Let’s explore …

How Node.js really works

Node.js uses two kinds of threads: a main thread handled by event loop and several auxiliary threads in the worker pool.

Event loop is the mechanism that takes callbacks (functions) and registers them to be executed at some point in the future. It operates in the same thread as the proper JavaScript code. When a JavaScript operation blocks the thread, the event loop is blocked as well.

Worker pool is an execution model that spawns and handles separate threads, which then synchronously perform the task and return the result to the event loop. The event loop then executes the provided callback with said result.

In short, it takes care of asynchronous I/O operations — primarily, interactions with the system’s disk and network. It is mainly used by modules such as fs (I/O-heavy) or crypto (CPU-heavy). Worker pool is implemented in libuv, which results in a slight delay whenever Node needs to communicate internally between JavaScript and C++, but this is hardly noticeable.

With both of these mechanisms, we are able to write code like this:

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }
 console.log(content.toString());
});

The aforementioned fs module tells the worker pool to use one of its threads to read the contents of a file and notify the event loop when it is done. The event loop then takes the provided callback function and executes it with the content of the file.

Above is an example of a non-blocking code; as such, we don’t have to wait synchronously for something to happen. We tell the worker pool to read the file and call the provided function with the result. Since worker pool has its own threads, the event loop can continue executing normally while the file is being read.

It’s all good until there’s a need to synchronously execute some complex operation: any function that takes too long to run will block the thread. If an application has many such functions, it could significantly decrease the throughput of the server or freeze it altogether. In this case, there’s no way of delegating the work to the worker pool.

Fields that require complex calculations — such as AI, machine learning, or big data — couldn’t really use Node.js efficiently due to the operations blocking the main (and only) thread, making the server unresponsive. That was the case up until Node.js v10.5.0 came about, which added support for multiple threads.

Introducing: worker_threads

The worker_threads module is a package that allows us to create fully functional multithreaded Node.js applications.

A thread worker is a piece of code (usually taken out of a file) spawned in a separate thread.

Note that the terms thread worker, worker, and thread are often used interchangeably; they all refer to the same thing.

To start using thread workers, we have to import the worker_threads module. Let’s start by creating a function to help us spawn these thread workers, and then we’ll talk a little bit about their properties.

type WorkerCallback = (err: any, result?: any) => any;
export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });
worker.on('message', cb.bind(null, null)); worker.on('error', cb); worker.on('exit', (exitCode) => { if (exitCode === 0) { return null; } return cb(new Error(`Worker has stopped with code ${exitCode}`)); }); return worker; }

To create a worker, we have to create an instance of the Worker class. In the first argument, we provide a path to the file that contains the worker’s code; in the second, we provide an object containing a property called workerData. This is the data we’d like the thread to have access to when it starts running.

Note that whether you use JavaScript itself or something that transpiles to JavaScript (e.g., TypeScript), the path should always refer to files with either .js or .mjs extensions.

I would also like to point out why we used the callback approach as opposed to returning a promise that would be resolved when the message event is fired. This is because workers can dispatch many message events, not just one.

As you can see in the example above, the communication between threads is event-based, which means we are setting up listeners to be called once a given event is sent by the worker.

Here are the most common events:

worker.on('error', (error) => {});

The error event is emitted whenever there’s an uncaught exception inside the worker. The worker is then terminated, and the error is available as the first argument in the provided callback.

worker.on('exit', (exitCode) => {});

exit is emitted whenever a worker exits. If process.exit() was called inside the worker, exitCode would be provided to the callback. If the worker was terminated with worker.terminate(), the code would be 1.

worker.on('online', () => {});

online is emitted whenever a worker stops parsing the JavaScript code and starts the execution. It’s not used very often, but it can be informative in specific cases.

worker.on('message', (data) => {});

message is emitted whenever a worker sends data to the parent thread.

Now let’s take a look at how the data is being shared between threads.

Exchanging data between threads

To send the data to the other thread, we use the port.postMessage() method. It has the following signature:

port.postMessage(data[, transferList])

The port object can be either parentPort or an instance of MessagePort — more on that later.

The data argument

The first argument — here called data — is an object that is copied to the other thread. It can contain anything the copying algorithm supports.

The data is copied by the structured clone algorithm. Per Mozilla:

It builds up a clone by recursing through the input object while maintaining a map of previously visited references in order to avoid infinitely traversing cycles.

The algorithm doesn’t copy functions, errors, property descriptors, or prototype chains. It should also be noted that copying objects in this way is different than with JSON because it can contain circular references and typed arrays, for example, whereas JSON cannot.

By supporting the copying of typed arrays, the algorithm makes it possible to share memory between threads.

Sharing memory between threads

People may argue that modules like cluster or child_process enabled the use of threads a long time ago. Well, yes and no.

The cluster module can create multiple node instances with one master process routing incoming requests between them. Clustering an application allows us to effectively multiply the server’s throughput; however, we can’t spawn a separate thread with the cluster module.

People tend to use tools like PM2 to cluster their applications as opposed to doing it manually inside their own code, but if you’re interested, you can read my post on how to use the cluster module.

The child_process module can spawn any executable regardless of whether it’s JavaScript. It is pretty similar, but it lacks several important features that worker_threads has.

Specifically, thread workers are more lightweight and share the same process ID as their parent threads. They can also share memory with their parent threads, which allows them to avoid serializing big payloads of data and, as a result, send the data back and forth much more efficiently.

Now let’s take a look at an example of how to share memory between threads. In order for the memory to be shared, an instance of ArrayBuffer or SharedArrayBuffer must be sent to the other thread as the data argument or inside the data argument.

Here’s a worker that shares memory with its parent thread:

import { parentPort } from 'worker_threads';
parentPort.on('message', () => {
 const numberOfElements = 100;
 const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
 const arr = new Int32Array(sharedBuffer);
 for (let i = 0; i < numberOfElements; i += 1) {
   arr[i] = Math.round(Math.random() * 30);
 }
 parentPort.postMessage({ arr });
});

First, we create a SharedArrayBuffer with the memory needed to contain 100 32-bit integers. Next, we create an instance of Int32Array, which will use the buffer to save its structure, then we just fill the array with some random numbers and send it to the parent thread.

In the parent thread:

import path from 'path';
import { runWorker } from '../run-worker';
const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => { if (err) { return null; } arr[0] = 5; }); worker.postMessage({});

By changing arr[0] to 5, we actually change it in both threads.

Of course, by sharing memory, we risk changing a value in one thread and having it changed in the other. But we also gain a very nice feature along the way: the value doesn’t need to be serialized to be available in another thread, which greatly increases efficiency. Simply remember to manage references to the data properly in order for it to be garbage-collected once you finish working with it.

Sharing an array of integers is fine, but what we’re really interested in is sharing objects — the default way of storing information. Unfortunately, there is no SharedObjectBuffer or similar, but we can create a similar structure ourselves.

The transferList argument

transferList can only contain ArrayBuffer and MessagePort. Once they are transferred to the other thread, they can no longer be used in the sending thread; the memory is moved to the other thread and, thus, is unavailable in the sending one.

For the time being, we can’t transfer network sockets by including them in the transferList (which we can do with the child_process module).

Creating a channel for communications

Communication between threads is made through ports, which are instances of the MessagePort class and enable event-based communication.

There are two ways of using ports to communicate between threads. The first is the default and the easier of the two. Within the worker’s code, we import an object called parentPort from the worker_threads module and use the object’s .postMessage() method to send messages to the parent thread.

Here’s an example:

import { parentPort } from 'worker_threads';
const data = {
// ...
};
parentPort.postMessage(data);

parentPort is an instance of MessagePort that Node.js created for us behind the scenes to enable communication with the parent thread. This way, we can communicate between threads by using parentPort and worker objects.

The second way of communicating between threads is to actually create a MessageChannel on our own and send it to the worker. Here’s how we could create a new MessagePort and share it with our worker:

import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';
const worker = new Worker(path.join(__dirname, 'worker.js'));
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => { console.log('message from worker:', message); }); worker.postMessage({ port: port2 }, [port2]);

After creating port1 and port2, we set up event listeners on port1 and send port2 to the worker. We have to include it in the transferList for it to be transferred to the worker side.

And now, inside the worker:

import { parentPort, MessagePort } from 'worker_threads';
parentPort.on('message', (data) => {
 const { port }: { port: MessagePort } = data;
 port.postMessage('heres your message!');
});

This way, we use the port that was sent by the parent thread.

Using parentPort is not necessarily a wrong approach, but it’s better to create a new MessagePort with an instance of MessageChannel and then share it with the spawned worker (read: separation of concerns).

Note that in the examples below, I use parentPort to keep things simple.

Two ways of using workers

There are two ways we can use workers. The first is to spawn a worker, execute its code, and send the result to the parent thread. With this approach, each time a new task comes up, we have to create a worker all over again.

The second way is to spawn a worker and set up listeners for the message event. Each time the message is fired, it does the work and sends the result back to the parent thread, which keeps the worker alive for later usage.

Node.js documentation recommends the second approach because of how much effort it takes to actually create a thread worker, which requires creating a virtual machine and parsing and executing the code. This method is also much more efficient than constantly spawning workers.

This approach is called worker pool because we create a pool of workers and keep them waiting, dispatching the message event to do the work when needed.

Here’s an example of a file that contains a worker that is spawned, executed, and then closed:

import { parentPort } from 'worker_threads';
const collection = [];
for (let i = 0; i < 10; i += 1) { collection[i] = i; } parentPort.postMessage(collection);

After sending the collection to the parent thread, it simply exits.

And here’s an example of a worker that can wait for a long period of time before it is given a task:

import { parentPort } from 'worker_threads';
parentPort.on('message', (data: any) => { const result = doSomething(data);
parentPort.postMessage(result); });

Useful properties available in the worker_threads module

There are a few properties available inside the worker_threads module:

isMainThread

The property is true when not operating inside a worker thread. If you feel the need, you can include a simple if statement at the start of a worker file to make sure it is only run as a worker.

import { isMainThread } from 'worker_threads';
if (isMainThread) {
 throw new Error('Its not a worker');
}

workerData

Data included in the worker’s constructor by the spawning thread.

const worker = new Worker(path, { workerData });

In the worker thread:

import { workerData } from 'worker_threads';
console.log(workerData.property);

parentPort

The aforementioned instance of MessagePort used to communicate with the parent thread.

threadId

A unique identifier assigned to the worker.

Now that we know the technical details, let’s implement something and test out our knowledge in practice.

Implementing setTimeout

setTimeout is an infinite loop that, as the name implies, times out the app. In practice, it checks in each iteration whether the sum of the starting date and a given number of milliseconds are smaller than the actual date.

import { parentPort, workerData } from 'worker_threads';
const time = Date.now();
while (true) {
 if (time + workerData.time <= Date.now()) {
   parentPort.postMessage({});
   break;
 }
}

This particular implementation spawns a thread, executes its code, and then exits after it’s done.

Let’s try implementing the code that will make use of this worker. First, let’s create a state in which we’ll keep track of the spawned workers:

const timeoutState: { [key: string]: Worker } = {};

And now the function that takes care of creating workers and saving them into the state:

export function setTimeout(callback: (err: any) => any, time: number) {
 const id = uuidv4();
 const worker = runWorker(
   path.join(__dirname, './timeout-worker.js'),
   (err) => {
     if (!timeoutState[id]) {
       return null;
     }
     timeoutState[id] = null;
     if (err) {
       return callback(err);
     }
     callback(null);
   },
   {
     time,
   },
 );
 timeoutState[id] = worker;
 return id;
}

First we use the UUID package to create a unique identifier for our worker, then we use the previously defined helper function runWorker to get the worker. We also pass to the worker a callback function to be fired once the worker sends some data. Finally, we save the worker in the state and return the id.

Inside the callback function, we have to check whether the worker still exists in the state because there is a possibility to cancelTimeout(), which would remove it. If it does exist, we remove it from the state and invoke the callback passed to the setTimeout function.

The cancelTimeout function uses the .terminate() method to force the worker to quit and removes that worker from the state:

export function cancelTimeout(id: string) {
 if (timeoutState[id]) {
   timeoutState[id].terminate();
   timeoutState[id] = undefined;
   return true;
 }
 return false;
}

If you’re interested, I also implemented setInterval here, but since it has nothing to do with threads (we reuse the code of setTimeout), I have decided not to include the explanation here.

I have created a little test code for the purpose of checking how much this approach differs from the native one. You can review the code here. These are the results:

native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }

We can see that there’s a slight delay in our setTimeout — about 40ms — due to the worker being created. The average CPU cost is also a little bit higher, but nothing unbearable (the CPU cost is an average of the CPU usage across the whole duration of the process).

If we could reuse the workers, we would lower the delay and CPU usage, which is why we’ll now take a look at how to implement our own worker pool.

Implementing a worker pool

Sick of debugging web apps? Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket pairs session replay with technical telemetry to quickly understand what went wrong.

Get a Free Trial of LogRocket

or

As mentioned above, a worker pool is a given number of previously created workers sitting and listening for the message event. Once the message event is fired, they do the work and send back the result.

To better illustrate what we’re going to do, here’s how we would create a worker pool of eight thread workers:

const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);

If you are familiar with limiting concurrent operations, then you will see that the logic here is almost the same, just a different use case.

As shown in the code snippet above, we pass to the constructor of WorkerPool the path to the worker and the number of workers to spawn.

export class WorkerPool<T, N> {
 private queue: QueueItem<T, N>[] = [];
 private workersById: { [key: number]: Worker } = {};
 private activeWorkersById: { [key: number]: boolean } = {};
 public constructor(public workerPath: string, public numberOfThreads: number) {
   this.init();
 }
}

Here, we have additional properties like workersById and activeWorkersById, in which we can save existing workers and the IDs of currently running workers, respectively. There’s also queue, in which we can save objects with the following structure:

type QueueCallback<N> = (err: any, result?: N) => void;
interface QueueItem<T, N> {
 callback: QueueCallback<N>;
 getData: () => T;
}

callback is just the default node callback, with error as its first argument and the possible result as the second. getData is the function passed to the worker pool’s .run() method (explained below), which is called once the item starts being processed. The data returned by the getData function will be passed to the worker thread.

Inside the .init() method, we create the workers and save them in the states:

private init() {
  if (this.numberOfThreads < 1) {
    return null;
  }
  for (let i = 0; i < this.numberOfThreads; i += 1) {
    const worker = new Worker(this.workerPath);
    this.workersById[i] = worker;
    this.activeWorkersById[i] = false;
  }
}

To avoid infinite loops, we first ensure the number of threads is >1. We then create the valid number of workers and save them by their index in the workersById state. We save information on whether they are currently running inside the activeWorkersById state, which, at first, is always false by default.

Now we have to implement the aforementioned .run() method to set up a task to run once a worker is available.

public run(getData: () => T) {
  return new Promise<N>((resolve, reject) => {
    const availableWorkerId = this.getInactiveWorkerId();
    const queueItem: QueueItem<T, N> = {
      getData,
      callback: (error, result) => {
        if (error) {
          return reject(error);
        }
return resolve(result);
      },
    };
   if (availableWorkerId === -1) {
      this.queue.push(queueItem);
      return null;
    }
    this.runWorker(availableWorkerId, queueItem);
  });
}

Inside the function passed to the promise, we first check whether there’s a worker available to process the data by calling the .getInactiveWorkerId():

private getInactiveWorkerId(): number {
  for (let i = 0; i < this.numberOfThreads; i += 1) {
    if (!this.activeWorkersById[i]) {
      return i;
    }
  }
  return -1;
}

Next, we create a queueItem, in which we save the getData function passed to the .run() method as well as the callback. In the callback, we either resolve or reject the promise depending on whether the worker passed an error to the callback.

If the availableWorkerId is -1, then there is no available worker, and we add the queueItem to the queue. If there is an available worker, we call the .runWorker() method to execute the worker.

In the .runWorker() method, we have to set inside the activeWorkersById state that the worker is currently being used; set up event listeners for message and error events (and clean them up afterwards); and, finally, send the data to the worker.

private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
 const worker = this.workersById[workerId];
 this.activeWorkersById[workerId] = true;
 const messageCallback = (result: N) => {
   queueItem.callback(null, result);
   cleanUp();
 };
 const errorCallback = (error: any) => {
   queueItem.callback(error);
   cleanUp();
 };
 const cleanUp = () => {
   worker.removeAllListeners('message');
   worker.removeAllListeners('error');
   this.activeWorkersById[workerId] = false;
   if (!this.queue.length) {
     return null;
   }
   this.runWorker(workerId, this.queue.shift());
 };
 worker.once('message', messageCallback);
 worker.once('error', errorCallback);
 worker.postMessage(await queueItem.getData());
}

First, by using the passed workerId, we get the worker reference from the workersById state. Then, inside activeWorkersById, we set the [workerId] property to true so we know not to run anything else while the worker is busy.

Next, we create messageCallback and errorCallback to be called on message and error events, respectively, then register said functions to listen for the event and send the data to the worker.

Inside the callbacks, we call the queueItem’s callback, then call the cleanUp function. Inside the cleanUp function, we make sure event listeners are removed since we reuse the same worker many times. If we didn’t remove the listeners, we would have a memory leak; essentially, we would slowly run out of memory.

Inside the activeWorkersById state, we set the [workerId] property to false and check if the queue is empty. If it isn’t, we remove the first item from the queue and call the worker again with a different queueItem.

Let’s create a worker that does some calculations after receiving the data in the message event:

import { isMainThread, parentPort } from 'worker_threads';
if (isMainThread) {
 throw new Error('Its not a worker');
}
const doCalcs = (data: any) => {
 const collection = [];
 for (let i = 0; i < 1000000; i += 1) {
   collection[i] = Math.round(Math.random() * 100000);
 }
 return collection.sort((a, b) => {
   if (a > b) {
     return 1;
   }
   return -1;
 });
};
parentPort.on('message', (data: any) => {
 const result = doCalcs(data);
 parentPort.postMessage(result);
});

The worker creates an array of 1 million random numbers and then sorts them. It doesn’t really matter what happens as long as it takes some time to finish.

Here’s an example of a simple usage of the worker pool:

const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);
const items = [...new Array(100)].fill(null);
Promise.all(
 items.map(async (_, i) => {
   await pool.run(() => ({ i }));
   console.log('finished', i);
 }),
).then(() => {
 console.log('finished all');
});

We start by creating a pool of eight workers. We then create an array with 100 elements, and for each element, we run a task in the worker pool. First, eight tasks will be executed immediately, and the rest will be put in the queue and gradually executed. By using a worker pool, we don’t have to create a worker each time, which vastly improves efficiency.

Conclusion

worker_threads provide a fairly easy way to add multithreading support to our applications. By delegating heavy CPU computations to other threads, we can significantly increase our server’s throughput. With the official threads support, we can expect more developers and engineers from fields like AI, machine learning, and big data to start using Node.js.

Plug: LogRocket, a DVR for web apps

https://logrocket.com/signup/

LogRocket is a frontend logging tool that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.

Try it for free.

Maciej Cieslar A JavaScript developer and a blogger @ https://www.mcieslar.com/

Leave a Reply