In this article, I will explain to create a Reactive Rest API in Spring WebFlux with MongoDB as the document store. We will use reactive programming in API as well as the Repository level. This is an attempt to give you a real-life application development feel in this tutorial. In the previous tutorials, I have already discussed creating nonblocking Restful web-services in Spring WebFlux, make sure you have gone through them thoroughly.

Prerequisites:

  • Basic knowledge of MongoDB
  • Knowledge of Spring Data for reactive MongoDB
  • Flux and Mono in reactive programming
  • Docker and Docker-compose installed
  • Maven installed in your system

1. Setting up the application

We will create CRUD APIs for a Contact manager application. This means we will implement Create, Read, Update and Delete endpoints for Contact Entity. We will create these endpoints using a standard annotation-based Controller approach as well as using the Functional programming approach in Spring WebFlux.

1.1. Project Setup from start.spring.io

We will generate our Spring WebFlux project from start.spring.io as we did in the previous articles. Only additional dependency here is spring-boot-starter-data-mongodb-reactive. Download the project, unzip it, and import it to your favorite IDE.

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
....
  </dependencies>

1.2. MongoDB setup using docker-compose

If you already have a setup of MongoDB running, you need not perform this.

I have put up a MongoDB docker-compose file on GitHub repo. All you can do is copy the content into a file docker-compose.yml in your local system and run docker-compose up.

# Mongo docker image
version: '3.7'

services:
  mongo:
    image: 'mongo:4.4.0-rc8-bionic'
    restart: on-failure
    ports:
    - 27017:27017
    volumes:
    - ./.mongo-volume:/data/db
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: root@123

  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: root@123

Navigate to the directory where this docker-compose.yml file is located and run the following command from the terminal.

docker-compose up

1.3. Configure the application.yml with MongoDB details

I prefer using YAML files for holding the Spring-boot application properties. You can alternatively use the properties file if you wish to. If you are new to YAML, check out the article on YAML – Human readable Configuration file.

Add the followings to your resources/application.yml file. I have made use of the Spring profile.

spring:
  profiles:
    active: dev
---
spring:
  profiles: dev
  data:
    mongodb:
      host: localhost
      port: 27017
      database: spring_reactive_demo
      username: root
      password: root@123
      authentication-database: admin
# Change the uat and prod profiles
server:
  port: 8083
---
spring:
  profiles: unit_test
  data:
    mongodb:
      host: localhost
      port: 27017
      database: spring_reactive_test
      username: root
      password: root@123
      authentication-database: admin
server:
  port: 8083
#---
#spring:
#  profiles: prod
#  data:
#    mongodb:
#      host: localhost
#      port: 27017
#      database: localDB

2. Implement the Controller based endpoints

We will first create the standard @RestController based endpoints to implement Contact CRUD operations. In the next part, we will implement functional endpoints.

Step-1: Generate the Contact class and ContactRepository

Contact class: You would never need to set the Id field as this will be generated by MongoDB. The class is annotated with @Document(collection = “contact”) as this needs to be stored in the database as contact collection.

ContactRepository interface: This is the reactive repository that contains logic for CRUD operations. We have 2 custom methods as shown. You can optionally use @Repository annotation in this interface, but not mandatory.

SpringBootMongoRest: This is the Spring boot config class. In your case, this will be Webflux02MongoRestApiApplication, I have just renamed it.

package c.jbd.webflux.data.mongo;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.util.Objects;
import java.util.StringJoiner;

@Document(collection = "contact")
public class Contact {
  @Id
  private String id;
  private String name;
  private String email;
  private String phone;

  public Contact() {
  }

  public Contact(String name, String email, String phone) {
    this.name = name;
    this.email = email;
    this.phone = phone;
  }

  public String getId() {
    return id;
  }

  public Contact setId(String id) {
    this.id = id;
    return this;
  }

  public String getName() {
    return name;
  }

  public Contact setName(String name) {
    this.name = name;
    return this;
  }

  public String getEmail() {
    return email;
  }

  public Contact setEmail(String email) {
    this.email = email;
    return this;
  }

  public String getPhone() {
    return phone;
  }

