About Fahd Shariff

Fahd is a software engineer working in the financial services industry. He is passionate about technology and specializes in Java application development in distributed environments.

Throttling Task Submission with a BlockingExecutor

The JDK’s java.util.concurrent.ThreadPoolExecutor allows you to submit tasks to a thread pool and uses a BlockingQueue to hold submitted tasks. If you have thousands of tasks to submit, you specify a “bounded” queue (i.e. one with a maximum capacity) otherwise your JVM may run out of memory. You can set a RejectedExecutionHandler to handle what happens when the queue is full, but there are still outstanding tasks to submit. Here is a simple example showing how you would use a ThreadPoolExecutor with a BlockingQueue with capacity 1000. The CallerRunsPolicy ensures that, when the queue is full, additional tasks will be processed by the submitting thread.
 
 

int numThreads = 5;
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                                 new ArrayBlockingQueue<Runnable>(1000),
                                 new ThreadPoolExecutor.CallerRunsPolicy());

The problem with this approach is that, when the queue is full, the thread submitting the tasks to the pool becomes busy executing a task itself and during this time, the queue could become empty and the threads in the pool could become idle. This is not very efficient. We want to keep the thread pool busy and the work queue saturated at all the times. There are various solutions to this problem. One of them is to use a custom Executor which blocks (and thus prevents further tasks from being submitted to the pool) when the queue is full. The code for BlockingExecutor is shown below. It is based on the BoundedExecutor example from Brian Goetz, 2006. Java Concurrency in Practice. 1 Edition. Addison-Wesley Professional. (Section 8.3.3).

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An executor which blocks and prevents further tasks from
 * being submitted to the pool when the queue is full.
 * <p>
 * Based on the BoundedExecutor example in:
 * Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4)
 */
public class BlockingExecutor extends ThreadPoolExecutor {

  private static final Logger LOGGER = LoggerFactory.
                                          getLogger(BlockingExecutor.class);
  private final Semaphore semaphore;

  /**
   * Creates a BlockingExecutor which will block and prevent further
   * submission to the pool when the specified queue size has been reached.
   *
   * @param poolSize the number of the threads in the pool
   * @param queueSize the size of the queue
   */
  public BlockingExecutor(final int poolSize, final int queueSize) {
    super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());

    // the semaphore is bounding both the number of tasks currently executing
    // and those queued up
    semaphore = new Semaphore(poolSize + queueSize);
  }

  /**
   * Executes the given task.
   * This method will block when the semaphore has no permits
   * i.e. when the queue has reached its capacity.
   */
  @Override
  public void execute(final Runnable task) {
    boolean acquired = false;
    do {
        try {
            semaphore.acquire();
            acquired = true;
        } catch (final InterruptedException e) {
            LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
        }
    } while (!acquired);

    try {
        super.execute(task);
    } catch (final RejectedExecutionException e) {
        semaphore.release();
        throw e;
    }
  }

  /**
   * Method invoked upon completion of execution of the given Runnable,
   * by the thread that executed the task.
   * Releases a semaphore permit.
   */
  @Override
  protected void afterExecute(final Runnable r, final Throwable t) {
    super.afterExecute(r, t);
    semaphore.release();
  }
}

 

Reference: Throttling Task Submission with a BlockingExecutor from our JCG partner Fahd Shariff at the fahd.blog blog.
Related Whitepaper:

Bulletproof Java Code: A Practical Strategy for Developing Functional, Reliable, and Secure Java Code

Use Java? If you do, you know that Java software can be used to drive application logic of Web services or Web applications. Perhaps you use it for desktop applications? Or, embedded devices? Whatever your use of Java code, functional errors are the enemy!

To combat this enemy, your team might already perform functional testing. Even so, you're taking significant risks if you have not yet implemented a comprehensive team-wide quality management strategy. Such a strategy alleviates reliability, security, and performance problems to ensure that your code is free of functionality errors.Read this article to learn about this simple four-step strategy that is proven to make Java code more reliable, more secure, and easier to maintain.

Get it Now!  

Leave a Reply


3 × six =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

20,709 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books