Core Java

The java.util.concurrent Package

This article is part of our Academy Course titled Java Concurrency Essentials.


In this course, you will dive into the magic of concurrency. You will be introduced to the fundamentals of concurrency and concurrent code and you will learn about concepts like atomicity, synchronization and thread safety. Check it out here!

1. Introduction

The following chapter introduces the java.util.concurrent package. Within this package reside a bunch of interesting classes that provide necessary and helpful functionality needed to implement multi-threaded applications. After a discussion on how to use the Executor interface and its implementation, the chapter covers atomic data types and concurrent data structures. The final section throws light at the semaphores and count-down latches.

2. java.util.concurrent

After having read the previous articles about concurrency and multi-threading, you might have the feeling that it is not always trivial to write robust code that executes well in a multi-threaded environment. There is a proverb that illustrates this (source unknown):

  • Junior programmers think concurrency is hard.
  • Experienced programmers think concurrency is easy.
  • Senior programmers think concurrency is hard.

Therefore a solid library of data structures and classes that provide well-tested thread-safety is of great help for anyone writing programs that make use of concurrency. Luckily the JDK provides a set of ready-to-use data structures and functionality for exactly that purpose. All these classes reside within the package java.util.concurrent.

2.1. Executor

The java.util.concurrent package defines a set of interfaces whose implementations execute tasks. The simplest one of these is the Executor interface:

public interface Executor {
	void execute(Runnable command);
}

Hence an Executor implementation takes the given Runnable instance and executes it. The interface makes no assumptions about the way of the execution, the javadoc only states “Executes the given command at some time in the future.”. A simple implementation could therefore be:

public class MyExecutor implements Executor {

	public void execute(Runnable r) {
		(new Thread(r)).start();
	}
}

Along with the mere interface the JDK also ships a fully-fledged and extendable implementation named ThreadPoolExecutor. Under the hood the ThreadPoolExecutor maintains a pool of threads and dispatches the instances of Runnable given the execute() method to the pool. The arguments passed to the constructor control the behavior of the thread pool. The constructor with the most arguments is the following one:

ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

Let’s go through the different arguments step by step:

  • corePoolSize: The ThreadPoolExecutor has an attribute corePoolSize that determines how many threads it will start until new threads are only started when the queue is full.
  • maximumPoolSize: This attribute determines how many threads are started at the maximum. You can set this to Integer.MAX_VALUE in order to have no upper boundary.
  • keepAliveTime: When the ThreadPoolExecutor has created more than corePoolSize threads, a thread will be removed from the pool when it idles for the given amount of time.
  • unit: This is just the TimeUnit for the keepAliveTime.
  • workQueue: This queue holds the instances of Runnable given through the execute() method until they are actually started.
  • threadFactory: An implementation of this interface gives you control over the creation of the threads used by the ThreadPoolExecutor .
  • handler: When you specify a fixed size for the workQueue and provide a maximumPoolSize then it may happen, that the ThreadPoolExecutor is not able to execute your Runnable instance due to saturation. In this case the provided handler is called and gives you control over what should happen in this case.

As there are a lot of parameters to adjust, let’s examine some code that uses them:

public class ThreadPoolExecutorExample implements Runnable {
	private static AtomicInteger counter = new AtomicInteger();
	private final int taskId;

	public int getTaskId() {
		return taskId;
	}

	public ThreadPoolExecutorExample(int taskId) {
		this.taskId = taskId;
	}
	
	public void run() {
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10);
		ThreadFactory threadFactory = new ThreadFactory() {
			public Thread newThread(Runnable r) {
				int currentCount = counter.getAndIncrement();
				System.out.println("Creating new thread: " + currentCount);
				return new Thread(r, "mythread" + currentCount);
			}
		};
		RejectedExecutionHandler rejectedHandler = new RejectedExecutionHandler() {
			public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
				if (r instanceof ThreadPoolExecutorExample) {
					ThreadPoolExecutorExample example = (ThreadPoolExecutorExample) r;
					System.out.println("Rejecting task with id " + example.getTaskId());
				}
			}
		};
		ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);
		for (int i = 0; i < 100; i++) {
			executor.execute(new ThreadPoolExecutorExample(i));
		}
		executor.shutdown();
	}
}

Our run() implementation only falls asleep for 5 seconds, but this is not the main focus of this code. The ThreadPoolExecutor starts with 5 core threads and allows the pool to grow up to 10 threads at the maximum. For demonstration purposes we allow an unused thread only to idle for about 1 second. The queue implementation here is a LinkedBlockingQueue with a capacity of 10 Runnable instances. We also implement a simple ThreadFactory in order to track the thread creation. The same is true for the RejectedExecutionHandler.

