Multithreading in Java for dummies (part 3)

Raúl Ávila - Jan 9 '17 - - Dev Community

This will be the last post of this series about multithreading. In the second post, we left our Ping Pong game in a good position, with the threads (players) being blocked while waiting for their turn, and with the possibility of scaling up the number of players as desired. However, there was something that wasn't very elegant, the class Game was in charge of creating, starting and finishing the threads, as well as synchronization with them to avoid the main thread from finishing before the players. Let's recap:

//...
Thread thread2 = new Thread(player2);
thread2.start();
Thread thread1 = new Thread(player1);
thread1.start();

//Let the players play!
try {
   Thread.sleep(2);
} catch (InterruptedException e) {
   e.printStackTrace();
}

//Tell the players to stop
thread1.interrupt();
thread2.interrupt();

//Wait until players finish
try {
   thread1.join();
   thread2.join();
} catch (InterruptedException e) {
   e.printStackTrace();
}
//...
Enter fullscreen mode Exit fullscreen mode

Thread management / Thread pools

Apart from being tedious, if we don't encapsulate the creation of threads conveniently, our code won't be very cohesive, because we're coupling the logic of the game itself to the concurrency management. In addition, creating threads is expensive, and in complex applications this will have an important cost in terms of performance.

The Java concurrency API exports a series of classes and interfaces allowing us to encapsulate the thread creation management with high flexibility: the Executor framework. Its three main elements are:

  • Executor: it's an interface with a single method, execute(Runnable). The idea with this framework is that we handle tasks instead of threads, so we're asking the instance of Executor to run the task (which is an instance of Runnable) when possible
  • ExecutorService: interface extending Executor, which publishes a series of more advanced methods, to control the whole lifecycle of the work to be done (shutdown, awaitTermination), and to run tasks of the type Callable, which are just a Runnable returning a value (more info here). In the official documentation you can read more about all the possibilities of this interface
  • Executors: the two previous components are interfaces, and we can create our own implementations if we want. However, the most common use cases are already implemented in the JDK. To use these implementations we need to request an instance using the static factory methods exposed by Executors

In general, the term "Thread Pool" is used to refer to the implementations of Executor/ExecutorService that we use to handle our threads. The most common types, which can be obtained invoking the static methods exposed by Executors are:

  • Single Thread Executor (newSingleThreadExecutor): it contains a single thread, and it's not widely used
  • Fixed Thread Pool (newFixedThreadPool): it contains a constant number of "alive" threads, which are waiting to receive tasks to run
  • Cached Thread Pool (newCachedThreadPoool): it maintains a thread pool that can grow or shrink depending on the demand
  • Scheduled Thread Pool (newScheduledThreadPool): it's used to scheduled the execution of tasks. It returns an instance of ScheduledExecutorService, given that ExecutorService doesn't expose the appropriate methods to schedule future tasks, only to execute them as soon as possible

Ping Pong, version 5: Thread Pool

Without modifying the class Player, we can adapt our class Game to use a thread pool instead of taking charge itself of the tedious tasks of creating, starting and stopping the threads. Let's see how:

public class Game {

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();

        Player player1 = new Player("ping", lock);
        Player player2 = new Player("pong", lock);

        player1.setNextPlayer(player2);
        player2.setNextPlayer(player1);

        System.out.println("Game starting...!");

        player1.setPlay(true);

        ExecutorService executor = Executors.newFixedThreadPool(2);

        executor.execute(player1);
        executor.execute(player2);

        sleep(2);

        executor.shutdownNow();

        System.out.println("Game finished!");
    }

    public static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We're using a pool containing two threads (one per player), and we send to it the tasks to run. We sleep the main thread for 2ms, so both players can play for a while, and we invoke the method shutdownNow(), which is equivalent to interrupting the threads as we did in previous versions, but encapsulating all the logic in the pool. It's necessary to invoke shutdownNow instead of shutdown, because the latter waits until the running tasks finish, returning a list with the pending tasks. Our players play indefinitely until they're interrupted, so if we try to finish with shutdown, the application will never end!

Well, if we run this version several times, we'll see that it works as expected some times, while others it shows an output like this:

Game starting...!
ping
pong
//...
Game finished!
pong
Enter fullscreen mode Exit fullscreen mode

What happened? After requesting both threads to interrupt, it's possible that the main thread finishes before the rest of the threads. This is why the text "Game finished!" appears before the last "pong" turn. Exploring the ExecutorService API, we can see a method called awaitTermination. This method blocks the thread who invokes it until all the tasks in the pool have finished, or until a timeout passed by parameter expires:

//...
ExecutorService executor = Executors.newFixedThreadPool(2);

executor.execute(player1);
executor.execute(player2);

sleep(2);

executor.shutdownNow();

try {
    executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    System.out.println("Main thread interrupted while waiting for players to finish");
}

System.out.println("Game finished!");
//...
Enter fullscreen mode Exit fullscreen mode

We finally get the expected output, and the game behaves as expected, with a cleaner and more readable main class. Have we finished? Not yet!

Barriers

Entry / Exit barriers are synchronization mechanisms facilitating the simultaneous execution of a thread group (entry barrier), or the waiting until a whole thread group finalizes.

