RxJS Observable
You have learned about the basics of Observables and Observers in the RxJS Building Blocks tutorial. In this article, I will discuss the various ways of creating Observables in rxjs and its technical details.
As you know, the observable$ represents a lazy push-based collection. This can be a collection of any data or stream of events depending upon the source. Let us understand how a push-based mechanism is different from a traditional polling-based system.
What is Pushing vs. Pulling Data in RxJS?
I found a good article here on Medium on Thoughts on Push vs Pull Architectures which will give you a good understanding. I will also explain this on a High Level. In any application today, the code needs to actively deal with data and events. Pull and Push are two mechanisms using which a data Producer communicates with the data Consumer.
Pulling Data
In most of the traditional programming, the application actively polls a data source for checking the new data sequence. This behavior is also represented by the iterator pattern used in JavaScript Arrays, Objects, Sets, Maps, and etc. In this case, Next data is retrieved through an iteration like for loop
etc. Every javascript function is a pulling mechanism, where the function produces a data, but the code that invokes decides when the producer should send the data.
This is basically Pulling the data, where the data source has to manage the data until it is received by the receiver. Pulling of data or events may not be a good idea for many of the use cases, for example, heavy event-driven applications, realtime applications and so on.
Pushing Data – This is what RxJS implements
In reactive programming, the application has more control by subscribing to a data stream (observable in RxJS). Observer subscribes to the data source and knows when there is a new data/event. It does not actively poll the source, but it reacts to the data being pushed to it. When the event has completed (data stream is closed), the source will send a notice to the subscriber (in complete()
). This is called a Push pattern which is very well implemented in RxJS and it has proven its efficiency and performance in complex UI applications.
The Observable
will notify all the observers automatically of any state changes. observers
registers an interest through the subscribe
method. RxJS has built-in query implementation over observable sequences such as events, callbacks, Promises, HTML5 Geolocation APIs, and much much more.
Source code of Observables
The Observable class implements Subscribable
interface.
//Complete code is available here - https://github.com/ReactiveX/rxjs/blob/6.x/src/internal/Observable.ts export class Observable<T> implements Subscribable<T> { //class body }
Complete source on GitHub
export interface Subscribable<T> { subscribe(observer?: PartialObserver<T>): Unsubscribable; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable; /** @deprecated Use an observer instead of an error callback */ subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Unsubscribable; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Unsubscribable; subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable; }
As you can see in the above code, Observable implements a Subscribable interface. Also, you can pass an Observer by calling the subscribe
method. I have discussed the Observer classes in the previous article.
Creating Observables in RxJS
There are several ways in which Observables can be created and they are discussed below.
1. Using Observable constructor
Here you need to make use of the new Observable(..)
as shown in the below code and the subscriber
is used to push the values. As you know, Observable does not execute unless it is subscribed. So, observable$subscribe(observer)
subscribes the observable.
The subscribe
method accepts an Observer as the parameter. You can run the example on stackblitz.com.
import { Observable } from 'rxjs'; const observable$ = new Observable((subscriber) => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); console.log('just before subscribe'); observable$.subscribe({ next(x) { console.log('got value ' + x) }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); } }); console.log('just after subscribe');
Output
just before subscribe got value 1 got value 2 got value 3 done just after subscribe
2. Using special functions (Operators)
You can also create an Observable using several handy RxJS functions. Below code demonstrates how to make use of from
to create an observable from an Array
.
import { from } from "rxjs"; let nums = [1, 2, 4, 34, 56, 789]; let numsObservable$ = from(nums); let observer = { next: num => console.log(num), error: err => console.error("Error occured: " + err), complete: () => console.log("Execution completed") }; numsObservable$.subscribe(observer);
Output
1 2 4 34 56 789 Execution completed
Just like from
, there are several other functions that can be used to create Observables and they are known as Creation Operators. They will be discussed in the latter part of the tutorial.
3. Create using Observable.create
function
You can also create a new cold Observable by calling Observable.create
constructor. Run this example on Stackblitz.
import { Observable } from 'rxjs'; const observable$ = Observable.create((subscriber) => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); console.log('just before subscribe'); observable$.subscribe({ next(x) { console.log('got value ' + x) }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); } }); console.log('just after subscribe');
Subscribing to Observables using observers
Once you have an Observable$, the next step is to subscribe to it. As you can see in the above example, observable$.subscribe
method is used to subscribe to an observable. There are several ways to subscribe to an observable and they are discussed below in detail.
The subscribe
takes an observer as an argument. An observer is a guy who reacts based on the data that is pushed by the producer (observable). Observers have 3 properties, next
, error
and complete
.
observable$.subscribe({ next(x) { console.log('got value ' + x) }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); } });
The
observable$.subscribe
call starts the Observable execution.
It is not always that you will have all the 3 methods inside your observers likenext
, error
, complete()
. They are optional. You don’t even have to pass as an object like mentioned below.
1. Observer with next
, error
and complete
example
The example below shows the typical ways you would write an observer. This is the recommended way. You can also run it on Stackblitz.com to play with it.
import { Observable } from "rxjs"; const observable$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); const myObserver = { next: data => { console.log(data); }, error: err => { console.error(err); }, complete: () => { console.log("Execution completed"); } }; observable$.subscribe(myObserver);
Output
1 2 3 Execution completed
In the above code, next
executes whenever there is new data, error
executes if there is an error and the observable execution stops. Similarly, complete()
gets executed once the observable is done producing the values.
2. An observer with a single method, next
Here instead of next
, you can have any method like error
or complete()
. But I don’t have a good use case why I would ignore the next
method. Run it on Stackblitz.com.
import { Observable } from "rxjs"; const observable$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); const myObserver = { next: data => { console.log(data); } }; observable$.subscribe(myObserver);
Output
1 2 3
As you can see in the console output, observer does nothing when observable performs subscriber.complete()
as it does not have the necessary callback to handle that.
3. Passing the callbacks directly to subscribe
RxJS is quite flexible, we can also pass the same set of callbacks as an argument to observable$.subscribe
method. Please take a look at the below code. Run on Stackblitz.com.
import { Observable } from "rxjs"; const observable$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); observable$.subscribe( data => { console.log(data); }, error => { console.error(error); }, () => { console.log("Execution completed"); } );
Output
1 2 3 Execution completed
NOTE: The order in which callbacks are passed to
observable.subscribe
method is important. It is in the order ofnext
,error
andcomplete()
.
You may not need to deal with all the 3 callbacks. Say you only want to handle next
and error
, then the code looks like below.
observable$.subscribe( data => { console.log(data); }, error => { console.error(error); } );
Executing Observable to produce data
An observable can produce 3 types of values. It can deliver the following values.
- Next –
subscriber.next(object)
: This sends value such as Number, Object, String etcs. - Error –
subscriber.error("any-error")
: Sends Error or Exception. - Complete –
subscriber.complete()
: Indicates that execution of the observable is completed.
The code inside Observable produces the data.
const observable$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); });
An Observable execution can deliver 0 to infinite
next
notification. If either anerror
orcomplete
is delivered, nothing else can be delivered afterwards.
Basically, if you call subscriber.complete();
or subscriber.error(..)
, the execution of the Observable stops and you can not deliver anything after that. I leave this part for you to verify. Run the below code and comment output in the below.
const observable$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); subscriber.next(4); });
It is recommended to wrap the code in subscribe
with a try/catch
block.
import { Observable } from "rxjs"; const observable$ = new Observable(function subscribe(subscriber) { try { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); } catch (err) { subscriber.error(err); // delivers an error if appears one } });
Unsubscribing Observable Executions
Unsubscribing is also known as disposing of execution. So far you have created observables without cleaning the memory. An observable execution may be infinite, we need an API to cancel the execution.
Execution of the Observer is exclusive to one Observer only, therefore the subscribed observer has a way to unsubscribe()
as shown in the below example.
import { fromEvent } from "rxjs"; import { pluck } from "rxjs/operators"; const clicksObservable$ = fromEvent(document, "click"); const subscription = clicksObservable$ .pipe(pluck("clientX")) .subscribe({ next: clientX => console.log(clientX), error: err => console.error(err), complete: () => console.log("Execution completed") }); setTimeout(()=>{ subscription.unsubscribe(); //unsubscribe }, 10000);
Please run this on stackblitz.com to see the output. If you look at the console, it logs x-coordinates of each mouse click until 10 seconds. After 10 seconds, the subscription is unsubscribed.
I will be taking about RxJS subscriptions in detail in the next page of this tutorial. This is all as part of RxJS Observable in depth. Please share your feedback in the comments below.