Using Spring Integration Futures

Using Spring Integration Futures

2 Comments

This past week I had a real world use case for using Spring Integration Futures. I was looking into optimizing the checkout experience for a large ecommerce website. Consider what happens in a large scale website when an order is submitted. Typically, you will see a process something like:

  • Validate the place order form information
  • Verify the address against an address service
  • Verify the credit card with a payment processor
  • Verify available inventory

Each of these in a large scale enterprise are services. For example, it is common for an enterprise to subscribe to an address validation service. This helps ensure shipping accuracy. Or you’ll have an inventory service to help manage inventory levels. Normally not a problem. But when you have a black Friday shopping special and thousands of users shopping on the site, managing inventory can become tricky.

Each of these services will take some time to execute. If called in sequence, the website response time slows down and impacts the user experience in the checkout flow. However, in my example here, there is no reason these service calls couldn’t be called concurrently. By calling the services concurrently, the response time now is the longest service call, not the sum of all the service calls. If each service call takes a half second, calling the services sequentially would take 2 seconds of elapsed time. Calling them concurrently only takes a half second.

A couple of weeks back I wrote a blog post on testing Spring Integration Gateways.  In this use case, Spring Integration is the perfect tool. Spring Integration has a very cool feature to support asynchronous calls. When using Spring Integration Messaging Gateways, if you wrap the return time in a Java Future, Spring Integration will automatically handle the request in a different thread using a thread pool. As an application developer, this makes writing a multithreaded application very easy to do. The Spring Framework and Spring Integration handle the complexity of managing the worker threads and the thread pool.

In this post, I’m going to walk you through the configuration of the Spring Integration gateways and Spring service beans used to make these four service calls supporting my e-commerce place order example.

Spring Integration Code Examples

Command Objects

In this example, I’m going to use a command object. In a web application, this would typically have values bound to it from a form post. However, in today’s omni-channel retail environment, the website is not the only client I need to worry about. This request could be coming from a native iOS mobile application, a native Android mobile application, an in-store kiosk, or maybe a stand alone customer service application.

By using a command object, I’m decoupling my processing from the front end client. I don’t care where the request originated. Maybe it was a form post. Maybe a web service, could be a JMS request. It doesn’t really matter where the request originated.

Place Order Command

package guru.springframework.si.model.commands;

import guru.springframework.si.model.Order;

public class PlaceOrderCommand {
    private Order order;
}

Domain Objects

For this example, I’ve created a couple domain objects. These are just simple POJOs I’m using to illustrate the example. In a real application these objects would be much more robust.

Order

package guru.springframework.si.model;

import java.util.List;

public class Order {
    private Integer orderId;
    private String firstName;
    private String lastName;
    private List<OrderLine> orderLines;

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public List<OrderLine> getOrderLines() {
        return orderLines;
    }

    public void setOrderLines(List<OrderLine> orderLines) {
        this.orderLines = orderLines;
    }
}

OrderLine

package guru.springframework.si.model;

public class OrderLine {
    private Integer lineNumber;
    private Product product;
    private Integer orderQty;

    public Integer getLineNumber() {
        return lineNumber;
    }

    public void setLineNumber(Integer lineNumber) {
        this.lineNumber = lineNumber;
    }

    public Product getProduct() {
        return product;
    }

    public void setProduct(Product product) {
        this.product = product;
    }

    public Integer getOrderQty() {
        return orderQty;
    }

    public void setOrderQty(Integer orderQty) {
        this.orderQty = orderQty;
    }
}

Product

package guru.springframework.si.model;

public class Product {
    private String productId;
    private String description;

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }
}

 

Spring Integration Gateways

For this example, I’ve defined four different Spring Integration Messaging Gateways. Technically, I could have used just one Spring Integration Messaging Gateway, but that would have been a violation of the Single Responsibility Principle. This approach does lead to more class files. But when I need to maintain this code, I know where to look. The programming logic is clear and organized.

OrderGateway

The Order Gateway interface defines two methods. The first placeOrder  is the start of our processing chain.  This is where we will submit the command object. The second method is used in our the processing of the place order command object.

Note: notice the usage of the Java Future  for the return time of the validateOrder method. This is what instructs Spring Integration to perform the method call asynchronously using a thread pool.

package guru.springframework.si.gateways;