  public Contact setPhone(String phone) {
    this.phone = phone;
    return this;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    Contact contact = (Contact) o;
    return Objects.equals(id, contact.id) &&
      Objects.equals(name, contact.name) &&
      Objects.equals(email, contact.email) &&
      Objects.equals(phone, contact.phone);
  }

  @Override
  public int hashCode() {
    return Objects.hash(id, name);
  }

  @Override
  public String toString() {
    return new StringJoiner(", ", Contact.class.getSimpleName() + "[", "]")
      .add("id='" + id + "'")
      .add("name='" + name + "'")
      .add("email='" + email + "'")
      .add("phone=" + phone)
      .toString();
  }
}
package c.jbd.webflux.data.mongo;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

public interface ContactRepository extends ReactiveMongoRepository<Contact, String> {
  Mono<Contact> findFirstByEmail(String email);

  Mono<Contact> findAllByPhoneOrName(String phoneOrName);
}
package c.jbd.webflux;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

//in your case Webflux02MongoRestApiApplication.java
@SpringBootApplication
public class SpringBootMongoRest {
  public static void main(String[] args) {
    SpringApplication.run(SpringBootMongoRest.class, args);
  }
}

Step-2: Create the ContactController

The ContactController has the methods for doing CRUD operations and they are annotated with the standard spring annotations. Important here is to learn the way ContactRepository is used.

package c.jbd.webflux.controller;

import c.jbd.webflux.data.mongo.Contact;
import c.jbd.webflux.data.mongo.ContactRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/controller")
public class ContactController {

  private final ContactRepository contactRepository;

  @Autowired
  public ContactController(ContactRepository contactRepository) {
    this.contactRepository = contactRepository;
  }

  @GetMapping("/contacts")
  public Flux<Contact> getAllContacts() {
    return contactRepository.findAll();
  }

  @GetMapping(value = "/contacts/{id}")
  public Mono<ResponseEntity<Contact>> getContact(@PathVariable String id) {
    return contactRepository.findById(id)
      .map(contact -> new ResponseEntity<>(contact, HttpStatus.OK))
      .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  }

  @GetMapping(value = "/contacts/byEmail/{email}")
  public Mono<ResponseEntity<Contact>> getByEmail(@PathVariable String email) {
    return contactRepository.findFirstByEmail(email)
      .map(contact -> new ResponseEntity<>(contact, HttpStatus.OK))
      .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  }

  @PostMapping("/contacts")
  public Mono<ResponseEntity<Contact>> insertContact(@RequestBody Contact contact){
    return contactRepository.insert(contact)
      .map(contact1 -> new ResponseEntity<>(contact1, HttpStatus.ACCEPTED))
      .defaultIfEmpty(new ResponseEntity<>(contact, HttpStatus.NOT_ACCEPTABLE));
  }

  @PutMapping("/contacts/{id}")
  public Mono<ResponseEntity<Contact>> updateContact(@RequestBody Contact contact, @PathVariable String id) {
    return contactRepository.findById(id)
      .flatMap(contact1 -> {
        contact.setId(id);
        return contactRepository.save(contact)
          .map(contact2 -> new ResponseEntity<>(contact2, HttpStatus.ACCEPTED));
      }).defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  }

  @DeleteMapping(value = "/contacts/{id}")
  public Mono<Void> deleteContact(@PathVariable String id) {
    return contactRepository.deleteById(id);
  }
}

Step-3: Run the App and test the endpoints

Run the web-app as Spring-boot app by running the below command from the root directory of your project in the terminal.

mvn spring-boot:run

Now follow the CURL commands to test the endpoints. You can use POSTMAN also to test the endpoints.

curl --location --request POST 'http://localhost:8083/controller/contacts' \
--header 'Content-Type: application/json' \
--data-raw '{
  "email": "[email protected]",
  "name": "Test from Postman",
  "phone": "110000000000"
}'


//Response
{"id":"5f1488e890baf8178f4b9284","name":"Test from Postman","email":"[email protected]","phone":"110000000000"}
curl --location --request GET 'http://localhost:8083/controller/contacts/5f14887390baf8178f4b9283' \
--header 'Accept: application/json'