We have seen the idea of exit barrier before in this post, with awaitTermination. This method allows us to create an exit barrier, but it forces us to set up a timeout (it can be hours, but it's a timeout!). We would like to create an exit barrier without a timeout.

In order to understand what an entry barrier is, we're going to add an instruction to Game:

//...
executor.execute(player1);
sleep(1000);
executor.execute(player2);
//...
Enter fullscreen mode Exit fullscreen mode

We send the main thread to sleep for a second before sending the second player to play. Even though the result is hard to reproduce here, because it's related with the timing of the application, something like this happens:

Game starting...!
ping
// Waiting 1 second
pong
Enter fullscreen mode Exit fullscreen mode

That is, the player "ping" plays, but during a second there isn't anybody to play with! So the game is "suspended" for a second, which could be minutes (the time the main thread takes to start the second thread/player). This situation is not ideal, because we're starting a concurrent process requiring the presence of several threads before all of them are ready. To avoid this we need an entry barrier.

There are several classes in the Concurrency API that can be used as barriers, but the simplest one, and the one we'll use as entry and exit barrier is CountdownLatch. The usage of this class can be summarized in three points:

  1. We create a barrier with a counter initialized to N
  2. The threads that must continue depending on the barrier will invoke await(), and will be blocked until the counter reaches zero. There's an await method with timeout too
  3. The actors that can influence the opening of the barrier will invoke countDown() when necessary conditions are met. In general, N conditions must be met for the barrier to be opened

Version 6: Entry / Exit barriers

In this new version we'll have to modify both Game and Player. Let's see how they would look like:

public class Player implements Runnable {

    private final String text;

    private final Lock lock;
    private final Condition myTurn;

    private final CountDownLatch entryBarrier;
    private final CountDownLatch exitBarrier;

    private Condition nextTurn;

    private Player nextPlayer;

    private volatile boolean play = false;

    public Player(String text,
                  Lock lock,
                  CountDownLatch entryBarrier,
                  CountDownLatch exitBarrier) {
        this.text = text;
        this.lock = lock;
        this.myTurn = lock.newCondition();

        this.entryBarrier = entryBarrier;
        this.exitBarrier = exitBarrier;
    }

    @Override
    public void run() {
        if(entryBarrierOpen())
            play();
    }

    public boolean entryBarrierOpen() {
        try {
            entryBarrier.await();
            return true;
        } catch (InterruptedException e) {
            System.out.println("Player "+text+
                                " was interrupted before starting Game!");
            return false;
        }
    }

    private void play() {
        while (!Thread.interrupted()) {
            lock.lock();

            try {
                while (!play)
                    myTurn.awaitUninterruptibly();

                System.out.println(text);

                this.play = false;
                nextPlayer.play = true;

                nextTurn.signal();
            } finally {
                lock.unlock();
            }
        }

        exitBarrier.countDown();
    }

    public void setNextPlayer(Player nextPlayer) {
        this.nextPlayer = nextPlayer;
        this.nextTurn = nextPlayer.myTurn;
    }

    public void setPlay(boolean play) {
        this.play = play;
    }
}
Enter fullscreen mode Exit fullscreen mode

The class doesn't start until the entry barrier is open, and when it's interrupted to finish, it invokes countDown on the exit barrier, which will be how Game knows that both players are done.

Think for a second the values we have to initialize the counters in entryBarrier and exitBarrier before you keep reading...

public class Game {

    public static void main(String[] args) {
        CountDownLatch entryBarrier = new CountDownLatch(1);
        CountDownLatch exitBarrier = new CountDownLatch(2);

        Lock lock = new ReentrantLock();

        Player player1 = new Player("ping", lock, entryBarrier, exitBarrier);
        Player player2 = new Player("pong", lock, entryBarrier, exitBarrier);

        player1.setNextPlayer(player2);
        player2.setNextPlayer(player1);

        System.out.println("Game starting...!");

        player1.setPlay(true);

        ExecutorService executor = Executors.newFixedThreadPool(2);

        executor.execute(player1);

        sleep(1000);

        executor.execute(player2);

        entryBarrier.countDown();

        sleep(2);

        executor.shutdownNow();

        try {
            exitBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Game finished!");
    }

    public static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Indeed, the entry barrier has a counter set to 1, because it will be opened as soon as the main thread has passed all the tasks to the thread pool, while the exit barrier, which here it's used as an alternative to awaitTermination, has a counter set to 2, which is the number of actors that must finish the execution before the main thread can progress.

This way, the timing in our application is the desired one, although we have complicated the code a bit. The thing is that concurrency itself is quite complex, so it's hard to encapsulate perfectly all the mechanisms used.

Before finishing the post, I would like to mention that the exit barrier has been added to this version with didactic goals. The best mechanism to wait for the finalization of a group of threads is through awaitTermination, using a reasonable timeout, so if we reach this timeout it will be because a failure has happened in one of the tasks we're waiting for its termination. I have added a version 7 to GitHub, which uses an entry barrier, and awaitTermination as exit barrier. This version could be considered the optimal version of the application.

I hope this series of posts has been helpful to clarify many of the concepts used in concurrent applications implemented with Java. If you want to learn more, the best book ever bout this topic is Java Concurrency In Practice by Brian Goetz.

(All the code can be found in this GitHub repository)

. . . . . . . . . .