You will learn about Backpressure in the Project reactor. Backpressure is the ability of a Consumer to signal the Producer that the rate of emission is higher than what it can handle. So using this mechanism, the Consumer gets control over the speed at which data is emitted.
If you are new to Project Reactor, read about the Flux in reactive stream.
What is Backpressure?
- Using Backpressure, the Subscriber controls the data flow from the Publisher.
- The Subscriber makes use of
request(n)
to requestn
number of elements at a time.
Backpressure example using Flux
There are several ways we can implement Backpressure technique, we will discuss them one by one.
1. Request only n elements
Lets a there is a stream which emits x
number of elements, but the subscriber only wants to receive n
number of elements from the publisher.
@Test
public void subscriptionRequest() {
Flux<Integer> range$ = Flux.range(1, 100);
//gets only 10 elements
range$.subscribe(
value -> System.out.println(value),
err -> err.printStackTrace(),
() -> System.out.println("==== Completed ===="),
subscription -> subscription.request(10)
);
}
Output:
1 2 3 4 5 6 7 8 9 10
2. Cancel the subscription after n elements
A better approach is to have more control over the flow and cancel the subscription when needed. You can use the BaseSubscriber::hookOnNext
to subscribe to a producer. Look at the example below, the producer emits the next value only when the subscriber sends request(1)
. In reality, this producer can be a database system and the Subscriber could be a I/O device. So in order to match the speed of operation, I/O may request a batch of data, process them, and then request the next batch of data and so on.
@Test
public void cancelCallback() {
Flux<Integer> range$ = Flux.range(1, 100).log() ;
range$.doOnCancel(() -> {
System.out.println("===== Cancel method invoked =======");
}).doOnComplete(() -> {
System.out.println("==== Completed ====");
}).subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value){
try{
Thread.sleep(500);
request(1); //request next element
System.out.println(value);
if (value == 5) {
cancel();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
//Test the code
StepVerifier.create(range$)
.expectNext(1, 2, 3, 4, 5)
.thenCancel()
.verify();
}
Output:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(1) [ INFO] (main) | request(1) 1 [ INFO] (main) | onNext(2) [ INFO] (main) | request(1) 2 [ INFO] (main) | onNext(3) [ INFO] (main) | request(1) 3 [ INFO] (main) | onNext(4) [ INFO] (main) | request(1) 4 [ INFO] (main) | onNext(5) [ INFO] (main) | request(1) 5 ===== Cancel method invoked ======= [ INFO] (main) | cancel() [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(1) [ INFO] (main) | onNext(2) [ INFO] (main) | onNext(3) [ INFO] (main) | onNext(4) [ INFO] (main) | onNext(5) [ INFO] (main) | cancel()
3. Backpressure StepVerifier
The StepVerifier
of reactor-test
allows us to test a Producer properly w.r.t backpressure behavior. Only when the thenRequest(n)
is triggered, the producer sends the next n
number of elements.
@Test
public void backpressureVerifier() {
Flux data$ = Flux.just(101, 201, 301).log();
StepVerifier.create(data$)
.expectSubscription()
.thenRequest(1)
.expectNext(101)
.thenRequest(2)
.expectNext(201, 301)
.verifyComplete();
}
Output:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) [ INFO] (main) | request(1) [ INFO] (main) | onNext(101) [ INFO] (main) | request(2) [ INFO] (main) | onNext(201) [ INFO] (main) | onNext(301) [ INFO] (main) | onComplete() [ INFO] (main) | request(unbounded)
Conclusion
I have kept the concept simple. You can refer to the official docs for more examples on Backpressure in the Project reactor. The idea is to have control over the flow of data from a Reactive Stream.