Indermohan Singh JavaScript Developer interested in @angular @rxjs and Ionic framework.

Reverse engineering Observable

7 min read 1960

TL;DR: In this article, we will reverse engineer Observable from the RxJS library. We will also re-create a couple of operators from the library and learn about Observer and Subscription. We are also going to use TypeScript to annotate the code. In the end, we will write a very small example code to use that reverse engineered library. You can find the demo at StackBlitz.

Introduction to Reactive Programming and RxJS

Let’s keep it simple.

  • Reactive Programming is programming with asynchronous data streams — Andre Staltz (Creator of cycle.js)

I am not going to give you a lengthy explanation in this post (you can find more information here) but Reactive Programming is basically a paradigm (or approach) to manage asynchronous data streams.

RxJS is a library to do Reactive Programming. It allows you to write reactive programs with a functional approach.

What is Observable?

Observable is the core element of RxJS. It’s more or less like an array, whose items arrive in the future asynchronously.

  • Observable represents the idea of an invokable collection of future values or events. — RxJS Docs

From the API perspective, Observable has a subscribe method. This subscribe method is used to invoke the execution of an Observable.

let observable; // somehow we have created it
observable.subscribe(doSomething); // doSomething does the heavy lifting for observable.

In the above example, we have created an Observable named observable using some magical rxjs code and then we invoked the subscribe method by passing doSomething. An important thing to remember is when we call this subscribe method only then Observable starts working. For now, ignore how we created observable and what doSomething is.

It’s also worth noting that this subscribe method returns something called a Subscription. Basically, this Subscription allows us to unsubscribe from the Observable. In other words, it returns an Object with an unsubscribe method, which allows us to stop listening to the values sent by Observable.

What is Observer?

Observer is a collection of callback functions, which reacts to the value arriving via Observable.

  • Observer is a collection of callbacks that knows how to listen to values delivered by the Observable. — RxJS Docs.

In Observable, we need callbacks for three things:

  • values — future values, which Observable is going to send/push
  • errors — errors which might occur while invocation of an Observable to signal when observable is done with sending values

Hence, Observer is a collection of three callback methods as shown below:

