This will be the last post of this series about multithreading. In the second post, we left our Ping Pong game in a good position, with the threads (players) being blocked while waiting for their turn, and with the possibility of scaling up the number of players as desired. However, there was something that wasn't very elegant, the class Game
was in charge of creating, starting and finishing the threads, as well as synchronization with them to avoid the main thread from finishing before the players. Let's recap:
//...
Thread thread2 = new Thread(player2);
thread2.start();
Thread thread1 = new Thread(player1);
thread1.start();
//Let the players play!
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Tell the players to stop
thread1.interrupt();
thread2.interrupt();
//Wait until players finish
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
//...
Thread management / Thread pools
Apart from being tedious, if we don't encapsulate the creation of threads conveniently, our code won't be very cohesive, because we're coupling the logic of the game itself to the concurrency management. In addition, creating threads is expensive, and in complex applications this will have an important cost in terms of performance.
The Java concurrency API exports a series of classes and interfaces allowing us to encapsulate the thread creation management with high flexibility: the Executor
framework. Its three main elements are:
-
Executor
: it's an interface with a single method,execute(Runnable)
. The idea with this framework is that we handle tasks instead of threads, so we're asking the instance ofExecutor
to run the task (which is an instance ofRunnable
) when possible -
ExecutorService
: interface extendingExecutor
, which publishes a series of more advanced methods, to control the whole lifecycle of the work to be done (shutdown
,awaitTermination
), and to run tasks of the typeCallable
, which are just aRunnable
returning a value (more info here). In the official documentation you can read more about all the possibilities of this interface -
Executors
: the two previous components are interfaces, and we can create our own implementations if we want. However, the most common use cases are already implemented in the JDK. To use these implementations we need to request an instance using the static factory methods exposed byExecutors
In general, the term "Thread Pool" is used to refer to the implementations of Executor
/ExecutorService
that we use to handle our threads. The most common types, which can be obtained invoking the static methods exposed by Executors
are:
- Single Thread Executor (
newSingleThreadExecutor
): it contains a single thread, and it's not widely used - Fixed Thread Pool (
newFixedThreadPool
): it contains a constant number of "alive" threads, which are waiting to receive tasks to run - Cached Thread Pool (
newCachedThreadPoool
): it maintains a thread pool that can grow or shrink depending on the demand - Scheduled Thread Pool (
newScheduledThreadPool
): it's used to scheduled the execution of tasks. It returns an instance of ScheduledExecutorService, given thatExecutorService
doesn't expose the appropriate methods to schedule future tasks, only to execute them as soon as possible
Ping Pong, version 5: Thread Pool
Without modifying the class Player
, we can adapt our class Game
to use a thread pool instead of taking charge itself of the tedious tasks of creating, starting and stopping the threads. Let's see how:
public class Game {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Player player1 = new Player("ping", lock);
Player player2 = new Player("pong", lock);
player1.setNextPlayer(player2);
player2.setNextPlayer(player1);
System.out.println("Game starting...!");
player1.setPlay(true);
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(player1);
executor.execute(player2);
sleep(2);
executor.shutdownNow();
System.out.println("Game finished!");
}
public static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
We're using a pool containing two threads (one per player), and we send to it the tasks to run. We sleep the main thread for 2ms, so both players can play for a while, and we invoke the method shutdownNow()
, which is equivalent to interrupting the threads as we did in previous versions, but encapsulating all the logic in the pool. It's necessary to invoke shutdownNow
instead of shutdown
, because the latter waits until the running tasks finish, returning a list with the pending tasks. Our players play indefinitely until they're interrupted, so if we try to finish with shutdown
, the application will never end!
Well, if we run this version several times, we'll see that it works as expected some times, while others it shows an output like this:
Game starting...!
ping
pong
//...
Game finished!
pong
What happened? After requesting both threads to interrupt, it's possible that the main thread finishes before the rest of the threads. This is why the text "Game finished!" appears before the last "pong" turn. Exploring the ExecutorService
API, we can see a method called awaitTermination
. This method blocks the thread who invokes it until all the tasks in the pool have finished, or until a timeout passed by parameter expires:
//...
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(player1);
executor.execute(player2);
sleep(2);
executor.shutdownNow();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("Main thread interrupted while waiting for players to finish");
}
System.out.println("Game finished!");
//...
We finally get the expected output, and the game behaves as expected, with a cleaner and more readable main class. Have we finished? Not yet!
Barriers
Entry / Exit barriers are synchronization mechanisms facilitating the simultaneous execution of a thread group (entry barrier), or the waiting until a whole thread group finalizes.
We have seen the idea of exit barrier before in this post, with awaitTermination
. This method allows us to create an exit barrier, but it forces us to set up a timeout (it can be hours, but it's a timeout!). We would like to create an exit barrier without a timeout.
In order to understand what an entry barrier is, we're going to add an instruction to Game
:
//...
executor.execute(player1);
sleep(1000);
executor.execute(player2);
//...
We send the main thread to sleep for a second before sending the second player to play. Even though the result is hard to reproduce here, because it's related with the timing of the application, something like this happens:
Game starting...!
ping
// Waiting 1 second
pong
That is, the player "ping" plays, but during a second there isn't anybody to play with! So the game is "suspended" for a second, which could be minutes (the time the main thread takes to start the second thread/player). This situation is not ideal, because we're starting a concurrent process requiring the presence of several threads before all of them are ready. To avoid this we need an entry barrier.
There are several classes in the Concurrency API that can be used as barriers, but the simplest one, and the one we'll use as entry and exit barrier is CountdownLatch
. The usage of this class can be summarized in three points:
- We create a barrier with a counter initialized to N
- The threads that must continue depending on the barrier will invoke
await()
, and will be blocked until the counter reaches zero. There's anawait
method with timeout too - The actors that can influence the opening of the barrier will invoke
countDown()
when necessary conditions are met. In general, N conditions must be met for the barrier to be opened
Version 6: Entry / Exit barriers
In this new version we'll have to modify both Game
and Player
. Let's see how they would look like:
public class Player implements Runnable {
private final String text;
private final Lock lock;
private final Condition myTurn;
private final CountDownLatch entryBarrier;
private final CountDownLatch exitBarrier;
private Condition nextTurn;
private Player nextPlayer;
private volatile boolean play = false;
public Player(String text,
Lock lock,
CountDownLatch entryBarrier,
CountDownLatch exitBarrier) {
this.text = text;
this.lock = lock;
this.myTurn = lock.newCondition();
this.entryBarrier = entryBarrier;
this.exitBarrier = exitBarrier;
}
@Override
public void run() {
if(entryBarrierOpen())
play();
}
public boolean entryBarrierOpen() {
try {
entryBarrier.await();
return true;
} catch (InterruptedException e) {
System.out.println("Player "+text+
" was interrupted before starting Game!");
return false;
}
}
private void play() {
while (!Thread.interrupted()) {
lock.lock();
try {
while (!play)
myTurn.awaitUninterruptibly();
System.out.println(text);
this.play = false;
nextPlayer.play = true;
nextTurn.signal();
} finally {
lock.unlock();
}
}
exitBarrier.countDown();
}
public void setNextPlayer(Player nextPlayer) {
this.nextPlayer = nextPlayer;
this.nextTurn = nextPlayer.myTurn;
}
public void setPlay(boolean play) {
this.play = play;
}
}
The class doesn't start until the entry barrier is open, and when it's interrupted to finish, it invokes countDown
on the exit barrier, which will be how Game
knows that both players are done.
Think for a second the values we have to initialize the counters in entryBarrier
and exitBarrier
before you keep reading...
public class Game {
public static void main(String[] args) {
CountDownLatch entryBarrier = new CountDownLatch(1);
CountDownLatch exitBarrier = new CountDownLatch(2);
Lock lock = new ReentrantLock();
Player player1 = new Player("ping", lock, entryBarrier, exitBarrier);
Player player2 = new Player("pong", lock, entryBarrier, exitBarrier);
player1.setNextPlayer(player2);
player2.setNextPlayer(player1);
System.out.println("Game starting...!");
player1.setPlay(true);
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(player1);
sleep(1000);
executor.execute(player2);
entryBarrier.countDown();
sleep(2);
executor.shutdownNow();
try {
exitBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Game finished!");
}
public static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Indeed, the entry barrier has a counter set to 1, because it will be opened as soon as the main thread has passed all the tasks to the thread pool, while the exit barrier, which here it's used as an alternative to awaitTermination
, has a counter set to 2, which is the number of actors that must finish the execution before the main thread can progress.
This way, the timing in our application is the desired one, although we have complicated the code a bit. The thing is that concurrency itself is quite complex, so it's hard to encapsulate perfectly all the mechanisms used.
Before finishing the post, I would like to mention that the exit barrier has been added to this version with didactic goals. The best mechanism to wait for the finalization of a group of threads is through awaitTermination
, using a reasonable timeout, so if we reach this timeout it will be because a failure has happened in one of the tasks we're waiting for its termination. I have added a version 7 to GitHub, which uses an entry barrier, and awaitTermination
as exit barrier. This version could be considered the optimal version of the application.
I hope this series of posts has been helpful to clarify many of the concepts used in concurrent applications implemented with Java. If you want to learn more, the best book ever bout this topic is Java Concurrency In Practice by Brian Goetz.
(All the code can be found in this GitHub repository)