Concurrency – Executors and Spring Integration

Thread Pool/Executors Based Implementation

A better approach than the raw thread version, is a Thread pool based one, where an appropriate thread pool size is defined based on the system where the task is running – Number of CPU’s/(1-Blocking Coefficient of Task). Venkat Subramaniams book has more details:

First I defined a custom task to generate the Report Part, given the Report Part Request, this is implemented as a Callable:

public class ReportPartRequestCallable implements Callable<ReportPart> {
 private final ReportRequestPart reportRequestPart;
 private final ReportPartGenerator reportPartGenerator;

 public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) {
     this.reportRequestPart = reportRequestPart;
     this.reportPartGenerator = reportPartGenerator;
    }

 @Override
    public ReportPart call() {
    return this.reportPartGenerator.generateReportPart(reportRequestPart);
    } 
}
public class ExecutorsBasedReportGenerator implements ReportGenerator {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator.class);

    private ReportPartGenerator reportPartGenerator;

    private ExecutorService executors = Executors.newFixedThreadPool(10);

    @Override
    public Report generateReport(ReportRequest reportRequest) {
        List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>();
        List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
        for (ReportRequestPart reportRequestPart : reportRequestParts) {
            tasks.add(new ReportPartRequestCallable(reportRequestPart, reportPartGenerator));
        }

        List<Future<ReportPart>> responseForReportPartList;
        List<ReportPart> reportParts = new ArrayList<ReportPart>();
        try {
            responseForReportPartList = executors.invokeAll(tasks);
            for (Future<ReportPart> reportPartFuture : responseForReportPartList) {
                reportParts.add(reportPartFuture.get());
            }

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
        return new Report(reportParts);
    }

 ......
}

Here a thread pool is created using the Executors.newFixedThreadPool(10) call, with a pool size of 10, a callable task is generated for each of the report request parts, and handed over to the threadpool using the ExecutorService abstraction

responseForReportPartList = executors.invokeAll(tasks);

this call returns a List of Futures, which support a get() method which is a blocking call on the response to be available.

This is clearly a much better implementation compared to the raw thread version, the number of threads is constrained to a manageable number under load.

Spring Integration Based Implementation

The approach that I personally like the most is using Spring Integration, the reason is that with Spring Integration I focus on the components doing the different tasks and leave it upto Spring Integration to wire the flow together, using a xml based or annotation based configuration. Here I will be using a XML based configuration :

The components in my case are:
1. The component to generate the report part, given the report part request, which I had shown earlier.
2. A component to split the report request to report request parts:

public class DefaultReportRequestSplitter implements ReportRequestSplitter{
 @Override
 public List<ReportRequestPart> split(ReportRequest reportRequest) {
  return reportRequest.getRequestParts();
 }
}

3. A component to assemble/aggregate the report parts into a whole report:

public class DefaultReportAggregator implements ReportAggregator{

    @Override
    public Report aggregate(List<ReportPart> reportParts) {
        return new Report(reportParts);
    }

}

And that is all the java code that is required with Spring Integration, the rest of the is wiring – here I have used a Spring integration configuration file:

<?xml version='1.0' encoding='UTF-8'?>
<beans ....

    <int:channel id='report.partsChannel'/>
    <int:channel id='report.reportChannel'/>
    <int:channel id='report.partReportChannel'>
        <int:queue capacity='50'/>
    </int:channel>  
    <int:channel id='report.joinPartsChannel'/>


 <int:splitter id='splitter' ref='reportsPartSplitter' method='split' 
        input-channel='report.partsChannel' output-channel='report.partReportChannel'/>
    
    <task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' />
    
 <int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

    <int:aggregator ref='reportAggregator' method='aggregate' 
            input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> 

    <int:gateway id='reportGeneratorGateway' service-interface='org.bk.sisample.springintegration.ReportGeneratorGateway' 
           default-request-channel='report.partsChannel' default-reply-channel='report.reportChannel'/>
    
    <bean name='reportsPartSplitter' class='org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter'></bean>
    <bean name='reportPartReportGenerator' class='org.bk.sisample.processors.DummyReportPartGenerator'/>
    <bean name='reportAggregator' class='org.bk.sisample.springintegration.processors.DefaultReportAggregator'/>
    <bean name='reportGenerator' class='org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator'/>

</beans>

Spring Source Tool Suite provides a great way of visualizing this file:

this matches perfectly with my original view of the user flow:

In the Spring Integration version of the code, I have defined the different components to handle the different parts of the flow:
1. A splitter to convert a report request to report request parts:

<int:splitter id='splitter' ref='reportsPartSplitter' method='split' 
        input-channel='report.partsChannel' output-channel='report.partReportChannel'/>

2. A service activator component to generate a report part from a report part request:

<int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

3. An aggregator to join the report parts back to a report, and is intelligent enough to correlate the original split report requests appropriately without any explicit coding required for it:

<int:aggregator ref='reportAggregator' method='aggregate' 
            input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> 

What is interesting in this code is that, like in the executors based sample, the number of threads that services each of these components is completely configurable using the xml file, by using appropriate channels to connect the different components together and by using task executors with the thread pool size set as attribute of the executor.

In this code, I have defined a queue channel where the report request parts come in:

<int:channel id='report.partReportChannel'>
        <int:queue capacity='50'/>
    </int:channel>  

and is serviced by the service activator component, using a task executor with a thread pool of size 10, and a capacity of 50:

<task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' />
    
 <int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

All this through configuration!

The entire codebase for this sample is available at this github location: https://github.com/bijukunjummen/si-sample

Reference: Concurrency – Executors and Spring Integration from our JCG partner Biju Kunjummen at the all and sundry blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

Leave a Reply


+ 3 = five



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close