Core Java

Threading stories: about robust thread pools

Another blog of my threading series. This time it’s about thread pools, robust thread pool settings in particular. In Java thread pools are implemented by the ThreadPoolExecutor class introduced in Java 5. The Javadoc of that class is very well organized. So I spare me the effort to give a general introduction here. Basically, what ThreadPoolExecutor does is, it creates and manages threads that process Runnable tasks that were submitted to a work queue by an arbitrary client. It’s a mechanism to perform work asynchronously, which is an important capability in times of multi-core machines and cloud computing.

To be useful in a wide range of contexts ThreadPoolExecutor provides some adjustable parameters. This is nice, but it also leaves the decision to us, the developers, to choose the right settings for our concrete cases. Here is the largest constructor for a ThreadPoolExecutor.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) { ... }

Thread pool types

Some of the parameters shown in the construtor above are very sensible ones in terms of resource consumption and resulting system stability. Based on the different parameter settings of the constructor it’s possible to distinguish some fundamental categories of thread pools. Here are some default thread pool settings offered by the Executors class.

 public static ExecutorService newCachedThreadPool() {
   return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                 60L, TimeUnit.SECONDS,
                                 new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
   return new ThreadPoolExecutor(nThreads, nThreads,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
}

In the “cached thread pool” the number of threads is unbounded. This is caused by a maximumPoolSize of Integer.MAX_VALUE in conjunction with SynchronousQueue. If you submit tasks in a burst to that thread pool, it will likely create a thread for each single task. In this scenario created threads terminate when they were idle for 60 seconds. The second example shows a “fixed thread pool”, where maximumPoolSize is set to a specific fixed value. The pools thread count will never exceed this value. If tasks arive in a burst and all threads are busy then they will queue up in the work queue (here a LinkedBlockingQueue). Threads in this fixed thread pool never die. The drawback of unbounded pools is obvious: both settings can cause JVM memory trouble (you get OutOfMemoryErrors – if you’re lucky).

Let’s look at some bounded thread pool settings:

ThreadPoolExecutor pool = 
       new ThreadPoolExecutor(0, 50, 
                              60, TimeUnit.SECONDS, 
                              new SynchronousQueue<Runnable>());
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

ThreadPoolExecutor pool = 
       new ThreadPoolExecutor(50, 50, 
                              0L, TimeUnit.MILLISECONDS, 
                              new LinkedBlockingQueue<Runnable>(100000));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

The first snippet creates a chached thread pool with the number of threads bounded to a value of 50. If tasks arrive in a burst and all the threads are busy, a call to the ThreadPoolExecutor.execute() method would now be rejected by issueing a RejectedExecutionException. Often this isn’t what I typically want, therefore I change the saturation policy by setting the rejected-execution-handler to CallerRunsPolicy. This policy pushes the work back to the caller. That is, the client thread that issued the task for asynchronous execution will now run the task synchronously. You can develop your own saturation policy by implementing your own RejectedExecutionHandler. The second snippet creates a fixed thread pool with 50 threads and a work queue that is bounded to a value of 100000 tasks. If the work queue is full, the saturation policy pushes the work back to the client. The cached pool creates threads on demand and it terminates the threads if they idle for 60 seconds. The fixed pool keeps the threads alive.

Thread pool boundaries

As shown above there are two basic approaches to define thread pools: bounded and unbounded thread pools. Unbounded thread pools, like the default ones of Executors class work fine, as long you don’t submit too many tasks in a burst. If that happens unbounded thread pools can harm your systems stability. Either too many threads are created by a cached thread pool, or too many tasks are queued in a fixed thread pool. The letter is more difficult to achieve, but still possible. For production use it may be better to set the boundaries to some meaningfull values like in the last two thread pool settings. Because it can be tricky to define those “meaningfull boundaries” I have developed a little program that does that work for me.

/**
 * A class that calculates the optimal thread pool boundaries. It takes the desired target utilization and the desired
 * work queue memory consumption as input and retuns thread count and work queue capacity.
 * 
 * @author Niklas Schlimm
 * 
 */
public abstract class PoolSizeCalculator {

 /**
  * The sample queue size to calculate the size of a single {@link Runnable} element.
  */
 private final int SAMPLE_QUEUE_SIZE = 1000;

 /**
  * Accuracy of test run. It must finish within 20ms of the testTime otherwise we retry the test. This could be
  * configurable.
  */
 private final int EPSYLON = 20;

 /**
  * Control variable for the CPU time investigation.
  */
 private volatile boolean expired;

 /**
  * Time (millis) of the test run in the CPU time calculation.
  */
 private final long testtime = 3000;

 /**
  * Calculates the boundaries of a thread pool for a given {@link Runnable}.
  * 
  * @param targetUtilization
  *            the desired utilization of the CPUs (0 <= targetUtilization <= 1)
  * @param targetQueueSizeBytes
  *            the desired maximum work queue size of the thread pool (bytes)
  */
 protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {
  calculateOptimalCapacity(targetQueueSizeBytes);
  Runnable task = creatTask();
  start(task);
  start(task); // warm up phase
  long cputime = getCurrentThreadCPUTime();
  start(task); // test intervall
  cputime = getCurrentThreadCPUTime() - cputime;
  long waittime = (testtime * 1000000) - cputime;
  calculateOptimalThreadCount(cputime, waittime, targetUtilization);
 }

 private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {
  long mem = calculateMemoryUsage();
  BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP);
  System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes);
  System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem
    + " bytes in a queue");
  System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);
  System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);
 }

 /**
  * Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2)
  * 
  * @param cpu
  *            cpu time consumed by considered task
  * @param wait
  *            wait time of considered task
  * @param targetUtilization
  *            target utilization of the system
  */
 private void calculateOptimalThreadCount(long cpu, long wait, BigDecimal targetUtilization) {
  BigDecimal waitTime = new BigDecimal(wait);
  BigDecimal computeTime = new BigDecimal(cpu);
  BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors());
  BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization).multiply(
    new BigDecimal(1).add(waitTime.divide(computeTime, RoundingMode.HALF_UP)));
  System.out.println("Number of CPU: " + numberOfCPU);
  System.out.println("Target utilization: " + targetUtilization);
  System.out.println("Elapsed time (nanos): " + (testtime * 1000000));
  System.out.println("Compute time (nanos): " + cpu);
  System.out.println("Wait time (nanos): " + wait);
  System.out.println("Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / "
    + computeTime + ")");
  System.out.println("* Optimal thread count: " + optimalthreadcount);
 }

 /**
  * Runs the {@link Runnable} over a period defined in {@link #testtime}. Based on Heinz Kabbutz' ideas
  * (http://www.javaspecialists.eu/archive/Issue124.html).
  * 
  * @param task
  *            the runnable under investigation
  */
 public void start(Runnable task) {
  long start = 0;
  int runs = 0;
  do {
   if (++runs > 5) {
    throw new IllegalStateException("Test not accurate");
   }
   expired = false;
   start = System.currentTimeMillis();
   Timer timer = new Timer();
   timer.schedule(new TimerTask() {
    public void run() {
     expired = true;
    }
   }, testtime);
   while (!expired) {
    task.run();
   }
   start = System.currentTimeMillis() - start;
   timer.cancel();
  } while (Math.abs(start - testtime) > EPSYLON);
  collectGarbage(3);
 }

 private void collectGarbage(int times) {
  for (int i = 0; i < times; i++) {
   System.gc();
   try {
    Thread.sleep(10);
   } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    break;
   }
  }
 }

 /**
  * Calculates the memory usage of a single element in a work queue. Based on Heinz Kabbutz' ideas
  * (http://www.javaspecialists.eu/archive/Issue029.html).
  * 
  * @return memory usage of a single {@link Runnable} element in the thread pools work queue
  */
 public long calculateMemoryUsage() {
  BlockingQueue<Runnable> queue = createWorkQueue();
  for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
   queue.add(creatTask());
  }
  long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  queue = null;
  collectGarbage(15);
  mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  queue = createWorkQueue();
  for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
   queue.add(creatTask());
  }
  collectGarbage(15);
  mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;
 }

 /**
  * Create your runnable task here.
  * 
  * @return an instance of your runnable task under investigation
  */
 protected abstract Runnable creatTask();

 /**
  * Return an instance of the queue used in the thread pool.
  * 
  * @return queue instance
  */
 protected abstract BlockingQueue<Runnable> createWorkQueue();

 /**
  * Calculate current cpu time. Various frameworks may be used here, depending on the operating system in use. (e.g.
  * http://www.hyperic.com/products/sigar). The more accurate the CPU time measurement, the more accurate the results
  * for thread count boundaries.
  * 
  * @return current cpu time of current thread
  */
 protected abstract long getCurrentThreadCPUTime();

}