{
  'next':  (value) => { // do whatever you want to here },
  'error': (error) => { // error occurred },
  'complete': () => { // complete}
}

The subscribe method and Observer

There is a relationship between Observer and subscribe method. Take a look at the following example:

let observable; // somehow we have created it
let doSomething = {
  'next': (value) => { // do whatever you want to here },
  'error': (error) => { // error occurred },
  'complete': () => { // complete}
};

observable.subscribe(doSomething); // doSomething does the heavy lifting for observable.

Here, we have created an Observable and then executed it by invoking subscribe method. And if you take a closer look, we have passed an Observer to that subscribe method.

You can write the type definition of subscribe in TypeScript as follows:

Observable.subscribe(observer:Observer):Subscription;

You can combine this pattern with any Push API.

Using Observable and Observer pattern

In the following example, we are going to wrap Observable around JavaScript’s setInterval API:

function setIntervalObservable(time) {
  return {
    'subscribe': (observer) => {
      let timeId = setInterval(() => { observer.next(); }, time);
      let subscription = {
        'unsubscribe': () => {
          clearInterval(timeId);
        };
      };
      return subscription;
    };
  }
}

Now we can call this setIntervalObservable method with time and subscribe to it. It will fire the observer.next callback after each cycle of given time as shown below:

let interval$ = setIntervalObservable(1000);
let observer = {
  'next': (value) => { console.log("Hello World"); },
  'error': (error) => { // error occurred },
  'complete': () => { // complete}
};
interval$.subscribe(observer);

// Output
// Hello World 
// Hello World
// ...

Reverse engineering Observable

So far you have learned about Observer, Observable, Subscription and so on. Now we are going to create Observable using TypeScript Classes and Interfaces.

Creating Observer interface

Observer, as mentioned, is a collection of callbacks. You already know about next, error and complete but there is an optional value named closed. Which you are going to use later in this tutorial :

interface Observer {
  next: (value: any) => void;
  error: (err: any) => void;
  complete: () => void;
}

Creating Subscription class

As mentioned above, subscribe method returns a Subscription. So basically, a subscription takes unsubscribe method as input, so that it can be invoked by the user later on:

class Subscription {
  unsubscribe:() => void;
  constructor(unsubscribe:() => void) {
    this.unsubscribe = unsubscribe;
  }
};

Creating Observable class

In this section, we will create an Observable class and a constructor which takes subscribe method as an input. The subscribe method takes Observer as input and returns a Subscription:

class Observable {
  subscribe: (observer: Observer) => Subscription;
  constructor(subscribe) {
    this.subscribe = subscribe;
  };
}

Creating create static method on Observable class

Observable also comes with a static method named create to create new Observable. This method also takes a subscribe method and returns an Observable:

class Observable {
...
  public static create(subscribe): Observable {
    return new Observable(subscribe);
  };
}

RxJS creation operators

Usually, when working with RxJS, you don’t really have to create your own custom Observable. RxJS comes with creation methods which allow you to create Observable from different types of Inputs. The input to the creation method can be anything depending upon the needs but it must return an Observable.

You can describe creation operators using TypeScript as follows:

creationOperator(input:any): Observable;

There are so many creation operators in RxJS like fromEvent and of to name a few.

setIntervalObservable (that we used earlier) is actually a creation method. We can easily re-write it using our Observable and Subscription Class as shown below:

function setIntervalObservable(time) {
  return Observable.create(observer => {
    let timeId = setInterval(() => observer.next(), time);
    return new Subscription(() => { 
      clearInterval(timeId);
    });
  });
}

Reverse engineering of creation operator

The of creation operator from RxJS basically takes multiple values as an input and then pushes/sends those values to the observer as shown below:

// example
of(1,2,3)
.subscribe(observer);

// output
// 1 2 3

We have to do the following:

  • loop over each value given as input
  • fire observer.next with those values
  • after that, fire observer.complete()
  • return a Subscription

Here is the complete code for the of operator:

let of = (...values) =>  {
  return new Observable((observer:Observer) => {
    values.forEach(value => {
      observer.next(value);
    });
    // complete observer
    observer.complete();
    return new Subscription(() => {
      // unsubscription action here
    });
  });
};

How to create a custom creation operator?

Creating custom creation operators looks something like this:

  • operator can take any number or type of inputs, depending upon the need
  • It must return an Observable
  • send/push values by invoking observer.next
  • After observable is complete, fire observer.complete()
  • Don’t forget to return a Subscription from within Observable

Pipeable operators in RxJS

So far we have created Observable and subscribed to them. But there is another big element of RxJS which allows us to do functional programming with asynchronous values. So we can basically use Array’s map, filter or similar methods/operators to modify the original Observable.

To work with these operators, there is a method on Observable class named pipe. This pipe method takes single or multiple operators as an input and returns a new Observable:

Observable.pipe(...invokedOperators): Observable;

Here is an example of using a filter and map operator in RxJS:

let $stream = of(1,2,3,4);
$stream
.pipe(
  filter(x => x > 2),
  map(x => x * 2)
).subscribe(observer);

// Output
// of     1 2 3 4
// filter - - 3 4
// map    - - 6 8

Creating custom pipeable operators

You have to first understand the structure and anatomy of RxJS pipeable operator to write our own custom pipe method on Observable class.

The type definition of a pipeable operator using TypeScript would look something like this:

type pipeableOperator = (input) => (source:Observable) => Observable;

  • operator takes an input. This input can be anything and either single or multi-value. It depends upon what kind of operator you want to make.
  • the operator function returns another function. This returned function takes the source Observable as an input and returns a new Observable by modifying the input by performing the desired action based on operator’s input.

Creating filter operator

In order to create filter operator, let’s first see it’s structure:

filter(filterPredicate): (source:Observable) => Observable;

  1. filterPredicate is the function which returns a Boolean value. You have to apply it to the value emitted by source Observable.
  2. We can access the values emitted by source Observable by subscribing to it, as shown below:
source.subscribe({
  next(value) {
    if(filterPredicate(value) {
      // emit the value for new Observer here
      // using observer.next(value);
    });
  }
});

3. Inside the if condition shown above, emit the value for new Observable.

Here is how we can code filter operator :

// type definition for pipeable operator
type pipeableOperator = (input) => (source:Observable) => Observable;

let filter:pipeableOperator = (filterFn) => {
  return (source) => {
    return new Observable((observer: Observer) => {
        let subscription = source.subscribe({ 
         next(value) { 
           if(filterFn(value)) {
             observer.next(value); 
           };
          },
         error(err) { observer.error(err); },
         complete() { observer.complete(); }
        });
        
        // Unsubscription
        return new Subscription(() => {
            subscription.unsubscribe();
        });
    });
  };
};
  • Similarly, you can create other operators like map and so on.

Creating the pipe method

Now we can reverse engineer the pipe method. But first, we have to do the following:

  1. Pipe method takes single or multiple inputs. So we have to loop over all of those operators. We can use JavaScript’s spread operator and forEach to do that as shown below:
pipe(...operators) {
  operators.forEach(operator => {
  });
}

2. It is important to realize that, inside pipe method, we don’t really get the pipeable operator but the invocation of it. In other words, we are basically accessing whatever is returned by the operator. It is a function which takes source Observable and returns new modified Observable.

  1. We can access the source Observable via this.

  2. Basically, we will start with this as first Observable and then call the first operator on it. We will use this new Observable as a source for next operator.

Here is how we will write pipe method:

class Observable {
...
 public pipe(...invockedOps): Observable {
    let observable = this;
    invockedOps.forEach(invockedOp => {
      observable = invockedOp(observable);
    });
    return observable;
  }
}

Final example

Here is an example of creating and consuming an Observable using our reverse engineered library:

// import the code that you have written 
// import { Observer, of }
// import { map, filter }
let observer: Observer =  {
  next(value) { console.log(value); },
  error(error) { console.warn(error) },
  complete() { console.log("completed") }
};

let myValues = of(1,2,3,4)
.pipe(
  filter(x => x > 2)
).subscribe(observer);

The fun part is, code in the above example is totally compatible with RxJS. So basically you can switch the imports to RxJS library and everything will work fine.

Conclusion

In this article, we have written a very tiny subset of RxJS Observable, creating custom creation operators and custom pipeable operators, along with reverse engineering of operator, filter operator from the RxJS. We have also learned about Observer and Subscriptions. You can check the demo at StackBlitz.

Plug: LogRocket, a DVR for web apps

https://logrocket.com/signup/

LogRocket is a frontend logging tool that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single page apps.

Try it for free.

 

Indermohan Singh JavaScript Developer interested in @angular @rxjs and Ionic framework.

Leave a Reply