Make Life Easy Using Java Parallel Streams

Abdulcelil Cercenazi - Jun 27 '21 - - Dev Community

Let's face it, multi-threaded work in Java is HARD πŸ˜”

  • You have to declare a class that implements Runnable.
  • Then, override the run method.
  • And, if you want to pass some parameters to the thread you have to include them in the class's constructor.
  • And yes, if you have to data you wish to process in chunks, you will also have to partition the data yourself.

Let's look at an example. πŸ€”

Say we have a list of strings that we want to process in chunks, each in a separate thread to make the job faster.

The process we want to apply βš’

class Service{
public void doWork(String s){
    System.out.printf("Do work on %s%n", s);
    }
}
Enter fullscreen mode Exit fullscreen mode

The thread class implementing Runnable βš™οΈ

class Job implements Runnable{
    private final List<String> data;
    private final Service service;
    public Job(List<String> data) {
        this.data = data;
        this.service = new Service();
    }
    @Override
    public void run() {
        for (String s : data)
            service.doWork(s);
    }
}
Enter fullscreen mode Exit fullscreen mode

And this is the code to make it all happen πŸ”—

// get the data chunks and run a separate worker thread for each chunk 
public void oldWay(){
    List<String> data = getData();
    for(List<String> batch : getDataChunks(data, 1000))
        new Job(batch).run();
}

// get the numbers from 0 to 10,000 as Strings
private List<String> getData(){
    return IntStream.
        range(0, 10_000).
        mapToObj(Integer::toString).
        collect(Collectors.toList());
}

private List<List<String>> getDataChunks(List<String> data, int chunkSize){
    List<List<String>> result= new ArrayList<>();
    final AtomicInteger counter = new AtomicInteger();
    for (String s : data) {
        if (counter.getAndIncrement() % chunkSize == 0)
            result.add(new ArrayList<>());
        result.get(result.size() - 1).add(s);
    }
    return result;
}
Enter fullscreen mode Exit fullscreen mode

That Was A Lot Of Manual Work πŸ€•

It's Time For Us To Meet Parallel Stream. πŸ₯³πŸ₯³

πŸ‘‰ What Is It?

-  According to Oracle:
Enter fullscreen mode Exit fullscreen mode

When a stream executes in parallel, the Java runtime partitions the
stream into multiple substreams. Aggregate operations iterate over and
process these substreams in parallel and then combine the results

πŸ‘‰ How Does It Work?

When we call parallelStream on a collection we get a number of sub streams each working in a separate thread.
Each thread handles a number of elements from the collection.

πŸ‘‰ How many threads are put to work?

It depends on your configuration.

The default size is equal to one less than the number of cores of your CPU.

That default size of the common pool can be changed with this property

  • Djava.util.concurrent.ForkJoinPool.common.parallelism=8

You can also use your own custom thread pool.

Let's Look At An Example πŸ‘€

public void newWay(){
Service service = new Service();
    // note that  we used the getData method and Service class from previous example
    getData().parallelStream().forEach(service::doWork);
}
Enter fullscreen mode Exit fullscreen mode

Much Better 😍


Code On GitHubπŸ‘¨πŸ»β€πŸ’»


Check my tutorial on Java StreamsπŸ‘ˆ

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .