Introduction
In this article, you will learn about the Mono in project reactor which represents 0-1
(Zero or One) item. A Mono<T>
is a Publisher (Producer) that emits at most one item and then terminates. It can terminate with an onComplete
(for successful completion) or an onError
(for any failure/error) signal. In the previous article (Getting started with Project Reactor), I already mentioned that the Project Reactor implements the Reactive Stream specification.
1. Code inside Mono <T>
class
The Mono
is an abstract class that implements the Publisher
from reactive streams. It has several factory methods to return an instance of its subclass. There are several subclasses of Mono
abstract class like MonoJust
, MonoNever
, MonoOperator
, MonoProcessor
and many more classes.
package reactor.core.publisher;
public abstract class Mono<T>
implements CorePublisher<T> {
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {..}
public static <T> Mono<T> empty() {..}
public static <T> Mono<T> error(Throwable error) {..}
public static <T> Mono<T> just(T data) {..}
...
}
package reactor.core;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
public interface CorePublisher<T> extends Publisher<T> {
void subscribe(CoreSubscriber<? super T> subscriber);
}
package org.reactivestreams;
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
I will present several examples to make you up and running with Reactor Mono.
1.1. Marble diagram of Mono
A Marble diagram represents the state of a stream when one or more operators are applied to it. The following diagram shows how a Mono behaves when one or more operators applied to it.
As you can see, Mono emits one or zero item and completes the execution successfully. If for some reason the execution terminates abruptly, it throws an error, the cross represents the same.
I encourage you to take a look at the Mono class on Github. You can find the list of methods present in this abstract class.
1.2. Setting up a maven project
To try out the examples related to Flux, let us create a maven project with the following details.
- Add the
reactor-bom
for dependency version management. - Add
reactor-core
andreactor-test
for running sample codes. - Add the
junit-jupiter-api
for running the@Test
cases. - Upgrade the
maven-compiler-plugin
andmaven-surefire-plugin
to work with Junit-5 and JDK 8+.
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version>
</plugin>
</plugins>
</build>
</project>
1.3. Create a Mono with test code
You can create a simple Mono
using the Mono.just
. The below code creates a Mono which emits Welcome to Jstobigdata.com
. The second part of the code verifies the behavior of the above created Mono using reactor-test
. In Reactor java, you can manipulate a stream content using a Reactor Operator.
The .log()
prints traces of a reactor stream (Mono and Flux) and .map()
is an operator which is used here to concat .com
.
@Test
public void simpleMono(){
//Create a Mono with a msg.
//.map is an Operator to manipulate the string
//.log traces the details
Mono<String> message$ = Mono.just("Welcome to Jstobigdata")
.map(msg -> msg.concat(".com")).log();
message$.subscribe(System.out::println);
//Test the message$ mono
StepVerifier.create(message$)
.expectNext("Welcome to Jstobigdata.com")
.verifyComplete();
}
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(Welcome to Jstobigdata.com) Welcome to Jstobigdata.com [ INFO] (main) | onComplete() [ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(Welcome to Jstobigdata.com) [ INFO] (main) | onComplete()
1.4. Reactor test
Testing a Project reactor code is different then the normal java code. You need to use the reactor-test
jar in the dependency to verify the behaviour of a Mono or Flux stream.
@Test
void sampleTest() {
//Sample Mono code
StepVerifier.create(monoStream)
.expectNext("Welcome to Jstobigdata.com")
.verifyComplete(); //invokes the Mono
}
The StepVerifier.create()
creates the verifier, .expectNext(..)
verifies the emitted value and finally it is important to call the .verifyComplete()
which makes sure the monoStream
completes its execution successfully.
2. Examples of a Mono stream
The followings are some of the commonly used Mono stream examples. I have written them with the .log()
and test codes.
2.1. Subscribe a Mono stream
Call the Mono.subscribe(...)
to subscribe any stream. There are several factory methods, few are listed below.
public final Disposable subscribe()
public final Disposable subscribe(Consumer<? super T> consumer)
public final Disposable subscribe(Consumer<? super T> consumer)
public final Disposable subscribe( consumer, errorConsumer, completeConsumer)
@Test
public void subscribeMono() {
Mono<String> message$ = Mono.just("Welcome to Jstobigdata")
.map(msg -> msg.concat(".com")).log();
message$.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("====== Execution Completed ======");
}
});
StepVerifier.create(message$)
.expectNext("Welcome to Jstobigdata.com")
.verifyComplete();
}
@Test
public void subscribeMono2() {
Mono<String> message$ = Mono.just("Welcome to Jstobigdata")
.map(msg -> msg.concat(".com")).log();
message$.subscribe(
value -> {
System.out.println(value);
},
err -> {
err.printStackTrace();
},
() -> {
System.out.println("===== Execution completed =====");
});
StepVerifier.create(message$)
.expectNext("Welcome to Jstobigdata.com")
.expectNextCount(0) //no next element
.verifyComplete();
}
@Test
public void subscribeMono3() {
Mono<String> message$ = Mono.error(new RuntimeException("Check error mono"));
message$.subscribe(
value -> {
System.out.println(value);
},
err -> {
err.printStackTrace();
},
() -> {
System.out.println("===== Execution completed =====");
});
StepVerifier.create(message$)
.expectError(RuntimeException.class)
.verify();
}
I prefer using the subscribe( consumer, errorConsumer, completeConsumer)
method to subscribe, so basically pass 3 lambdas, as shown in the subscribeMono2()
and subscribeMono3()
. If the stream throws an Exception, errorConsumer
will be invoked and the execution will be terminated. Similarly, completeConsumer
will be invoked when the Mono finishes its execution successfully.
2.2. Create an Empty Mono
Mono.empty()
creates a Mono that completes without emitting any item.
@Test
public void emptyMono() {
Mono empty$ = Mono.empty().log();
empty$.subscribe(val -> {
System.out.println("==== Value ====" + val);
}, err ->{
// handle error here
}, ()->{
System.out.println("====== On Complete Invoked ======");
});
//To Test
StepVerifier.create(empty$)
.expectNextCount(0) //optional
.verifyComplete();
}
[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription) [ INFO] (main) request(unbounded) [ INFO] (main) onComplete() ====== On Complete Invoked ====== [ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription) [ INFO] (main) request(unbounded) [ INFO] (main) onComplete()
2.3. Create a never emitting Mono
A No Signal or a Never Mono will never signal any data, error, or completion signal, essentially running indefinitely. Ideally, you should set a timeout duration in your test case, else it will never complete its execution.
@Test
public void noSignalMono() {
//Mono that never returns a value
Mono<String> noSignal$ = Mono.never();
//Can not use - Mono.never().log()
noSignal$.subscribe(val -> {
System.out.println("==== Value ====" + val);
});
StepVerifier.create(noSignal$)
.expectTimeout(Duration.ofSeconds(5))
.verify();
}
2.4. Throw Exception inside a Mono
You can also throw an Exception inside the Mono and the error subscription callback is invoked.
@Test
public void subscribeMono3() {
Mono<String> message$ = Mono.error(new RuntimeException("Check error mono"));
message$.subscribe(
value -> {
System.out.println(value);
},
err -> {
err.printStackTrace();
},
() -> {
System.out.println("===== Execution completed =====");
});
StepVerifier.create(message$)
.expectError(RuntimeException.class)
.verify();
}
2.5. Mono from a Supplier
You can also create a Mono from java.util.function.Supplier
as shown below.
@Test
public void fromSupplier() {
Supplier<String> stringSupplier = () -> "Sample Message";
Mono<String> sMono$ = Mono.fromSupplier(stringSupplier).log();
sMono$.subscribe(System.out::println);
StepVerifier.create(sMono$)
.expectNext("Sample Message")
.verifyComplete();
}
2.6. Filter a Mono
It also allows you to Filter the value using the standard java stream filter api.
@Test
public void filterMono(){
Supplier<String> stringSupplier = ()-> "Hello World";
Mono<String> filteredMono$ = Mono.fromSupplier(stringSupplier)
.filter(str ->str.length()>5)
.log();
filteredMono$.subscribe(System.out::println);
StepVerifier.create(filteredMono$)
.expectComplete();
}
[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(Hello World) Hello World [ INFO] (main) | onComplete()
Conclusion:
I have presented various ways in which a Mono can be created. There are several more Operators which you can use on a Mono, I encourage you to try out the examples from GitHub repo.