Daishi Kato A freelance programmer. I'm interested in working remotely with people abroad. Contact me at contact.axlight.com. Apollo GraphQL | React

Comparing the Stream API and (async) generators in Node.js v10

4 min read 1339

Introduction

A stream is an abstraction of data in programming. The Node.js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data. The Stream API is mostly used internally with other APIs like fs and http.

For example, fs.createReadStream is often used for reading a large file. Another example is http.ServerResponse which implements the Stream API so that the server can respond to large data. A stream is mainly used for large data, but conceptually it can represent the infinite length of data.

There is another abstraction of loops called a generator (introduced in ES2015) that is similar to a stream. A generator returns an iterator where you can loop each item and is also capable of representing the infinite length of data. ES2018 introduced async generator/iterator which can handle asynchronous data. An async generator is supported in Node.js v10.

In this post, we will be learning how to implement a synchronous counter with a pull-based stream and generator. We will also be implementing an asynchronous counter with a push-based stream and async generator in order to compare the Stream API and async generators.

Prerequisites

Before continuing, readers will need to have node.js installed and have a basic understanding of streams.

Implement a stream for the synchronous counter

In general, you would just use a stream provided by a library, in other words, you consume a stream. Now, for the purpose of study, we will provide a stream by ourselves. The documentation describes how to implement streams. Let us first make an infinite counter as a readable stream. Create a file, name it “stream-sync-counter.js”.

// stream-sync-counter.js

const { Readable, Writable } = require('stream');

const createCounterReader = () => {
  let count = 0;
  return new Readable({
    objectMode: true,
    read() {
      count += 1;
      console.log('reading:', count);
      this.push(count);
    },
  });
};

const counterReader = createCounterReader();

This is a pull-based stream, which means it will read new values if the buffer is below a certain amount. We used “object mode, so the item is just one number.

Now, let’s define a writable stream to consume this counter.

// stream-sync-counter.js (continued)

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    console.log('writing:', chunk);
    done();
  },
});

The function logWriter we defined above does nothing except it outputs numbers to the console.

Now, we connect these streams, also known as a “pipe.”

// stream-sync-counter.js (continued)

counterReader.pipe(logWriter);

If you run this code, you will see numbers counting up infinitely.

$ node stream-sync-counter.js
reading: 1
reading: 2
writing: 1
reading: 3
writing: 2
reading: 4
writing: 3
reading: 5
writing: 4
reading: 6
writing: 5
...

One note is that the readable stream reads several items at once to fill its buffer and waits until some items are consumed. The way readable stream works is 1) read items and store them in the buffer, 2) wait until items are consumed, 3) if some items are consumed and the buffer becomes empty (=” below a certain amount”), it goes back to the step 1). To better see how the buffer works, you can put timeouts in your writable stream.

// modify the function in stream-sync-counter.js

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    setTimeout(() => {
      console.log('writing:', chunk);
      done();
    }, 100);
  },
});

If you run with this, you would see an interesting output:

$ node stream-sync-counter.js
reading: 1
reading: 2
reading: 3
...
reading: 31
reading: 32
writing: 1
writing: 2
writing: 3
...
writing: 14
writing: 15
writing: 16
reading: 33
reading: 34
reading: 35
...
reading: 46
reading: 47
reading: 48
writing: 17
...

Implement a generator for the synchronous counter

A generator is a feature introduced in ES2015. It’s a general abstraction of loops and allows the implementation of a loop as a function. A generator is a special function to return an iterator.

The following is the code to generate an infinite counter. Create a file, name it “generator-sync-counter.js”.

// generator-sync-counter.js