import guru.springframework.si.model.commands.PlaceOrderCommand;
import org.springframework.integration.annotation.Gateway;
import org.springframework.validation.Errors;

import java.util.concurrent.Future;

public interface OrderGateway {

    @Gateway(requestChannel = "placeOrderChannel")
    PlaceOrderCommand placeOrder(PlaceOrderCommand command);

    @Gateway(requestChannel = "validateOrderChannel")
    Future<Errors> validateOrder(PlaceOrderCommand command);
}

 

AddressGateway

package guru.springframework.si.gateways;

import guru.springframework.si.model.commands.PlaceOrderCommand;
import org.springframework.integration.annotation.Gateway;
import org.springframework.validation.Errors;

import java.util.concurrent.Future;

public interface AddressGateway {

    @Gateway(requestChannel = "verifyAddressChannel")
    Future<Errors> verifyAddress(PlaceOrderCommand command);
}

 

InventoryGateway

package guru.springframework.si.gateways;

import guru.springframework.si.model.commands.PlaceOrderCommand;
import org.springframework.integration.annotation.Gateway;
import org.springframework.validation.Errors;

import java.util.concurrent.Future;

public interface InventoryGateway {
    @Gateway(requestChannel = "verifyOrderInventoryChannel")
    Future<Errors> verifyOrderInventory(PlaceOrderCommand command);
}

PaymentGateway

package guru.springframework.si.gateways;

import guru.springframework.si.model.commands.PlaceOrderCommand;
import org.springframework.integration.annotation.Gateway;
import org.springframework.validation.Errors;

import java.util.concurrent.Future;

public interface PaymentGateway {
    @Gateway(requestChannel = "verifyCreditCardChannel")
    Future<Errors> verifyCreditCard(PlaceOrderCommand command);
}

Spring Services

Since this is a Spring project, we’ll create our services as Spring Beans, and naturally will we be using Dependency Injection and program to an interface.

OrderService

package guru.springframework.si.services;

import guru.springframework.si.model.commands.PlaceOrderCommand;
import org.springframework.validation.Errors;

public interface OrderService {
    PlaceOrderCommand placeOrder(PlaceOrderCommand command);

    Errors validateOrder(PlaceOrderCommand command);
}

OrderServiceImpl

The implementation of our Order Service is one of the more complex classes in this tutorial. You can see we are having Spring autowire our four Spring Integration Messaging Gateways into the class. In the placeOrderMethod , you can see I call a method on each of the four Spring Integration Gateways. Each method returns a Java Future. After all four are submitted, I go back to get the value of the Future. In this case, I’m using the Spring Errors object. If all four validation steps come back without errors, in a real system I’d persist the order to the database and do any post processing. But this is just a little sample to show off the use of Spring Integration Futures. So in this case, I’m just returning the command object either way.

package guru.springframework.si.services;

import guru.springframework.si.gateways.AddressGateway;
import guru.springframework.si.gateways.InventoryGateway;
import guru.springframework.si.gateways.OrderGateway;
import guru.springframework.si.gateways.PaymentGateway;
import guru.springframework.si.model.commands.PlaceOrderCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.validation.BeanPropertyBindingResult;
import org.springframework.validation.Errors;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Service("orderService")
public class OrderServiceImpl implements OrderService {

    private AddressGateway addressGateway;
    private InventoryGateway inventoryGateway;
    private OrderGateway orderGateway;
    private PaymentGateway paymentGateway;

    @Autowired
    public void setAddressGateway(AddressGateway addressGateway) {
        this.addressGateway = addressGateway;
    }

    @Autowired
    public void setInventoryGateway(InventoryGateway inventoryGateway) {
        this.inventoryGateway = inventoryGateway;
    }

    @Autowired
    public void setOrderGateway(OrderGateway orderGateway) {
        this.orderGateway = orderGateway;
    }

    @Autowired
    public void setPaymentGateway(PaymentGateway paymentGateway) {
        this.paymentGateway = paymentGateway;
    }