The program will find ideal thread pool boundaries for the maximum capacity of your work queue and the required thread count. The algorithms are based on the work of Brian Goetz and Dr. Heinz Kabutz, you can find the references in the Javadoc. Calculating the capacity required by your work queue in a fixed thread pool is relatively simple. All you need is the desired target size of the work queue in bytes divided by the average size of your submitted tasks in bytes. Unfortunately, calculating the maximum thread count *isn’t* an exact science. However, if you use the formulas in the program you avoid the harmful extremes of too large work queues and too many threads. Calculating the ideal pool size depends on the wait time to compute time ratio of your task. The more wait time, the more threads required to achieve a given utilization. The PoolSizeCalculator requires the desired target utilization and the desired maximum work queue memory consumption as input. Based on an investigation of object sizes and CPU time it returns the ideal settings for maximum thread count and work queue capacity in the thread pool.

Let’s go through an example. The following snippet shows how to use the PoolSizeCalculator in a scenario of 1.0 (=100%) desired utilization and 100000 bytes maximum work queue size.

public class MyPoolSizeCalculator extends PoolSizeCalculator {

 public static void main(String[] args) throws InterruptedException, 
                                               InstantiationException, 
                                               IllegalAccessException,
                                               ClassNotFoundException {
  MyThreadSizeCalculator calculator = new MyThreadSizeCalculator();
  calculator.calculateBoundaries(new BigDecimal(1.0), 
                                 new BigDecimal(100000));
 }