//Response
{"id":"5f1488e890baf8178f4b9284","name":"Test from Postman","email":"[email protected]","phone":"110000000000"}
curl --location --request PUT 'http://localhost:8083/controller/contacts/5f14887390baf8178f4b9283' \
--header 'Content-Type: application/json' \
--data-raw '{"id":"5f14887390baf8178f4b9283","name":"Test from Postman updated","email":"[email protected]","phone":"110000000000"}'


//Response
{"id":"5f14887390baf8178f4b9283","name":"Test from Postman updated","email":"[email protected]","phone":"110000000000"}
curl --location --request DELETE 'http://localhost:8083/controller/contacts/5f14887390baf8178f4b9283'
curl --location --request GET 'http://localhost:8083/controller/contacts/' \
--header 'Accept: application/json'

//Create multiple Contacts to list them here
[
    {
        "id": "5f10619b385b012854646c59",
        "name": "John",
        "email": "[email protected]",
        "phone": "1121121121"
    },
    {
        "id": "5f10619c385b012854646c5a",
        "name": "Jonny",
        "email": "[email protected]",
        "phone": "22222222222"
    }
]

Similarly, you can also test the find by email /contacts/byEmail/{email} endpoint.

Alternatively, you can use the Postman, this is the recommended approach to test your endpoints. I have uploaded a postman collection on GitHub that you can import into your local postman and run the test cases in the same order.

3. Functional Webflux rest endpoints

In the previous article, I have already introduced you to the concept of Functional model endpoints. Important is to understand the Router functions and Handler functions. I will not go in details now, below is the code that represents router functions and Handler function to provide functional WebFlux endpoints.

package c.jbd.webflux.functional;

import c.jbd.webflux.data.mongo.Contact;
import c.jbd.webflux.data.mongo.ContactRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import static org.springframework.web.reactive.function.BodyInserters.*;

@Component
public class ContactRestHandler {
  private final ContactRepository contactRepository;

  private Mono<ServerResponse> response404
    = ServerResponse.notFound().build();

  private Mono<ServerResponse> response406
    = ServerResponse.status(HttpStatus.NOT_ACCEPTABLE).build();

  @Autowired //optional
  public ContactRestHandler(ContactRepository contactRepository) {
    this.contactRepository = contactRepository;
  }

  //GET - find a contact by id
  public Mono<ServerResponse> getById(ServerRequest request) {
    String id = request.pathVariable("id");

    return contactRepository.findById(id)
      .flatMap(contact ->
        ServerResponse.ok()
          .contentType(MediaType.APPLICATION_JSON)
          .body(fromValue(contact))
      ).switchIfEmpty(response404);
  }

  //List all contacts
  public Mono<ServerResponse> getAllContacts(ServerRequest request) {
    return ServerResponse.ok()
      .contentType(MediaType.APPLICATION_JSON)
      .body(contactRepository.findAll(), Contact.class);
  }

  //Find a Contact by email address.
  public Mono<ServerResponse> getByEmail(ServerRequest request) {
    String email = request.pathVariable("email");

    return contactRepository.findFirstByEmail(email)
      .flatMap(contact ->
        ServerResponse.ok()
          .contentType(MediaType.APPLICATION_JSON)
          .body(fromValue(contact))
      ).switchIfEmpty(response404);
  }

  //Save a new Contact
  public Mono<ServerResponse> insertContact(ServerRequest request) {
    Mono<Contact> unsavedContact = request.bodyToMono(Contact.class);

    return unsavedContact
      .flatMap(contact -> contactRepository.save(contact)
        .flatMap(savedContact -> ServerResponse.accepted()
          .contentType(MediaType.APPLICATION_JSON)
          .body(fromValue(savedContact)))
      ).switchIfEmpty(response406);
  }

  //Update an existing contact
  public Mono<ServerResponse> updateContact(ServerRequest request) {
    Mono<Contact> contact$ = request.bodyToMono(Contact.class);
    String id = request.pathVariable("id");

    //TODO - additional id match
    Mono<Contact> updatedContact$ = contact$.flatMap(contact ->
      contactRepository.findById(id)
        .flatMap(oldContact -> {
          oldContact
            .setPhone(contact.getPhone())
            .setName(contact.getName())
            .setEmail(contact.getEmail());
          return contactRepository.save(oldContact);
        })
    );

    return updatedContact$.flatMap(contact ->
      ServerResponse.accepted()
        .contentType(MediaType.APPLICATION_JSON)
        .body(fromValue(contact))
    ).switchIfEmpty(response404);
  }

