Spring Web Reactive

Spring Web Reactive

5 Comments

An exciting feature in Spring Framework 5 is the new Web Reactive framework for allows reactive web applications. Reactive programming is about developing systems that are fully reactive and non-blocking. Such systems are suitable for event-loop style processing that can scale with a small number of threads.

Spring Framework 5 embraces Reactive Streams to enable developing systems based on the Reactive Manifesto published in 2014.

The Spring Web Reactive framework stands separately from Spring MVC. This is because Spring MVC is developed around the Java Servlet API, which uses blocking code inside of Java. While popular Java application servers such as Tomcat and Jetty, have evolved to offer non-blocking operations, the Java Servlet API has not.

From a programming perspective, reactive programming involves a major shift from imperative style logic to a declarative composition of asynchronous logic.

In this post, I’ll explain how to develop a Web Reactive application with the Spring Framework 5.0.

Spring Web Reactive Types

Under the covers, Spring Web Reactive is using Reactor, which is a Reactive Streams Implementation. The Spring Framework extends the Reactive Streams Publisher interface with the Flux and Mono  reactive types.

The Flux  data type represents zero to many objects. (0..N)

While the Mono  data type is zero to one.  (0..1)

If you’d like a deeper dive on reactive types, check on Understanding Reactive Types by Sebastien Deleuze.

The Web Reactive Application

The application that we will create is a web reactive application that performs operations on domain objects. To keep it simple, we will use an in-memory repository implementation to simulate CRUD operations in this post. In later posts, we will go reactive with Spring Data.

Spring 5 added the new spring-webflux module for reactive programming that we will use in our application. The application is composed of these components:

  • Domain object: Product in our application.
  • Repository: A repository interface with an implementation class to mimic CRUD operations in a Map.
  • Handler: A handler class to interact with the repository layer.
  • Server: A non-blocking Web server with the single-threaded event loop. For this application, we will look at how to use both Netty and Tomcat to serve requests.

The Maven POM

For web reactive programming, you need the new spring-webflux and reactive-stream modules as dependencies in your Maven POM.

<dependency>
     <groupId>org.reactivestreams</groupId>
     <artifactId>reactive-streams</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework</groupId>
     <artifactId>spring-webflux</artifactId>
     <version>5.0.0.RC2</version>
</dependency>

To host the application in a supported runtime, you need to add its dependency. The supported runtimes are:

  • Tomcat: org.apache.tomcat.embed:tomcat-embed-core
  • Jetty: org.eclipse.jetty:jetty-server and org.eclipse.jetty:jetty-servlet
  • Reactor Netty: io.projectreactor.ipc:reactor-netty
  • Undertow: io.undertow:undertow-core

The code to add dependencies for both embedded Tomcat and Netty is this.

<dependency>
     <groupId>io.projectreactor.ipc</groupId>
     <artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
     <groupId>org.apache.tomcat.embed</groupId>
     <artifactId>tomcat-embed-core</artifactId>
     <version>8.5.4</version>
</dependency>

The final dependency is for reactive serialization and deserialization to and from JSON with Jackson.

Note – This is a pre-release of Jackson, will include non-blocking serialization and deserialization. (Version 2.9.0 was not released at the time of writing)

<dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-databind</artifactId>
     <version>2.9.0.pr4</version>
</dependency>

As we are using the latest milestone release of Spring Boot, remember to add the Spring milestones repository:

<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

Here is the complete Maven POM.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>guru.springframework</groupId>
    <artifactId>spring5-reactive-web</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/libs-milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-M1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.reactivestreams</groupId>
            <artifactId>reactive-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-core</artifactId>
            <version>8.5.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webflux</artifactId>
            <version>5.0.0.RC2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.0.pr4</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

The Domain Object

Our application has a Product domain object on which operations will be performed. The code for the Product object is this.

Product.java

package guru.springframework.domain;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.math.BigDecimal;

public class Product {
    private int productId;
    private String productName;
    private String description;
    private String imageUrl;
    private BigDecimal price;

    public Product(@JsonProperty("id") int productId, @JsonProperty("name") String productName, @JsonProperty("description") String description, @JsonProperty("image") String imageUrl, @JsonProperty("price") BigDecimal price) {
        this.productId = productId;
        this.productName = productName;
        this.description = description;
        this.imageUrl = imageUrl;
        this.price = price;
    }

    public int getProductId() {
        return productId;
    }

