An RxJS Subject is a special type of Observable that allows multicasting to multiple Observers. The concept will become clear as you proceed further. As you learned before Observables are unicast as each subscribed Observer has its own execution (Subscription). This is a complete tutorial on RxJS Subjects. You will also learn other variations of Subjects like AsyncSubject, ReplaySubject, and BehaviorSubject.
A Subject is like an Observable, but multicasts to multiple Observers.
Create RxJS Subjects
A Subject is in double nature. Every Subject is an Observable and also, every Subject is an Observer. Let us understand the technicality of it.
What happens if the Subject is an Observable?
This just means you can subscribe
a Subject just like you do to an Observable.
What happens if the Subject is an Observer?
Just like Observer, Subjects have next(value)
, error(err)
, and complete()
. The only difference is the value next(value)
is multicasted. You can pass Subject as an argument to observable$
.
In the example below, there are 2 simple Observers (observerA, observerB). They both subscribe to the subject and logs the received value. This code is also available to try on Stackblitz.com.
import { Subject } from "rxjs"; const subject = new Subject<Number>(); //ObserverA subject.subscribe({ next: value => { console.log("Inside ObserverA: " + value); } }); //ObserverB subject.subscribe({ next: value => { console.log("Inside ObserverB: " + value); } }); subject.next(10); subject.next(Math.random());
Output
Inside ObserverA: 10 Inside ObserverB: 10 Inside ObserverA: 0.5030967419361339 Inside ObserverB: 0.5030967419361339
Use Subject as an Observer
As mentioned before, the Subject is also an Observer. This means I will be able to pass it to observable$.subscribe(subject)
, but to multicast. The below example explains the same, try the code on Stackblitz.com.
import { Subject, from } from "rxjs"; const subject = new Subject<Number>(); //ObserverA subject.subscribe({ next: value => { console.log("Inside ObserverA: " + value); } }); //ObserverB subject.subscribe({ next: value => { console.log("Inside ObserverB: " + value); } }); const observable$ = from([1, 34, 3434, 343445434]); observable$.subscribe(subject); //Pass subject as argument
Output
Inside ObserverA: 10 Inside ObserverA: 1 Inside ObserverB: 1 Inside ObserverA: 34 Inside ObserverB: 34 Inside ObserverA: 3434 Inside ObserverB: 3434 Inside ObserverA: 343445434 Inside ObserverB: 343445434
In the above example, from
operator creates an Observable from an Array. When you create your own observable$, the same way you can pass a subject to it to multicast the values. The complete code example is available on Stackblitz.
const observable2$ = new Observable(subscriber => { subscriber.next(5); subscriber.next(10); subscriber.next(15); }); observable2$.subscribe(subject);
Multicast Operator – Multicasted Observables
There is a special multicast
Operator in rxjs/operators
which converts an Observable to a multicast observable. This means a multicast Observable can send notifications to multiple subscribed Observers. The below code demonstrates the same. Link to Stackblitz.com.
import { Subject, Observable } from "rxjs"; import { multicast } from "rxjs/operators"; const subject = new Subject<Number>(); const observable$ = new Observable(subscriber => { subscriber.next(5); subscriber.next(10); subscriber.next(15); }); const multicastedObservable$ = observable$.pipe(multicast(subject)); //ObserverA multicastedObservable$.subscribe({ next: value => { console.log("Inside ObserverA: " + value); } }); //ObserverB multicastedObservable$.subscribe({ next: value => { console.log("Inside ObserverB: " + value); } }); //Comment this and test the output multicastedObservable$.connect();
Output
Inside ObserverA: 5 Inside ObserverB: 5 Inside ObserverA: 10 Inside ObserverB: 10 Inside ObserverA: 15 Inside ObserverB: 15
I am not following the right typescript conventions in the above code, as I want to make sure everyone understands the tutorial. Remember, the connect()
method is very important in the above code. The multicast
opearator returns ConnectableObservable
which is a special type of Observable with connect()
method.
The
connect()
method returns a Subscriptin object, which you can use to unsubscribe to cancel the execution of shared Observable.
RxJS AsyncSubject
An AsyncSubject is a special Subject which only emits the last value before the asyncSubject.complete()
method is called. It can emit the last value through next(value)
or error(err)
depending on whatever gets executed just before the complete()
method call. I will take you through a few simple examples on AsyncSubject
.
In the below code example, it is clear how the last value only is received by the observer. If you comment out the subject.complete();
, the subject will not emit any value and hence there will not be any value received by the observer. (Stackblitz)
import { AsyncSubject } from "rxjs"; const subject = new AsyncSubject(); subject.subscribe({ next: data => { console.log("ObserverA: " + data); }, complete: () => { console.log("SubcriptionA execution complete"); } }); subject.next(12); subject.next(10000000); subject.next(0); //Comment the below line and check. subject.complete();
Output
ObserverA: 0 SubcriptionA execution complete
AsyncSubject example 2
The code below shows the emit of error(err)
. See the output on Stackblitz.com.
import { AsyncSubject } from "rxjs"; const subject = new AsyncSubject(); subject.subscribe({ next: data => { console.log("ObserverA: " + data); }, complete: () => { console.log("SubcriptionA execution complete"); }, error: err => { console.error("SubscriptionA " + err); } }); subject.next(12); subject.next(10000000); subject.error(new Error("Emitted error")); //Comment the below line and check. subject.complete(); subject.subscribe({ next: data => { console.log("ObserverB: " + data); }, complete: () => { console.log("SubcriptionB execution complete"); }, error: err => { console.error("SubscriptionB " + err); } }); //Logs an error in the console - Error: Emitted error
Output
SubscriptionA Error: Emitted error SubscriptionB Error: Emitted error
The above code only logs errors in the console. As you know, the complete
callback will not run as the observer receives an error.
NOTE – The
AsyncSubject
caches the notification that was emitted just beforecomplete()
. Any new `Observer` that subscribes the subject gets the same notification. This is what is demonstrates in the above code.
When to use AsyncSubject?
- Use AsyncSubject when you care about the value/error emitted before closing the stream.
- Use it when you want to cache the latest value. For example, the latest value fetched from Server.
BehaviorSubject in RxJS
BehaviorSubject stores the latest value emitted to subscribers. And whenever a new Observer subscribes, it immediately receives the stored last value from the BehaviorSubject
.
BehaviorSubject represents a value that changes over time. Observers can subscribe to the subject to receive the last value and all subsequent notifications. (Stackblitz)
import { BehaviorSubject } from "rxjs"; // Initialized with initial value - 1 const subject = new BehaviorSubject<Number>(1); subject.subscribe(data => { console.log("ObserverA: " + data); }); subject.next(2); subject.next(3); subject.subscribe(data => { console.log("ObserverB: " + data); }); subject.next(4); console.log("subject.getValue(): " + subject.getValue());
Output
ObserverA: 1 ObserverA: 2 ObserverA: 3 ObserverB: 3 ObserverA: 4 ObserverB: 4 subject.getValue(): 4
It is mandatory to pass an initial value while creating a BehaviorSubject as shown in line 4. The observerB receives the last emitted value 3, subject.next(3)
. You can access the last emitted value using behaviorSubject.getValue()
as shown in line-19.
ReplaySubject in RxJS
A ReplaySubject is similar to a BehaviorSubject that sends all old values to new subscribers. It can also record a part of the Observable execution.
The below code shows the behavior of an example of ReplaySubject usage. The constructor receives buffer size as a parameter. Try it on Stackblitz.com.
import { ReplaySubject } from "rxjs"; // Contructor accepts the buffer size. const rSubject = new ReplaySubject<Number>(4); rSubject.next(1); rSubject.subscribe(data => { console.log("ObserverA: " + data); }); rSubject.next(2); rSubject.next(3); rSubject.next(4); rSubject.next(5); rSubject.next(6); rSubject.subscribe(data => { console.log("ObserverB: " + data); }); rSubject.next(7);
Output
ObserverA: 1 ObserverA: 2 ObserverA: 3 ObserverA: 4 ObserverA: 5 ObserverA: 6 ObserverB: 3 ObserverB: 4 ObserverB: 5 ObserverB: 6 ObserverA: 7 ObserverB: 7
The line-4 shows the creation of a ReplaySubject
with a buffer-size of 4. This means the rSubject
can replay maximum 4 values. The observerA
gets the value 1
, but the ObserverB
receives from 3
to 6
as the buffer size is 4. Please spend some time to get a good understanding of the code.
Please share your feedback on this article on RxJS Subject. I would love to hear your thoughts on this article.