CyclicBarrier à Java

1. Introduction

CyclicBarriers sont des constructions de synchronisation qui ont été introduites avec Java 5 dans le cadre du package java.util.concurrent .

Dans cet article, nous allons explorer cette implémentation dans un scénario de concurrence.

2. Concurrence Java - Synchroniseurs

Le package java.util.concurrent contient plusieurs classes qui aident à gérer un ensemble de threads qui collaborent les uns avec les autres. Certains d'entre eux incluent:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Échangeur
  • Sémaphore
  • SynchronousQueue

Ces classes offrent des fonctionnalités prêtes à l'emploi pour les modèles d'interaction courants entre les threads.

Si nous avons un ensemble de threads qui communiquent entre eux et ressemblent à l'un des modèles communs, nous pouvons simplement réutiliser les classes de bibliothèque appropriées (également appelées synchroniseurs ) au lieu d'essayer de créer un schéma personnalisé en utilisant un ensemble de verrous et de conditions objets et le mot-clé synchronisé .

Concentrons- nous sur le CyclicBarrier à l'avenir.

3. CyclicBarrier

Un CyclicBarrier est un synchroniseur qui permet à un ensemble de threads d'attendre les uns les autres pour atteindre un point d'exécution commun, également appelé barrière .

Les CyclicBarriers sont utilisés dans les programmes dans lesquels nous avons un nombre fixe de threads qui doivent attendre les uns les autres pour atteindre un point commun avant de continuer l'exécution.

La barrière est dite cyclique car elle peut être réutilisée après la libération des threads en attente.

4. Utilisation

Le constructeur d'un CyclicBarrier est simple. Il faut un seul entier qui indique le nombre de threads qui doivent appeler la méthode await () sur l'instance de barrière pour signifier atteindre le point d'exécution commun:

public CyclicBarrier(int parties)

Les threads qui ont besoin de synchroniser leur exécution sont également appelés parties et l'appel de la méthode await () est la façon dont nous pouvons enregistrer qu'un certain thread a atteint le point de barrière.

Cet appel est synchrone et le thread appelant cette méthode suspend l'exécution jusqu'à ce qu'un nombre spécifié de threads ait appelé la même méthode sur la barrière. Cette situation où le nombre requis de threads a appelé wait () est appelée déclenchement de la barrière .

Facultativement, nous pouvons passer le deuxième argument au constructeur, qui est une instance exécutable . Cela a une logique qui serait exécutée par le dernier thread qui déclenche la barrière:

public CyclicBarrier(int parties, Runnable barrierAction)

5. Mise en œuvre

Pour voir CyclicBarrier en action, considérons le scénario suivant:

Il y a une opération qu'un nombre fixe de threads effectue et stocke les résultats correspondants dans une liste. Lorsque tous les threads ont terminé d'exécuter leur action, l'un d'eux (généralement le dernier à franchir la barrière) commence à traiter les données récupérées par chacun d'eux.

Implémentons la classe principale où toute l'action se produit:

public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List
    
      partialResults = Collections.synchronizedList(new ArrayList()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
    

Cette classe est assez simple - NUM_WORKERS est le nombre de threads qui vont s'exécuter et NUM_PARTIAL_RESULTS est le nombre de résultats que chacun des threads de travail va produire.

Enfin, nous avons partialResults qui est une liste qui va stocker les résultats de chacun de ces threads de travail. Notez que cette liste est une SynchronizedList car plusieurs threads y écriront en même temps, et la méthode add () n'est pas thread-safe sur un ArrayList simple .

Maintenant, implémentons la logique de chacun des threads de travail:

public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List partialResult = new ArrayList(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }

Nous allons maintenant implémenter la logique qui s'exécute lorsque la barrière a été déclenchée.

Pour simplifier les choses, ajoutons simplement tous les nombres dans la liste des résultats partiels:

public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (List threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }

La dernière étape serait de construire le CyclicBarrier et de lancer les choses avec une méthode main () :

public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } } 

Dans le code ci-dessus, nous avons initialisé la barrière cyclique avec 5 threads qui produisent chacun 3 entiers dans le cadre de leur calcul et stockent les mêmes dans la liste résultante.

Une fois que la barrière est déclenchée, le dernier thread qui a déclenché la barrière exécute la logique spécifiée dans AggregatorThread, à savoir - ajouter tous les nombres produits par les threads.

6. Résultats

Voici la sortie d'une exécution du programme ci-dessus - chaque exécution peut créer des résultats différents car les threads peuvent être générés dans un ordre différent:

Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46 

Comme le montre la sortie ci-dessus, le thread 4 est celui qui déclenche la barrière et exécute également la logique d'agrégation finale. Il n'est pas non plus nécessaire que les threads soient réellement exécutés dans l'ordre dans lequel ils ont été démarrés, comme le montre l'exemple ci-dessus.

7. Conclusion

Dans cet article, nous avons vu ce qu'est un CyclicBarrier et dans quels types de situations il est utile.

Nous avons également implémenté un scénario où nous avions besoin d'un nombre fixe de threads pour atteindre un point d'exécution fixe, avant de continuer avec une autre logique de programme.

Comme toujours, le code du didacticiel se trouve à l'adresse over sur GitHub.