The loop in the main() method now issues 100 Runnable instance to the pool within a short amount of time. The output of the sample shows that we have to create 10 threads (up the maximum) to handle all pending Runnables:

Creating new thread: 0
...
Creating new thread: 9
Rejecting task with id 20
...
Rejecting task with id 99

But it also shows that all tasks with taskId greater than 19 are forwarded to the RejectedExecutionHandler. This is due to the fact that our Runnable implementation sleeps for 5 seconds. After the first 10 threads have been started the queue can only hold another 10 Runnable instances. All further instances then have to be rejected.

Finally the shutdown() method lets the ThreadPoolExecutor reject all further tasks and waits until the already issued tasks have been executed. You can replace the call of shutdown() with a call of shutdownNow(). The latter tries to interrupt all running threads and shuts down the thread pool without waiting for all threads to finish. In the example above you would see ten InterruptedException exceptions as our ten sleeping threads are woken up immediately.

2.2. ExecutorService

The Executor interface is very simple, it only forces the underlying implementation to implement the execute() method. The ExecutorService goes on step further as it extends the Executor interface and adds a series of utility methods (e.g. to add a complete collection of tasks), methods to shut down the thread pool as well as the ability to query the implementation for the result of the execution of one task. We have seen that the Runnable interface only defines a run() method is void as return value. Hence the introduction of a new interface named Callable was necessary that defines similar to Runnable also only one method, but this methods returns a value:

V call();

But how does the JDK handle the fact that a task returns a value but is submitted to a thread pool for execution?

The submitter of the task cannot know ahead when the task gets executed and how long the executions lasts. Letting the current thread wait for the result is obviously no solution. The work to check if the result is already available with the feature to block or to wait a certain amount of time is implemented in another class: java.util.concurrent.Future<V>. This class has only a few methods to check whether the task is done, to cancel the task as well as to retrieve its result.

Last but not least we have another interface which extends the Executor interface as well as the ExecutorService interface by some methods to schedule a task at a given point in time. The name of the interface is ScheduledExecutorService and it provides basically a schedule() method that takes an argument how long to wait until the task gets executed:

schedule(Callable<V> callable, long delay, TimeUnit unit);
schedule(Runnable command, long delay, TimeUnit unit);

Like for the ExecutorService the schedule() method is available in two variants: One for the Runnable interface and one for tasks that return a value using the Callable interface. The ScheduledExecutorService also provides a method to execute tasks periodically:

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

Next to the initial delay we can specify the period at which the task should run.

The last example has already shown how to create a ThreadPoolExecutor. The implementation for the ScheduledExecutorService is named ScheduledThreadPoolExecutor and has to be handled very similar to the ThreadPoolExecutor used above. But often the full control over all features of an ExecutorService is not necessary. Just imagine a simple test client that should invoke some server methods using a simple ThreadPool.

The creators of the JDK have therefore created a simple factory class named Executors (please mind the trailing s). This class provides a handful of static methods to create a read-to-use ThreadPoolExecutor. All of this together lets us implement a simple thread pool that executes a bunch of tasks that compute some number (the number crunching operation is here for demonstration purposes substituted by a simple Thread.sleep()):

public class ExecutorsExample implements Callable<Integer> {
	private static Random random = new Random(System.currentTimeMillis());

	public Integer call() throws Exception {
		Thread.sleep(1000);
		return random.nextInt(100);
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newFixedThreadPool(5);
		Future<Integer>[] futures = new Future[5];
		for (int i = 0; i < futures.length; i++) {
			futures[i] = executorService.submit(new ExecutorsExample());
		}
		for (int i = 0; i < futures.length; i++) {
			Integer retVal = futures[i].get();
			System.out.println(retVal);
		}
		executorService.shutdown();
	}
}

The creation of the ExecutorService is a one-liner. To execute some tasks we just need a for-loop that creates a few new instances of ExecutorsExample and stores the returned Future in an array. After we have submitted the tasks to the service, we just wait for the result. The method get() of Future is blocking, i.e. the current threads sleeps until the result is available. An overridden version of this method takes a timeout specification in order to let the waiting thread proceed if the task does not finish within the defined time period.

2.3. Concurrent collections

The Java collections framework encompasses a wide range of data structures that every Java programmers uses in his day to day work. This collection is extended by the data structures within the java.util.concurrent package. These implementations provided thread-safe collections to be used within a multi-threaded environment.

