Emmanuel John I'm a full-stack software developer, mentor, and writer. I am an open source enthusiast. In my spare time, I enjoy watching sci-fi movies and cheering for Arsenal FC.

Using RxJS Observables to transform data in TypeScript

5 min read 1477

Using Observables to Transform Data in TypeScript

Introduction

Event processing is one of the common implementations when building TypeScript applications.

Registering an event as well as executing some actions when an event has been triggered requires an Observable pattern, which provides a unidirectional flow to streams of events, making debugging and error handling with event streams a lot easier.

In this article, we will explore Observables, how they are used to handle event-driven data, and errors and asynchronous data.

    1. What are RxJS Observables?
    2. Observable operators
    3. Transforming arrays with Observable
    4. Combining multiple operators for event stream transformation
    5. Handling errors with Observables

What are RxJS Observables?

One of the interesting, but sometimes brain-teasing, parts of JavaScript is handling events.

Consider a real-time chat application like Facebook, which has multiple events taking place within a particular time frame. A user could be typing some text on his news feed while receiving some notifications and messages from other friends in no particular order. It is now the responsibility of the application to handle these events. This is where Observables come in.

An Observable is a collection of multiple input values that get processed using array methods such as map, reduce, filter, and so on. It comes in handy when handling asynchronous operations such as making HTTP requests, user-input events, and so on.

Observable pattern is a design pattern for registering an event, as well as implementing a process when an event has been triggered. One of the most powerful and popular JavaScript libraries that specializes in event processing is the Reactive Extensions for JavaScript library, also known as RxJS.

For us to get started, we need to ensure we have the RxJS library installed. So, we install the library with this command:

npm install rxjs

Already, this library has all the declaration files that are needed by TypeScript, so there’s no need to independently install them.

In order to create an Observable, we need the Observable type and of method from RxJS as follows:

import { of, Observable } from "rxjs"; 
const emitter : Observable<string> = of("Sam", "Ray", "Thomas");

In the above snippet, we imported the of function and the Observable type from the RxJS library followed by creating an Observable from the strings "Sam", "Ray", and "Thomas".

Next, we will subscribe to an Observable as follows:

emitter.subscribe((value: string) => {
  console.log(`Name: ${value}`)
})

In the above snippet, we registered an Observer by calling the subscribe method on the emitter variable. Since the emitter variable is of type Observable, we can access the subscribe method automatically. The subscribe method receives a function as a parameter, and this function will be called once for each value that is emitted by the Observable.

The above code will output the following Observable stream:

Name: Sam
Name: Ray
Name: Thomas

Observable operators

Pipeable Operators and Creation Operators are the two kinds of operators in RxJS.



The Pipeable Operators are methods that take an Observable as input and return another Observable. They can be piped to Observables using the syntax observableInstance.pipe(operator()). It includes the filter(...) and mergeMap(...) methods.

Creation Operators are operators that create a new Observable when called.

Creation Operators includes the following:

You can reference the RxJS official docs for a complete list of operators.

Transforming arrays with Observable

The RxJS from method allows the transformation of data in an array. It receives an array as an input and transforms each datum in the array into an Observable.

Let’s consider the code snippet below:

const transformArray: Observable<number> = from([1, 2, 3, 4]);
transformArray.subscribe((value: number) => {
  console.log(`value: ${value}`);
});

The above snippet uses the from method to transform the values in the array to an Observable followed by calling the subscribe method on the transformArray variable. The subscribe method accepts a function that gets executed for each value emitted by the Observable.

Combining multiple operators for event stream transformation

RxJS provides us with the pipe method that allows multiple operator methods to be combined for the complex transformation of event streams.

Let’s consider the following code:

const emitter = of(4, 9, 16, 25)
const mapValue = emitter.pipe(
  map((value: number) => {
    return Math.sqrt(value)
  }),
  map((value: number) => {
    return `square root: ${value}`
  })
)
mapValue.subscribe((value: string) => {
  console.log(`string value emitted ${value}`)
})

Here, the RxJS of method is used to create an Observable from the numbers 4, 9, 16, and 25. The pipe method takes in two map methods. The first map method returns the square root of the input value and the second map method transforms the output of the first map method to a string.

Finally, the subscribe method is called on each Observable emitted.

Running the above code will output as follows:

string value emitted square root: 2 
string value emitted square root: 3 
string value emitted square root: 4
string value emitted square root: 5

Handling errors with Observables

Handling exceptions within an Observable stream requires a well-structured mechanism to catch these exceptions.