    @Override
    public PlaceOrderCommand placeOrder(PlaceOrderCommand command) {
        Future<Errors> validateOrderErrorsFuture = orderGateway.validateOrder(command);
        Future<Errors> validateAddressErrorsFuture = addressGateway.verifyAddress(command);
        Future<Errors> validateInventoryErrorsFuture = inventoryGateway.verifyOrderInventory(command);
        Future<Errors> verifyCreditCardErrorsFuture = paymentGateway.verifyCreditCard(command);

        try {
            Errors validateOrderErrors = validateOrderErrorsFuture.get();
            Errors validateAddressErrors = validateAddressErrorsFuture.get();
            Errors validateInventoryErrors = validateInventoryErrorsFuture.get();
            Errors verifyCreditCardErrors = verifyCreditCardErrorsFuture.get();

            if (!validateOrderErrors.hasErrors() &&
                    !validateAddressErrors.hasErrors() &&
                    !validateInventoryErrors.hasErrors() &&
                    !verifyCreditCardErrors.hasErrors()) {
                return command;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return command;
    }

    @Override
    public Errors validateOrder(PlaceOrderCommand command) {
        System.out.println(Thread.currentThread().getId() + " : Validating Order.....");
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getId() + " : Validating Order - DONE");
        Errors errors = new BeanPropertyBindingResult(command, "Place Order Command");
        return errors;
    }
}

 

Spring Integration Configuration

I had to expand on the Spring Integration configuration from our previous example. You’ll see I’m using the Spring Integration Gateway tag to define the four Spring Integration Gateways we’re using. Then I defined the Spring Integration channels and the appropriate Spring Integration service activators. Nothing new here over the previous example. Just a little more routing to take care of.

Notice how I did not define a thread pool? By default, Spring Integration is providing a thread pool for our use. You can of course define your own or update the settings of the default thread pool if needed.

si-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:context="http://www.springframework.org/schema/context"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
                  http://www.springframework.org/schema/beans/spring-beans.xsd
                  http://www.springframework.org/schema/integration
                  http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <beans:import resource="classpath*:/spring/si-product-gateway.xml"/>

    <context:component-scan base-package="guru.springframework.si.services"/>

    <service-activator ref="productServiceImpl" method="getProduct" input-channel="getProductChannel"/>

    <gateway service-interface="guru.springframework.si.gateways.AddressGateway"/>
    <gateway service-interface="guru.springframework.si.gateways.InventoryGateway"/>
    <gateway service-interface="guru.springframework.si.gateways.OrderGateway"/>
    <gateway service-interface="guru.springframework.si.gateways.PaymentGateway"/>

    <channel id="verifyAddressChannel"/>
    <service-activator input-channel="verifyAddressChannel" ref="addressService" method="verifyAddress"/>

    <channel id="verifyOrderInventoryChannel"/>
    <service-activator input-channel="verifyOrderInventoryChannel" ref="inventoryService" method="verifyInventory"/>

    <channel id="verifyCreditCardChannel"/>
    <service-activator input-channel="verifyCreditCardChannel" ref="paymentService" method="verifyPayment"/>

    <channel id="validateOrderChannel"/>
    <service-activator input-channel="validateOrderChannel" ref="orderService" method="validateOrder"/>

    <channel id="placeOrderChannel"/>
    <service-activator input-channel="placeOrderChannel" ref="orderService" method="placeOrder"/>


</beans:beans>

 

Running the Spring Integration Code

I’ve setup a Spock test to run the Spring Integration code in this example.

OrderGatewayTests

package guru.springframework.si.gateway

import guru.springframework.si.gateways.OrderGateway
import guru.springframework.si.model.commands.PlaceOrderCommand
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import spock.lang.Specification

@ContextConfiguration(locations = "classpath*:/spring/si-config.xml")
class OrderGatewayTests extends Specification {

    @Autowired
    OrderGateway orderGateway

    def "Test Place Order"() {
        given:
        PlaceOrderCommand command = new PlaceOrderCommand()

        when:
        PlaceOrderCommand result = orderGateway.placeOrder(command)

        then:
        result

    }
}

 

When you run this Spock test you will see the following output:

 24 : Validating Order.....
09:47:15.749 [SimpleAsyncTaskExecutor-3] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1e2b1b7a] received message: GenericMessage [payload=guru.springframework.si.model.commands.PlaceOrderCommand@69653e16, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10ecf6cf, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10ecf6cf, id=cb796604-fa7a-cbb7-27f2-606919e10f31, timestamp=1432475235749}]
09:47:15.750 [SimpleAsyncTaskExecutor-3] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@a9e03fb] received message: GenericMessage [payload=guru.springframework.si.model.commands.PlaceOrderCommand@69653e16, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@459ecd8, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@459ecd8, id=e3a353ef-5d8b-99fb-f525-f76ece0a2179, timestamp=1432475235750}]
26 : In Inventory Service
27 : In Payment Service
25 : In Address Service
25 : In Address Service. 
26 : In Inventory Service. 
27 : In Payment Service. 
26 : In Inventory Service. . 
25 : In Address Service. . 
26 : In Inventory Service. . . 
27 : In Payment Service. . 
26 : In Inventory Service. . . . 
25 : In Address Service. . . 
26 : In Inventory Service. . . . . 
27 : In Payment Service. . . 
26 : In Inventory Service. . . . . . 
25 : In Address Service. . . . 
25 : In Address Service. . . . . 
25 : In Address Service. . . . . . 
26 : In Inventory Service. . . . . . . 
27 : In Payment Service. . . . 
26 : In Inventory Service. . . . . . . . 
25 : In Address Service. . . . . . . 
26 : In Inventory Service. . . . . . . . . 
27 : In Payment Service. . . . . 
25 : In Address Service. . . . . . . . 
27 : In Payment Service. . . . . . 
25 : In Address Service. . . . . . . . . 
27 : In Payment Service. . . . . . . 
27 : In Payment Service. . . . . . . . 
27 : In Payment Service. . . . . . . . . 
09:47:15.804 [SimpleAsyncTaskExecutor-3] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'verifyAddressChannel', message: GenericMessage [payload=guru.springframework.si.model.commands.PlaceOrderCommand@69653e16, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@20b9fb81, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@20b9fb81, id=c6f0c5f5-6629-7f08-2d74-889ac16cfddf, timestamp=1432475235749}]
09:47:15.804 [SimpleAsyncTaskExecutor-3] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'verifyOrderInventoryChannel', message: GenericMessage [payload=guru.springframework.si.model.commands.PlaceOrderCommand@69653e16, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10ecf6cf, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@10ecf6cf, id=cb796604-fa7a-cbb7-27f2-606919e10f31, timestamp=1432475235749}]
09:47:15.804 [SimpleAsyncTaskExecutor-3] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'verifyCreditCardChannel', message: GenericMessage [payload=guru.springframework.si.model.commands.PlaceOrderCommand@69653e16, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@459ecd8, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@459ecd8, id=e3a353ef-5d8b-99fb-f525-f76ece0a2179, timestamp=1432475235750}]
24 : Validating Order - DONE
09:47:16.256 [SimpleAsyncTaskExecutor-3] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'validateOrderChannel', message: GenericMessage [payload=guru.springframework.si.model.commands.PlaceOrderCommand@69653e16, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e7527d3, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e7527d3, id=e17abf09-b8dd-e050-71e4-69a251c54d60, timestamp=1432475235749}]

Each the services I’ve wired into this example do simple print line statements out to the console. Each starts their respective output with the Id number of the thread they are running in. You can see from the output how each service is running concurrently in different threads.

Conclusion

As a programmer, Spring Integration is a very powerful tool to have in your toolbelt. You can see from this example how I’ve created a multithreaded application with a minimal amount of coding. I simply wrapped the return type I wanted on the Spring Integration Messaging Gateways with a Java Future. Spring Integration and Spring managed the thread pool for me. I didn’t need to worry about managing threads. The Spring Framework allowed me to focus on delivering the business solution, and took care of the complex boiler plate code.

Get The Code

I’ve committed the source code for this post to github. It is a Maven project which you can download and build. If you wish to learn more about the Spring Framework, I have a free introduction to Spring tutorial. You can sign up for this tutorial in the section below.

Source Code

The source code for this post is available on github. You can download it here.

About jt

    You May Also Like

    2 comments on “Using Spring Integration Futures

    1. December 28, 2015 at 9:55 am

      Nice article, thanks for sharing.

      Reply
    2. April 30, 2020 at 12:24 pm

      Nice Article. I am new to Spring so I am learning all sorts of programming patterns and I wonder if there is something newer in Spring 5 that replaces this old style Spring Integration. Does WebFlux is a newer and better way to do the same thing that you described in this article.

      Thanks

      Reply

    Leave a Reply

    Your email address will not be published. Required fields are marked *

    This site uses Akismet to reduce spam. Learn how your comment data is processed.