Vue d'ensemble de java.util.concurrent

1. Vue d'ensemble

Le package java.util.concurrent fournit des outils pour créer des applications simultanées.

Dans cet article, nous ferons un aperçu de l'ensemble du package.

2. Principaux composants

Le java.util.concurrent contient beaucoup trop de fonctionnalités à discuter en une seule rédaction. Dans cet article, nous nous concentrerons principalement sur certains des utilitaires les plus utiles de ce package, tels que:

  • Exécuteur
  • ExecutorService
  • ScheduledExecutorService
  • Avenir
  • CountDownLatch
  • CyclicBarrier
  • Sémaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Serrures
  • Phaser

Vous pouvez également trouver de nombreux articles dédiés aux cours individuels ici.

2.1. Exécuteur

Executor est une interface qui représente un objet qui exécute les tâches fournies.

Cela dépend de l'implémentation particulière (à partir de laquelle l'appel est lancé) si la tâche doit être exécutée sur un thread nouveau ou actuel. Par conséquent, en utilisant cette interface, nous pouvons découpler le flux d'exécution de tâche du mécanisme d'exécution de tâche réel.

Un point à noter ici est que Executor n'exige pas strictement que l'exécution de la tâche soit asynchrone. Dans le cas le plus simple, un exécuteur peut appeler instantanément la tâche soumise dans le thread appelant.

Nous devons créer un invocateur pour créer l'instance de l'exécuteur:

public class Invoker implements Executor { @Override public void execute(Runnable r) { r.run(); } }

Maintenant, nous pouvons utiliser cet invocateur pour exécuter la tâche.