 protected long getCurrentThreadCPUTime() {
  return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
 }

 protected Runnable creatTask() {
  return new AsynchronousTask(0, "IO", 1000000);
 }
 
 protected BlockingQueue<Runnable> createWorkQueue() {
  return new LinkedBlockingQueue<>();
 }

}

MyPoolSizeCalculator extends the abstract PoolSizeCalculator. You need to implement three template methods: getCurrentThreadCPUTime, creatTask, createWorkQueue. The snippet applies standard Java Management Extensions for the CPU time measurement (line 13). If JMX isn’t accurate enough, then other frameworks can be considered (e.g. SIGAR API). Thread pools work best when tasks are homogeneous and independent. Therefore the createTask-method creates an instance of a single type of Runnable task (line 17). This task will be investigated to calculate wait time to CPU time ratio. Finally, I need to create an instance of the work queue to calculate memory usage of submitted task (line 21). The output of that program shows the ideal settings for the work queue capacity and the maximum pool size (number of threads). These are the results for my examplary I/O intense AsynchronousTask on a dual core machine.

Target queue memory usage (bytes): 100000  
createTask() produced com.schlimm.java7.nio.threadpools.AsynchronousTask which took 40 bytes in a queue  
Formula: 100000 / 40  
* Recommended queue capacity (bytes): 2500  
Number of CPU: 2  
Target utilization: 1.0  
Elapsed time (nanos): 3000000000  
Compute time (nanos): 906250000  
Wait time (nanos): 2093750000  
Formula: 2 * 1.0 * (1 + 2093750000 / 906250000)  
* Optimal thread count: 6.0  

The ‘recommended queue capacity’ and the ‘optimal thread count’ are the important values. An ideal setting for my AsynchronousTask would be as follows:

ThreadPoolExecutor pool = 
       new ThreadPoolExecutor(6, 6, 
                              0L, TimeUnit.MILLISECONDS, 
                              new LinkedBlockingQueue<Runnable>(2500));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Using those settings your work queue cannot grow larger then the desired 100000 bytes. And since the desired utilization is 1.0 (100%), it does not make sense to make the pool larger then 6 threads (wait time to compute time ratio is three – for each compute time intervall, three wait time intervalls follow). The results of the program largely depend on the type of tasks you process. If the tasks are homogenous and compute intense the program will likely recommend to set the pool size to the number of available CPU. But if the task has wait time, like in I/O intense tasks, the program will recomend to increase the thread count to achieve 100% utilization. Also notice, some tasks change their wait time to compute time ratio after some time of processing, e.g. if the file size of an I/O operation increases. This fact suggests to develop a self-tuning thread pool (one of my follow up blogs). In any case, you should make the thread pool sizes configurable, so that you can adjust them at runtime.

OK, that’s all in terms of robust thread pools for now. I hope you enjoyed it a little. And don’t blame me if the formula isn’t 100% accurate in terms of maximum pool size. As I said, it’s not an exact science, it’s about getting an idea of the ideal pool size.

Reference: “Threading stories: about robust thread pools” from our JCG partner Niklas.

Ilias Tsagklis

Ilias is a software developer turned online entrepreneur. He is co-founder and Executive Editor at Java Code Geeks.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button