Playing with Java 8 – Lambdas and Concurrency

So Java 8 was released a while back, with a ton of features and changes. All us Java zealots have been waiting for this for ages, all the way back to from when they originally announced all the great features that will be in Java 7, which ended up being pulled.

I have just recently had the time to actually start giving it a real look, I updated my home projects to 8 and I have to say I am generally quite happy with what we got. The java.time API the “mimics” JodaTime is a big improvement, the package is going useful, lambdas are going to change our coding style, which might take a bit of getting used to and with those changes… the quote, “With great power comes great responsibility” rings true, I sense there may be some interesting times in our future, as is quite easy to write some hard to decipher code. As an example debugging the code I wrote below would be “fun”…

The file example is on my Github blog repo

What this example does is simple, run couple threads, do some work concurrently, then wait for them all to complete. I figured while I am playing with Java 8, let me go for it fully…
Here’s what I came up with:


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

 * Generified future running and completion
 * @param <T> the result type
 * @param <S> the task input
public class WaitingFuturesRunner<T, S> {
    private transient static final Log logger = LogFactory.getLog(WaitingFuturesRunner.class);
    private final Collection<Task<T, S>> tasks;
    private final long timeOut;
    private final TimeUnit timeUnit;
    private final ExecutorService executor;

     * Constructor, used to initialise with the required tasks
     * @param tasks the list of tasks to execute
     * @param timeOut  max length of time to wait
     * @param timeUnit     time out timeUnit
    public WaitingFuturesRunner(final Collection<Task<T, S>> tasks, final long timeOut, final TimeUnit timeUnit) {
        this.tasks = tasks;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        this.executor = Executors.newFixedThreadPool(tasks.size());

     * Go!
     * @param taskInput          The input to the task
     * @param consolidatedResult a container of all the completed results
    public void go(final S taskInput, final ConsolidatedResult<T> consolidatedResult) {
        final CountDownLatch latch = new CountDownLatch(tasks.size());
        final List<CompletableFuture<T>> theFutures =
                .map(aSearch -> CompletableFuture.supplyAsync(() -> processTask(aSearch, taskInput, latch), executor))

        final CompletableFuture<List<T>> allDone = collectTasks(theFutures);
        try {
            latch.await(timeOut, timeUnit);
            logger.debug("complete... adding results");
        } catch (final InterruptedException | ExecutionException e) {
            logger.error("Thread Error", e);
            throw new RuntimeException("Thread Error, could not complete processing", e);

    private <E> CompletableFuture<List<E>> collectTasks(final List<CompletableFuture<E>> futures) {
        final CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v ->

    private T processTask(final Task<T, S> task, final S searchTerm, final CountDownLatch latch) {
        logger.debug("Starting: " + task);
        T searchResults = null;
        try {
            searchResults = task.process(searchTerm, latch);
        } catch (final Exception e) {
        return searchResults;




import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

 * Test
 * Created by brian on 4/26/14.
public class CompletableFuturesRunnerTest {

    public static void init() {

     *  5tasks at 3000ms concurrently should not be more than 3100
     * @throws Exception error
    @Test(timeout = 3100)
    public void testGo() throws Exception {
        final List<Task<String, String>> taskList = setupTasks();

        final WaitingFuturesRunner<String, String> completableFuturesRunner = new WaitingFuturesRunner<>(taskList, 4, TimeUnit.SECONDS);
        final StringResults consolidatedResults = new StringResults();

        completableFuturesRunner.go("Something To Process", consolidatedResults);

        Assert.assertEquals(5, consolidatedResults.getResults().size());
        for (final String s : consolidatedResults.getResults()) {
            Assert.assertTrue(s.contains("Something To Process"));


    private List<Task<String, String>> setupTasks() {
        final List<Task<String, String>> taskList = new ArrayList<>();
        final StringInputTask stringInputTask = new StringInputTask("Task 1");
        final StringInputTask stringInputTask2 = new StringInputTask("Task 2");
        final StringInputTask stringInputTask3 = new StringInputTask("Task 3");
        final StringInputTask stringInputTask4 = new StringInputTask("Task 4");
        final StringInputTask stringInputTask5 = new StringInputTask("Task 5");
        return taskList;


0 [pool-1-thread-1] Starting: StringInputTask{taskName='Task 1'}

0 [pool-1-thread-5] Starting: StringInputTask{taskName='Task 5'}

0 [pool-1-thread-2] Starting: StringInputTask{taskName='Task 2'}

2 [pool-1-thread-4] Starting: StringInputTask{taskName='Task 4'}

2 [pool-1-thread-3] Starting: StringInputTask{taskName='Task 3'}

3003 [pool-1-thread-5] Done: Task 5

3004 [pool-1-thread-3] Done: Task 3

3003 [pool-1-thread-1] Done: Task 1

3003 [pool-1-thread-4] Done: Task 4

3003 [pool-1-thread-2] Done: Task 2

3007 [Thread-0] WaitingFuturesRunner  - complete... adding results

Some of the useful articles / links I found and read while doing this:

Oracle: Lambda Tutorial

IBM: Java 8 Concurrency

Tomasz Nurkiewicz : Definitive Guide to CompletableFuture

Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You'll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You'll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply

× three = 24

Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

20,709 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books