Guide de Java SynchronousQueue

1. Vue d'ensemble

Dans cet article, nous examinerons le SynchronousQueue du package java.util.concurrent .

En termes simples, cette implémentation nous permet d'échanger des informations entre les threads de manière thread-safe.

2. Présentation de l'API

Le SynchronousQueue a seulement deux opérations soutenues: prendre () et mis (), et les deux sont le blocage .

Par exemple, lorsque nous voulons ajouter un élément à la file d'attente, nous devons appeler la méthode put () . Cette méthode se bloquera jusqu'à ce qu'un autre thread appelle la méthode take () , signalant qu'il est prêt à prendre un élément.

Bien que SynchronousQueue ait une interface de file d'attente, nous devrions le considérer comme un point d'échange pour un seul élément entre deux threads, dans lequel un thread transfère un élément et un autre thread prend cet élément.

3. Implémentation de transferts à l'aide d'une variable partagée

Pour voir pourquoi SynchronousQueue peut être si utile, nous implémenterons une logique utilisant une variable partagée entre deux threads et ensuite, nous réécrirons cette logique en utilisant SynchronousQueue rendant notre code beaucoup plus simple et plus lisible.

Disons que nous avons deux threads - un producteur et un consommateur - et lorsque le producteur définit une valeur d'une variable partagée, nous voulons signaler ce fait au thread consommateur. Ensuite, le thread consommateur récupérera une valeur à partir d'une variable partagée.

Nous utiliserons le CountDownLatch pour coordonner ces deux threads, afin d'éviter une situation où le consommateur accède à une valeur d'une variable partagée qui n'a pas encore été définie.

Nous allons définir une variable sharedState et un CountDownLatch qui seront utilisés pour coordonner le traitement:

ExecutorService executor = Executors.newFixedThreadPool(2); AtomicInteger sharedState = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(1);

Le producteur enregistrera un entier aléatoire dans la variable sharedState et exécutera la méthode countDown () sur le countDownLatch, signalant au consommateur qu'il peut récupérer une valeur de sharedState:

Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); sharedState.set(producedElement); countDownLatch.countDown(); };

Le consommateur attendra le countDownLatch en utilisant la méthode await () . Lorsque le producteur signale que la variable a été définie, le consommateur la récupérera dans sharedState:

Runnable consumer = () -> { try { countDownLatch.await(); Integer consumedElement = sharedState.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Dernier point mais non le moindre, commençons notre programme:

executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(countDownLatch.getCount(), 0);

Il produira la sortie suivante:

Saving an element: -1507375353 to the exchange point consumed an element: -1507375353 from the exchange point

Nous pouvons voir que c'est beaucoup de code pour implémenter une fonctionnalité aussi simple que l'échange d'un élément entre deux threads. Dans la section suivante, nous essaierons de l'améliorer.

4. Implémentation des transferts à l'aide de SynchronousQueue

Implémentons maintenant la même fonctionnalité que dans la section précédente, mais avec un SynchronousQueue. Cela a un double effet car nous pouvons l'utiliser pour échanger l'état entre les threads et pour coordonner cette action afin que nous n'ayons pas besoin d'utiliser autre chose que SynchronousQueue.

Tout d'abord, nous définirons une file d'attente:

ExecutorService executor = Executors.newFixedThreadPool(2); SynchronousQueue queue = new SynchronousQueue();

Le producteur appellera une méthode put () qui bloquera jusqu'à ce qu'un autre thread prenne un élément de la file d'attente:

Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); try { queue.put(producedElement); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Le consommateur va simplement récupérer cet élément en utilisant la méthode take () :

Runnable consumer = () -> { try { Integer consumedElement = queue.take(); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Ensuite, nous commencerons notre programme:

executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(queue.size(), 0);

Il produira la sortie suivante:

Saving an element: 339626897 to the exchange point consumed an element: 339626897 from the exchange point

Nous pouvons voir qu'un SynchronousQueue est utilisé comme point d'échange entre les threads, ce qui est beaucoup mieux et plus compréhensible que l'exemple précédent qui utilisait l'état partagé avec un CountDownLatch.

5. Conclusion

Dans ce didacticiel rapide, nous avons examiné la construction SynchronousQueue . Nous avons créé un programme qui échange des données entre deux threads en utilisant l'état partagé, puis réécrit ce programme pour tirer parti de la construction SynchronousQueue . Cela sert de point d'échange qui coordonne le producteur et le thread consommateur.

L'implémentation de tous ces exemples et extraits de code se trouve dans le projet GitHub - il s'agit d'un projet Maven, il devrait donc être facile à importer et à exécuter tel quel.