    public void setProductId(int productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getImageUrl() {
        return imageUrl;
    }

    public void setImageUrl(String imageUrl) {
        this.imageUrl = imageUrl;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Product{" +
                "productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", description='" + description + '\'' +
                ", imageUrl='" + imageUrl + '\'' +
                ", price=" + price +
                '}';
    }
}

 

Product is a POJO with fields representing product information. Each field has its corresponding getter and setter methods. @JsonProperty is a Jackson annotation to map external JSON properties to the Product fields.

The Repository

The repository layer of the application is built on the ProductRepository interface with methods to save a product, retrieve a product by ID, and retrieve all products.

In this example, we are mimicking the functionality of a reactive data store with a simple ConcurrentHashMap implementation.

ProductRepository.java

package guru.springframework.repositories;

import guru.springframework.domain.Product;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface ProductRepository {

    Mono<Product> getProduct(int id);

    Flux<Product> getAllProducts();

    Mono<Void> saveProduct(Mono<Product> product);

}

 

The important things in this interface are the new Mono and Flux reactive types of Project Reactor. Both these reactive types along with the other types of the Reactive API are capable
to serve a huge amount of requests concurrently and to handle operations with latency. These types make operations, such as requesting data from a remote server, more efficient. Unlike traditional processing that blocks the current thread while waiting for a result, Reactive APIs are non-blocking as they deal with streams of data.

To understand Mono and Flux, let’s look at the two main interfaces of the Reactive API: Publisher, which is the source of events T in the stream and Subscriber, which is the destination for those events.

Both Mono and Fluximplements Publisher. The difference lies in cardinality, which is critical in reactive streams.

The difference lies in cardinality, which is critical in reactive streams.

  • Flux observes 0 to N items and completes either successfully or with an error.
  • A Mono observes 0 or 1 item, with Mono hinting at most 0 item.

Note: Reactive APIs were initially designed to deal with N elements or streams of data. So Reactor initially came only with Flux. But, while working on Spring Framework 5, the team found a need to distinguish between streams of 1 or N elements, so the Mono reactive type was introduced.

Here is the repository implementation class.

ProductRepositoryInMemoryImpl.java

package guru.springframework.repositories;

import guru.springframework.domain.Product;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ProductRepositoryInMemoryImpl implements ProductRepository {

    private final Map<Integer, Product> productMap = new ConcurrentHashMap<>();

    public ProductRepositoryInMemoryImpl() {
        this.productMap.put(1, new Product(313,
                "Spring Guru Shirt",
                "Spring Framework Guru White collared Shirt",
                "https://springframework.guru/wp-content/uploads/2015/04/spring_framework_guru_shirt-rf412049699c14ba5b68bb1c09182bfa2_8nax2_512.jpg",
                new BigDecimal("18.95")));

        this.productMap.put(2, new Product(512,
                "Spring Guru Mug",
                "Spring Framework Guru Green Cofee Mug",
                "https://springframework.guru/wp-content/uploads/2015/04/spring_framework_guru_coffee_mug-r11e7694903c348e1a667dfd2f1474d95_x7j54_8byvr_512.jpg",
                new BigDecimal("11.95")));
    }

    @Override
    public Mono<Product> getProduct(int id) {
        return Mono.justOrEmpty(this.productMap.get(id));
    }

    @Override
    public Flux<Product> getAllProducts() {
        return Flux.fromIterable(this.productMap.values());
    }

    @Override
    public Mono<Void> saveProduct(Mono<Product> productMono) {

        Mono<Product> pMono = productMono.doOnNext(product -> {
            int id = productMap.size() + 1;
            productMap.put(id, product);
            System.out.format("Saved %s with id %d%n", product, id);
        });
        return pMono.thenEmpty(Mono.empty());
    }
}

This ProductRepositoryInMemoryImpl class uses a Map implementation to store Product objects.

In the overridden getProduct() method, the call to Mono.justOrEmpty() creates a new Mono that emits the specified item – Product object in this case, provided the Product object is not null. For a null value, the Mono.justOrEmpty() method completes by emitting onComplete.

In the overridden getAllProducts() method, the call to Flux.fromIterable() creates a new Flux that emits the items ( Product objects) present in the Iterable passed as parameter.

In the overridden saveProduct() method, the call to doOnNext() accepts a callback that stores the provided Product into the Map. What we have here is an example of classic non-blocking programming. Execution control does not block and wait for the product storing operation.

The Product Handler

The Product handler is similar to a typical service layer in Spring MVC. It interacts with the repository layer. Following the SOLID Principles, we would want the client code to interact with this layer through an interface. So, we start with a ProductHandler interface.

The code of the ProductHandler interface is this.

ProductHandler.java

package guru.springframework.handlers;

import guru.springframework.domain.Product;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;

public interface ProductHandler {
    public Mono<ServerResponse> getProductFromRepository(ServerRequest request);

    public Mono<ServerResponse> saveProductToRepository(ServerRequest request);

    public Mono<ServerResponse> getAllProductsFromRepository(ServerRequest request);
}

The implementation class, ProductHandlerImpl is this.

ProductHandlerImpl.java

package guru.springframework.handlers;

import guru.springframework.repositories.ProductRepository;
import guru.springframework.domain.Product;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;

public class ProductHandlerImpl implements ProductHandler {

    private final ProductRepository repository;

    public ProductHandlerImpl(ProductRepository repository) {
        this.repository = repository;
    }

    @Override
    public Mono<ServerResponse> getProductFromRepository(ServerRequest request) {
        int personId = Integer.valueOf(request.pathVariable("id"));
        Mono<ServerResponse> notFound = ServerResponse.notFound().build();
        Mono<Product> personMono = this.repository.getProduct(personId);
        return personMono
                .flatMap(person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)))
                .switchIfEmpty(notFound);
    }

