In this article, you will learn to Transform and Combine Reactive Streams in Project reactive. Make sure you have a basic understanding of the Flux before proceeding with this article. We will use map
for transforming a Flux
, also merge
and concat
for combining multiple streams.
1. Transform a Flux using map
We can use the map
operator on a reactive stream to transform its values before emitting them to the subscribers. Transforming a value basically means performing manipulation on the individual values before they are finally emitted to the subscribers.
I will use Flux as an example to demonstrate the working of the map
operator. The filter operator first passes only the names which have more than 5 chars, then these values are further capitalised. The repeat
operator repeats the whole thing 1
more time.
@Test
public void transformMap(){
List<String> names = Arrays.asList("google", "abc", "fb", "stackoverflow");
Flux<String> names$ = Flux.fromIterable(names)
.filter(name -> name.length() > 5)
.map(name -> name.toUpperCase())
.repeat(1) //Just repeat once
.log();
StepVerifier.create(names$)
.expectNext("GOOGLE", "STACKOVERFLOW", "GOOGLE", "STACKOVERFLOW")
.verifyComplete();
}
As you can see, only the strings with more than 5 characters are capitalised and emitted twice.
2. Transform a Flux using flatMap
Just like map
, you can also use the flatMap
operator to transform a Reactive stream.
It transforms the elements emitted by this Flux asynchronously into Publishers, then flattens these inner publishers into a single Flux through merging, which allows them to interleave.
There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:
- Generation of inners and subscription: this operator is eagerly subscribing to its inners.
- Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner elements are flattened as they arrive.
- Interleaving: This operator lets values from different inners interleave (similar to merging the inner sequences).
@Test
public void transformUsingFlatMap(){
List<String> names = Arrays.asList("google", "abc", "fb", "stackoverflow");
Flux<String> names$ = Flux.fromIterable(names)
.filter(name -> name.length() > 5)
.flatMap(name -> {
return Mono.just(name.toUpperCase());
})
.repeat(1) //Just repeat once
.log();
StepVerifier.create(names$)
.expectNext("GOOGLE", "STACKOVERFLOW", "GOOGLE", "STACKOVERFLOW")
.verifyComplete();
}
The simple difference between map
and flatMap
is, flatMap needs to return a Mono.
3. Combine multiple streams using merge
The merge
Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.
@Test
public void combineUsingMerge(){
Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie");
Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker");
Flux<String> names$ = Flux.merge(names1$, names2$).log();
StepVerifier.create(names$)
.expectSubscription()
.expectNext("Blenders", "Old", "Johnnie", "Pride", "Monk", "Walker")
.verifyComplete();
}
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn’t already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
The merge operator emits the element as soon as one of its sources$
emits a value. It subscribes all the sources$
at the same time, therefore does not maintain sequence unlike the concat
operator which is discussed later.
@Test
public void mergeWithDelay(){
Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie")
.delayElements(Duration.ofSeconds(1));
Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker")
.delayElements(Duration.ofSeconds(1));
Flux<String> names$ = Flux.merge(names1$, names2$).log();
StepVerifier.create(names$)
.expectSubscription()
.expectNextCount(6)
.verifyComplete();
}
4. Combine streams with concat
The concat
operator concatenates the multiple sources and forwards the elements emitted by them to the downstream. Remember, concat
emits the element sequentially by subscribing to the first $source
, waiting for it to complete and then subscribes to the next one and so on.
@Test
public void concatWithDelay(){
Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie")
.delayElements(Duration.ofSeconds(1));
Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker")
.delayElements(Duration.ofSeconds(1));
Flux<String> names$ = Flux.concat(names1$, names2$)
.log();
StepVerifier.create(names$)
.expectSubscription()
.expectNext("Blenders", "Old", "Johnnie", "Pride", "Monk", "Walker")
.verifyComplete();
}
As you can see, it waits for the source1 to complete and then subscribes to the second one. So the concat
is useful when you want to subscribe to the sources sequentially.
5. Combine streams with zip
The zip
operator also used to combine elements from multiple sources$
. It waits for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This “Step-Merge” processing is especially useful in Scatter-Gather scenarios.
@Test
public void combineWithZip(){
Flux<String> names1$ = Flux.just("Blenders", "Old", "Johnnie")
.delayElements(Duration.ofSeconds(1));
Flux<String> names2$ = Flux.just("Pride", "Monk", "Walker")
.delayElements(Duration.ofSeconds(1));
Flux<String> names$ = Flux.zip(names1$, names2$, (n1, n2) -> {
return n1.concat(" ").concat(n2);
}).log();
StepVerifier.create(names$)
.expectNext("Blenders Pride", "Old Monk", "Johnnie Walker")
.verifyComplete();
}
Conclusion:
I discussed how to transform and combine a reactive stream in Project reactor using map
, concat
, merge
and zip
etc. There are many more operators mentioned in the Official documentation. I used Flux with simple values to give you a simple understanding of the operators, in a real-world scenario the data could be coming from a database or web-services.