Introduction

In this article, you will learn about Flux in Project Reactor which represents 0 to N (Zero to N) items. Flux is a Reactive Streams Publisher with Rx operators that emits 0 to N elements, and then completes (successfully or with an error). If it is known that the underlying Publisher will emit 0 or 1 element, you should use a Mono instead.

If you directly landed on this page, please follow the project set up instructions from the previous article.

1. Code inside a Flux<T> class

Just like Mono, Flux also is an abstract class and it implements the Publisher from reactive stream specification. It has several factory methods to create its instance.

public abstract class Flux<T> implements CorePublisher<T> {

    public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy 
    backpressure) {..}
    public static <T> Flux<T> empty() {..}
    public static <T> Flux<T> error(Throwable error) {..}
    public static <T> Flux<T> from(Publisher<? extends T> source) {..}
    public static Flux<Long> interval(Duration period) {..}
    public static <T> Flux<T> just(T... data) {..}
    ..
}
package reactor.core;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public interface CorePublisher<T> extends Publisher<T> {
  void subscribe(CoreSubscriber<? super T> subscriber);
}
package org.reactivestreams;

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

1.1. Marble diagram of Flux

As shown in the below Marble diagram, a Flux emits zero or n number of items. When the Flux completes its emission successfully, the subscriber receives the completed signal which is shown as the vertical line. You can also use several reactive operators to transform or manipulate the values emitted. For some reason, if the Flux terminates abruptly by throwing an Exception, the subscriber receives an Error signal and this is represented as the cross (X).

Flux in Project Reactor
Source – Projectreactor.io

1.2. Create a simple Flux

We will use the Flux.just() factory method to create a Flux from the given set of values. The StepVerfiier from the reactor-test is used to test our examples.

@Test
public void justFlux() {
  //Create a Flux
  Flux<String> simpleFlux$ = Flux.just("hello", "there").log();

 //Simple Subscriber
  simpleFlux$.subscribe(val -> System.out.println(val));

  //Test code
  StepVerifier.create(simpleFlux$)
      .expectNext("hello")
      .expectNext("there")
      .verifyComplete();
}

2. Examples of Flux stream

The followings are some of the commonly used Flux stream examples. I have written them with the .log() to show the execution trace and test codes.

2.1. Subscribe a Flux stream

There’re several ways in which a Flux can be subscribed, some of them are demonstrated below. Basically, you Need to invoke flux$.subscribe and pass an instance of a Subscriber or lambdas.

@Test
public void subscribeFlux() {
  Flux<String> messages$ = Flux.just("Welcome", "to", "Jstobigdata").log();

  messages$.subscribe(new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
      s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(String s) {
      System.out.println("onNext: " + s);
    }

    @Override
    public void onError(Throwable t) {
      t.printStackTrace();
    }

    @Override
    public void onComplete() {
      System.out.println("====== Execution Completed ======");
    }
  });

  StepVerifier.create(messages$)
      .expectNext("Welcome")
      .expectNext("to")
      .expectNext("Jstobigdata")
      .verifyComplete();
}
@Test
public void subscribeFlux2() {
  Flux<String> messages$ = Flux.just("Welcome", "to", "Jstobigdata").log();

  messages$.subscribe(
      msg -> {
        System.out.println(msg);
      },
      err -> {
        err.printStackTrace();
      },
      () -> {
        System.out.println("===== Execution completed =====");
      });

  StepVerifier.create(messages$)
      .expectNext("Welcome")
      .expectNext("to")
      .expectNext("Jstobigdata")
      .verifyComplete();
}
@Test
public void subscribeFlux3() {
  Flux<String> messages$ = Flux.error(new RuntimeException());

  messages$.subscribe(
      msg -> {
        System.out.println(msg);
      },
      err -> {
        err.printStackTrace();
      },
      () -> {
        System.out.println("===== Execution completed =====");
      });

  StepVerifier.create(messages$)
      .expectError(RuntimeException.class)
      .verify();
}

The onSubscribe method allows you to set the Backpressure which will be covered in a later article. If a Flux throws an Error, it is caught in the err lambda or onError(Throwable t).

2.2. Create an Empty Flux

Just like Mono, you can also create an empty Flux using Flux.empty method. As soon individual example, the subscribed method directly prints completed without emitting any value.

@Test
public void emptyFlux() {
  Flux emptyFlux$ = Flux.empty().log();
  emptyFlux$.subscribe(
      val -> {
        System.out.println("=== Never called ===");
      },
      error -> {
        //Error
      },
      () -> {
        System.out.println("==== Completed ===");
      });

  StepVerifier.create(emptyFlux$)
      .expectNextCount(0)
      .verifyComplete();
}

Output:

[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()
==== Completed ===
[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()

2.3. Never emitting Flux

For whatsoever reason if you need flux that never emits any value, you can make use of Flux.never to create one.

@Test
public void neverFlux() {
  Flux neverFlux = Flux.never().log();

  StepVerifier
      .create(neverFlux)
      .expectTimeout(Duration.ofSeconds(2))
      .verify();
}

2.4. Handling Error in Flux

The error callback has to handle the error thrown by a Flux. There are several ways it can throw an Exception as follows.

Flux.error is used to create a Flux that emits an Exception

@Test
public void errorExample() {
  Flux<String> messages$ = Flux.error(new RuntimeException());

  messages$.subscribe(
      msg -> {
        System.out.println(msg);
      },
      err -> {
        err.printStackTrace();
      },
      () -> {
        System.out.println("===== Execution completed =====");
      });

  StepVerifier.create(messages$)
      .expectError(RuntimeException.class)
      .verify();
}

Output:

java.lang.RuntimeException
	at c.jbd.projectreactor.TestFluxSubscribe.subscribeFlux3(TestFluxSubscribe.java:69)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

In a practical scenario, a Flux can throw an Error, so the subscriber should have the error callback to handle these errors.

Flux.concatWith allows adding new values to an existing stream. In the below code, the reactive steam emits "Tom", "Jerry" and then throws an Exception and terminates the stream. Therefore the subscriber never receives the value "John".

@Test
public void handleError() {
  Flux<String> names$ = Flux.just("Tom", "Jerry")
      .concatWith(Flux.error(new RuntimeException("Exception occurred..!")))
      .concatWith(Flux.just("John"));

  names$.subscribe(
      name -> {
        System.out.println(name);
      }, err -> {
        err.printStackTrace();
      }
  );

  StepVerifier.create(names$)
      .expectNext("Tom")
      .expectNext("Jerry")
      .expectError(RuntimeException.class)
      .verify();
}

Output:

Tom
Jerry
java.lang.RuntimeException: Exception occurred..!
---- at c.jbd.projectreactor.TestFlux.handleError(TestFlux.java:66) ...

2.5. Filtering in Flux

The Filter is a special operator that allows you to evaluate each source value against a given Predicate. If the predicate test succeeds against the source value, it is emitted. If the predicate test fails, the value is ignored and a request of 1 is made to the upstream to test the next value.

Flux Filter Marble diagram
Source – Projectreactor.io

Following is a simple example that demonstrates the working of Filter in emitting only the Odd numbers. The Flux.range(10, 10) emits ten values starting from 10, e.g. 10, 11, 12, 13 ....., 19.

@Test
public void rangeFlux() {
  Flux<Integer> range$ = Flux.range(10, 10)
      .filter(num -> Math.floorMod(num, 2) == 1)
      .log();

  range$.subscribe(System.out::println);

  StepVerifier.create(range$)
      //.expectNextCount(5)
      .expectNext(11, 13, 15, 17, 19)
      .verifyComplete();
}

Output:

Tom
[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(11)
11
[ INFO] (main) | onNext(13)
13
[ INFO] (main) | onNext(15)
15
[ INFO] (main) | onNext(17)
17
[ INFO] (main) | onNext(19)
19
[ INFO] (main) | onComplete()
[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(11)
[ INFO] (main) | onNext(13)
[ INFO] (main) | onNext(15)
[ INFO] (main) | onNext(17)
[ INFO] (main) | onNext(19)
[ INFO] (main) | onComplete()

2.6. Transform using map and flatMap

There are several operators available to manipulate the stream. I will discuss map and flatMap operators in the below example.

The flatMap need to return a Mono, wherein the flat returns an object from inside.

@Test
public void transformUsingFlatMap() {
  List<Person> list = new ArrayList<>();
  list.add(new Person("John", "john@gmail.com", "12345678"));
  list.add(new Person("Jack", "jack@gmail.com", "12345678"));
  Flux<Person> people$ = Flux.fromIterable(list)
      .flatMap(person -> {
        return asyncCapitalize(person);
      })
      .log();

  people$.subscribe(System.out::println);

  StepVerifier.create(people$)
      .expectNext(new Person("JOHN", "JOHN@GMAIL.COM", "12345678"))
      .expectNext(new Person("JACK", "JACK@GMAIL.COM", "12345678"))
      .verifyComplete();
}

//Used in asynchronously process the Person
Mono<Person> asyncCapitalize(Person person) {
  Person p = new Person(person.getName().toUpperCase(),
      person.getEmail().toUpperCase(), person.getPhone().toUpperCase());
  Mono<Person> person$ = Mono.just(p);
  return person$;
}
@Test
public void transformUsingMap() {
  List<Person> list = new ArrayList<>();
  list.add(new Person("John", "john@gmail.com", "12345678"));
  list.add(new Person("Jack", "jack@gmail.com", "12345678"));
  Flux<Person> people$ = Flux.fromIterable(list)
      .map(p -> {
        return new Person(p.getName().toUpperCase(),
            p.getEmail().toUpperCase(), p.getPhone().toUpperCase());
      })
      .log();

  people$.subscribe(System.out::println);

  StepVerifier.create(people$)
      .expectNext(new Person("JOHN", "JOHN@GMAIL.COM", "12345678"))
      .expectNextCount(1)
      .verifyComplete();
}

Conclusion:

There are several more operators which you can find in the official doc. I will be also taking about zip, concat in another article. So far I hope you got an overview of the Flux in project Reactor and it’s working.