  //Delete a Contact
  public Mono<ServerResponse> deleteContact(ServerRequest request) {
    String id = request.pathVariable("id");
    Mono<Void> deleted = contactRepository.deleteById(id);

    return ServerResponse.ok()
      .contentType(MediaType.APPLICATION_JSON)
      .body(deleted, Void.class);
  }
}
package c.jbd.webflux.functional;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;

@Configuration
public class ContactRestRouter {

  @Bean
  public RouterFunction<ServerResponse> routeContact(ContactRestHandler contactRestHandler) {
    return RouterFunctions
      .route(GET("/functional/contacts/")
        , contactRestHandler::getAllContacts)
      .andRoute(GET("/functional/contacts/{id}")
        , contactRestHandler::getById)
      .andRoute(GET("/functional/contacts/byEmail/{email}")
        , contactRestHandler::getByEmail)
      .andRoute(POST("/functional/contacts")
        , contactRestHandler::insertContact)
      .andRoute(PUT("functional/contacts/{id}")
        , contactRestHandler::updateContact)
      .andRoute(DELETE("/functional/contacts/{id}")
        , contactRestHandler::deleteContact);
  }
}

Test the endpoints – You can use the same approach to test the endpoints as you did for controller-based endpoints.

4. Unit test case of the reactive endpoints

Since we are talking about reactive programming here, we will do our unit testing in a Reactive way. I have demonstrated to you earlier about WebClient and WebTestClient in the previous article. We will use the WebTestClient here for testing these endpoints.

The below test classes are for ContactController and ContactRepository. By just changing the urls in ContactControllerTest, you can make it work with the Functional Contact rest endpoints. Nothing special is needed to test the functional endpoints.

package c.jbd.webflux.controller;

import c.jbd.webflux.data.mongo.Contact;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.BodyInserters;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

@SpringBootTest
@ActiveProfiles("unit_test")
@AutoConfigureWebTestClient
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ContactControllerTest {

  @Autowired
  private WebTestClient webTestClient;

  private Contact savedContact;

  @Test
  @Order(0)
  public void createContact() {
    Flux<Contact> contactFlux = webTestClient.post()
      .uri("/controller/contacts")
      .accept(MediaType.APPLICATION_JSON)
      .contentType(MediaType.APPLICATION_JSON)
      .body(BodyInserters.fromValue(
        new Contact("webtest", "[email protected]", "0000000000")))
      .exchange()
      .expectStatus().isAccepted()
      .returnResult(Contact.class).getResponseBody()
      .log();

    contactFlux.next().subscribe(contact -> {
      this.savedContact = contact;
    });

    Assertions.assertNotNull(savedContact);
  }

  @Test
  @Order(1)
  public void getByEmail() {
    Flux<Contact> contactFlux = webTestClient.get()
      .uri("/controller/contacts/byEmail/{email}", "[email protected]")
      .accept(MediaType.APPLICATION_JSON)
      .exchange()
      .expectStatus().isOk()
      .returnResult(Contact.class).getResponseBody()
      .log();

    StepVerifier.create(contactFlux)
      .expectSubscription()
      .expectNextMatches(contact -> contact.getEmail().equals("[email protected]"))
      .verifyComplete();
  }

  @Test
  @Order(2)
  public void updateContact() {

    Flux<Contact> contactFlux = webTestClient.put()
      .uri("/controller/contacts/{id}", savedContact.getId())
      .accept(MediaType.APPLICATION_JSON)
      .body(BodyInserters.fromValue(new Contact("WebTestClient", "[email protected]", "11111111").setId(savedContact.getId())))
      .exchange()
      .returnResult(Contact.class).getResponseBody()
      .log();

    StepVerifier.create(contactFlux)
      .expectSubscription()
      .expectNextMatches(contact -> contact.getEmail().equals("[email protected]"))
      .verifyComplete();
  }

  @Test
  @Order(2)
  public void getAllContacts() {
    Flux<Contact> contactsFlux = webTestClient.get()
      .uri("/controller/contacts")
      .accept(MediaType.APPLICATION_JSON)
      .exchange()
      .returnResult(Contact.class).getResponseBody()
      .log();

    StepVerifier.create(contactsFlux)
      .expectSubscription()
      .expectNextCount(1)
      .verifyComplete();
  }

  @Test
  @Order(3)
  public void deleteContact() {
    Flux<Void> vFlux = webTestClient.delete()
      .uri("/controller/contacts/{id}", savedContact.getId())
      .accept(MediaType.APPLICATION_JSON)
      .exchange()
      .returnResult(Void.class).getResponseBody();

    StepVerifier.create(vFlux)
      .expectSubscription()
      .verifyComplete();
  }
}
package c.jbd.webflux.data.mongo;

