RxJS Observable

You have learned about the basics of Observables and Observers in the RxJS Building Blocks tutorial. In this article, I will discuss the various ways of creating Observables in rxjs and its technical details.

As you know, the observable$ represents a lazy push-based collection. This can be a collection of any data or stream of events depending upon the source. Let us understand how a push-based mechanism is different from a traditional polling-based system.

What is Pushing vs. Pulling Data in RxJS?

I found a good article here on Medium on Thoughts on Push vs Pull Architectures which will give you a good understanding. I will also explain this on a High Level. In any application today, the code needs to actively deal with data and events. Pull and Push are two mechanisms using which a data Producer communicates with the data Consumer.

Pulling Data

In most of the traditional programming, the application actively polls a data source for checking the new data sequence. This behavior is also represented by the iterator pattern used in JavaScript Arrays, Objects, Sets, Maps, and etc. In this case, Next data is retrieved through an iteration like for loop etc. Every javascript function is a pulling mechanism, where the function produces a data, but the code that invokes decides when the producer should send the data.

This is basically Pulling the data, where the data source has to manage the data until it is received by the receiver. Pulling of data or events may not be a good idea for many of the use cases, for example, heavy event-driven applications, realtime applications and so on.

Pushing Data – This is what RxJS implements

In reactive programming, the application has more control by subscribing to a data stream (observable in RxJS). Observer subscribes to the data source and knows when there is a new data/event. It does not actively poll the source, but it reacts to the data being pushed to it. When the event has completed (data stream is closed), the source will send a notice to the subscriber (in complete()). This is called a Push pattern which is very well implemented in RxJS and it has proven its efficiency and performance in complex UI applications.

The Observable will notify all the observers automatically of any state changes. observers registers an interest through the subscribe method. RxJS has built-in query implementation over observable sequences such as events, callbacks, Promises, HTML5 Geolocation APIs, and much much more.

Source code of Observables

The Observable class implements Subscribable interface.

//Complete code is available here - https://github.com/ReactiveX/rxjs/blob/6.x/src/internal/Observable.ts

export class Observable<T> implements Subscribable<T> {
    //class body
}

Complete source on GitHub

export interface Subscribable<T> {
  subscribe(observer?: PartialObserver<T>): Unsubscribable;
  /** @deprecated Use an observer instead of a complete callback */
  subscribe(next: null | undefined, error: null | undefined, complete: () => void): Unsubscribable;
  /** @deprecated Use an observer instead of an error callback */
  subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Unsubscribable;
  /** @deprecated Use an observer instead of a complete callback */
  subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Unsubscribable;
  subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable;
}

As you can see in the above code, Observable implements a Subscribable interface. Also, you can pass an Observer by calling the subscribe method. I have discussed the Observer classes in the previous article.

Creating Observables in RxJS

There are several ways in which Observables can be created and they are discussed below.

1. Using Observable constructor

Here you need to make use of the new Observable(..) as shown in the below code and the subscriber is used to push the values. As you know, Observable does not execute unless it is subscribed. So, observable$subscribe(observer) subscribes the observable.

The subscribe method accepts an Observer as the parameter. You can run the example on stackblitz.com.

import { Observable } from 'rxjs';

const observable$ = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

console.log('just before subscribe');
observable$.subscribe({
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
     console.log('done');
  }
});
console.log('just after subscribe');

Output

just before subscribe
got value 1
got value 2
got value 3
done
just after subscribe

2. Using special functions (Operators)

You can also create an Observable using several handy RxJS functions. Below code demonstrates how to make use of from to create an observable from an Array.

import { from } from "rxjs";

let nums = [1, 2, 4, 34, 56, 789];
let numsObservable$ = from(nums);

let observer = {
  next: num => console.log(num),
  error: err => console.error("Error occured: " + err),
  complete: () => console.log("Execution completed")
};

numsObservable$.subscribe(observer);

Output

1
2
4
34
56
789
Execution completed

Just like from, there are several other functions that can be used to create Observables and they are known as Creation Operators. They will be discussed in the latter part of the tutorial.

3. Create using Observable.create function

You can also create a new cold Observable by calling Observable.create constructor. Run this example on Stackblitz.

import { Observable } from 'rxjs';