    @Override
    public Mono<ServerResponse> saveProductToRepository(ServerRequest request) {
        Mono<Product> product = request.bodyToMono(Product.class);
        return ServerResponse.ok().build(this.repository.saveProduct(product));
    }

    @Override
    public Mono<ServerResponse> getAllProductsFromRepository(ServerRequest request) {
        Flux<Product> products = this.repository.getAllProducts();
        return ServerResponse.ok().contentType(APPLICATION_JSON).body(products, Product.class);
    }

}

 

In the getProductFromRepository(ServerRequest request) method of the ProductHandlerImpl class:

  • Line 22 obtains the product ID sent as a request parameter
  • Line 23 builds an HTTP response as ServerResponse for the NOT_FOUND HTTP status.
  • Line 24 calls the repository to obtain the Product as a Mono.
  • Line 25 – Line 27: Returns a Mono that can represent either the Product or the NOT_FOUND HTTP status if the product is not found.
  • Line 31 in the saveProductToRepository(ServerRequest request) method converts the request body to a Mono. Then Line 33 calls the saveProduct() method of the repository to save the product, and finally return a success status code as an HTTP response.
  • In the getAllProductsFromRepository() method, Line 37 calls the getAllProducts() method of the repository that returns a Flux< ServerResponse>. Then Line 38 returns back the Flux as a JSON that contains all the products.

Running the Application

The example of web reactive application has two components. One is the Reactive Web Server. The second is our client.

The Reactive Web Server

Now it is time to wire up all the components together for a web reactive application.

We will use embedded Tomcat as the server for the application, but will also look at how to do the same with the lightweight Reactive Netty.

These we will implement in a Server class.

Server.java

package guru.springframework.server;

import guru.springframework.handlers.ProductHandler;
import guru.springframework.handlers.ProductHandlerImpl;
import guru.springframework.repositories.ProductRepository;
import guru.springframework.repositories.ProductRepositoryInMemoryImpl;
import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.startup.Tomcat;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ServletHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;

public class Server {
    public static void main(String[] args) throws Exception {
        Server server = new Server();
        server.startTomcatServer("localhost", 8080);
        System.out.println("Press ENTER to exit.");
        System.in.read();
    }

    public RouterFunction<ServerResponse> routingFunction() {
        ProductRepository repository = new ProductRepositoryInMemoryImpl();
        ProductHandler handler = new ProductHandlerImpl(repository);

        return nest(path("/product"),
                nest(accept(APPLICATION_JSON),
                        route(GET("/{id}"), handler::getProductFromRepository)
                                .andRoute(method(HttpMethod.GET), handler::getAllProductsFromRepository)
                ).andRoute(POST("/")
                        .and(contentType(APPLICATION_JSON)), handler::saveProductToRepository));
    }

    public void startTomcatServer(String host, int port) throws LifecycleException {
        RouterFunction<?> route = routingFunction();
        HttpHandler httpHandler = toHttpHandler(route);
        Tomcat tomcatServer = new Tomcat();
        tomcatServer.setHostname(host);
        tomcatServer.setPort(port);
        Context rootContext = tomcatServer.addContext("", System.getProperty("java.io.tmpdir"));
        ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(httpHandler);
        Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet);
        rootContext.addServletMapping("/", "httpHandlerServlet");
        tomcatServer.start();
    }
}

 

In this Server class:

  • Line 37 – Line 38 creates a ProductHandler initialized with ProductRepository.
  • Line 39 – Line 43 constructs and returns a RouterFunction. In Spring Reactive Web, you can relate a RouterFunction with the @RequestMapping annotation. A RouterFunction is used for routing incoming requests to handler functions. In the Server class, incoming GET requests to “/{id}” and “/” are routed to the getProductFromRepository and getAllProductsFromRepository handler functions respectively. Incoming POST requests to “/” are routed to the saveProductToRepository handler function.
  • Line 53 – Line 54  in the startTomcatServer() method, integrates the RouterFunction into Tomcat as a generic HttpHandler.
  • Line 55- Line 61 initializes Tomcat with a hostname, port number, context path, and a servlet mapping.
  • Line 62 finally starts Tomcat by calling the start() method.

