Home » Java » Core Java » Throttling Task Submission with a BlockingExecutor

About Fahd Shariff

Fahd is a software engineer working in the financial services industry. He is passionate about technology and specializes in Java application development in distributed environments.

Throttling Task Submission with a BlockingExecutor

The JDK’s java.util.concurrent.ThreadPoolExecutor allows you to submit tasks to a thread pool and uses a BlockingQueue to hold submitted tasks. If you have thousands of tasks to submit, you specify a “bounded” queue (i.e. one with a maximum capacity) otherwise your JVM may run out of memory. You can set a RejectedExecutionHandler to handle what happens when the queue is full, but there are still outstanding tasks to submit. Here is a simple example showing how you would use a ThreadPoolExecutor with a BlockingQueue with capacity 1000. The CallerRunsPolicy ensures that, when the queue is full, additional tasks will be processed by the submitting thread.
 
 

int numThreads = 5;
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                                 new ArrayBlockingQueue<Runnable>(1000),
                                 new ThreadPoolExecutor.CallerRunsPolicy());

The problem with this approach is that, when the queue is full, the thread submitting the tasks to the pool becomes busy executing a task itself and during this time, the queue could become empty and the threads in the pool could become idle. This is not very efficient. We want to keep the thread pool busy and the work queue saturated at all the times. There are various solutions to this problem. One of them is to use a custom Executor which blocks (and thus prevents further tasks from being submitted to the pool) when the queue is full. The code for BlockingExecutor is shown below. It is based on the BoundedExecutor example from Brian Goetz, 2006. Java Concurrency in Practice. 1 Edition. Addison-Wesley Professional. (Section 8.3.3).

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An executor which blocks and prevents further tasks from
 * being submitted to the pool when the queue is full.
 * <p>
 * Based on the BoundedExecutor example in:
 * Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4)
 */
public class BlockingExecutor extends ThreadPoolExecutor {

  private static final Logger LOGGER = LoggerFactory.
                                          getLogger(BlockingExecutor.class);
  private final Semaphore semaphore;

  /**
   * Creates a BlockingExecutor which will block and prevent further
   * submission to the pool when the specified queue size has been reached.
   *
   * @param poolSize the number of the threads in the pool
   * @param queueSize the size of the queue
   */
  public BlockingExecutor(final int poolSize, final int queueSize) {
    super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());

    // the semaphore is bounding both the number of tasks currently executing
    // and those queued up
    semaphore = new Semaphore(poolSize + queueSize);
  }

  /**
   * Executes the given task.
   * This method will block when the semaphore has no permits
   * i.e. when the queue has reached its capacity.
   */
  @Override
  public void execute(final Runnable task) {
    boolean acquired = false;
    do {
        try {
            semaphore.acquire();
            acquired = true;
        } catch (final InterruptedException e) {
            LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
        }
    } while (!acquired);

    try {
        super.execute(task);
    } catch (final RejectedExecutionException e) {
        semaphore.release();
        throw e;
    }
  }

  /**
   * Method invoked upon completion of execution of the given Runnable,
   * by the thread that executed the task.
   * Releases a semaphore permit.
   */
  @Override
  protected void afterExecute(final Runnable r, final Throwable t) {
    super.afterExecute(r, t);
    semaphore.release();
  }
}

 

Reference: Throttling Task Submission with a BlockingExecutor from our JCG partner Fahd Shariff at the fahd.blog blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

and many more ....

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*


Want to take your Java Skills to the next level?
Grab our programming books for FREE!
  • Save time by leveraging our field-tested solutions to common problems.
  • The books cover a wide range of topics, from JPA and JUnit, to JMeter and Android.
  • Each book comes as a standalone guide (with source code provided), so that you use it as reference.
Last Step ...

Where should we send the free eBooks?

Good Work!
To download the books, please verify your email address by following the instructions found on the email we just sent you.