Many Java programmers even use thread-safe data structures from time to time even without knowing that. The “old” classes Hashtable and Vector are examples for such classes. Being part of the JDK since version 1.0, these basic data structures were designed with thread-safety in mind. Although the thread-safety here only means that all methods are synchronized on instance level. The following code is taken from Oracle’s JDK implementation:

public synchronized void clear() {
	Entry tab[] = table;
	modCount++;
	for (int index = tab.length; --index >= 0; )
		tab[index] = null;
	count = 0;
}

This is crucial difference to the “newer” collection classes like HashMap or ArrayList (both available since JDK 1.2), which themselves are not thread-safe. But there is a convenient way to retrieve a thread-safe instance of such a “newer” collection class:

HashMap<Long,String> map = new HashMap<Long, String>();
Map<Long, String> synchronizedMap = Collections.synchronizedMap(map);

As we see in the code above, the Collections class lets us create at runtime a synchronized version of a formerly unsynchronized collections class.

As we have learned before, adding the keyword synchronized to a method results in the effect that at each point in time only one thread executes a method of the object under investigation. This is of course the easiest way to make a simple collection class thread-safe. More advanced techniques encompass special algorithms that are designed for concurrent access. These algorithms are implemented in the collection classes of the java.util.concurrent package.

An example for such a class is ConcurrentHashMap:

ConcurrentHashMap<Long,String> map = new ConcurrentHashMap<Long,String>();
map.put(key, value);
String value2 = map.get(key);

The code above looks nearly the same as for a normal HashMap, but the underlying implementation is completely different. Instead of using only one lock for the whole table the ConcurrentHashMap subdivides the whole table into many small partitions. Each partition has its own lock. Hence write operations to this map from different threads, assuming they are writing at different partitions of the table, do not compete and can use their own lock.

The implementation also introduces the idea of a commit of write operations to reduce the waiting time for read operations. This slightly changes the semantics of the read operation as it will return the result of the latest write operation that has finished. This means that the number of entries may not be the same directly before and after executing the read method, like it would be when using a synchronized method, but for concurrent applications this is not always important. The same is true for the iterator implementation of the ConcurrentHashMap.

To get a better feeling for the different performance of Hashtable, synchronized HashMap and ConcurrentHashMap, let’s implement a simple performance test. The following code starts a few threads and lets each thread retrieve a value from the map at a random position and afterwards updates a value at another random position:

public class MapComparison implements Runnable {
	private static Map<Integer, String> map;
	private Random random = new Random(System.currentTimeMillis());

	public static void main(String[] args) throws InterruptedException {
		runPerfTest(new Hashtable<Integer, String>());
		runPerfTest(Collections.synchronizedMap(new HashMap<Integer,String>()));
		runPerfTest(new ConcurrentHashMap<Integer, String>());
		runPerfTest(new ConcurrentSkipListMap<Integer, String>());
	}

	private static void runPerfTest(Map<Integer, String> map) throws InterruptedException {
		MapComparison.map = map;
		fillMap(map);
		ExecutorService executorService = Executors.newFixedThreadPool(10);
		long startMillis = System.currentTimeMillis();
		for (int i = 0; i < 10; i++) {
			executorService.execute(new MapComparison());
		}
		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.MINUTES);
		System.out.println(map.getClass().getSimpleName() + " took " + (System.currentTimeMillis() - startMillis) + " ms");
	}

	private static void fillMap(Map<Integer, String> map) {
		for (int i = 0; i < 100; i++) {
			map.put(i, String.valueOf(i));
		}
	}

	public void run() {
		for (int i = 0; i < 100000; i++) {
			int randomInt = random.nextInt(100);
			map.get(randomInt);
			randomInt = random.nextInt(100);
			map.put(randomInt, String.valueOf(randomInt));
		}
	}
}

The output of this program is the following:

Hashtable took 436 ms
SynchronizedMap took 433 ms
ConcurrentHashMap took 75 ms
ConcurrentSkipListMap took 89 ms

As we have expected, the Hashtable and the synchronized HashMap implementations are far behind the ones from the concurrent package. This example also introduces a skip list implementation of the HashMap, where the linked items within one bucket form a skip list, meaning the list is sorted and there are different levels of linking the items within the list. The highest level pointer points directly to some item in the middle of the list. If this item is already greater than the current item, the iterator has to take the next lower level of linkage to skip fewer elements than on the highest level. A detailed description of skip lists can be found here. The interesting point about skip list is that all read access takes about log(n) time, even if all items are stored within the same bucket.


 

2.4. Atomic Variables

When having multiple threads sharing a single variable, we have the task to synchronize access to this variable. The reason for this is the fact, that even a simple instruction like i++ is not atomic. It basically consists of the following bytecode instructions:

