TL;DR: In this article, we will reverse engineer Observable from the RxJS library. We will also re-create a couple of operators from the library and learn about Observer and Subscription. We are also going to use TypeScript to annotate the code. In the end, we will write a very small example code to use that reverse engineered library. You can find the demo at StackBlitz.
Let’s keep it simple.
I am not going to give you a lengthy explanation in this post (you can find more information here) but Reactive Programming is basically a paradigm (or approach) to manage asynchronous data streams.
RxJS is a library to do Reactive Programming. It allows you to write reactive programs with a functional approach.
Observable is the core element of RxJS. It’s more or less like an array, whose items arrive in the future asynchronously.
From the API perspective, Observable has a subscribe method. This subscribe method is used to invoke the execution of an Observable.
In the above example, we have created an Observable named observable using some magical rxjs code and then we invoked the subscribe method by passing doSomething. An important thing to remember is when we call this subscribe method only then Observable starts working. For now, ignore how we created observable and what doSomething is.
It’s also worth noting that this subscribe method returns something called a Subscription. Basically, this Subscription allows us to unsubscribe from the Observable. In other words, it returns an Object with an unsubscribe method, which allows us to stop listening to the values sent by Observable.
Observer is a collection of callback functions, which reacts to the value arriving via Observable.
In Observable, we need callbacks for three things:
Hence, Observer is a collection of three callback methods as shown below:
There is a relationship between Observer and subscribe method. Take a look at the following example:
Here, we have created an Observable and then executed it by invoking subscribe method. And if you take a closer look, we have passed an Observer to that subscribe method.
You can write the type definition of subscribe in TypeScript as follows:
Observable.subscribe(observer:Observer):Subscription;
You can combine this pattern with any Push API.
In the following example, we are going to wrap Observable around JavaScript’s setInterval API:
Now we can call this setIntervalObservable method with time and subscribe to it. It will fire the observer.next callback after each cycle of given time as shown below:
So far you have learned about Observer, Observable, Subscription and so on. Now we are going to create Observable using TypeScript Classes and Interfaces.
Observer, as mentioned, is a collection of callbacks. You already know about next, error and complete but there is an optional value named closed. Which you are going to use later in this tutorial :
As mentioned above, subscribe method returns a Subscription. So basically, a subscription takes unsubscribe method as input, so that it can be invoked by the user later on:
In this section, we will create an Observable class and a constructor which takes subscribe method as an input. The subscribe method takes Observer as input and returns a Subscription:
Observable also comes with a static method named create to create new Observable. This method also takes a subscribe method and returns an Observable:
Usually, when working with RxJS, you don’t really have to create your own custom Observable. RxJS comes with creation methods which allow you to create Observable from different types of Inputs. The input to the creation method can be anything depending upon the needs but it must return an Observable.
You can describe creation operators using TypeScript as follows:
creationOperator(input:any): Observable;
There are so many creation operators in RxJS like fromEvent and of to name a few.
setIntervalObservable (that we used earlier) is actually a creation method. We can easily re-write it using our Observable and Subscription Class as shown below:
The of creation operator from RxJS basically takes multiple values as an input and then pushes/sends those values to the observer as shown below:
We have to do the following:
Here is the complete code for the of operator:
Creating custom creation operators looks something like this:
So far we have created Observable and subscribed to them. But there is another big element of RxJS which allows us to do functional programming with asynchronous values. So we can basically use Array’s map, filter or similar methods/operators to modify the original Observable.
To work with these operators, there is a method on Observable class named pipe. This pipe method takes single or multiple operators as an input and returns a new Observable:
Observable.pipe(...invokedOperators): Observable;
Here is an example of using a filter and map operator in RxJS:
You have to first understand the structure and anatomy of RxJS pipeable operator to write our own custom pipe method on Observable class.
The type definition of a pipeable operator using TypeScript would look something like this:
type pipeableOperator = (input) => (source:Observable) => Observable;
In order to create filter operator, let’s first see it’s structure:
filter(filterPredicate): (source:Observable) => Observable;
3. Inside the if condition shown above, emit the value for new Observable.
Here is how we can code filter operator :
Creating the pipe method
Now we can reverse engineer the pipe method. But first, we have to do the following:
2. It is important to realize that, inside pipe method, we don’t really get the pipeable operator but the invocation of it. In other words, we are basically accessing whatever is returned by the operator. It is a function which takes source Observable and returns new modified Observable.
Here is how we will write pipe method:
Here is an example of creating and consuming an Observable using our reverse engineered library:
The fun part is, code in the above example is totally compatible with RxJS. So basically you can switch the imports to RxJS library and everything will work fine.
In this article, we have written a very tiny subset of RxJS Observable, creating custom creation operators and custom pipeable operators, along with reverse engineering of operator, filter operator from the RxJS. We have also learned about Observer and Subscriptions. You can check the demo at StackBlitz.
Install LogRocket via npm or script tag. LogRocket.init()
must be called client-side, not
server-side
$ npm i --save logrocket // Code: import LogRocket from 'logrocket'; LogRocket.init('app/id');
// Add to your HTML: <script src="https://cdn.lr-ingest.com/LogRocket.min.js"></script> <script>window.LogRocket && window.LogRocket.init('app/id');</script>
Hey there, want to help make our blog better?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowLearn how to manage memory leaks in Rust, avoid unsafe behavior, and use tools like weak references to ensure efficient programs.
Bypass anti-bot measures in Node.js with curl-impersonate. Learn how it mimics browsers to overcome bot detection for web scraping.
Handle frontend data discrepancies with eventual consistency using WebSockets, Docker Compose, and practical code examples.
Efficient initializing is crucial to smooth-running websites. One way to optimize that process is through lazy initialization in Rust 1.80.