Editor’s note: This article was last updated by Temitope Oyedele on 24 April 2023 to compare RxJS Observables, operators, and Observers, as well as to explore the difference between Observables and Promises in Angular. For more information, check out Using RxJS operators to consume Observables.
What is RxJS?
RxJS is a framework for reactive programming that makes use of Observables, making it easy to write asynchronous code. According to the official documentation, this project is a kind of reactive extension to JavaScript with better performance, modularity, and debuggable call stacks, while staying mostly backwards-compatible, with some changes that reduce the API surface. RxJS is the official library used by Angular to handle reactivity, converting pull operations for call-backs into Observables.
In this article, we’ll cover:
- Understanding Observables: Pull vs. push
- What is a Stream?
- What are Observables?
- What is an Observer?
- What is an operator?
- Comparing Observables, Observers, and operators in RxJS
- The Observable lifecycle
- The difference between Observables and Promises
Prerequisites
To be able to follow this article’s demonstration you should have:
- Node version v18.16.0 installed on your machine
- Node Package Manager v8.0 or above (usually ships with Node installation)
- Angular CLI v15
- The latest version of Angular (v15)
// run the command in a terminal ng version
Confirm that you are using the latest version of Angular, and update to it if you are not. Create an Angular project for this tutorial by simply navigating to the folder for your project, opening it up in your terminal, and running the command:
ng new my_stater_project
Then, follow the instructions displayed.
Another thing that will be nice to have for this tutorial is working knowledge of the Angular framework at a beginner level.
Understanding Observables: Pull vs. push
Before diving into Observables, it’s important to understand the concepts of push and pull communication systems in JavaScript.
Pull system
A pull system is basically a function. A function is usually first defined (a process called production) and then, somewhere along the line, called (this process is called consumption)to return the data or value in the function. For functions, the producer (which is the definition) does not have any idea of when the data is going to be consumed, so the function call literally pulls the return value or data from the producer.
Push system
In the push system, control rests on the producer. The consumer does not know exactly when the data will get passed to it. A common example are promises in JavaScript. Promises (producers) push already-resolved value to call-backs (consumers). Another example is RxJS Observables, which produce multiple values called a stream (unlike promises that return one value) and push them to Observers, which serve as consumers.
What is a Stream?
A stream is a sequence of data values over time. This can range from a simple increment of numbers printed in six seconds (zero, one, two, three, four, five) or coordinates printed over time. This includes even the data value of inputs in form or chat texts passed through WebSockets or API responses. These all represent data values that will be collected over time, hence the name stream.
What are Observables?
Streams are important to understand because they are facilitated by RxJS Observables. An Observable is a function that can return a stream of values to an Observer over time, either synchronously or asynchronously. The data values returned can go from zero to an infinite range of values.
What is an Observer?
An Observer is an object that receives notifications from an Observable. An Observer has three methods: next()
, error()
, and complete()
. These methods are called, respectively, by the Observable when new values are emitted, when an error occurs during the emission process, and when the Observable completes and will emit no more values.
Observers can be used to consume the values emitted by an Observable and to trigger side effects, such as updating a user interface or performing some computation.
What is an operator?
Operators are functions that are used to manipulate and transform Observable streams. Many different types of operators are available in RxJS, each serving a different purpose. Some common types of operators include:
- Creation operators: These are used to create new observable streams from scratch, using functions such as
Observable.create
,from
,interval
, andof
- Transformation operators: These are used to transform the values emitted by an observable stream into new values, using functions such as
map
,pluck
,scan
, andswitchMap
- Filtering operators: These are used to selectively emit values from an observable stream, based on certain conditions, using functions such as
filter
,take
,skip
, anddistinct
- Combination operators: These are used to combine multiple observable streams into a single stream, using functions such as
merge
,combineLatest
, andconcat
- Error handling operators: These are used to handle errors that occur in an observable stream, using functions such as
catchError
,retry
, andthrowError
Each Operator takes one or more observable streams as input and returns a new observable stream as output. Operators can be chained together to form complex data transformation pipelines, allowing you to manipulate and transform observable streams in powerful ways.
Comparing Observables, Observers, and operators in RxJS
Observables, operators, and observers are three fundamental concepts in RxJS that work together to create reactive and asynchronous programming patterns.
Observables represent a stream of data that can be subscribed to, allowing multiple values to be emitted over time. They provide methods for subscribing to the stream and handling its emissions. Observables can be seen as the source of data.
Operators, on the other hand, are functions that can be used to transform, filter, or combine data streams produced by Observables. They allow data modification as it flows through the stream, resulting in a new Observable with the transformed data stream. Operators can be chained together to create a pipeline of transformations on the data stream.
Observers are objects that can listen to an Observable and react to each value emitted by the Observable. They can execute custom logic in response to each emitted value, such as updating the UI or making additional API calls. Observers can be added to an Observable to listen for specific events at different stages of stream processing. Observers can be thought of as consumers of data.
Observers and subscriptions
For Observables to work, there need to be Observers and subscriptions. Observables are data source wrappers and then the Observer executes some instructions when there is a new value or a change in data values. The Observable is connected to the Observer who does the execution through subscription. With a subscribe method, the Observer connects to the Observable to execute a code block.
The Observable lifecycle
With some help from Observes and subscriptions, the Observable instance passes through these four stages throughout its lifetime:
- Creation
- Subscription
- Execution
- Destruction
Creating Observables
If you followed this post from the start, you must have opened the Angular starter project in VS Code. To create an Observable, you have to first import an Observable from RxJS in the .ts
file of the component you want to create it in. The creation syntax looks something like this:
import { Observable } from "rxjs"; var observable = Observable.create((observer:any) => { observer.next('Hello World!') })
Open your app.component.ts
file and copy the code block below into it:
import { Component, OnInit } from '@angular/core'; import { Observable } from "rxjs"; @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit{ title = 'ngcanvas'; ngOnInit(): void { var observable = Observable.create() } }
Subscribing to Observables
To tell RxJS to execute the code block on the Observable, or in a simpler term, to call the Observable to begin execution, you have to use the subscribe method like this:
export class AppComponent implements OnInit { title = 'ngcanvas'; ngOnInit(): void { var observable = Observable.create((observer:any) => { observer.next('Hello World!') }) observable.subscribe(function logMessage(message:any) { console.log(message); }) } }
This subscribe method will cause “hello world” to be logged in the console.
Executing Observables
The Observer is in charge of executing instructions in the Observable, so each Observer that subscribes can deliver three values to the Observable:
- Next value: With the next value, the Observer sends a value that can be a number, string, or object. There can be more than one next notification set on a particular Observable
- Error value: With the error value, the Observer sends a JavaScript exception. If an error is found in the Observable, nothing else can be delivered to the Observable
- Complete value: With the complete value, the Observer sends no value. This usually signals that the subscriptions for that particular Observable are complete. If the complete value is sent, nothing else can be delivered to the Observable
This can be illustrated with the code block below:
export class AppComponent implements OnInit { title = 'ngcanvas'; ngOnInit(): void { var observable = Observable.create((observer:any) => { observer.next('I am number 1'); observer.next('I am number 2'); observer.error('I am number 3'); observer.complete(); observer.next('I am number 5'); }); observable.subscribe(function logMessage(message:any) { console.log(message); }); } }
If you run the application at this point in the dev server with:
ng serve
… when you open up the console in the developer tools, your log will look like this:
You will notice that either the error value or complete value automatically stops execution and so the number five never shows up in the console. This is a simple synchronous exercise. To make it asynchronous, let’s wrap timers around some of the values:
export class AppComponent implements OnInit{ title = 'ngcanvas'; ngOnInit(): void { var observable = Observable.create((observer:any) => { observer.next('I am number 1') observer.next('I am number 2') setInterval(() => { observer.next('Random Async log message') }, 2000) observer.next('I am number 3') observer.next('I am number 4') setInterval(() => { observer.error('This is the end') }, 6001) observer.next('I am number 5') }) observable.subscribe(function logMessage(message:any) { console.log(message); }) } }
This will appear like this in your browser console:
Notice that the display of value was done here asynchronously, with the help of the setInterval
module.
Destroying an Observable
To destroy an Observable is to essentially remove it from the DOM by unsubscribing to it. Normally for asynchronous logic, RxJS takes care of unsubscribing and, immediately after an error or a complete notification, your Observable is unsubscribed. You can manually trigger unsubscribe with something like this:
return function unsubscribe() { clearInterval(observable); };
Why Observables are so vital
- Emitting multiple values asynchronously is very easily handled with Observables
- Error handlers can also easily be done inside Observables rather than a construct like Promises
- Observables are considered lazy, so in case of no subscription there will be no emission of data values
- Observables can be resolved multiple times as opposed to functions, or even Promises
The difference between Observables and Promises
It’s important to note that while they share some similarities, Observables and Promises also have significant differences.
An Observable is an object that represents a stream of data that can be subscribed to, allowing multiple values to be emitted over time. Observables provide methods for subscribing to the stream and handling its emissions. On the other hand, Operators are functions that can be used to transform, filter, or combine data streams produced by Observables. Operators take an Observable as input and return a new Observable with the transformed data stream.
A Promise is a one-time operation that represents an asynchronous operation’s eventual completion or failure and can only return a single value. Once a Promise is resolved or rejected, its state cannot be changed. Promises are used in Angular for handling HTTP requests and other asynchronous operations.
More great articles from LogRocket:
- Don't miss a moment with The Replay, a curated newsletter from LogRocket
- Learn how LogRocket's Galileo cuts through the noise to proactively resolve issues in your app
- Use React's useEffect to optimize your application's performance
- Switch between multiple versions of Node
- Discover how to animate your React app with AnimXYZ
- Explore Tauri, a new framework for building binaries
- Advisory boards aren’t just for executives. Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
On the other hand, Observables represent a stream of data that can be subscribed to multiple times. Observables can return multiple values over time and are commonly used in Angular for handling events, animations, and real-time data. Observables are also more powerful than Promises, as they provide operators to transform, filter, and combine data streams.
Another key difference between the two is how they handle errors. Promises use a single error handler that is called when an error occurs, whereas Observables can handle errors at each stage of the data stream.
Conclusion
In this article, we have been given a thorough introduction to Observables, Observers, and operators in RxJS. We have also been shown the lifecycle process of Observables with practical illustrations. More RxJS posts can be found on the blog. Happy hacking!
Get set up with LogRocket's modern error tracking in minutes:
- Visit https://logrocket.com/signup/ to get an app ID
- Install LogRocket via npm or script tag.
LogRocket.init()
must be called client-side, not server-side - (Optional) Install plugins for deeper integrations with your stack:
- Redux middleware
- NgRx middleware
- Vuex plugin
$ npm i --save logrocket
// Code:
import LogRocket from 'logrocket';
LogRocket.init('app/id');
Add to your HTML:
<script src="https://cdn.lr-ingest.com/LogRocket.min.js"></script>
<script>window.LogRocket && window.LogRocket.init('app/id');</script>
I love RXjs. This is a good article.
You shouldn’t pass anything to the `complete` function of the observer. It doesn’t accept anything. See http://reactivex.io/rxjs/class/es6/MiscJSDoc.js~ObserverDoc.html .
You just call it like `observer.complete()`
so the observer is a producer or consumer ? because you produce data with it while i understand that you pass it in subscribe to consume data from the observable.
Great article – thanks.
fyi – it doesn’t look like your code is indented properly starting with the “Subscribing to Observables” section (at least not while viewing this on Chrome v90).