About Fahd Shariff

Fahd is a software engineer working in the financial services industry. He is passionate about technology and specializes in Java application development in distributed environments.

Throttling Task Submission with a BlockingExecutor

The JDK’s java.util.concurrent.ThreadPoolExecutor allows you to submit tasks to a thread pool and uses a BlockingQueue to hold submitted tasks. If you have thousands of tasks to submit, you specify a “bounded” queue (i.e. one with a maximum capacity) otherwise your JVM may run out of memory. You can set a RejectedExecutionHandler to handle what happens when the queue is full, but there are still outstanding tasks to submit.

Here is a simple example showing how you would use a ThreadPoolExecutor with a BlockingQueue with capacity 1000. The CallerRunsPolicy ensures that, when the queue is full, additional tasks will be processed by the submitting thread.
 

int numThreads = 5;
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                                 new ArrayBlockingQueue<Runnable>(1000),
                                 new ThreadPoolExecutor.CallerRunsPolicy());

The problem with this approach is that, when the queue is full, the thread submitting the tasks to the pool becomes busy executing a task itself and during this time, the queue could become empty and the threads in the pool could become idle. This is not very efficient. We want to keep the thread pool busy and the work queue saturated at all the times.

There are various solutions to this problem. One of them is to use a custom Executor which blocks (and thus prevents further tasks from being submitted to the pool) when the queue is full. The code for BlockingExecutor is shown below. It is based on the BoundedExecutor example from Brian Goetz, 2006. Java Concurrency in Practice. 1 Edition. Addison-Wesley Professional. (Section 8.3.3).

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An executor which blocks and prevents further tasks from
 * being submitted to the pool when the queue is full.
 * <p>
 * Based on the BoundedExecutor example in:
 * Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4)
 */
public class BlockingExecutor extends ThreadPoolExecutor {

  private static final Logger LOGGER = LoggerFactory.
                                          getLogger(BlockingExecutor.class);
  private final Semaphore semaphore;

  /**
   * Creates a BlockingExecutor which will block and prevent further
   * submission to the pool when the specified queue size has been reached.
   *
   * @param poolSize the number of the threads in the pool
   * @param queueSize the size of the queue
   */
  public BlockingExecutor(final int poolSize, final int queueSize) {
    super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());

    // the semaphore is bounding both the number of tasks currently executing
    // and those queued up
    semaphore = new Semaphore(poolSize + queueSize);
  }

  /**
   * Executes the given task.
   * This method will block when the semaphore has no permits
   * i.e. when the queue has reached its capacity.
   */
  @Override
  public void execute(final Runnable task) {
    boolean acquired = false;
    do {
        try {
            semaphore.acquire();
            acquired = true;
        } catch (final InterruptedException e) {
            LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
        }
    } while (!acquired);

    try {
        super.execute(task);
    } catch (final RejectedExecutionException e) {
        semaphore.release();
        throw e;
    }
  }

  /**
   * Method invoked upon completion of execution of the given Runnable,
   * by the thread that executed the task.
   * Releases a semaphore permit.
   */
  @Override
  protected void afterExecute(final Runnable r, final Throwable t) {
    super.afterExecute(r, t);
    semaphore.release();
  }
}

 

Reference: Throttling Task Submission with a BlockingExecutor from our JCG partner Fahd Shariff at the fahd.blog blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

2 Responses to "Throttling Task Submission with a BlockingExecutor"

  1. Frank says:

    Slight bug – queueSize variable not set

    super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(???));

  2. Not necessary. The semaphore is blocking the submission of tasks, not the LinkedBlockingQueue.

Leave a Reply


8 + two =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close