Core Java

Java 8 Parallel Streams – Custom Thread Pools Examples

A brief intro to custom thread pools and their use in Java 8 parallel streams. Examples on how to use custom pools with the Parallel streams API which avoids common thread pool usage.

1. Introduction

In this tutorial, You’ll learn how to create custom thread pools in Java 8 for bulk data processing with parallel streams powerful API.

Parallel Stream can work well in concurrent environments and these are improved versions of streams performance at the cost of multi-threading overhead.

The main focus in this article is to look at one of the biggest limitations of Stream API and Examples on how can you use the Parallel Streams with the custom thread pools.

2. Java 8 Parallel Streams

First, let us see how to create Parallel Streams from a Collection.

To make a stream that can run by multiple cores of the processer, you just need to call parallelStream() method.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
package com.javaprogramto.java8.streams.parallel.streams;
 
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
 
public class ParallelStreamCreation {
 
    public static void main(String[] args) {
 
        List<Integer> intList = Arrays.asList(10, 20, 30, 40, 50);
 
        Stream<Integer> parallelStream = intList.parallelStream();
 
        parallelStream.forEach(value -> System.out.println(value));
    }
}

Output:
[30
40
50
20
10]

You can observe the output that printed the values randomly by different cores.

Internally, it uses SplitIterator and StreamSupport classes to make it run parallelly.

The default processing is done with  ForkJoinPool.commonPool() which is shared by the entire application. If you lots of parallel streams that are running at the same time then you may see performance and delay in processing time.

3. Using Custom Thread Pool

As a result of the above approach will use a common ForkJoinPool for all the parallel Streams.

If you have many parallel streams running at the same time and some of them take time longer than expected due to network slowness and those tasks may be blocking the threads from the common pool. Hence, it causes to slow down the tasks and take a longer time to complete.

In these cases, It is good to go with the custom thread pools with the parallel streams combination.

Look at the below program, that runs with 5 threads using ForkJoinPool and inside creating a new parallel stream to find the sum of all numbers for the given range.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.javaprogramto.java8.streams.parallel.streams;
 
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
 
public class CustomPoolParallelStreams {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        parallelStreamProcess();
    }
 
    private static void parallelStreamProcess() throws ExecutionException, InterruptedException {
 
        int start = 1;
        int end = 10000;
 
        List<Integer> intList = IntStream.rangeClosed(start, end).boxed()
                .collect(Collectors.toList());
        System.out.println(intList.size());
 
        ForkJoinPool newCustomThreadPool = new ForkJoinPool(5);
        int actualTotal = newCustomThreadPool.submit(
                () -> {
                     int a = intList.stream().parallel().reduce(0, Integer::sum).intValue();
                     return a;
                }).get();
 
        System.out.println("actualTotal " + actualTotal);
 
    }
 
}

Output:


[10000

actualTotal 50005000]

Actually, The above program does not come up with the efficient but I have seen the many websites talking about this solution. In fact, this also creating a parallel stream inside ForkJoinPool which again internally consumes threads from a common pool of ForkJoinPool area.

So, If you are running multiple parallel streams then do not use this Steam api parallel method as this might slow other streams give the results in more time.

Here, we have taken the pool count as 5 but you can change it as per your CPU configuration. If you have more then you can fine-tune based on the other tasks.

If you have only one parallel stream then you can use it with a limited pool count.

But, Wait for a java update that parallel stream can take ForkJoinPool as input to limit the number of parallel processes.

4. Conclusion

In this article, You’ve seen how to create parallel streams in java stream api and parallel stream api uses a common share thread pool from ForkJoinPool. But this is shared by all other parallel threads so it is good to avoid the usage of Stream parallel but you can limit the number of threads with using the second approach. And also you have to consider that using the second approach also has some disadvantages.

Just wait for the new parallel stream api from the official oracle.

All the code is shown in this article is over GitHub.

You can download the project directly and can run in your local without any errors.

View on GitHub 

Download 

If you have any queries please post in the comment section.

Published on Java Code Geeks with permission by Venkatesh Nukala, partner at our JCG program. See the original article here: Java 8 Parallel Streams – Custom Thread Pools Examples

Opinions expressed by Java Code Geeks contributors are their own.

Venkatesh Nukala

Venkatesh Nukala is a Software Engineer working for Online Payments Industry Leading company. In my free time, I would love to spend time with family and write articles on technical blogs. More on JavaProgramTo.com
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button