More great articles from LogRocket:


Let’s consider the code snippet below:

interface IName {
  value: string;
}
interface IObj {
  name?: IName;
}
const emitObj: Observable<IObj> = of(
  { name: { value: "Bob" } },
  {},
  { name: { value: "Sam" } }
);

In the above snippet, we created IName with the property value of type string. Also, we created IObj with an optional property name of type IName. We then create the emitObj Observable, which emits three values.

Now let’s consider the following Observable stream:

const returnName = emitObj.pipe(
    map((value: IObj) => {
        return value.name!.value;
    })
);
returnName.subscribe((value: string) => {
    console.log(`name: ${value} `)
});

In the above snippet, we created an Observable stream returnName, which returns the name.value property for the input stream value. Then, we subscribe to this stream and log the received value to the console.

When we run this snippet, we will get the following output:

name: Bob
TypeError: Cannot read property 'value' of undefined

This error occurs because the second value emitted in our Observable stream does not have a name property, resulting in an undefined value.

In order to fix this error, we can create an error handler for the subscribe function in our Observable stream.

Let’s consider the following:

returnName.subscribe(
  // called for each observable value
  (value: string| null) => {
    console.log(`name: ${value} `);
  },
  // called if an error occurs
  (error: unknown) => {
    console.log(`error : ${error}`);
  },
  // called when execution is done
  () => {
    console.log(`done`);
  }
);

Here, we have provided the subscribe method with three functions as arguments, with the first called for each value that is emitted by the Observable stream, the second function called if an error occurs, and the last function called when the subscribe method execution is completed.

When we run this snippet, we will get the following output:

name: Bob
TypeError: Cannot read property 'value' of undefined

What happens here is that the first value emitted by the Observable stream is handled by the first function provided to the subscribe method. The second value emitted by the Observable stream causes an error, which is being trapped by the error function provided to the subscribe method. The last function is not called because of the error that occurs in the Observable stream.

catchError

Another way to handle Observable errors is to use the catchError operator within an Observable stream itself, so that we can catch these errors early enough.

Let’s consider the following code snippet:

const returnName = emitObj.pipe(
    map((value: IObj) => {
        return value!.name!.value;
    }),
    catchError((error: unknown) => {
        console.log(`stream caught : ${error}`);
        return of(null);
    })
);

Here, we have added a catchError operator to the Observable stream. Within this function, we log the error message to the console and then return an Observable value of null. With this implementation, the last function gets triggered even when an error occurs in the Observable.

When we run this snippet, we will get the following output:

Name: Bob
stream caught : TypeError: Cannot read property 'value' of undefined
received null
done

What happens here is that the map method generates an error for the second value of the Observable stream, after which our catchError function is being called with the following error: “Cannot read property ‘value’ of undefined.” Also, the last method gets triggered.

Once an error occurs within an Observable stream, the stream will stop emitting values. This is why the emitObj Observable stream does not emit the value of its last parameter.

Conclusion

In this article, we’ve explored data transformation with Observable streams and how to implement them. Also, we’ve discussed error handling with Observables and provided strategies to implement error handlers when working with Observable streams. To avoid memory leaks and unexpected results, make sure to unsubscribe from any subscriptions you manually subscribe to.

Hopefully you’ve found this post informative and helpful. You can also check out the official RxJS documentation for a deep dive into the RxJS Observable module. Also, I recommend Brian Troncone’s learnrxjs.io for a more visual explanation of RxJS concepts.

: Full visibility into your web and mobile apps

LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page and mobile apps.

.
Emmanuel John I'm a full-stack software developer, mentor, and writer. I am an open source enthusiast. In my spare time, I enjoy watching sci-fi movies and cheering for Arsenal FC.

2 Replies to “Using RxJS Observables to transform data in TypeScript”

  1. Svelte is a much better system for Observables. It requires setting up its dev tools. As it compiles observable code according to how it is being used.

    With rxjs, never ever use subscribe or unsubscribe yourself. Leave it to a framework such as Angular. Don’t put yourself in a place that a memory leak could happen. Even if it never does. Knockout.js deprecation is a good example why.

  2. Yes, this is something I agree with.
    Managing subscriptions ourselves can be risky because we might forget to unsubscribe from the Observable, resulting in a memory leak.

    Svelte manages our subscriptions on our behalf. This, in my opinion, makes it one of the most reactive frameworks.

    Considering Node.js where there are no native Observables, we are left to manage subscriptions ourselves.

Leave a Reply