Home » Java » Enterprise Java » Concurrency – Executors and Spring Integration

About Biju Kunjummen

Concurrency – Executors and Spring Integration

Thread Pool/Executors Based Implementation

A better approach than the raw thread version, is a Thread pool based one, where an appropriate thread pool size is defined based on the system where the task is running – Number of CPU’s/(1-Blocking Coefficient of Task). Venkat Subramaniams book has more details:

First I defined a custom task to generate the Report Part, given the Report Part Request, this is implemented as a Callable:

public class ReportPartRequestCallable implements Callable<ReportPart> {
 private final ReportRequestPart reportRequestPart;
 private final ReportPartGenerator reportPartGenerator;

 public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) {
     this.reportRequestPart = reportRequestPart;
     this.reportPartGenerator = reportPartGenerator;
    }

 @Override
    public ReportPart call() {
    return this.reportPartGenerator.generateReportPart(reportRequestPart);
    } 
}
public class ExecutorsBasedReportGenerator implements ReportGenerator {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator.class);

    private ReportPartGenerator reportPartGenerator;

    private ExecutorService executors = Executors.newFixedThreadPool(10);

    @Override
    public Report generateReport(ReportRequest reportRequest) {
        List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>();
        List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
        for (ReportRequestPart reportRequestPart : reportRequestParts) {
            tasks.add(new ReportPartRequestCallable(reportRequestPart, reportPartGenerator));
        }

        List<Future<ReportPart>> responseForReportPartList;
        List<ReportPart> reportParts = new ArrayList<ReportPart>();
        try {
            responseForReportPartList = executors.invokeAll(tasks);
            for (Future<ReportPart> reportPartFuture : responseForReportPartList) {
                reportParts.add(reportPartFuture.get());
            }

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
        return new Report(reportParts);
    }

 ......
}

Here a thread pool is created using the Executors.newFixedThreadPool(10) call, with a pool size of 10, a callable task is generated for each of the report request parts, and handed over to the threadpool using the ExecutorService abstraction

responseForReportPartList = executors.invokeAll(tasks);

this call returns a List of Futures, which support a get() method which is a blocking call on the response to be available.

This is clearly a much better implementation compared to the raw thread version, the number of threads is constrained to a manageable number under load.

Spring Integration Based Implementation

The approach that I personally like the most is using Spring Integration, the reason is that with Spring Integration I focus on the components doing the different tasks and leave it upto Spring Integration to wire the flow together, using a xml based or annotation based configuration. Here I will be using a XML based configuration :

The components in my case are:
1. The component to generate the report part, given the report part request, which I had shown earlier.
2. A component to split the report request to report request parts:

public class DefaultReportRequestSplitter implements ReportRequestSplitter{
 @Override
 public List<ReportRequestPart> split(ReportRequest reportRequest) {
  return reportRequest.getRequestParts();
 }
}

3. A component to assemble/aggregate the report parts into a whole report:

public class DefaultReportAggregator implements ReportAggregator{

    @Override
    public Report aggregate(List<ReportPart> reportParts) {
        return new Report(reportParts);
    }

}

And that is all the java code that is required with Spring Integration, the rest of the is wiring – here I have used a Spring integration configuration file:

<?xml version='1.0' encoding='UTF-8'?>
<beans ....

    <int:channel id='report.partsChannel'/>
    <int:channel id='report.reportChannel'/>
    <int:channel id='report.partReportChannel'>
        <int:queue capacity='50'/>
    </int:channel>  
    <int:channel id='report.joinPartsChannel'/>


 <int:splitter id='splitter' ref='reportsPartSplitter' method='split' 
        input-channel='report.partsChannel' output-channel='report.partReportChannel'/>
    
    <task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' />
    
 <int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

    <int:aggregator ref='reportAggregator' method='aggregate' 
            input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> 

    <int:gateway id='reportGeneratorGateway' service-interface='org.bk.sisample.springintegration.ReportGeneratorGateway' 
           default-request-channel='report.partsChannel' default-reply-channel='report.reportChannel'/>
    
    <bean name='reportsPartSplitter' class='org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter'></bean>
    <bean name='reportPartReportGenerator' class='org.bk.sisample.processors.DummyReportPartGenerator'/>
    <bean name='reportAggregator' class='org.bk.sisample.springintegration.processors.DefaultReportAggregator'/>
    <bean name='reportGenerator' class='org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator'/>

</beans>

Spring Source Tool Suite provides a great way of visualizing this file:

this matches perfectly with my original view of the user flow:

In the Spring Integration version of the code, I have defined the different components to handle the different parts of the flow:
1. A splitter to convert a report request to report request parts:

<int:splitter id='splitter' ref='reportsPartSplitter' method='split' 
        input-channel='report.partsChannel' output-channel='report.partReportChannel'/>

2. A service activator component to generate a report part from a report part request:

<int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

3. An aggregator to join the report parts back to a report, and is intelligent enough to correlate the original split report requests appropriately without any explicit coding required for it:

<int:aggregator ref='reportAggregator' method='aggregate' 
            input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> 

What is interesting in this code is that, like in the executors based sample, the number of threads that services each of these components is completely configurable using the xml file, by using appropriate channels to connect the different components together and by using task executors with the thread pool size set as attribute of the executor.

In this code, I have defined a queue channel where the report request parts come in:

<int:channel id='report.partReportChannel'>
        <int:queue capacity='50'/>
    </int:channel>  

and is serviced by the service activator component, using a task executor with a thread pool of size 10, and a capacity of 50:

<task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' />
    
 <int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' 
            input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'>
    <int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'>
    </int:poller>
 </int:service-activator>

All this through configuration!

The entire codebase for this sample is available at this github location: https://github.com/bijukunjummen/si-sample

Reference: Concurrency – Executors and Spring Integration from our JCG partner Biju Kunjummen at the all and sundry blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

and many more ....

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*


Want to take your Java Skills to the next level?
Grab our programming books for FREE!
  • Save time by leveraging our field-tested solutions to common problems.
  • The books cover a wide range of topics, from JPA and JUnit, to JMeter and Android.
  • Each book comes as a standalone guide (with source code provided), so that you use it as reference.
Last Step ...

Where should we send the free eBooks?

Good Work!
To download the books, please verify your email address by following the instructions found on the email we just sent you.