An RxJS Subject is a special type of Observable that allows multicasting to multiple Observers. The concept will become clear as you proceed further. As you learned before Observables are unicast as each subscribed Observer has its own execution (Subscription). This is a complete tutorial on RxJS Subjects. You will also learn other variations of Subjects like AsyncSubject, ReplaySubject, and BehaviorSubject.

A Subject is like an Observable, but multicasts to multiple Observers.

Create RxJS Subjects

A Subject is in double nature. Every Subject is an Observable and also, every Subject is an Observer. Let us understand the technicality of it.

What happens if the Subject is an Observable?
This just means you can subscribe a Subject just like you do to an Observable.

What happens if the Subject is an Observer?
Just like Observer, Subjects have next(value), error(err), and complete(). The only difference is the value next(value) is multicasted. You can pass Subject as an argument to observable$.

In the example below, there are 2 simple Observers (observerA, observerB). They both subscribe to the subject and logs the received value. This code is also available to try on Stackblitz.com.

import { Subject } from "rxjs";

const subject = new Subject<Number>();

//ObserverA
subject.subscribe({
  next: value => {
    console.log("Inside ObserverA: " + value);
  }
});

//ObserverB
subject.subscribe({
  next: value => {
    console.log("Inside ObserverB: " + value);
  }
});

subject.next(10);
subject.next(Math.random());

Output

Inside ObserverA: 10
Inside ObserverB: 10
Inside ObserverA: 0.5030967419361339
Inside ObserverB: 0.5030967419361339

Use Subject as an Observer

As mentioned before, the Subject is also an Observer. This means I will be able to pass it to observable$.subscribe(subject), but to multicast. The below example explains the same, try the code on Stackblitz.com.

import { Subject, from } from "rxjs";

const subject = new Subject<Number>();

//ObserverA
subject.subscribe({
  next: value => {
    console.log("Inside ObserverA: " + value);
  }
});

//ObserverB
subject.subscribe({
  next: value => {
    console.log("Inside ObserverB: " + value);
  }
});

const observable$ = from([1, 34, 3434, 343445434]);
observable$.subscribe(subject); //Pass subject as argument

Output

Inside ObserverA: 10
Inside ObserverA: 1
Inside ObserverB: 1
Inside ObserverA: 34
Inside ObserverB: 34
Inside ObserverA: 3434
Inside ObserverB: 3434
Inside ObserverA: 343445434
Inside ObserverB: 343445434

In the above example, from operator creates an Observable from an Array. When you create your own observable$, the same way you can pass a subject to it to multicast the values. The complete code example is available on Stackblitz.

const observable2$ = new Observable(subscriber => {
  subscriber.next(5);
  subscriber.next(10);
  subscriber.next(15);
});
observable2$.subscribe(subject);

Multicast Operator – Multicasted Observables

There is a special multicast Operator in rxjs/operators which converts an Observable to a multicast observable. This means a multicast Observable can send notifications to multiple subscribed Observers. The below code demonstrates the same. Link to Stackblitz.com.

import { Subject, Observable } from "rxjs";
import { multicast } from "rxjs/operators";

const subject = new Subject<Number>();

const observable$ = new Observable(subscriber => {
  subscriber.next(5);
  subscriber.next(10);
  subscriber.next(15);
});
const multicastedObservable$ = observable$.pipe(multicast(subject));

//ObserverA
multicastedObservable$.subscribe({
  next: value => {
    console.log("Inside ObserverA: " + value);
  }
});

//ObserverB
multicastedObservable$.subscribe({
  next: value => {
    console.log("Inside ObserverB: " + value);
  }
});

//Comment this and test the output
multicastedObservable$.connect();

Output

Inside ObserverA: 5
Inside ObserverB: 5
Inside ObserverA: 10
Inside ObserverB: 10
Inside ObserverA: 15
Inside ObserverB: 15

I am not following the right typescript conventions in the above code, as I want to make sure everyone understands the tutorial. Remember, the connect() method is very important in the above code. The multicast opearator returns ConnectableObservable which is a special type of Observable with connect() method.

The connect() method returns a Subscriptin object, which you can use to unsubscribe to cancel the execution of shared Observable.

RxJS AsyncSubject

An AsyncSubject is a special Subject which only emits the last value before the asyncSubject.complete() method is called. It can emit the last value through next(value) or error(err) depending on whatever gets executed just before the complete() method call. I will take you through a few simple examples on AsyncSubject.

