In this article, you will learn to Transform and Combine Reactive Streams in Project reactive. Make sure you have a basic understanding of the Flux before proceeding with this article. We will use map for transforming a Flux, also merge and concat for combining multiple streams.

1. Transform a Flux using map

We can use the map operator on a reactive stream to transform its values before emitting them to the subscribers. Transforming a value basically means performing manipulation on the individual values before they are finally emitted to the subscribers.

Map applied on Flux
Source: Projectreactor.io

I will use Flux as an example to demonstrate the working of the map operator. The filter operator first passes only the names which have more than 5 chars, then these values are further capitalised. The repeat operator repeats the whole thing 1 more time.

@Test
public void transformMap(){
  List<String> names = Arrays.asList("google", "abc", "fb", "stackoverflow");
  Flux<String> names$ = Flux.fromIterable(names)
      .filter(name -> name.length() > 5)
      .map(name -> name.toUpperCase())
      .repeat(1) //Just repeat once
      .log();

  StepVerifier.create(names$)
      .expectNext("GOOGLE", "STACKOVERFLOW", "GOOGLE", "STACKOVERFLOW")
      .verifyComplete();
}

As you can see, only the strings with more than 5 characters are capitalised and emitted twice.

2. Transform a Flux using flatMap

Just like map, you can also use the flatMap operator to transform a Reactive stream.

It transforms the elements emitted by this Flux asynchronously into Publishers, then flattens these inner publishers into a single Flux through merging, which allows them to interleave.

Source: Projectreactor.io

There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:

  • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
  • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner elements are flattened as they arrive.
  • Interleaving: This operator lets values from different inners interleave (similar to merging the inner sequences).
@Test
public void transformUsingFlatMap(){
  List<String> names = Arrays.asList("google", "abc", "fb", "stackoverflow");
  Flux<String> names$ = Flux.fromIterable(names)
      .filter(name -> name.length() > 5)
      .flatMap(name -> {
        return Mono.just(name.toUpperCase());
      })
      .repeat(1) //Just repeat once
      .log();

  StepVerifier.create(names$)
      .expectNext("GOOGLE", "STACKOVERFLOW", "GOOGLE", "STACKOVERFLOW")
      .verifyComplete();
}

The simple difference between map and flatMap is, flatMap needs to return a Mono.

3. Combine multiple streams using merge

The merge Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.

Source: Projectreactor.io
@Test
public void combineUsingMerge(){
  Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie");
  Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker");

  Flux<String> names$ = Flux.merge(names1$, names2$).log();

  StepVerifier.create(names$)
      .expectSubscription()
      .expectNext("Blenders", "Old", "Johnnie", "Pride", "Monk", "Walker")
      .verifyComplete();
}

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn’t already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

The merge operator emits the element as soon as one of its sources$ emits a value. It subscribes all the sources$ at the same time, therefore does not maintain sequence unlike the concat operator which is discussed later.

@Test
public void mergeWithDelay(){
  Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie")
      .delayElements(Duration.ofSeconds(1));

  Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker")
      .delayElements(Duration.ofSeconds(1));

  Flux<String> names$ = Flux.merge(names1$, names2$).log();

  StepVerifier.create(names$)
      .expectSubscription()
      .expectNextCount(6)
      .verifyComplete();
}

4. Combine streams with concat

The concat operator concatenates the multiple sources and forwards the elements emitted by them to the downstream. Remember, concat emits the element sequentially by subscribing to the first $source, waiting for it to complete and then subscribes to the next one and so on.

Source – Projectreactor.io
@Test
public  void concatWithDelay(){
  Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie")
      .delayElements(Duration.ofSeconds(1));

  Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker")
      .delayElements(Duration.ofSeconds(1));

  Flux<String> names$ = Flux.concat(names1$, names2$)
      .log();

 StepVerifier.create(names$)
      .expectSubscription()
      .expectNext("Blenders", "Old", "Johnnie", "Pride", "Monk", "Walker")
      .verifyComplete();
}

As you can see, it waits for the source1 to complete and then subscribes to the second one. So the concat is useful when you want to subscribe to the sources sequentially.

5. Combine streams with zip

The zip operator also used to combine elements from multiple sources$. It waits for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This “Step-Merge” processing is especially useful in Scatter-Gather scenarios.

Source – Projectreactor.io
@Test
public void combineWithZip(){
  Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie")
      .delayElements(Duration.ofSeconds(1));

  Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker")
      .delayElements(Duration.ofSeconds(1));

  Flux<String> names$ = Flux.zip(names1$, names2$, (n1, n2) -> {
    return n1.concat(" ").concat(n2);
  }).log();

  StepVerifier.create(names$)
      .expectNext("Blenders Pride", "Old Monk", "Johnnie Walker")
      .verifyComplete();
}

Conclusion:

I discussed how to transform and combine a reactive stream in Project reactor using map, concat, merge and zip etc. There are many more operators mentioned in the Official documentation. I used Flux with simple values to give you a simple understanding about the operators, in a real world scenario the data could be coming from a database or web-services.