Guide du Java Phaser

1. Vue d'ensemble

Dans cet article, nous examinerons la construction Phaser du package java.util.concurrent . C'est une construction très similaire à CountDownLatch qui nous permet de coordonner l'exécution des threads. En comparaison avec CountDownLatch , il possède des fonctionnalités supplémentaires.

Le Phaser est une barrière sur laquelle le nombre dynamique de threads doit attendre avant de continuer l'exécution. Dans CountDownLatch, ce numéro ne peut pas être configuré dynamiquement et doit être fourni lors de la création de l'instance.

2. API Phaser

Le Phaser nous permet de construire une logique dans laquelle les threads doivent attendre sur la barrière avant de passer à l'étape suivante de l'exécution .

Nous pouvons coordonner plusieurs phases d'exécution, en réutilisant une instance Phaser pour chaque phase du programme. Chaque phase peut avoir un nombre différent de threads en attente de passer à une autre phase. Nous examinerons un exemple d'utilisation des phases plus tard.

Pour participer à la coordination, le thread doit s'inscrire () lui-même auprès de l' instance Phaser . Notez que cela ne fait qu'augmenter le nombre de parties enregistrées, et nous ne pouvons pas vérifier si le thread actuel est enregistré - nous devrions sous-classer l'implémentation pour prendre en charge cela.

Le thread signale qu'il est arrivé à la barrière en appelant arriveAndAwaitAdvance () , qui est une méthode de blocage. Lorsque le nombre de partis arrivés est égal au nombre de partis enregistrés, l'exécution du programme se poursuit et le nombre de phase augmente. Nous pouvons obtenir le numéro de phase actuel en appelant la méthode getPhase () .

Lorsque le thread termine son travail, nous devons appeler la méthode arriveAndDeregister () pour signaler que le thread actuel ne doit plus être pris en compte dans cette phase particulière.

3. Implémentation de la logique à l'aide de l' API Phaser

Disons que nous voulons coordonner plusieurs phases d'actions. Trois threads traiteront la première phase et deux threads traiteront la deuxième phase.

Nous allons créer une classe LongRunningAction qui implémente l' interface Runnable :

class LongRunningAction implements Runnable { private String threadName; private Phaser ph; LongRunningAction(String threadName, Phaser ph) { this.threadName = threadName; this.ph = ph; ph.register(); } @Override public void run() { ph.arriveAndAwaitAdvance(); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } ph.arriveAndDeregister(); } }

Lorsque notre classe d'action est instanciée, nous nous enregistrons dans l' instance Phaser à l'aide de la méthode register () . Cela augmentera le nombre de threads utilisant ce Phaser spécifique .

L'appel à arriveAndAwaitAdvance () provoquera l'attente du thread actuel sur la barrière. Comme déjà mentionné, lorsque le nombre de partis arrivés devient égal au nombre de partis enregistrés, l'exécution se poursuit.

Une fois le traitement terminé, le thread actuel se désenregistre lui-même en appelant la méthode arriveAndDeregister () .

Créons un cas de test dans lequel nous allons démarrer trois threads LongRunningAction et bloquer sur la barrière. Ensuite, une fois l'action terminée, nous créerons deux threads LongRunningAction supplémentaires qui effectueront le traitement de la phase suivante.

Lors de la création d'une instance Phaser à partir du thread principal, nous transmettons 1 comme argument. Cela équivaut à appeler la méthode register () à partir du thread actuel. Nous faisons cela parce que, lorsque nous créons trois threads de travail, le thread principal est un coordinateur et, par conséquent, le Phaser doit avoir quatre threads enregistrés:

ExecutorService executorService = Executors.newCachedThreadPool(); Phaser ph = new Phaser(1); assertEquals(0, ph.getPhase());

La phase après l'initialisation est égale à zéro.

La classe Phaser a un constructeur dans lequel nous pouvons lui passer une instance parent. Il est utile dans les cas où nous avons un grand nombre de parties qui subiraient des coûts de contention de synchronisation massifs. Dans de telles situations, des instances de phaseurs peuvent être configurées de sorte que des groupes de sous-phaseurs partagent un parent commun.

Ensuite, commençons trois threads d' action LongRunningAction , qui attendront sur la barrière jusqu'à ce que nous appelons la méthode arriveAndAwaitAdvance () à partir du thread principal.

Gardez à l'esprit que nous avons initialisé notre Phaser avec 1 et appelé register () trois fois de plus. Maintenant, trois threads d'action ont annoncé qu'ils étaient arrivés à la barrière, donc un autre appel de arriveAndAwaitAdvance () est nécessaire - celui du thread principal:

executorService.submit(new LongRunningAction("thread-1", ph)); executorService.submit(new LongRunningAction("thread-2", ph)); executorService.submit(new LongRunningAction("thread-3", ph)); ph.arriveAndAwaitAdvance(); assertEquals(1, ph.getPhase());

Une fois cette phase terminée, la méthode getPhase () en retournera un car le programme a terminé de traiter la première étape de l'exécution.

Disons que deux threads doivent effectuer la prochaine phase de traitement. Nous pouvons tirer parti de Phaser pour y parvenir, car il nous permet de configurer dynamiquement le nombre de threads qui doivent attendre sur la barrière. Nous démarrons deux nouveaux threads, mais ceux-ci ne continueront pas à s'exécuter jusqu'à l'appel à arriveAndAwaitAdvance () à partir du thread principal (comme dans le cas précédent):

executorService.submit(new LongRunningAction("thread-4", ph)); executorService.submit(new LongRunningAction("thread-5", ph)); ph.arriveAndAwaitAdvance(); assertEquals(2, ph.getPhase()); ph.arriveAndDeregister();

Après cela, la méthode getPhase () retournera un numéro de phase égal à deux. Lorsque nous voulons terminer notre programme, nous devons appeler la méthode arriveAndDeregister () car le thread principal est toujours enregistré dans le Phaser. Lorsque l'annulation de l'enregistrement entraîne la perte de zéro du nombre de partis enregistrés, le Phaser est interrompu. Tous les appels aux méthodes de synchronisation ne seront plus bloqués et reviendront immédiatement.

L'exécution du programme produira la sortie suivante (le code source complet avec les instructions de la ligne d'impression peut être trouvé dans le référentiel de code):

This is phase 0 This is phase 0 This is phase 0 Thread thread-2 before long running action Thread thread-1 before long running action Thread thread-3 before long running action This is phase 1 This is phase 1 Thread thread-4 before long running action Thread thread-5 before long running action

Nous voyons que tous les threads attendent l'exécution jusqu'à ce que la barrière s'ouvre. La phase suivante de l'exécution n'est exécutée que lorsque la précédente s'est terminée avec succès.

4. Conclusion

Dans ce didacticiel, nous avons examiné la construction Phaser de java.util.concurrent et nous avons implémenté la logique de coordination avec plusieurs phases à l'aide de la classe Phaser .

L'implémentation de tous ces exemples et extraits de code se trouve dans le projet GitHub - il s'agit d'un projet Maven, il devrait donc être facile à importer et à exécuter tel quel.