Event processing is one of the common implementations when building TypeScript applications.
Registering an event as well as executing some actions when an event has been triggered requires an Observable pattern, which provides a unidirectional flow to streams of events, making debugging and error handling with event streams a lot easier.
In this article, we will explore Observables, how they are used to handle event-driven data, and errors and asynchronous data.
One of the interesting, but sometimes brain-teasing, parts of JavaScript is handling events.
Consider a real-time chat application like Facebook, which has multiple events taking place within a particular time frame. A user could be typing some text on his news feed while receiving some notifications and messages from other friends in no particular order. It is now the responsibility of the application to handle these events. This is where Observables come in.
An Observable is a collection of multiple input values that get processed using array methods such as map
, reduce
, filter
, and so on. It comes in handy when handling asynchronous operations such as making HTTP requests, user-input events, and so on.
Observable pattern is a design pattern for registering an event, as well as implementing a process when an event has been triggered. One of the most powerful and popular JavaScript libraries that specializes in event processing is the Reactive Extensions for JavaScript library, also known as RxJS.
For us to get started, we need to ensure we have the RxJS library installed. So, we install the library with this command:
npm install rxjs
Already, this library has all the declaration files that are needed by TypeScript, so there’s no need to independently install them.
In order to create an Observable, we need the Observable
type and of
method from RxJS as follows:
import { of, Observable } from "rxjs"; const emitter : Observable<string> = of("Sam", "Ray", "Thomas");
In the above snippet, we imported the of
function and the Observable
type from the RxJS library followed by creating an Observable
from the strings "Sam"
, "Ray"
, and "Thomas"
.
Next, we will subscribe to an Observable as follows:
emitter.subscribe((value: string) => { console.log(`Name: ${value}`) })
In the above snippet, we registered an Observer by calling the subscribe
method on the emitter
variable. Since the emitter
variable is of type Observable, we can access the subscribe
method automatically. The subscribe
method receives a function as a parameter, and this function will be called once for each value that is emitted by the Observable.
The above code will output the following Observable stream:
Name: Sam Name: Ray Name: Thomas
Pipeable Operators and Creation Operators are the two kinds of operators in RxJS.
The Pipeable Operators are methods that take an Observable as input and return another Observable. They can be piped to Observables using the syntax observableInstance.pipe(operator())
. It includes the filter(...)
and mergeMap(...)
methods.
Creation Operators are operators that create a new Observable when called.
Creation Operators includes the following:
You can reference the RxJS official docs for a complete list of operators.
The RxJS from
method allows the transformation of data in an array. It receives an array as an input and transforms each datum in the array into an Observable.
Let’s consider the code snippet below:
const transformArray: Observable<number> = from([1, 2, 3, 4]); transformArray.subscribe((value: number) => { console.log(`value: ${value}`); });
The above snippet uses the from
method to transform the values in the array to an Observable followed by calling the subscribe
method on the transformArray
variable. The subscribe
method accepts a function that gets executed for each value emitted by the Observable.
RxJS provides us with the pipe
method that allows multiple operator methods to be combined for the complex transformation of event streams.
Let’s consider the following code:
const emitter = of(4, 9, 16, 25) const mapValue = emitter.pipe( map((value: number) => { return Math.sqrt(value) }), map((value: number) => { return `square root: ${value}` }) ) mapValue.subscribe((value: string) => { console.log(`string value emitted ${value}`) })
Here, the RxJS of
method is used to create an Observable from the numbers 4
, 9
, 16
, and 25
. The pipe
method takes in two map
methods. The first map
method returns the square root of the input value and the second map
method transforms the output of the first map
method to a string.
Finally, the subscribe
method is called on each Observable emitted.
Running the above code will output as follows:
string value emitted square root: 2 string value emitted square root: 3 string value emitted square root: 4 string value emitted square root: 5
Handling exceptions within an Observable stream requires a well-structured mechanism to catch these exceptions.
Let’s consider the code snippet below:
interface IName { value: string; } interface IObj { name?: IName; } const emitObj: Observable<IObj> = of( { name: { value: "Bob" } }, {}, { name: { value: "Sam" } } );
In the above snippet, we created IName
with the property value
of type string
. Also, we created IObj
with an optional property name
of type IName
. We then create the emitObj
Observable, which emits three values.
Now let’s consider the following Observable stream:
const returnName = emitObj.pipe( map((value: IObj) => { return value.name!.value; }) ); returnName.subscribe((value: string) => { console.log(`name: ${value} `) });
In the above snippet, we created an Observable stream returnName
, which returns the name.value
property for the input stream value. Then, we subscribe to this stream and log the received value to the console.
When we run this snippet, we will get the following output:
name: Bob TypeError: Cannot read property 'value' of undefined
This error occurs because the second value emitted in our Observable stream does not have a name property, resulting in an undefined value.
In order to fix this error, we can create an error handler for the subscribe
function in our Observable stream.
Let’s consider the following:
returnName.subscribe( // called for each observable value (value: string| null) => { console.log(`name: ${value} `); }, // called if an error occurs (error: unknown) => { console.log(`error : ${error}`); }, // called when execution is done () => { console.log(`done`); } );
Here, we have provided the subscribe
method with three functions as arguments, with the first called for each value that is emitted by the Observable stream, the second function called if an error occurs, and the last function called when the subscribe
method execution is completed.
When we run this snippet, we will get the following output:
name: Bob TypeError: Cannot read property 'value' of undefined
What happens here is that the first value emitted by the Observable stream is handled by the first function provided to the subscribe
method. The second value emitted by the Observable stream causes an error, which is being trapped by the error function provided to the subscribe
method. The last function is not called because of the error that occurs in the Observable stream.
catchError
Another way to handle Observable errors is to use the catchError
operator within an Observable stream itself, so that we can catch these errors early enough.
Let’s consider the following code snippet:
const returnName = emitObj.pipe( map((value: IObj) => { return value!.name!.value; }), catchError((error: unknown) => { console.log(`stream caught : ${error}`); return of(null); }) );
Here, we have added a catchError
operator to the Observable stream. Within this function, we log the error message to the console and then return an Observable value of null
. With this implementation, the last function gets triggered even when an error occurs in the Observable.
When we run this snippet, we will get the following output:
Name: Bob stream caught : TypeError: Cannot read property 'value' of undefined received null done
What happens here is that the map
method generates an error for the second value of the Observable stream, after which our catchError
function is being called with the following error: “Cannot read property ‘value’ of undefined.” Also, the last method gets triggered.
Once an error occurs within an Observable stream, the stream will stop emitting values. This is why the emitObj
Observable stream does not emit the value of its last parameter.
In this article, we’ve explored data transformation with Observable streams and how to implement them. Also, we’ve discussed error handling with Observables and provided strategies to implement error handlers when working with Observable streams. To avoid memory leaks and unexpected results, make sure to unsubscribe from any subscriptions you manually subscribe to.
Hopefully you’ve found this post informative and helpful. You can also check out the official RxJS documentation for a deep dive into the RxJS Observable module. Also, I recommend Brian Troncone’s learnrxjs.io for a more visual explanation of RxJS concepts.
LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.
In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page and mobile apps.
Would you be interested in joining LogRocket's developer community?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowCompare Prisma and Drizzle ORMs to learn their differences, strengths, and weaknesses for data access and migrations.
It’s easy for devs to default to JavaScript to fix every problem. Let’s use the RoLP to find simpler alternatives with HTML and CSS.
Learn how to manage memory leaks in Rust, avoid unsafe behavior, and use tools like weak references to ensure efficient programs.
Bypass anti-bot measures in Node.js with curl-impersonate. Learn how it mimics browsers to overcome bot detection for web scraping.
2 Replies to "Using RxJS Observables to transform data in TypeScript"
Svelte is a much better system for Observables. It requires setting up its dev tools. As it compiles observable code according to how it is being used.
With rxjs, never ever use subscribe or unsubscribe yourself. Leave it to a framework such as Angular. Don’t put yourself in a place that a memory leak could happen. Even if it never does. Knockout.js deprecation is a good example why.
Yes, this is something I agree with.
Managing subscriptions ourselves can be risky because we might forget to unsubscribe from the Observable, resulting in a memory leak.
Svelte manages our subscriptions on our behalf. This, in my opinion, makes it one of the most reactive frameworks.
Considering Node.js where there are no native Observables, we are left to manage subscriptions ourselves.