Introduction

In this article, you will learn about the Mono in project reactor which represents 0-1 (Zero or One) item. A Mono<T> is a Publisher (Producer) that emits at most one item and then terminates. It can terminate with an onComplete (for successful completion) or an onError (for any failure/error) signal. In the previous article (Getting started with Project Reactor), I already mentioned that the Project Reactor implements the Reactive Stream specification.

1. Code inside Mono <T> class

The Mono is an abstract class that implements the Publisher from reactive streams. It has several factory methods to return an instance of its subclass. There are several subclasses of Mono abstract class like MonoJust, MonoNever, MonoOperator, MonoProcessor and many more classes.

Java
package reactor.core.publisher;

public abstract class Mono<T> 
        implements CorePublisher<T> {
  public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {..}
  public static <T> Mono<T> empty() {..}
  public static <T> Mono<T> error(Throwable error) {..}
  public static <T> Mono<T> just(T data) {..}
...
}
Java
package reactor.core;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public interface CorePublisher<T> extends Publisher<T> {
	void subscribe(CoreSubscriber<? super T> subscriber);
}
Java
package org.reactivestreams;

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

I will present several examples to make you up and running with Reactor Mono.

1.1. Marble diagram of Mono

A Marble diagram represents the state of a stream when one or more operators are applied to it. The following diagram shows how a Mono behaves when one or more operators applied to it.

As you can see, Mono emits one or zero item and completes the execution successfully. If for some reason the execution terminates abruptly, it throws an error, the cross represents the same.

Reactor Mono
Source – projectreactor.io

I encourage you to take a look at the Mono class on Github. You can find the list of methods present in this abstract class.

1.2. Setting up a maven project

To try out the examples related to Flux, let us create a maven project with the following details.

  • Add the reactor-bom for dependency version management.
  • Add reactor-core and reactor-test for running sample codes.
  • Add the junit-jupiter-api for running the @Test cases.
  • Upgrade the maven-compiler-plugin and maven-surefire-plugin to work with Junit-5 and JDK 8+.
XML
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>11</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
    </dependency>
   
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-test</artifactId>
    </dependency>
    
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-api</artifactId>
      <version>5.5.2</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-bom</artifactId>
        <version>Dysprosium-SR6</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>11</source>
          <target>11</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>3.0.0-M3</version>
      </plugin>
    </plugins>
  </build>
</project>

1.3. Create a Mono with test code

You can create a simple Mono using the Mono.just. The below code creates a Mono which emits Welcome to Jstobigdata.com. The second part of the code verifies the behavior of the above created Mono using reactor-test. In Reactor java, you can manipulate a stream content using a Reactor Operator.

The .log() prints traces of a reactor stream (Mono and Flux) and .map() is an operator which is used here to concat .com.

Java
@Test
public void simpleMono(){

  //Create a Mono with a msg.
  //.map is an Operator to manipulate the string
  //.log traces the details
  Mono<String> message$ = Mono.just("Welcome to Jstobigdata")
      .map(msg -> msg.concat(".com")).log();

  message$.subscribe(System.out::println);
  
  //Test the message$ mono
  StepVerifier.create(message$)
      .expectNext("Welcome to Jstobigdata.com")
      .verifyComplete();
}
Output:

[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(Welcome to Jstobigdata.com)
Welcome to Jstobigdata.com
[ INFO] (main) | onComplete()
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(Welcome to Jstobigdata.com)
[ INFO] (main) | onComplete()

1.4. Reactor test

Testing a Project reactor code is different then the normal java code. You need to use the reactor-test jar in the dependency to verify the behaviour of a Mono or Flux stream.

Java
@Test
void sampleTest() {  

  //Sample Mono code
  StepVerifier.create(monoStream)
        .expectNext("Welcome to Jstobigdata.com")
        .verifyComplete(); //invokes the Mono
}

The StepVerifier.create() creates the verifier, .expectNext(..) verifies the emitted value and finally it is important to call the .verifyComplete() which makes sure the monoStream completes its execution successfully.

2. Examples of a Mono stream

The followings are some of the commonly used Mono stream examples. I have written them with the .log() and test codes.

2.1. Subscribe a Mono stream

Call the Mono.subscribe(...) to subscribe any stream. There are several factory methods, few are listed below.

public final Disposable subscribe()
public final Disposable subscribe(Consumer<? super T> consumer)
public final Disposable subscribe(Consumer<? super T> consumer)
public final Disposable subscribe( consumer, errorConsumer, completeConsumer)
Java
@Test
public void subscribeMono() {

  Mono<String> message$ = Mono.just("Welcome to Jstobigdata")
      .map(msg -> msg.concat(".com")).log();
  message$.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
      s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(String s) {
      System.out.println("onNext: " + s);
    }

    @Override
    public void onError(Throwable t) {
      t.printStackTrace();
    }

    @Override
    public void onComplete() {
      System.out.println("====== Execution Completed ======");
    }
  });

  StepVerifier.create(message$)
      .expectNext("Welcome to Jstobigdata.com")
      .verifyComplete();
}
Java
@Test
public void subscribeMono2() {
  Mono<String> message$ = Mono.just("Welcome to Jstobigdata")
      .map(msg -> msg.concat(".com")).log();
  
  message$.subscribe(
      value -> {
        System.out.println(value);
      },
      err -> {
        err.printStackTrace();
      },
      () -> {
        System.out.println("===== Execution completed =====");
      });
   
    StepVerifier.create(message$)
        .expectNext("Welcome to Jstobigdata.com")
        .expectNextCount(0) //no next element
        .verifyComplete();
  }
