Spring Web Reactive
5 CommentsLast Updated on June 2, 2019 by Simanta
An exciting feature in Spring Framework 5 is the new Web Reactive framework for allows reactive web applications. Reactive programming is about developing systems that are fully reactive and non-blocking. Such systems are suitable for event-loop style processing that can scale with a small number of threads.
Spring Framework 5 embraces Reactive Streams to enable developing systems based on the Reactive Manifesto published in 2014.
The Spring Web Reactive framework stands separately from Spring MVC. This is because Spring MVC is developed around the Java Servlet API, which uses blocking code inside of Java. While popular Java application servers such as Tomcat and Jetty, have evolved to offer non-blocking operations, the Java Servlet API has not.
From a programming perspective, reactive programming involves a major shift from imperative style logic to a declarative composition of asynchronous logic.
In this post, I’ll explain how to develop a Web Reactive application with the Spring Framework 5.0.
Spring Web Reactive Types
Under the covers, Spring Web Reactive is using Reactor, which is a Reactive Streams Implementation. The Spring Framework extends the Reactive Streams Publisher interface with the Flux
and Mono
reactive types.
The Flux
data type represents zero to many objects. (0..N)
While the Mono
data type is zero to one. (0..1)
If you’d like a deeper dive on reactive types, check on Understanding Reactive Types by Sebastien Deleuze.
The Web Reactive Application
The application that we will create is a web reactive application that performs operations on domain objects. To keep it simple, we will use an in-memory repository implementation to simulate CRUD operations in this post. In later posts, we will go reactive with Spring Data.
Spring 5 added the new spring-webflux
module for reactive programming that we will use in our application. The application is composed of these components:
- Domain object: Product in our application.
- Repository: A repository interface with an implementation class to mimic CRUD operations in a
Map
. - Handler: A handler class to interact with the repository layer.
- Server: A non-blocking Web server with the single-threaded event loop. For this application, we will look at how to use both Netty and Tomcat to serve requests.
The Maven POM
For web reactive programming, you need the new spring-webflux and reactive-stream
modules as dependencies in your Maven POM.
<dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webflux</artifactId> <version>5.0.0.RC2</version> </dependency>
To host the application in a supported runtime, you need to add its dependency. The supported runtimes are:
- Tomcat:
org.apache.tomcat.embed:tomcat-embed-core
- Jetty:
org.eclipse.jetty:jetty-server and org.eclipse.jetty:jetty-servlet
- Reactor Netty:
io.projectreactor.ipc:reactor-netty
- Undertow:
io.undertow:undertow-core
The code to add dependencies for both embedded Tomcat and Netty is this.
<dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-core</artifactId> <version>8.5.4</version> </dependency>
The final dependency is for reactive serialization and deserialization to and from JSON with Jackson.
Note – This is a pre-release of Jackson, will include non-blocking serialization and deserialization. (Version 2.9.0 was not released at the time of writing)
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.0.pr4</version> </dependency>
As we are using the latest milestone release of Spring Boot, remember to add the Spring milestones repository:
<repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories>
Here is the complete Maven POM.
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>guru.springframework</groupId> <artifactId>spring5-reactive-web</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-M1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> </dependency> <dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-core</artifactId> <version>8.5.4</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webflux</artifactId> <version>5.0.0.RC2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.0.pr4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
The Domain Object
Our application has a Product
domain object on which operations will be performed. The code for the Product
object is this.
Product.java
package guru.springframework.domain; import com.fasterxml.jackson.annotation.JsonProperty; import java.math.BigDecimal; public class Product { private int productId; private String productName; private String description; private String imageUrl; private BigDecimal price; public Product(@JsonProperty("id") int productId, @JsonProperty("name") String productName, @JsonProperty("description") String description, @JsonProperty("image") String imageUrl, @JsonProperty("price") BigDecimal price) { this.productId = productId; this.productName = productName; this.description = description; this.imageUrl = imageUrl; this.price = price; } public int getProductId() { return productId; } public void setProductId(int productId) { this.productId = productId; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getImageUrl() { return imageUrl; } public void setImageUrl(String imageUrl) { this.imageUrl = imageUrl; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } @Override public String toString() { return "Product{" + "productId='" + productId + '\'' + ", productName='" + productName + '\'' + ", description='" + description + '\'' + ", imageUrl='" + imageUrl + '\'' + ", price=" + price + '}'; } }
Product
is a POJO with fields representing product information. Each field has its corresponding getter and setter methods. @JsonProperty
is a Jackson annotation to map external JSON properties to the Product
fields.
The Repository
The repository layer of the application is built on the ProductRepository
interface with methods to save a product, retrieve a product by ID, and retrieve all products.
In this example, we are mimicking the functionality of a reactive data store with a simple ConcurrentHashMap implementation.
ProductRepository.java
package guru.springframework.repositories; import guru.springframework.domain.Product; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface ProductRepository { Mono<Product> getProduct(int id); Flux<Product> getAllProducts(); Mono<Void> saveProduct(Mono<Product> product); }
The important things in this interface are the new Mono
and Flux
reactive types of Project Reactor. Both these reactive types along with the other types of the Reactive API are capable
to serve a huge amount of requests concurrently and to handle operations with latency. These types make operations, such as requesting data from a remote server, more efficient. Unlike traditional processing that blocks the current thread while waiting for a result, Reactive APIs are non-blocking as they deal with streams of data.
To understand Mono
and Flux
, let’s look at the two main interfaces of the Reactive API: Publisher
, which is the source of events T
in the stream and Subscriber
, which is the destination for those events.
Both Mono
and Flux
implements Publisher
. The difference lies in cardinality, which is critical in reactive streams.
The difference lies in cardinality, which is critical in reactive streams.
- A
Flux
observes0
toN
items and completes either successfully or with an error. - A
Mono
observes0
or1
item, withMono
hinting at most0
item.
Note: Reactive APIs were initially designed to deal with N elements or streams of data. So Reactor initially came only with Flux
. But, while working on Spring Framework 5, the team found a need to distinguish between streams of 1 or N elements, so the Mono
reactive type was introduced.
Here is the repository implementation class.
ProductRepositoryInMemoryImpl.java
package guru.springframework.repositories; import guru.springframework.domain.Product; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.math.BigDecimal; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class ProductRepositoryInMemoryImpl implements ProductRepository { private final Map<Integer, Product> productMap = new ConcurrentHashMap<>(); public ProductRepositoryInMemoryImpl() { this.productMap.put(1, new Product(313, "Spring Guru Shirt", "Spring Framework Guru White collared Shirt", "http://springframework.guru/wp-content/uploads/2015/04/spring_framework_guru_shirt-rf412049699c14ba5b68bb1c09182bfa2_8nax2_512.jpg", new BigDecimal("18.95"))); this.productMap.put(2, new Product(512, "Spring Guru Mug", "Spring Framework Guru Green Cofee Mug", "http://springframework.guru/wp-content/uploads/2015/04/spring_framework_guru_coffee_mug-r11e7694903c348e1a667dfd2f1474d95_x7j54_8byvr_512.jpg", new BigDecimal("11.95"))); } @Override public Mono<Product> getProduct(int id) { return Mono.justOrEmpty(this.productMap.get(id)); } @Override public Flux<Product> getAllProducts() { return Flux.fromIterable(this.productMap.values()); } @Override public Mono<Void> saveProduct(Mono<Product> productMono) { Mono<Product> pMono = productMono.doOnNext(product -> { int id = productMap.size() + 1; productMap.put(id, product); System.out.format("Saved %s with id %d%n", product, id); }); return pMono.thenEmpty(Mono.empty()); } }
This ProductRepositoryInMemoryImpl
class uses a Map
implementation to store Product
objects.
In the overridden getProduct()
method, the call to Mono.justOrEmpty()
creates a new Mono
that emits the specified item – Product
object in this case, provided the Product
object is not null
. For a null
value, the Mono.justOrEmpty()
method completes by emitting onComplete
.
In the overridden getAllProducts()
method, the call to Flux.fromIterable()
creates a new Flux
that emits the items ( Product
objects) present in the Iterable
passed as parameter.
In the overridden saveProduct()
method, the call to doOnNext()
accepts a callback that stores the provided Product
into the Map
. What we have here is an example of classic non-blocking programming. Execution control does not block and wait for the product storing operation.
The Product Handler
The Product handler is similar to a typical service layer in Spring MVC. It interacts with the repository layer. Following the SOLID Principles, we would want the client code to interact with this layer through an interface. So, we start with a ProductHandler interface.
The code of the ProductHandler
interface is this.
ProductHandler.java
package guru.springframework.handlers; import guru.springframework.domain.Product; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.springframework.http.MediaType.APPLICATION_JSON; import static org.springframework.web.reactive.function.BodyInserters.fromObject; public interface ProductHandler { public Mono<ServerResponse> getProductFromRepository(ServerRequest request); public Mono<ServerResponse> saveProductToRepository(ServerRequest request); public Mono<ServerResponse> getAllProductsFromRepository(ServerRequest request); }
The implementation class, ProductHandlerImpl
is this.
ProductHandlerImpl.java
package guru.springframework.handlers; import guru.springframework.repositories.ProductRepository; import guru.springframework.domain.Product; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.http.MediaType.APPLICATION_JSON; import static org.springframework.web.reactive.function.BodyInserters.fromObject; public class ProductHandlerImpl implements ProductHandler { private final ProductRepository repository; public ProductHandlerImpl(ProductRepository repository) { this.repository = repository; } @Override public Mono<ServerResponse> getProductFromRepository(ServerRequest request) { int personId = Integer.valueOf(request.pathVariable("id")); Mono<ServerResponse> notFound = ServerResponse.notFound().build(); Mono<Product> personMono = this.repository.getProduct(personId); return personMono .flatMap(person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person))) .switchIfEmpty(notFound); } @Override public Mono<ServerResponse> saveProductToRepository(ServerRequest request) { Mono<Product> product = request.bodyToMono(Product.class); return ServerResponse.ok().build(this.repository.saveProduct(product)); } @Override public Mono<ServerResponse> getAllProductsFromRepository(ServerRequest request) { Flux<Product> products = this.repository.getAllProducts(); return ServerResponse.ok().contentType(APPLICATION_JSON).body(products, Product.class); } }
In the getProductFromRepository(ServerRequest request)
method of the ProductHandlerImpl
class:
- Line 22 obtains the product ID sent as a request parameter
- Line 23 builds an HTTP response as
ServerResponse
for theNOT_FOUND
HTTP status. - Line 24 calls the repository to obtain the
Product
as aMono
. - Line 25 – Line 27: Returns a
Mono
that can represent either theProduct
or theNOT_FOUND
HTTP status if the product is not found. - Line 31 in the
saveProductToRepository(ServerRequest request)
method converts the request body to aMono
. Then Line 33 calls thesaveProduct()
method of the repository to save the product, and finally return a success status code as an HTTP response. - In the
getAllProductsFromRepository()
method, Line 37 calls thegetAllProducts()
method of the repository that returns aFlux< ServerResponse>
. Then Line 38 returns back theFlux
as a JSON that contains all the products.
Running the Application
The example of web reactive application has two components. One is the Reactive Web Server. The second is our client.
The Reactive Web Server
Now it is time to wire up all the components together for a web reactive application.
We will use embedded Tomcat as the server for the application, but will also look at how to do the same with the lightweight Reactive Netty.
These we will implement in a Server
class.
Server.java
package guru.springframework.server; import guru.springframework.handlers.ProductHandler; import guru.springframework.handlers.ProductHandlerImpl; import guru.springframework.repositories.ProductRepository; import guru.springframework.repositories.ProductRepositoryInMemoryImpl; import org.apache.catalina.Context; import org.apache.catalina.LifecycleException; import org.apache.catalina.startup.Tomcat; import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ServletHttpHandlerAdapter; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.http.MediaType.APPLICATION_JSON; import static org.springframework.web.reactive.function.server.RequestPredicates.*; import static org.springframework.web.reactive.function.server.RouterFunctions.*; public class Server { public static void main(String[] args) throws Exception { Server server = new Server(); server.startTomcatServer("localhost", 8080); System.out.println("Press ENTER to exit."); System.in.read(); } public RouterFunction<ServerResponse> routingFunction() { ProductRepository repository = new ProductRepositoryInMemoryImpl(); ProductHandler handler = new ProductHandlerImpl(repository); return nest(path("/product"), nest(accept(APPLICATION_JSON), route(GET("/{id}"), handler::getProductFromRepository) .andRoute(method(HttpMethod.GET), handler::getAllProductsFromRepository) ).andRoute(POST("/") .and(contentType(APPLICATION_JSON)), handler::saveProductToRepository)); } public void startTomcatServer(String host, int port) throws LifecycleException { RouterFunction<?> route = routingFunction(); HttpHandler httpHandler = toHttpHandler(route); Tomcat tomcatServer = new Tomcat(); tomcatServer.setHostname(host); tomcatServer.setPort(port); Context rootContext = tomcatServer.addContext("", System.getProperty("java.io.tmpdir")); ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(httpHandler); Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet); rootContext.addServletMapping("/", "httpHandlerServlet"); tomcatServer.start(); } }
In this Server
class:
- Line 37 – Line 38 creates a
ProductHandler
initialized withProductRepository
. - Line 39 – Line 43 constructs and returns a
RouterFunction
. In Spring Reactive Web, you can relate aRouterFunction
with the@RequestMapping
annotation. ARouterFunction
is used for routing incoming requests to handler functions. In the Server class, incoming GET requests to“/{id}”
and“/”
are routed to thegetProductFromRepository
andgetAllProductsFromRepository
handler functions respectively. Incoming POST requests to“/”
are routed to thesaveProductToRepository
handler function. - Line 53 – Line 54 in the
startTomcatServer()
method, integrates theRouterFunction
into Tomcat as a genericHttpHandler
. - Line 55- Line 61 initializes Tomcat with a hostname, port number, context path, and a servlet mapping.
- Line 62 finally starts Tomcat by calling the
start()
method.
The output on executing the Server
class is this.
To use Netty instead of Tomcat, use this code:
public void startReactorServer(String host, int port) throws InterruptedException { RouterFunction route = routingFunction(); HttpHandler httpHandler = toHttpHandler(route); ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler); HttpServer server = HttpServer.create(host, port); server.newHandler(adapter).block(); }
The Client
Spring Framework 5 adds a new reactive WebClient
in addition to the existing RestTemplate
. The new WebClient
deserves a post on its own.
To keep this post simple and limited to only accessing our reactive Web application, I will use ExchangeFunction
– a simple alternative to WebClient
. ExchangeFunction
represents a function that exchanges a client request for a (delayed) client response.
The code of the client class, named ReactiveClient
is this.
ReactiveWebClient.java
package guru.springframework.client; import guru.springframework.domain.Product; import org.springframework.http.HttpMethod; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ExchangeFunction; import org.springframework.web.reactive.function.client.ExchangeFunctions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.math.BigDecimal; import java.net.URI; import java.util.List; public class ReactiveWebClient { public static final String HOST = "localhost"; public static final int PORT = 8080; private ExchangeFunction exchange = ExchangeFunctions.create(new ReactorClientHttpConnector()); public static void main(String[] args) throws Exception { ReactiveWebClient client = new ReactiveWebClient(); client.createProduct(); client.getAllProduct(); } public void createProduct() { URI uri = URI.create(String.format("http://%s:%d/product", HOST, PORT)); Product shirt = new Product(319, "Spring Guru Jeans", "Spring Framework Guru Denim Jean", "http://springframework.guru/wp-content/uploads/2015/04/spring_framework_guru_shirt-rf412049699c14ba5b68bb1c09182bfa2_8nax2_512.jpg", new BigDecimal("35.95")); ClientRequest request = ClientRequest.method(HttpMethod.POST, uri) .body(BodyInserters.fromObject(shirt)).build(); Mono<ClientResponse> response = exchange.exchange(request); System.out.println(response.block().statusCode()); } public void getAllProduct() { URI uri = URI.create(String.format("http://%s:%d/product", HOST, PORT)); ClientRequest request = ClientRequest.method(HttpMethod.GET, uri).build(); Flux<Product> productList = exchange.exchange(request) .flatMapMany(response -> response.bodyToFlux(Product.class)); Mono<List<Product>> productListMono = productList.collectList(); System.out.println(productListMono.block()); } }
In the ReactiveClient
class, Line 21 calls the ExchangeFunctions.create()
method passing a ReactorClientHttpConnector
, which is an abstraction over HTTP clients to connect the client to the server. The create()
method returns an ExchangeFunction
.
In the createProduct()
method of the ReactiveClient
class, Line 30 – Line 31 builds a ClientRequest
that posts a Product
object to a URL represented by the URI
object. Then Line 32 calls the exchange(request)
method to exchange the given request for a response Mono
.
In the getAllProducts()
method, Line 37 starts an exchange to send a GET
request to get all products.
The response body is converted into a Flux
and printed to the console.
With Tomcat running, the output on running the ReactiveClient
class is:
Conclusion
In this post, I showed you a very simple example of the new web reactive features inside of Spring Framework 5.
While the reactive programming features inside of Spring Framework 5 are certainly fun to use. What, I’m finding that is, even more, fun is the functional programming style of the new Spring Framework 5 APIs.
Consider the configuration of the web reactive server:
public RouterFunction<ServerResponse> routingFunction() { ProductRepository repository = new ProductRepositoryInMemoryImpl(); ProductHandler handler = new ProductHandlerImpl(repository); return nest(path("/product"), nest(accept(APPLICATION_JSON), route(GET("/{id}"), handler::getProductFromRepository) .andRoute(method(HttpMethod.GET), handler::getAllProductsFromRepository) ).andRoute(POST("/") .and(contentType(APPLICATION_JSON)), handler::saveProductToRepository)); }
This functional style is a significant change from what we’ve become accustomed to in Spring MVC.
Don’t worry, Spring MVC is still alive and well. And even when using the Reactive features in Spring Framework 5, you can still define ‘controllers’ in the traditional declarative sense.
And maybe traditional monolithic applications will continue to declare controllers using traditional approaches?
Where I expect the functional style to really shine is in the realm of microservices. This new functional style makes it crazy easy to define small, targeted services.
I’m looking forward to seeing how the Spring community adopts the functional API and seeing how it evolves.
Simon
Hey Guru,
thanks for this article to get a bit deeper into the reactive topic.
I guess you have to check your ProductHandler interface. The method `getAllProductsFromRepository` should return a Flux and not a Mono. Of course the implementation is wrong as well.
To keep it simple, I guess it would be better to use the RequestMapping implementation instead of the RouterFunction. What would make it easier to understand for newbies. Same for the client, why not use the WebClient?
Best,
Simon
Sayali Shinde
Could you please share the source code ?
Carlos Romero
Could you please share the source code ? + 1
oterrien
I don’t understand why the saving method guarantees the id is unique. However I miss something within Mono, I think two threads could be able to call the same piece of code in the same time. So, it is possible, a thread pushes new entry just after the other thread computed its id. But, the size of the map should have been changed, right?
Sourabh
Hi I am getting some parent dependencies error as Could not resolve artifact io.projectreactor:reactor-bom:pom:Bismuth-M1. i also change it to latest version but still the same error.