Introduction
In this article, you will learn about Flux in Project Reactor which represents 0 to N
(Zero to N) items. Flux is a Reactive Streams Publisher with Rx operators that emits 0 to N elements, and then completes (successfully or with an error). If it is known that the underlying Publisher will emit 0 or 1 element, you should use a Mono instead.
If you directly landed on this page, please follow the project set up instructions from the previous article.
1. Code inside a Flux<T>
class
Just like Mono, Flux also is an abstract class and it implements the Publisher from reactive stream specification. It has several factory methods to create its instance.
public abstract class Flux<T> implements CorePublisher<T> {
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy
backpressure) {..}
public static <T> Flux<T> empty() {..}
public static <T> Flux<T> error(Throwable error) {..}
public static <T> Flux<T> from(Publisher<? extends T> source) {..}
public static Flux<Long> interval(Duration period) {..}
public static <T> Flux<T> just(T... data) {..}
..
}
package reactor.core;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
public interface CorePublisher<T> extends Publisher<T> {
void subscribe(CoreSubscriber<? super T> subscriber);
}
package org.reactivestreams;
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
1.1. Marble diagram of Flux
As shown in the below Marble diagram, a Flux emits zero or n number of items. When the Flux completes its emission successfully, the subscriber receives the completed signal which is shown as the vertical line. You can also use several reactive operators to transform or manipulate the values emitted. For some reason, if the Flux terminates abruptly by throwing an Exception, the subscriber receives an Error signal and this is represented as the cross (X).
1.2. Create a simple Flux
We will use the Flux.just()
factory method to create a Flux from the given set of values. The StepVerfiier from the reactor-test
is used to test our examples.
@Test
public void justFlux() {
//Create a Flux
Flux<String> simpleFlux$ = Flux.just("hello", "there").log();
//Simple Subscriber
simpleFlux$.subscribe(val -> System.out.println(val));
//Test code
StepVerifier.create(simpleFlux$)
.expectNext("hello")
.expectNext("there")
.verifyComplete();
}
2. Examples of Flux stream
The followings are some of the commonly used Flux
stream examples. I have written them with the .log()
to show the execution trace and test codes.
2.1. Subscribe a Flux stream
There’re several ways in which a Flux can be subscribed, some of them are demonstrated below. Basically, you Need to invoke flux$.subscribe
and pass an instance of a Subscriber
or lambdas.
@Test
public void subscribeFlux() {
Flux<String> messages$ = Flux.just("Welcome", "to", "Jstobigdata").log();
messages$.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("====== Execution Completed ======");
}
});
StepVerifier.create(messages$)
.expectNext("Welcome")
.expectNext("to")
.expectNext("Jstobigdata")
.verifyComplete();
}
@Test
public void subscribeFlux2() {
Flux<String> messages$ = Flux.just("Welcome", "to", "Jstobigdata").log();
messages$.subscribe(
msg -> {
System.out.println(msg);
},
err -> {
err.printStackTrace();
},
() -> {
System.out.println("===== Execution completed =====");
});
StepVerifier.create(messages$)
.expectNext("Welcome")
.expectNext("to")
.expectNext("Jstobigdata")
.verifyComplete();
}
@Test
public void subscribeFlux3() {
Flux<String> messages$ = Flux.error(new RuntimeException());
messages$.subscribe(
msg -> {
System.out.println(msg);
},
err -> {
err.printStackTrace();
},
() -> {
System.out.println("===== Execution completed =====");
});
StepVerifier.create(messages$)
.expectError(RuntimeException.class)
.verify();
}
The onSubscribe
method allows you to set the Backpressure which will be covered in a later article. If a Flux throws an Error, it is caught in the err
lambda or onError(Throwable t)
.
2.2. Create an Empty Flux
Just like Mono, you can also create an empty Flux using Flux.empty
method. As soon individual example, the subscribed method directly prints completed
without emitting any value.
@Test
public void emptyFlux() {
Flux emptyFlux$ = Flux.empty().log();
emptyFlux$.subscribe(
val -> {
System.out.println("=== Never called ===");
},
error -> {
//Error
},
() -> {
System.out.println("==== Completed ===");
});
StepVerifier.create(emptyFlux$)
.expectNextCount(0)
.verifyComplete();
}
[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription) [ INFO] (main) request(unbounded) [ INFO] (main) onComplete() ==== Completed === [ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription) [ INFO] (main) request(unbounded) [ INFO] (main) onComplete()
2.3. Never emitting Flux
For whatsoever reason if you need flux that never emits any value, you can make use of Flux.never
to create one.
@Test
public void neverFlux() {
Flux neverFlux = Flux.never().log();
StepVerifier
.create(neverFlux)
.expectTimeout(Duration.ofSeconds(2))
.verify();
}
2.4. Handling Error in Flux
The error
callback has to handle the error thrown by a Flux. There are several ways it can throw an Exception as follows.
Flux.error is used to create a Flux that emits an Exception
@Test
public void errorExample() {
Flux<String> messages$ = Flux.error(new RuntimeException());
messages$.subscribe(
msg -> {
System.out.println(msg);
},
err -> {
err.printStackTrace();
},
() -> {
System.out.println("===== Execution completed =====");
});
StepVerifier.create(messages$)
.expectError(RuntimeException.class)
.verify();
}
java.lang.RuntimeException at c.jbd.projectreactor.TestFluxSubscribe.subscribeFlux3(TestFluxSubscribe.java:69) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
In a practical scenario, a Flux can throw an Error, so the subscriber should have the error callback to handle these errors.
Flux.concatWith allows adding new values to an existing stream. In the below code, the reactive steam emits "Tom"
, "Jerry"
and then throws an Exception and terminates the stream. Therefore the subscriber never receives the value "John"
.
@Test
public void handleError() {
Flux<String> names$ = Flux.just("Tom", "Jerry")
.concatWith(Flux.error(new RuntimeException("Exception occurred..!")))
.concatWith(Flux.just("John"));
names$.subscribe(
name -> {
System.out.println(name);
}, err -> {
err.printStackTrace();
}
);
StepVerifier.create(names$)
.expectNext("Tom")
.expectNext("Jerry")
.expectError(RuntimeException.class)
.verify();
}
Tom Jerry java.lang.RuntimeException: Exception occurred..! ---- at c.jbd.projectreactor.TestFlux.handleError(TestFlux.java:66) ...
2.5. Filtering in Flux
The Filter is a special operator that allows you to evaluate each source value against a given Predicate. If the predicate test succeeds against the source value, it is emitted. If the predicate test fails, the value is ignored and a request of 1 is made to the upstream to test the next value.
Following is a simple example that demonstrates the working of Filter in emitting only the Odd numbers. The Flux.range(10, 10)
emits ten values starting from 10, e.g. 10, 11, 12, 13 ....., 19
.
@Test
public void rangeFlux() {
Flux<Integer> range$ = Flux.range(10, 10)
.filter(num -> Math.floorMod(num, 2) == 1)
.log();
range$.subscribe(System.out::println);
StepVerifier.create(range$)
//.expectNextCount(5)
.expectNext(11, 13, 15, 17, 19)
.verifyComplete();
}
Tom [ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(11) 11 [ INFO] (main) | onNext(13) 13 [ INFO] (main) | onNext(15) 15 [ INFO] (main) | onNext(17) 17 [ INFO] (main) | onNext(19) 19 [ INFO] (main) | onComplete() [ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(11) [ INFO] (main) | onNext(13) [ INFO] (main) | onNext(15) [ INFO] (main) | onNext(17) [ INFO] (main) | onNext(19) [ INFO] (main) | onComplete()
2.6. Transform using map
and flatMap
There are several operators available to manipulate the stream. I will discuss map
and flatMap
operators in the below example.
The flatMap
need to return a Mono, wherein the flat
returns an object from inside.
@Test
public void transformUsingFlatMap() {
List<Person> list = new ArrayList<>();
list.add(new Person("John", "[email protected]", "12345678"));
list.add(new Person("Jack", "[email protected]", "12345678"));
Flux<Person> people$ = Flux.fromIterable(list)
.flatMap(person -> {
return asyncCapitalize(person);
})
.log();
people$.subscribe(System.out::println);
StepVerifier.create(people$)
.expectNext(new Person("JOHN", "[email protected]", "12345678"))
.expectNext(new Person("JACK", "[email protected]", "12345678"))
.verifyComplete();
}
//Used in asynchronously process the Person
Mono<Person> asyncCapitalize(Person person) {
Person p = new Person(person.getName().toUpperCase(),
person.getEmail().toUpperCase(), person.getPhone().toUpperCase());
Mono<Person> person$ = Mono.just(p);
return person$;
}
@Test
public void transformUsingMap() {
List<Person> list = new ArrayList<>();
list.add(new Person("John", "[email protected]", "12345678"));
list.add(new Person("Jack", "[email protected]", "12345678"));
Flux<Person> people$ = Flux.fromIterable(list)
.map(p -> {
return new Person(p.getName().toUpperCase(),
p.getEmail().toUpperCase(), p.getPhone().toUpperCase());
})
.log();
people$.subscribe(System.out::println);
StepVerifier.create(people$)
.expectNext(new Person("JOHN", "[email protected]", "12345678"))
.expectNextCount(1)
.verifyComplete();
}
Conclusion:
There are several more operators which you can find in the official doc. I will be also taking about zip
, concat
in another article. So far I hope you got an overview of the Flux in project Reactor and it’s working. Download the code from GitHub repo.