Introduction aux pools de threads en Java

1. Introduction

Cet article est un aperçu des pools de threads en Java - en commençant par les différentes implémentations de la bibliothèque Java standard, puis en examinant la bibliothèque Guava de Google.

2. Le pool de threads

En Java, les threads sont mappés sur des threads de niveau système qui sont des ressources du système d'exploitation. Si vous créez des threads de manière incontrôlable, vous risquez de manquer rapidement de ces ressources.

Le changement de contexte entre les threads est également effectué par le système d'exploitation - afin d'émuler le parallélisme. Une vue simpliste est la suivante: plus vous créez de threads, moins chaque thread passe de temps à effectuer un travail réel.

Le modèle Thread Pool permet d'économiser des ressources dans une application multithread et également de contenir le parallélisme dans certaines limites prédéfinies.

Lorsque vous utilisez un pool de threads, vous écrivez votre code simultané sous la forme de tâches parallèles et vous les soumettez pour exécution à une instance d'un pool de threads . Cette instance contrôle plusieurs threads réutilisés pour exécuter ces tâches.

Le modèle vous permet de contrôler le nombre de threads que l'application crée , leur cycle de vie, ainsi que de planifier l'exécution des tâches et de conserver les tâches entrantes dans une file d'attente.

3. Pools de threads en Java

3.1. Executors , Executor et ExecutorService

La classe d'assistance Executors contient plusieurs méthodes pour la création d'instances de pool de threads préconfigurées pour vous. Ces classes sont un bon point de départ - utilisez-les si vous n'avez pas besoin d'appliquer de réglages personnalisés.

Les interfaces Executor et ExecutorService sont utilisées pour travailler avec différentes implémentations de pool de threads en Java. En général, vous devez garder votre code découplé de l'implémentation réelle du pool de threads et utiliser ces interfaces dans toute votre application.

L' interface Executor a une seule méthode d' exécution pour soumettre des instances exécutables pour exécution.

Voici un exemple rapide de la façon dont vous pouvez utiliser l' API Executors pour acquérir une instance Executor sauvegardée par un pool de threads unique et une file d'attente illimitée pour exécuter des tâches de manière séquentielle. Ici, nous exécutons une seule tâche qui imprime simplement « Hello World » à l'écran. La tâche est soumise en tant que lambda (une fonctionnalité Java 8) qui est supposée être exécutable .

Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));

L' interface ExecutorService contient un grand nombre de méthodes de contrôle de la progression des tâches et de gestion de l'arrêt du service . À l'aide de cette interface, vous pouvez soumettre les tâches pour exécution et contrôler leur exécution à l'aide de l' instance Future renvoyée .

Dans l'exemple suivant , nous créons un ExecutorService , soumettre une tâche, puis utiliser le retour futur de » get méthode d'attendre jusqu'à ce que la tâche soumise est terminée et la valeur est retournée:

ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();

Bien sûr, dans un scénario réel, vous ne voulez généralement pas appeler future.get () tout de suite mais différer de l'appeler jusqu'à ce que vous ayez réellement besoin de la valeur du calcul.

La méthode submit est surchargée pour prendre Runnable ou Callable, qui sont toutes deux des interfaces fonctionnelles et peuvent être passées en tant que lambdas (à partir de Java 8).

La méthode unique de Runnable ne lève pas d'exception et ne renvoie pas de valeur. L' interface Callable peut être plus pratique, car elle nous permet de lever une exception et de renvoyer une valeur.

Enfin - pour permettre au compilateur de déduire le type Callable , renvoyez simplement une valeur à partir du lambda.

Pour plus d'exemples d'utilisation de l' interface et des futures ExecutorService , consultez «Un guide sur Java ExecutorService».

3.2. ThreadPoolExecutor

Le ThreadPoolExecutor est une implémentation de pool de threads extensible avec beaucoup de paramètres et crochets pour réglage fin.

Les principaux paramètres de configuration que nous allons discuter ici sont: corePoolSize , maximumPoolSize et KeepAliveTime .

Le pool se compose d'un nombre fixe de threads principaux qui sont conservés à l'intérieur tout le temps, et de quelques threads excessifs qui peuvent être générés puis arrêtés lorsqu'ils ne sont plus nécessaires. Le paramètre corePoolSize est le nombre de threads principaux qui seront instanciés et conservés dans le pool. Lorsqu'une nouvelle tâche arrive, si tous les threads principaux sont occupés et que la file d'attente interne est pleine, alors le pool est autorisé à croître jusqu'à maximumPoolSize .

Le paramètre keepAliveTime est l'intervalle de temps pendant lequel les threads excessifs (instanciés au-delà de corePoolSize ) sont autorisés à exister à l'état inactif. Par défaut, ThreadPoolExecutor ne considère que les threads non principaux pour la suppression. Afin d'appliquer la même politique de suppression aux threads principaux, nous pouvons utiliser la méthode allowCoreThreadTimeOut (true) .

Ces paramètres couvrent un large éventail de cas d'utilisation, mais les configurations les plus courantes sont prédéfinies dans les méthodes statiques Executors .