function* counterGenerator() {
  let count = 0;
  while (true) {
    count += 1;
    console.log('reading:', count);
    yield count;
  
}

const counterIterator = counterGenerator();

Now, let’s create a function to run this iterator and output numbers to the console.

// generator-sync-counter.js (continued)

const logIterator = (iterator) => {
  for (const item of iterator) {
    console.log('writing:', item);
  
};

This is just a for-of loop. In ES2015, you can simply loop an iterator with for-of loop. You can simply invoke the function.

// generator-sync-counter.js (continued)

logIterator(counterIterator);

The result will look something like this:

$ node generator-sync-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
reading: 6
writing: 6

This is slightly different from the behavior of streams and is more intuitive because there’s no buffer.

You can also add timeouts like this:

// modify the function in generator-sync-counter.js

const logIterator = async (iterator) => {
  for (const item of iterator) {
    await new Promise(r => setTimeout(r, 100));
    console.log('writing:', item);
  
};

If you run it, you should get the same result.



We’ve basically created a synchronized infinite counter both with a stream and a generator. It works the same as when we consume the counter, but the internal behavior is slightly different because the stream is buffering.

Implement a stream for an asynchronous counter

Next, we will create an asynchronous counter with a stream at first. The asynchronous counter here means it will count up every second. To create such a stream, we use setInterval. Create a file, name it “stream-async-counter.js”.

// stream-async-counter.js

const { Readable, Writable } = require('stream');

const createCounterReader = (delay) => {
  let counter = 0;
  const reader = new Readable({
    objectMode: true,
    read() {},
  });
  setInterval(() => {
    counter += 1;
    console.log('reading:', counter);
    reader.push(counter);
  }, delay);
  return reader;
};

const counterReader = createCounterReader(1000);

This is a so-called push-based stream. As you might guess, it will push data indefinitely into the buffer, unless you consume data faster than pushing.

We use the logWriter without timeouts because items are pushed from the readable stream, which controls timing.

// stream-async-counter.js (continued)

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    console.log('writing:', chunk);
    done();
  },
});

counterReader.pipe(logWriter);

If we run this, we should see the following result with delays.

$ node stream-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...

This is slightly different from the result of the pull-based stream because now we consume data before a new item is added to buffer.

To see if the data is pushed regardless of consuming it, you could change the logWriter as follows.


More great articles from LogRocket:


// modify the function in stream-async-counter.js

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    setTimeout(() => {
      console.log('writing:', chunk);
      done();
    }, 5 * 1000);
  },
});

Use an async generator for an asynchronous counter

The for-await-of is a new feature in ES2018. It allows handling promises in iterators. Using an async generator, we can define an asynchronous infinite counter similar to the one in the previous section. Create a file named “generator-async-counter.js”:

// generator-async-counter.js

async function* counterGenerator(delay) {
  let counter = 0;
  while (true) {
    await new Promise(r => setTimeout(r, delay));
    counter += 1;
    console.log('reading:', counter);
    yield counter;
  
} 
    
const counterIterator = counterGenerator(1000);

Notice in the code shown above, we use Promise to wait a second.
To loop this iterator, we use the for-await-of statement.

// generator-async-counter.js (continued)

const logIterator = async (iterator) => {
  for await (const item of iterator) {
    console.log('writing:', item);
  
};

logIterator(counterIterator);

The result is just as expected.

$ node generator-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...

Unlike the push-based stream, the async generator only generates a new item upon a pull. To confirm that, you could modify logIterator as follows.

// modify the function in generator-async-counter.js

const logIterator = async (iterator) => {
  for await (const item of iterator) {
    console.log('writing:', item);
    await new Promise(r => setTimeout(r, 5 * 1000));
  
};

Conclusion

In this article, we implemented four infinite counters and saw how streams and generators behave similarly in this example but are fundamentally different. A stream has more control over the data source, whereas there is more control on the loop in a generator. We also saw the behavior difference, a stream has a buffer but a generator generally doesn’t. There are many other differences which we didn’t include in this article. Readers who want to learn more may want to check the documentation.

 

200’s only Monitor failed and slow network requests in production

Deploying a Node-based web app or website is the easy part. Making sure your Node instance continues to serve resources to your app is where things get tougher. If you’re interested in ensuring requests to the backend or third party services are successful, try LogRocket. https://logrocket.com/signup/

LogRocket is like a DVR for web and mobile apps, recording literally everything that happens while a user interacts with your app. Instead of guessing why problems happen, you can aggregate and report on problematic network requests to quickly understand the root cause.

LogRocket instruments your app to record baseline performance timings such as page load time, time to first byte, slow network requests, and also logs Redux, NgRx, and Vuex actions/state. .
Daishi Kato A freelance programmer. I'm interested in working remotely with people abroad. Contact me at contact.axlight.com. Apollo GraphQL | React

Leave a Reply