In the below code example, it is clear how the last value only is received by the observer. If you comment out the subject.complete();, the subject will not emit any value and hence there will not be any value received by the observer. (Stackblitz)

import { AsyncSubject } from "rxjs";

const subject = new AsyncSubject();

subject.subscribe({
  next: data => {
    console.log("ObserverA: " + data);
  },
  complete: () => {
    console.log("SubcriptionA execution complete");
  }
});

subject.next(12);
subject.next(10000000);
subject.next(0);

//Comment the below line and check.
subject.complete();

Output

ObserverA: 0
SubcriptionA execution complete

AsyncSubject example 2
The code below shows the emit of error(err). See the output on Stackblitz.com.

import { AsyncSubject } from "rxjs";

const subject = new AsyncSubject();

subject.subscribe({
  next: data => {
    console.log("ObserverA: " + data);
  },
  complete: () => {
    console.log("SubcriptionA execution complete");
  },
  error: err => {
    console.error("SubscriptionA " + err);
  }
});

subject.next(12);
subject.next(10000000);
subject.error(new Error("Emitted error"));

//Comment the below line and check.
subject.complete();

subject.subscribe({
  next: data => {
    console.log("ObserverB: " + data);
  },
  complete: () => {
    console.log("SubcriptionB execution complete");
  },
  error: err => {
    console.error("SubscriptionB " + err);
  }
});
//Logs an error in the console - Error: Emitted error

Output

SubscriptionA Error: Emitted error
SubscriptionB Error: Emitted error

The above code only logs errors in the console. As you know, the complete callback will not run as the observer receives an error.

NOTE – The AsyncSubject caches the notification that was emitted just before complete(). Any new `Observer` that subscribes the subject gets the same notification. This is what is demonstrates in the above code.

When to use AsyncSubject?

  • Use AsyncSubject when you care about the value/error emitted before closing the stream.
  • Use it when you want to cache the latest value. For example, the latest value fetched from Server.

BehaviorSubject in RxJS

BehaviorSubject stores the latest value emitted to subscribers. And whenever a new Observer subscribes, it immediately receives the stored last value from the BehaviorSubject.

BehaviorSubject represents a value that changes over time. Observers can subscribe to the subject to receive the last value and all subsequent notifications. (Stackblitz)

import { BehaviorSubject } from "rxjs";

// Initialized with initial value - 1
const subject = new BehaviorSubject<Number>(1);

subject.subscribe(data => {
  console.log("ObserverA: " + data);
});

subject.next(2);
subject.next(3);

subject.subscribe(data => {
  console.log("ObserverB: " + data);
});

subject.next(4);

console.log("subject.getValue(): " + subject.getValue());

Output

ObserverA: 1
ObserverA: 2
ObserverA: 3
ObserverB: 3
ObserverA: 4
ObserverB: 4
subject.getValue(): 4

It is mandatory to pass an initial value while creating a BehaviorSubject as shown in line 4. The observerB receives the last emitted value 3, subject.next(3). You can access the last emitted value using behaviorSubject.getValue() as shown in line-19.

ReplaySubject in RxJS

A ReplaySubject is similar to a BehaviorSubject that sends all old values to new subscribers. It can also record a part of the Observable execution.

The below code shows the behavior of an example of ReplaySubject usage. The constructor receives buffer size as a parameter. Try it on Stackblitz.com.

import { ReplaySubject } from "rxjs";

// Contructor accepts the buffer size.
const rSubject = new ReplaySubject<Number>(4);

rSubject.next(1);
rSubject.subscribe(data => {
  console.log("ObserverA: " + data);
});

rSubject.next(2);
rSubject.next(3);
rSubject.next(4);
rSubject.next(5);
rSubject.next(6);

rSubject.subscribe(data => {
  console.log("ObserverB: " + data);
});

rSubject.next(7);

Output

ObserverA: 1
ObserverA: 2
ObserverA: 3
ObserverA: 4
ObserverA: 5
ObserverA: 6
ObserverB: 3
ObserverB: 4
ObserverB: 5
ObserverB: 6
ObserverA: 7
ObserverB: 7

The line-4 shows the creation of a ReplaySubject with a buffer-size of 4. This means the rSubject can replay maximum 4 values. The observerA gets the value 1, but the ObserverB receives from 3 to 6 as the buffer size is 4. Please spend some time to get a good understanding of the code.

Please share your feedback on this article on RxJS Subject. I would love to hear your thoughts on this article.

By |Last Updated: April 1st, 2024|Categories: RxJS|

Table of Contents