The output on executing the Server class is this.
Output of Tomcat
To use Netty instead of Tomcat, use this code:

public void startReactorServer(String host, int port) throws InterruptedException {
   RouterFunction route = routingFunction();
   HttpHandler httpHandler = toHttpHandler(route);
   ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
   HttpServer server = HttpServer.create(host, port);
   server.newHandler(adapter).block();
}

The Client

Spring Framework 5 adds a new reactive WebClient in addition to the existing RestTemplate. The new WebClient deserves a post on its own.

To keep this post simple and limited to only accessing our reactive Web application, I will use ExchangeFunction – a simple alternative to WebClient. ExchangeFunction represents a function that exchanges a client request for a (delayed) client response.

The code of the client class, named ReactiveClient is this.

ReactiveWebClient.java

package guru.springframework.client;

import guru.springframework.domain.Product;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFunctions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.math.BigDecimal;
import java.net.URI;
import java.util.List;

public class ReactiveWebClient {
    public static final String HOST = "localhost";
    public static final int PORT = 8080;
    private ExchangeFunction exchange = ExchangeFunctions.create(new ReactorClientHttpConnector());

    public static void main(String[] args) throws Exception {
        ReactiveWebClient client = new ReactiveWebClient();
        client.createProduct();
        client.getAllProduct();
    }

    public void createProduct() {
        URI uri = URI.create(String.format("http://%s:%d/product", HOST, PORT));
        Product shirt = new Product(319, "Spring Guru Jeans", "Spring Framework Guru Denim Jean", "https://springframework.guru/wp-content/uploads/2015/04/spring_framework_guru_shirt-rf412049699c14ba5b68bb1c09182bfa2_8nax2_512.jpg", new BigDecimal("35.95"));
        ClientRequest request = ClientRequest.method(HttpMethod.POST, uri)
                .body(BodyInserters.fromObject(shirt)).build();
        Mono<ClientResponse> response = exchange.exchange(request);
        System.out.println(response.block().statusCode());
    }

    public void getAllProduct() {
        URI uri = URI.create(String.format("http://%s:%d/product", HOST, PORT));
        ClientRequest request = ClientRequest.method(HttpMethod.GET, uri).build();
        Flux<Product> productList = exchange.exchange(request)
                .flatMapMany(response -> response.bodyToFlux(Product.class));
        Mono<List<Product>> productListMono = productList.collectList();
        System.out.println(productListMono.block());
    }
}

 

In the ReactiveClient class, Line 21 calls the ExchangeFunctions.create() method passing a ReactorClientHttpConnector, which is an abstraction over HTTP clients to connect the client to the server. The create() method returns an ExchangeFunction.

In the createProduct() method of the ReactiveClient class, Line 30 – Line 31 builds a ClientRequest that posts a Product object to a URL represented by the URI object. Then Line 32 calls the exchange(request) method to exchange the given request for a response Mono.

In the getAllProducts() method, Line 37 starts an exchange to send a GET request to get all products.

The response body is converted into a Flux and printed to the console.

With Tomcat running, the output on running the ReactiveClient class is:
Output of Reactive Web CLient

Conclusion

In this post, I showed you a very simple example of the new web reactive features inside of Spring Framework 5.

While the reactive programming features inside of Spring Framework 5 are certainly fun to use. What, I’m finding that is, even more, fun is the functional programming style of the new Spring Framework 5 APIs.

Consider the configuration of the web reactive server:

    public RouterFunction<ServerResponse> routingFunction() {
        ProductRepository repository = new ProductRepositoryInMemoryImpl();
        ProductHandler handler = new ProductHandlerImpl(repository);

        return nest(path("/product"),
                nest(accept(APPLICATION_JSON),
                        route(GET("/{id}"), handler::getProductFromRepository)
                                .andRoute(method(HttpMethod.GET), handler::getAllProductsFromRepository)
                ).andRoute(POST("/")
                        .and(contentType(APPLICATION_JSON)), handler::saveProductToRepository));
    }

This functional style is a significant change from what we’ve become accustomed to in Spring MVC.

Don’t worry, Spring MVC is still alive and well. And even when using the Reactive features in Spring Framework 5, you can still define ‘controllers’ in the traditional declarative sense.

And maybe traditional monolithic applications will continue to declare controllers using traditional approaches?

Where I expect the functional style to really shine is in the realm of microservices. This new functional style makes it crazy easy to define small, targeted services.

I’m looking forward to seeing how the Spring community adopts the functional API and seeing how it evolves.

About jt

    You May Also Like