Introduction à Exchanger en Java

1. Vue d'ensemble

Dans ce didacticiel, nous examinerons java.util.concurrent.Exchanger. Cela fonctionne comme un point commun pour deux threads en Java pour échanger des objets entre eux.

2. Introduction à l'échangeur

L' échangeur de classe en Java peut être utilisé pour partager des objets entre deux fils de type T . La classe ne fournit qu'un seul échange de méthode surchargé (T t) .

Lorsqu'il est appelé, l' échange attend que l'autre thread de la paire l'appelle également. À ce stade, le deuxième thread trouve que le premier thread est en attente avec son objet. Le thread échange les objets qu'ils tiennent et signale l'échange, et maintenant ils peuvent revenir.

Examinons un exemple pour comprendre l'échange de messages entre deux threads avec Exchanger :

@Test public void givenThreads_whenMessageExchanged_thenCorrect() { Exchanger exchanger = new Exchanger(); Runnable taskA = () -> { try { String message = exchanger.exchange("from A"); assertEquals("from B", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; Runnable taskB = () -> { try { String message = exchanger.exchange("from B"); assertEquals("from A", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture.allOf( runAsync(taskA), runAsync(taskB)).join(); }

Ici, nous avons les deux threads échangeant des messages entre eux en utilisant l'échangeur commun. Voyons un exemple où nous échangeons un objet du thread principal avec un nouveau thread:

@Test public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException { Exchanger exchanger = new Exchanger(); Runnable runner = () -> { try { String message = exchanger.exchange("from runner"); assertEquals("to runner", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture result = CompletableFuture.runAsync(runner); String msg = exchanger.exchange("to runner"); assertEquals("from runner", msg); result.join(); }

Notez que nous devons d'abord démarrer le thread du runner , puis appeler exchange () dans le thread principal.

Notez également que l'appel du premier thread peut expirer si le second thread n'atteint pas le point d'échange dans le temps. La durée d'attente du premier thread peut être contrôlée à l'aide de l' échange surchargé (T t, long timeout, TimeUnit timeUnit).

3. Aucun échange de données GC

L'échangeur peut être utilisé pour créer des modèles de type pipeline avec le passage de données d'un thread à l'autre. Dans cette section, nous allons créer une simple pile de threads qui passent continuellement des données entre eux sous forme de pipeline.

@Test public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException { Exchanger
    
      readerExchanger = new Exchanger(); Exchanger
     
       writerExchanger = new Exchanger(); Runnable reader = () -> { Queue readerBuffer = new ConcurrentLinkedQueue(); while (true) { readerBuffer.add(UUID.randomUUID().toString()); if (readerBuffer.size() >= BUFFER_SIZE) { readerBuffer = readerExchanger.exchange(readerBuffer); } } }; Runnable processor = () -> { Queue processorBuffer = new ConcurrentLinkedQueue(); Queue writerBuffer = new ConcurrentLinkedQueue(); processorBuffer = readerExchanger.exchange(processorBuffer); while (true) { writerBuffer.add(processorBuffer.poll()); if (processorBuffer.isEmpty()) { processorBuffer = readerExchanger.exchange(processorBuffer); writerBuffer = writerExchanger.exchange(writerBuffer); } } }; Runnable writer = () -> { Queue writerBuffer = new ConcurrentLinkedQueue(); writerBuffer = writerExchanger.exchange(writerBuffer); while (true) { System.out.println(writerBuffer.poll()); if (writerBuffer.isEmpty()) { writerBuffer = writerExchanger.exchange(writerBuffer); } } }; CompletableFuture.allOf( runAsync(reader), runAsync(processor), runAsync(writer)).join(); }
     
    

Ici, nous avons trois threads: lecteur , processeur et écrivain . Ensemble, ils fonctionnent comme un seul pipeline échangeant des données entre eux.

Le readerExchanger est partagé entre le lecteur et le thread du processeur , tandis que le writerExchanger est partagé entre le processeur et le thread du rédacteur .

Notez que l'exemple ici est uniquement pour la démonstration. Il faut être prudent lors de la création de boucles infinies avec while (true) . Aussi pour garder le code lisible, nous avons omis la gestion des exceptions.

Ce modèle d'échange de données lors de la réutilisation du tampon permet d'avoir moins de garbage collection. La méthode d'échange renvoie les mêmes instances de file d'attente et il n'y aurait donc pas de GC pour ces objets. Contrairement à toute file d'attente de blocage, l'échangeur ne crée aucun nœud ou objet pour stocker et partager des données.

La création d'un tel pipeline est similaire au modèle Disrupter, avec une différence essentielle, le modèle Disrupter prend en charge plusieurs producteurs et consommateurs, tandis qu'un échangeur pourrait être utilisé entre une paire de consommateurs et de producteurs.

4. Conclusion

Nous avons donc appris ce qu'est Exchanger en Java, comment cela fonctionne et nous avons vu comment utiliser la classe Exchanger . Nous avons également créé un pipeline et démontré l'échange de données sans GC entre les threads.

Comme toujours, le code est disponible sur sur GitHub.