Par exemple , newFixedThreadPool méthode crée un ThreadPoolExecutor avec une égale corePoolSize et maximumPoolSize valeurs des paramètres et un zéro KeepAliveTime. Cela signifie que le nombre de threads dans ce pool de threads est toujours le même:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());

Dans l'exemple ci-dessus, nous instancions un ThreadPoolExecutor avec un nombre de threads fixe de 2. Cela signifie que si le nombre de tâches simultanément en cours d'exécution est inférieur ou égal à deux à tout moment, elles sont exécutées immédiatement. Sinon, certaines de ces tâches peuvent être placées dans une file d'attente pour attendre leur tour .

Nous avons créé trois tâches appelables qui imitent le travail lourd en dormant pendant 1000 millisecondes. Les deux premières tâches seront exécutées en même temps et la troisième devra attendre dans la file d'attente. Nous pouvons le vérifier en appelant les méthodes getPoolSize () et getQueue (). Size () immédiatement après avoir soumis les tâches.

Un autre ThreadPoolExecutor préconfiguré peut être créé avec la méthode Executors.newCachedThreadPool () . Cette méthode ne reçoit pas du tout un certain nombre de threads. Le corePoolSize est en fait défini sur 0 et le maximumPoolSize est défini sur Integer.MAX_VALUE pour cette instance. Le keepAliveTime est de 60 secondes pour celui-ci.

Ces valeurs de paramètre signifient que le pool de threads mis en cache peut croître sans limites pour s'adapter à n'importe quel nombre de tâches soumises . Mais lorsque les threads ne sont plus nécessaires, ils seront éliminés après 60 secondes d'inactivité. Un cas d'utilisation typique est celui où vous avez beaucoup de tâches de courte durée dans votre application.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(3, executor.getPoolSize()); assertEquals(0, executor.getQueue().size());

La taille de la file d'attente dans l'exemple ci-dessus sera toujours égale à zéro car une instance SynchronousQueue est utilisée en interne . Dans une SynchronousQueue , des paires d' opérations d' insertion et de suppression se produisent toujours simultanément, de sorte que la file d'attente ne contient jamais rien.

The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is zero.

Tasks in the above example will be executed sequentially, so the flag value will be 2 after the task's completion:

AtomicInteger counter = new AtomicInteger(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1); }); executor.submit(() -> { counter.compareAndSet(1, 2); });

Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it cannot be reconfigured after creation. Note that also this is the reason we cannot cast it to a ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:

  • schedule method allows to execute a task once after a specified delay;
  • scheduleAtFixedRate method allows to execute a task after a specified initial delay and then execute it repeatedly with a certain period; the period argument is the time measured between the starting times of the tasks, so the execution rate is fixed;
  • scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly executes the given task, but the specified delay is measured between the end of the previous task and the start of the next; the execution rate may vary depending on the time it takes to execute any given task.

The Executors.newScheduledThreadPool() method is typically used to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime. Here's how to schedule a task for execution in 500 milliseconds:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);

The following code shows how to execute a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock, then cancel it using the Future.cancel() method.

CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. Using a simple ThreadPoolExecutor, you will run out of threads quickly, as every task or subtask requires its own thread to run.

In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, implementing the Work Stealing algorithm instead. This framework is thoroughly described in the article “Guide to the Fork/Join Framework in Java”

Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:

static class TreeNode { int value; Set children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); } }

Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:

  • streams the children set,
  • maps over this stream, creating a new CountingTask for each element,
  • executes each subtask by forking it,
  • collects the results by calling the join method on each forked task,
  • sums the results using the Collectors.summingInt collector.
public static class CountingTask extends RecursiveTask { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); } }

The code to run the calculation on an actual tree is very simple:

TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Thread Pool's Implementation in Guava

Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.

4.1. Adding Guava as a Maven Dependency

Add the following dependency to your Maven pom file to include the Guava library to your project. You can find the latest version of Guava library in the Maven Central repository:

 com.google.guava guava 19.0 

4.2. Direct Executor and Direct Executor Service

Sometimes you want to execute the task either in the current thread or in a thread pool, depending on some conditions. You would prefer to use a single Executor interface and just switch the implementation. Although it is not so hard to come up with an implementation of Executor or ExecutorService that executes the tasks in the current thread, it still requires writing some boilerplate code.

Gladly, Guava provides predefined instances for us.

Here's an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:

Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true); }); assertTrue(executed.get());

The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.

You should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.

4.3. Exiting Executor Services

Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.

To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.

These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.

In the following example, we're submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination. Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true) { } });

4.4. Listening Decorators

Listening decorators allow you to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.

Vous voudrez rarement utiliser directement la méthode ListenableFuture.addListener () , mais elle est essentielle pour la plupart des méthodes d'assistance de la classe utilitaire Futures . Par exemple, avec la méthode Futures.allAsList () , vous pouvez combiner plusieurs instances ListenableFuture dans un seul ListenableFuture qui se termine après la réussite de tous les futurs combinés:

ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit(() -> "World"); String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(" ")); assertEquals("Hello World", greeting);

5. Conclusion

Dans cet article, nous avons discuté du modèle Thread Pool et de ses implémentations dans la bibliothèque Java standard et dans la bibliothèque Guava de Google.

Le code source de l'article est disponible à l'adresse over sur GitHub.