Guide de CountDownLatch en Java

1. Introduction

Dans cet article, nous allons donner un guide sur la classe CountDownLatch et montrer comment elle peut être utilisée dans quelques exemples pratiques.

Essentiellement, en utilisant un CountDownLatch, nous pouvons provoquer le blocage d'un thread jusqu'à ce que d'autres threads aient terminé une tâche donnée.

2. Utilisation dans la programmation simultanée

En termes simples, un CountDownLatch a un champ de compteur , que vous pouvez décrémenter selon vos besoins. Nous pouvons ensuite l'utiliser pour bloquer un thread appelant jusqu'à ce qu'il soit compté à zéro.

Si nous faisions un traitement parallèle, nous pourrions instancier le CountDownLatch avec la même valeur pour le compteur qu'un certain nombre de threads sur lesquels nous voulons travailler. Ensuite, nous pourrions simplement appeler countdown () après la fin de chaque thread, garantissant qu'un thread dépendant appelant await () se bloquera jusqu'à ce que les threads de travail soient terminés.

3. En attente de la fin d'un pool de threads

Essayons ce modèle en créant un Worker et en utilisant un champ CountDownLatch pour signaler qu'il est terminé:

public class Worker implements Runnable { private List outputScraper; private CountDownLatch countDownLatch; public Worker(List outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } }

Ensuite, créons un test afin de prouver que nous pouvons obtenir un CountDownLatch pour attendre que les instances Worker se terminent:

@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); }

Naturellement, «Latch relâché» sera toujours la dernière sortie - car il dépend de la libération de CountDownLatch .

Notez que si nous n'appelions pas await () , nous ne serions pas en mesure de garantir l'ordre d'exécution des threads, donc le test échouerait de manière aléatoire.

4. Un pool de threads en attente de début

Si nous avons pris l'exemple précédent, mais que cette fois a démarré des milliers de threads au lieu de cinq, il est probable que beaucoup des premiers auront fini de traiter avant même d'avoir appelé start () sur les derniers. Cela pourrait rendre difficile d'essayer de reproduire un problème de concurrence, car nous ne pourrions pas faire fonctionner tous nos threads en parallèle.

Pour contourner ce problème, faisons fonctionner le CountdownLatch différemment de l'exemple précédent. Au lieu de bloquer un thread parent jusqu'à ce que certains threads enfants soient terminés, nous pouvons bloquer chaque thread enfant jusqu'à ce que tous les autres aient démarré.

Modifions notre méthode run () pour qu'elle se bloque avant le traitement:

public class WaitingWorker implements Runnable { private List outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } }

Maintenant, modifions notre test pour qu'il bloque jusqu'à ce que tous les ouvriers aient commencé, débloque les ouvriers, puis bloque jusqu'à ce que les ouvriers aient fini:

@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); }

Ce modèle est vraiment utile pour essayer de reproduire des bogues de concurrence, car il peut être utilisé pour forcer des milliers de threads à essayer d'exécuter une logique en parallèle.

5. Mettre fin tôt à un compte à rebours

Parfois, nous pouvons rencontrer une situation où les travailleurs se terminent par erreur avant de décompter le CountDownLatch. Cela pourrait entraîner qu'il n'atteigne jamais zéro et wait () ne se termine jamais:

@Override public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); }

Modifions notre test précédent pour utiliser un BrokenWorker, afin de montrer comment await () bloquera pour toujours:

@Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); }

De toute évidence, ce n'est pas le comportement que nous souhaitons - il serait bien préférable que l'application continue plutôt que de bloquer à l'infini.

Pour contourner cela, ajoutons un argument timeout à notre appel à await ().

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse();

Comme nous pouvons le voir, le test finira par expirer et await () retournera false .

6. Conclusion

Dans ce guide rapide, nous avons montré comment nous pouvons utiliser un CountDownLatch pour bloquer un thread jusqu'à ce que d'autres threads aient terminé certains traitements.

Nous avons également montré comment il peut être utilisé pour aider à déboguer les problèmes de concurrence en s'assurant que les threads s'exécutent en parallèle.

L'implémentation de ces exemples peut être trouvée sur sur GitHub; il s'agit d'un projet basé sur Maven, il devrait donc être facile à exécuter tel quel.