iload_1       
iinc 1, 1
istore_1

Without knowing too much about the Java bytecode, one sees that the current value of the local variable 1 is pushed onto the operand stack, that it is incremented by the constant value 1 and afterwards popped from the stack and stored in the local variable number one. This means we need three atomic operations in order to increment a local variable by one. In a multi-threading environment this also means that the scheduler can stop the execution of the current thread between each of these instructions and start a new thread, which in turn then can work on the same variable.

To cope with situations like this you can of course synchronize the access to this specific variable:

synchronized(i) {
	i++;
}

But this also means the current thread has to acquire the lock on i which needs some internal synchronization and computation within the JVM. This approach is also called pessimistic locking as we assume that it is highly probable that another thread currently holds the lock we want to acquire. A different approach called optimistic locking, assumes that there are not so many threads competing for the resource and hence we just try to update the value and see if this has worked. One implementation of this approach is the compare-and-swap (CAS) method. This operation is implemented on many modern CPUs as atomic operation. It compares the content of a given memory location with a given value (the “expected value”) and updates it to a new value if the current value equals to the expected value. In pseudo code this looks like:

int currentValue = getValueAtMemoryPosition(pos);
if(currentValue == expectedValue) {
	setValueAtMemoryPosition(pos, newValue);
}

The CAS operation implements the code above as one atomic operation. Therefore it can be used to see if the value of some variable has still the value the current thread holds and updates it to the incremented value in this case. As the usage of the CAS operation needs hardware support, the JDK provides special classes to support these operations. They all reside within the package java.util.concurrent.atomic.

One representative of these classes is java.util.concurrent.atomic.AtomicInteger. The CAS operation discussed above is implemented by the method

	boolean compareAndSet(int expect, int update)

The boolean return value indicates if the update operation was successful or not. On the basis of this functionality further operation like an atomic increment operation can be implemented (here taken from Oracle’ JDK implementation):

    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

Now we are able to increment an integer variable by different threads without using pessimistic locks:

public class AtomicIntegerExample implements Runnable {
	private static final AtomicInteger atomicInteger = new AtomicInteger();

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 5; i++) {
			executorService.execute(new AtomicIntegerExample());
		}
		executorService.shutdown();
	}

	public void run() {
		for (int i = 0; i < 10; i++) {
			int newValue = atomicInteger.getAndIncrement();
			if (newValue == 42) {
				System.out.println("[" + Thread.currentThread().getName() + "]: " + newValue);
			}
		}
	}
}

The code above starts five threads and lets each of them increment the AtomicInteger variable. The lucky thread that gets the answer 42 prints this to the console. When executing this example code in repetition, the output will only be created by exactly one thread.

Next to AtomicInteger the JDK also offers classes for atomic operations on long values, integer and long arrays and references.

2.5. Semaphore

Semaphores are used to control access to a shared resource. In contrast to simple synchronized blocks a semaphore has an internal counter that is increased each time a thread acquires a lock and decreased each time a thread releases a lock it obtained before. The increasing and decreasing operations are of course synchronized, hence a semaphore can be used to control how many threads pass simultaneously through a critical section. The two basic operations of a thread are:

void acquire();
void release();

The constructor takes next to the number of concurrently locks a fairness parameter. The fairness parameter decides if new threads, which try to acquire a lock, are set at the beginning or at the end of the list of waiting threads. Putting the new thread at the end of the threads guarantees that all threads will acquire the lock after some time and hence no thread starves.

Semaphore(int permits, boolean fair)

To illustrate the described behavior, let’s setup a simple thread pool with five threads but control through a semaphore that at each point in time not more than three of them are running:

public class SemaphoreExample implements Runnable {
	private static final Semaphore semaphore = new Semaphore(3, true);
	private static final AtomicInteger counter = new AtomicInteger();
	private static final long endMillis = System.currentTimeMillis() + 10000;

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 5; i++) {
			executorService.execute(new SemaphoreExample());
		}
		executorService.shutdown();
	}

	public void run() {
		while(System.currentTimeMillis() < endMillis) {
			try {
				semaphore.acquire();
			} catch (InterruptedException e) {
				System.out.println("["+Thread.currentThread().getName()+"] Interrupted in acquire().");
			}
			int counterValue = counter.incrementAndGet();
			System.out.println("["+Thread.currentThread().getName()+"] semaphore acquired: "+counterValue);
			if(counterValue > 3) {
				throw new IllegalStateException("More than three threads acquired the lock.");
			}
			counter.decrementAndGet();
			semaphore.release();
		}
	}
}