public void execute() { Executor executor = new Invoker(); executor.execute( () -> { // task to be performed }); }

Il faut noter ici que si l'exécuteur ne peut pas accepter la tâche pour exécution, il lèvera RejectedExecutionException .

2.2. ExecutorService

ExecutorService est une solution complète pour le traitement asynchrone. Il gère une file d'attente en mémoire et planifie les tâches soumises en fonction de la disponibilité des threads.

Pour utiliser ExecutorService, nous devons créer une classe Runnable .

public class Task implements Runnable { @Override public void run() { // task details } }

Nous pouvons maintenant créer l' instance ExecutorService et attribuer cette tâche. Au moment de la création, nous devons spécifier la taille du pool de threads.

ExecutorService executor = Executors.newFixedThreadPool(10);

Si nous voulons créer une instance ExecutorService à un seul thread , nous pouvons utiliser newSingleThreadExecutor (ThreadFactory threadFactory) pour créer l'instance.

Une fois l'exécuteur créé, nous pouvons l'utiliser pour soumettre la tâche.

public void execute() { executor.submit(new Task()); }

Nous pouvons également créer l' instance Runnable lors de la soumission de la tâche.

executor.submit(() -> { new Task(); });

Il est également livré avec deux méthodes de terminaison d'exécution prêtes à l'emploi. Le premier est shutdown () ; il attend que toutes les tâches soumises soient exécutées. L'autre méthode est shutdownNow () whic h termine immédiatement toutes les tâches en attente d' exécution /.

Il existe également une autre méthode awaitTermination (long timeout, unité TimeUnit) qui bloque de force jusqu'à ce que toutes les tâches aient terminé l'exécution après un événement d'arrêt déclenché ou un délai d'exécution s'est produit, ou le thread d'exécution lui-même est interrompu

try { executor.awaitTermination( 20l, TimeUnit.NANOSECONDS ); } catch (InterruptedException e) { e.printStackTrace(); }

2.3. ScheduledExecutorService

ScheduledExecutorService est une interface similaire à ExecutorService, mais il peut effectuer des tâches périodiquement.

Les méthodes Executor et ExecutorService sont planifiées sur place sans introduire de retard artificiel. Zéro ou toute valeur négative signifie que la demande doit être exécutée instantanément.

Nous pouvons utiliser les interfaces Runnable et Callable pour définir la tâche.

public void execute() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); Future future = executorService.schedule(() -> { // ... return "Hello world"; }, 1, TimeUnit.SECONDS); ScheduledFuture scheduledFuture = executorService.schedule(() -> { // ... }, 1, TimeUnit.SECONDS); executorService.shutdown(); }

ScheduledExecutorService peut également planifier la tâche après un délai fixe donné :

executorService.scheduleAtFixedRate(() -> { // ... }, 1, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay(() -> { // ... }, 1, 10, TimeUnit.SECONDS);

Ici, la méthode scheduleAtFixedRate (commande Runnable, long initialDelay, longue période, unité TimeUnit) crée et exécute une action périodique qui est appelée d'abord après le délai initial fourni, puis avec la période donnée jusqu'à l'arrêt de l'instance de service.

La méthode scheduleWithFixedDelay (commande Runnable, long initialDelay, long delay, TimeUnit unit) crée et exécute une action périodique qui est appelée d'abord après le délai initial fourni, et de manière répétée avec le délai donné entre la fin de l'exécution et l'appel du le prochain.

2.4. Avenir

Future est utilisé pour représenter le résultat d'une opération asynchrone. Il est livré avec des méthodes pour vérifier si l'opération asynchrone est terminée ou non, obtenir le résultat calculé, etc.

What's more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly.

Otherwise, in-progress tasks will be allowed to complete.

We can use below code snippet to create a future instance:

public void invoke() { ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> { // ... Thread.sleep(10000l); return "Hello world"; }); }

We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:

if (future.isDone() && !future.isCancelled()) { try { str = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }

We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:

try { future.get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }

2.5. CountDownLatch

CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operation completes.

A CountDownLatch is initialized with a counter(Integer type); this counter decrements as the dependent threads complete execution. But once the counter reaches zero, other threads get released.

You can learn more about CountDownLatch here.

2.6. CyclicBarrier

CyclicBarrier works almost the same as CountDownLatch except that we can reuse it. Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method(known as barrier condition) before invoking the final task.

We need to create a Runnable task instance to initiate the barrier condition:

public class Task implements Runnable { private CyclicBarrier barrier; public Task(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { LOG.info(Thread.currentThread().getName() + " is waiting"); barrier.await(); LOG.info(Thread.currentThread().getName() + " is released"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }

Now we can invoke some threads to race for the barrier condition:

public void start() { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { // ... LOG.info("All previous tasks are completed"); }); Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); if (!cyclicBarrier.isBroken()) { t1.start(); t2.start(); t3.start(); } }

Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.

2.7. Semaphore

The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.

If a permit is not available (via tryAcquire()), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.

Once the executing thread releases the critical section, again the permit counter increases (done by release() method).

We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.

We can also check the number of available permits or the number of threads waiting to acquire the semaphore.

Following code snippet can be used to implement a semaphore:

static Semaphore semaphore = new Semaphore(10); public void execute() throws InterruptedException { LOG.info("Available permit : " + semaphore.availablePermits()); LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength()); if (semaphore.tryAcquire()) { try { // ... } finally { semaphore.release(); } } }

We can implement a Mutex like data-structure using Semaphore. More details on this can be found here.

2.8. ThreadFactory

As the name suggests, ThreadFactory acts as a thread (non-existing) pool which creates a new thread on demand. It eliminates the need of a lot of boilerplate coding for implementing efficient thread creation mechanisms.

We can define a ThreadFactory:

public class BaeldungThreadFactory implements ThreadFactory { private int threadId; private String name; public BaeldungThreadFactory(String name) { threadId = 1; this.name = name; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, name + "-Thread_" + threadId); LOG.info("created new thread with id : " + threadId + " and name : " + t.getName()); threadId++; return t; } }

We can use this newThread(Runnable r) method to create a new thread at runtime:

BaeldungThreadFactory factory = new BaeldungThreadFactory( "BaeldungThreadFactory"); for (int i = 0; i < 10; i++) { Thread t = factory.newThread(new Task()); t.start(); }

2.9. BlockingQueue

In asynchronous programming, one of the most common integration patterns is the producer-consumer pattern. The java.util.concurrent package comes with a data-structure know as BlockingQueue – which can be very useful in these async scenarios.

More information and a working example on this is available here.

2.10. DelayQueue

DelayQueue is an infinite-size blocking queue of elements where an element can only be pulled if it's expiration time (known as user defined delay) is completed. Hence, the topmost element (head) will have the most amount delay and it will be polled last.

More information and a working example on this is available here.

2.11. Locks

Not surprisingly, Lock is a utility for blocking other threads from accessing a certain segment of code, apart from the thread that's executing it currently.

The main difference between a Lock and a Synchronized block is that synchronized block is fully contained in a method; however, we can have Lock API’s lock() and unlock() operation in separate methods.

More information and a working example on this is available here.

2.12. Phaser

Phaser est une solution plus flexible que CyclicBarrier et CountDownLatch - utilisée pour agir comme une barrière réutilisable sur laquelle le nombre dynamique de threads doit attendre avant de continuer l'exécution. Nous pouvons coordonner plusieurs phases d'exécution, en réutilisant une instance Phaser pour chaque phase du programme.

Plus d'informations et un exemple de travail à ce sujet sont disponibles ici.

3. Conclusion

Dans cet article de présentation de haut niveau, nous nous sommes concentrés sur les différents utilitaires disponibles du package java.util.concurrent .

Comme toujours, le code source complet est disponible à l'adresse over sur GitHub.