import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

//@DataMongoTest
//@SpringJUnitConfig
@SpringBootTest // - not needed as it become heavy
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ContactRepositoryTest {
  @Autowired
  private ContactRepository contactRepository;

  @Autowired
  private ReactiveMongoOperations mongoOperations;

  @BeforeAll
  public void insertData() {
    Contact contact1 = new Contact()
      .setName("John")
      .setEmail("[email protected]")
      .setPhone("1121121121");

    Contact contact2 = new Contact()
      .setName("Jonny")
      .setEmail("[email protected]")
      .setPhone("22222222222");

    Contact contact3 = new Contact()
      .setName("Hello")
      .setEmail("[email protected]")
      .setPhone("3333333333");

    //Save and verify contact1
    StepVerifier.create(contactRepository.insert(contact1).log())
      .expectSubscription()
      .expectNextCount(1)
      .verifyComplete();

    //save and verify contact2
    StepVerifier.create(contactRepository.save(contact2).log())
      .expectSubscription()
      .expectNextCount(1)
      .verifyComplete();

    //Save and verify id
    StepVerifier.create(contactRepository.save(contact3).log())
      .expectSubscription()
      .expectNextMatches(contact -> (contact.getId() != null))
      .verifyComplete();
  }

  @Test
  @Order(1)
  public void findAll() {
    StepVerifier.create(contactRepository.findAll().log())
      .expectSubscription()
      .expectNextCount(3)
      .verifyComplete();
  }

  @Test
  @Order(2)
  public void findByEmail(){
    StepVerifier.create(contactRepository.findFirstByEmail("[email protected]").log())
      .expectSubscription()
      .expectNextMatches(contact -> contact.getEmail().equals("[email protected]"))
      //.expectNextCount(1)
      .verifyComplete();
  }

  @Test
  @Order(3)
  public void updateContact() {
    Mono<Contact> updatedContact$ = contactRepository.findFirstByEmail("[email protected]")
      .map(contact -> {
        //Update the new values
        contact.setPhone("1111111111");
        return contact;
      }).flatMap(contact -> {
      return contactRepository.save(contact);
    });

    StepVerifier.create(updatedContact$.log())
      .expectSubscription()
      .expectNextMatches(contact -> (contact.getPhone().equals("1111111111")))
      .verifyComplete();
  }

  @Test
  @Order(4)
  public void deleteContactById(){
    Mono<Void> deletedContact$ = contactRepository.findFirstByEmail("[email protected]")
      //.map(Contact::getId)
      .flatMap(contact -> {
        return contactRepository.deleteById(contact.getId());
      }).log();

    StepVerifier.create(deletedContact$)
      .expectSubscription()
      .verifyComplete();
  }

  @Test
  public void deleteContact(){
    Mono<Void> deletedContact$ = contactRepository.findFirstByEmail("[email protected]")
      .flatMap(contact -> contactRepository.delete(contact));

    StepVerifier.create(deletedContact$)
      .expectSubscription()
      .verifyComplete();
  }

  @AfterAll
  public void clearData(){
    Mono<Void> deletedItems$ = contactRepository.deleteAll();
    StepVerifier.create(deletedItems$.log())
      .expectSubscription()
      .verifyComplete();
  }
}

Conclusion

I have shown you to create non-blocking reactive endpoints using MongoDB and Spring Webflux. Simple APIs are developed here in this tutorial, in real life you will need to handle Exceptions, validate the payloads, and so on. Download the complete example from GitHub.