The Semaphore is constructed by passing 3 as the number of concurrent permits. When trying to acquire a lock, the blocked thread may experience an InterruptedException that has to be caught. Alternatively one could also call the utility method acquireUninterruptibly() to circumvent the try-catch construct.

To ensure that we have not more than three concurrent threads within the critical section, we use an AtomicInteger that gets incremented each time a process enters the section and decremented before it leaves the section. When the counter has a value higher than four, an IllegalStateException is thrown. Finally we release() the semaphore and let another waiting thread enter the critical section.

2.6. CountDownLatch

The CountDownLatch class is another helpful class for thread synchronization from the JDK. Similar to the Semaphore class it provides a counter, but the counter of the CountDownLatch can only be decreased until it reaches zero. Once the counter has reached zero all threads waiting on the CountDownLatch can proceed. Such functionality is often necessary when all threads of a pool have to synchronize at some point in order to proceed. A simple example would be an application that has to gather data from different sources before being able to store a new data set to the database.

The following code demonstrates how five threads sleep for a random amount of time. Each thread which wakes up counts down the latch and then awaits the latch to become zero. Finally all threads output that they have finished.

public class CountDownLatchExample implements Runnable {
	private static final int NUMBER_OF_THREADS = 5;
	private static final CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
	private static Random random = new Random(System.currentTimeMillis());

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
		for (int i = 0; i < NUMBER_OF_THREADS; i++) {
			executorService.execute(new CountDownLatchExample());
		}
		executorService.shutdown();
	}

	public void run() {
		try {
			int randomSleepTime = random.nextInt(20000);
			System.out.println("[" + Thread.currentThread().getName() + "] Sleeping for " + randomSleepTime);
			Thread.sleep(randomSleepTime);
			latch.countDown();
			System.out.println("[" + Thread.currentThread().getName() + "] Waiting for latch.");
			latch.await();
			System.out.println("[" + Thread.currentThread().getName() + "] Finished.");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

When running this example you will see that the output “Waiting for latch.” comes at different points in time but that the “Finished.” message of each thread is printed immediately one after the other.

2.7. CyclicBarrier

In contrast to the CountDownLatch, the CyclicBarrier class implements a counter that can be reset after being counted down to zero. All threads have to call its method await() until the internal counter is set to zero. The waiting threads are then woken up and can proceed. Internally the counter is then reset to its original value and the whole procedure can start again:

public class CyclicBarrierExample implements Runnable {
	private static final int NUMBER_OF_THREADS = 5;
	private static AtomicInteger counter = new AtomicInteger();
	private static Random random = new Random(System.currentTimeMillis());
	private static final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
		public void run() {
			counter.incrementAndGet();
		}
	});

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
		for (int i = 0; i < NUMBER_OF_THREADS; i++) {
			executorService.execute(new CyclicBarrierExample());
		}
		executorService.shutdown();
	}

	public void run() {
		try {
			while(counter.get() < 3) {
				int randomSleepTime = random.nextInt(10000);
				System.out.println("[" + Thread.currentThread().getName() + "] Sleeping for " + randomSleepTime);
				Thread.sleep(randomSleepTime);
				System.out.println("[" + Thread.currentThread().getName() + "] Waiting for barrier.");
				barrier.await();
				System.out.println("[" + Thread.currentThread().getName() + "] Finished.");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

The example above is very similar to the CountDownLatch, but in contrast to the previous example, I have added a while loop to the run() method. This run() implementations lets each thread continue the sleeping and await() procedure until the counter is three. Also note the anonymous Runnable() implementation provided to the constructor of CyclicBarrier. Its run() method gets executed each time the barrier is tripped. Here we increase the counter that is checked by the concurrent threads.

3. Download the source code

You can download the source code of this lesson: concurrency-4.zip

Martin Mois

Martin is a Java EE enthusiast and works for an international operating company. He is interested in clean code and the software craftsmanship approach. He also strongly believes in automated testing and continuous integration.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

4 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
LilyBlooper
LilyBlooper
8 years ago

Great artical ! Really Rocks!

Cobra Sunil
Cobra Sunil
7 years ago

the bourgeois of the java world have been enriched by this rather simulating, emulating and instigating article that has transcended the limitations of the environment

Narendra Murthy
Narendra Murthy
6 years ago

Very nice article !!
Was new to multi threading
This article covered almost all the concepts of it

Ms Windows
Ms Windows
1 year ago

im curious about first example, with this result

Creating new thread: 0
Creating new thread: 9
Rejecting task with id: 20
Rejecting task with id: 99
where is the id 10 -> 19 will come?

Last edited 1 year ago by Ms Windows
Back to top button