Java
@Test
public void subscribeMono3() {

  Mono<String> message$ = Mono.error(new RuntimeException("Check error mono"));

  message$.subscribe(
      value -> {
        System.out.println(value);
      },
      err -> {
        err.printStackTrace();
      },
      () -> {
        System.out.println("===== Execution completed =====");
      });

  StepVerifier.create(message$)
      .expectError(RuntimeException.class)
      .verify();
}

I prefer using the subscribe( consumer, errorConsumer, completeConsumer) method to subscribe, so basically pass 3 lambdas, as shown in the subscribeMono2() and subscribeMono3(). If the stream throws an Exception, errorConsumer will be invoked and the execution will be terminated. Similarly, completeConsumer will be invoked when the Mono finishes its execution successfully.

2.2. Create an Empty Mono

Mono.empty() creates a Mono that completes without emitting any item.

Empty mono
source – projectreactor.io
Java
@Test
public void emptyMono() {
  Mono empty$ =  Mono.empty().log();
  
  empty$.subscribe(val -> {
    System.out.println("==== Value ====" + val);
  }, err ->{
    // handle error here
  }, ()->{
    System.out.println("====== On Complete Invoked ======");
  });
  
  //To Test
  StepVerifier.create(empty$)
      .expectNextCount(0) //optional
      .verifyComplete();
}
Output:

[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()
====== On Complete Invoked ======

[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()

2.3. Create a never emitting Mono

A No Signal or a Never Mono will never signal any data, error, or completion signal, essentially running indefinitely. Ideally, you should set a timeout duration in your test case, else it will never complete its execution.

Java
@Test
public void noSignalMono() {

  //Mono that never returns a value
  Mono<String> noSignal$ = Mono.never();

  //Can not use - Mono.never().log()
  noSignal$.subscribe(val -> {
    System.out.println("==== Value ====" + val);
  });

  StepVerifier.create(noSignal$)
      .expectTimeout(Duration.ofSeconds(5))
      .verify();
}

2.4. Throw Exception inside a Mono

You can also throw an Exception inside the Mono and the error subscription callback is invoked.

Java
  @Test
  public void subscribeMono3() {
    Mono<String> message$ = Mono.error(new RuntimeException("Check error mono"));

    message$.subscribe(
        value -> {
          System.out.println(value);
        },
        err -> {
          err.printStackTrace();
        },
        () -> {
          System.out.println("===== Execution completed =====");
        });
   
    StepVerifier.create(message$)
        .expectError(RuntimeException.class)
        .verify();
  }

2.5. Mono from a Supplier

You can also create a Mono from java.util.function.Supplier as shown below.

Java
@Test
public void fromSupplier() {
  Supplier<String> stringSupplier = () -> "Sample Message";
  
  Mono<String> sMono$ = Mono.fromSupplier(stringSupplier).log();
  sMono$.subscribe(System.out::println);
  
  StepVerifier.create(sMono$)
      .expectNext("Sample Message")
      .verifyComplete();
}

2.6. Filter a Mono

It also allows you to Filter the value using the standard java stream filter api.

Java
@Test
public void filterMono(){
  Supplier<String> stringSupplier = ()-> "Hello World";
  
  Mono<String> filteredMono$ = Mono.fromSupplier(stringSupplier)
      .filter(str ->str.length()>5)
      .log();
  filteredMono$.subscribe(System.out::println);
  
  StepVerifier.create(filteredMono$)
      .expectComplete();
}
Output:

[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(Hello World)
Hello World
[ INFO] (main) | onComplete()

Conclusion:

I have presented various ways in which a Mono can be created. There are several more Operators which you can use on a Mono, I encourage you to try out the examples from GitHub repo.


By |Last Updated: April 2nd, 2024|Categories: Spring Framework|

Table of Contents