const observable$ = Observable.create((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

console.log('just before subscribe');
observable$.subscribe({
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
     console.log('done');
  }
});
console.log('just after subscribe');

Subscribing to Observables using observers

Once you have an Observable$, the next step is to subscribe to it. As you can see in the above example, observable$.subscribe method is used to subscribe to an observable. There are several ways to subscribe to an observable and they are discussed below in detail.

The subscribe takes an observer as an argument. An observer is a guy who reacts based on the data that is pushed by the producer (observable). Observers have 3 properties, next, error and complete.

observable$.subscribe({
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
     console.log('done');
  }
});

The observable$.subscribe call starts the Observable execution.

It is not always that you will have all the 3 methods inside your observers likenext, error, complete(). They are optional. You don’t even have to pass as an object like mentioned below.

1. Observer with next, error and complete example

The example below shows the typical ways you would write an observer. This is the recommended way. You can also run it on Stackblitz.com to play with it.

import { Observable } from "rxjs";

const observable$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

const myObserver = {
  next: data => {
    console.log(data);
  },
  error: err => {
    console.error(err);
  },
  complete: () => {
    console.log("Execution completed");
  }
};

observable$.subscribe(myObserver);

Output

1
2
3
Execution completed

In the above code, next executes whenever there is new data, error executes if there is an error and the observable execution stops. Similarly, complete() gets executed once the observable is done producing the values.

2. An observer with a single method, next

Here instead of next, you can have any method like error or complete(). But I don’t have a good use case why I would ignore the next method. Run it on Stackblitz.com.

import { Observable } from "rxjs";

const observable$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

const myObserver = {
  next: data => {
    console.log(data);
  }
};

observable$.subscribe(myObserver);

Output

1
2
3

As you can see in the console output, observer does nothing when observable performs subscriber.complete() as it does not have the necessary callback to handle that.

3. Passing the callbacks directly to subscribe

RxJS is quite flexible, we can also pass the same set of callbacks as an argument to observable$.subscribe method. Please take a look at the below code. Run on Stackblitz.com.

import { Observable } from "rxjs";

const observable$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

observable$.subscribe(
  data => {
    console.log(data);
  },
  error => {
    console.error(error);
  },
  () => {
    console.log("Execution completed");
  }
);

Output

1
2
3
Execution completed

NOTE: The order in which callbacks are passed to observable.subscribe method is important. It is in the order of next, error and complete().

You may not need to deal with all the 3 callbacks. Say you only want to handle next and error , then the code looks like below.

observable$.subscribe(
  data => {
    console.log(data);
  },
  error => {
    console.error(error);
  }
);

Executing Observable to produce data

An observable can produce 3 types of values. It can deliver the following values.

  1. Nextsubscriber.next(object): This sends value such as Number, Object, String etcs.
  2. Errorsubscriber.error("any-error"): Sends Error or Exception.
  3. Completesubscriber.complete(): Indicates that execution of the observable is completed.

The code inside Observable produces the data.

const observable$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

An Observable execution can deliver 0 to infinite next notification. If either an error or complete is delivered, nothing else can be delivered afterwards.

Basically, if you call subscriber.complete(); or subscriber.error(..), the execution of the Observable stops and you can not deliver anything after that. I leave this part for you to verify. Run the below code and comment output in the below.

const observable$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4);
});

It is recommended to wrap the code in subscribe with a try/catch block.

import { Observable } from "rxjs";

const observable$ = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // delivers an error if appears one
  }
});

Unsubscribing Observable Executions

Unsubscribing is also known as disposing of execution. So far you have created observables without cleaning the memory. An observable execution may be infinite, we need an API to cancel the execution.

Execution of the Observer is exclusive to one Observer only, therefore the subscribed observer has a way to unsubscribe() as shown in the below example.

import { fromEvent } from "rxjs";
import { pluck } from "rxjs/operators";

const clicksObservable$ = fromEvent(document, "click");

const subscription = clicksObservable$
  .pipe(pluck("clientX"))
  .subscribe({
    next: clientX => console.log(clientX),
    error: err => console.error(err),
    complete: () => console.log("Execution completed")
  });

 setTimeout(()=>{
   subscription.unsubscribe(); //unsubscribe
 }, 10000);

Please run this on stackblitz.com to see the output. If you look at the console, it logs x-coordinates of each mouse click until 10 seconds. After 10 seconds, the subscription is unsubscribed.

I will be taking about RxJS subscriptions in detail in the next page of this tutorial. This is all as part of RxJS Observable in depth. Please share your feedback in the comments below.

By |Last Updated: April 1st, 2024|Categories: RxJS|

Table of Contents