Concurrency – Executors and Spring Integration

Thread Pool/Executors Based Implementation

A better approach than the raw thread version, is a Thread pool based one, where an appropriate thread pool size is defined based on the system where the task is running – Number of CPU’s/(1-Blocking Coefficient of Task). Venkat Subramaniams book has more details:

First I defined a custom task to generate the Report Part, given the Report Part Request, this is implemented as a Callable:

public class ReportPartRequestCallable implements Callable<ReportPart> {
 private final ReportRequestPart reportRequestPart;
 private final ReportPartGenerator reportPartGenerator;

 public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) {
     this.reportRequestPart = reportRequestPart;
     this.reportPartGenerator = reportPartGenerator;
    }

 @Override
    public ReportPart call() {
    return this.reportPartGenerator.generateReportPart(reportRequestPart);
    } 
}
public class ExecutorsBasedReportGenerator implements ReportGenerator {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator.class);

    private ReportPartGenerator reportPartGenerator;

    private ExecutorService executors = Executors.newFixedThreadPool(10);

    @Override
    public Report generateReport(ReportRequest reportRequest) {
        List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>();
        List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
        for (ReportRequestPart reportRequestPart : reportRequestParts) {
            tasks.add(new ReportPartRequestCallable(reportRequestPart, reportPartGenerator));
        }

        List<Future<ReportPart>> responseForReportPartList;
        List<ReportPart> reportParts = new ArrayList<ReportPart>();
        try {
            responseForReportPartList = executors.invokeAll(tasks);
            for (Future<ReportPart> reportPartFuture : responseForReportPartList) {
                reportParts.add(reportPartFuture.get());
            }

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
        return new Report(reportParts);
    }

 ......
}

Here a thread pool is created using the Executors.newFixedThreadPool(10) call, with a pool size of 10, a callable task is generated for each of the report request parts, and handed over to the threadpool using the ExecutorService abstraction

responseForReportPartList = executors.invokeAll(tasks);

this call returns a List of Futures, which support a get() method which is a blocking call on the response to be available.

This is clearly a much better implementation compared to the raw thread version, the number of threads is constrained to a manageable number under load.

Spring Integration Based Implementation

The approach that I personally like the most is using Spring Integration, the reason is that with Spring Integration I focus on the components doing the different tasks and leave it upto Spring Integration to wire the flow together, using a xml based or annotation based configuration. Here I will be using a XML based configuration :

The components in my case are:
1. The component to generate the report part, given the report part request, which I had shown earlier.
2. A component to split the report request to report request parts:

public class DefaultReportRequestSplitter implements ReportRequestSplitter{
 @Override
 public List<ReportRequestPart> split(ReportRequest reportRequest) {
  return reportRequest.getRequestParts();
 }
}

3. A component to assemble/aggregate the report parts into a whole report:

public class DefaultReportAggregator implements ReportAggregator{

    @Override
    public Report aggregate(List<ReportPart> reportParts) {
        return new Report(reportParts);
    }

}

And that is all the java code that is required with Spring Integration, the rest of the is wiring – here I have used a Spring integration configuration file:

<?xml version='1.0' encoding='UTF-8'?>
<beans ....

    <int:channel id='report.partsChannel'/>
    <int:channel id='report.reportChannel'/>
    <int:channel id='report.partReportChannel'>
        <int:queue capacity='50'/>
    </int:channel>  
    <int:channel id='report.joinPartsChannel'/>


 <int:splitter id='splitter' ref='reportsPartSplitter' method='split' 
        input-channel='report.partsChannel' output-channel='report.partReportChannel'/>
    
    <task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' />
    
 <int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

    <int:aggregator ref='reportAggregator' method='aggregate' 
            input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> 

    <int:gateway id='reportGeneratorGateway' service-interface='org.bk.sisample.springintegration.ReportGeneratorGateway' 
           default-request-channel='report.partsChannel' default-reply-channel='report.reportChannel'/>
    
    <bean name='reportsPartSplitter' class='org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter'></bean>
    <bean name='reportPartReportGenerator' class='org.bk.sisample.processors.DummyReportPartGenerator'/>
    <bean name='reportAggregator' class='org.bk.sisample.springintegration.processors.DefaultReportAggregator'/>
    <bean name='reportGenerator' class='org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator'/>

</beans>

Spring Source Tool Suite provides a great way of visualizing this file:

this matches perfectly with my original view of the user flow:

In the Spring Integration version of the code, I have defined the different components to handle the different parts of the flow:
1. A splitter to convert a report request to report request parts:

<int:splitter id='splitter' ref='reportsPartSplitter' method='split' 
        input-channel='report.partsChannel' output-channel='report.partReportChannel'/>

2. A service activator component to generate a report part from a report part request:

<int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

3. An aggregator to join the report parts back to a report, and is intelligent enough to correlate the original split report requests appropriately without any explicit coding required for it:

<int:aggregator ref='reportAggregator' method='aggregate' 
            input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> 

What is interesting in this code is that, like in the executors based sample, the number of threads that services each of these components is completely configurable using the xml file, by using appropriate channels to connect the different components together and by using task executors with the thread pool size set as attribute of the executor.

In this code, I have defined a queue channel where the report request parts come in:

<int:channel id='report.partReportChannel'>
        <int:queue capacity='50'/>
    </int:channel>  

and is serviced by the service activator component, using a task executor with a thread pool of size 10, and a capacity of 50:

<task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' />
    
 <int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

All this through configuration!

The entire codebase for this sample is available at this github location: https://github.com/bijukunjummen/si-sample

Reference: Concurrency – Executors and Spring Integration from our JCG partner Biju Kunjummen at the all and sundry blog.

Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply


− two = 1



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

20,709 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books