Guide de java.util.concurrent.BlockingQueue

1. Vue d'ensemble

Dans cet article, nous examinerons l'une des constructions les plus utiles java.util.concurrent pour résoudre le problème producteur-consommateur simultané. Nous examinerons une API de l' interface BlockingQueue et comment les méthodes de cette interface facilitent l'écriture de programmes simultanés.

Plus loin dans l'article, nous montrerons un exemple de programme simple qui a plusieurs threads producteurs et plusieurs threads consommateurs.

2. Types de file d' attente de blocage

On peut distinguer deux types de BlockingQueue :

  • file d'attente illimitée - peut croître presque indéfiniment
  • file d'attente limitée - avec une capacité maximale définie

2.1. File d'attente illimitée

La création de files d'attente illimitées est simple:

BlockingQueue blockingQueue = new LinkedBlockingDeque();

La capacité de blockingQueue sera définie sur Integer.MAX_VALUE. Toutes les opérations qui ajoutent un élément à la file d'attente illimitée ne seront jamais bloquées, donc elle pourrait atteindre une très grande taille.

La chose la plus importante lors de la conception d'un programme producteur-consommateur utilisant BlockingQueue illimitée est que les consommateurs devraient être en mesure de consommer des messages aussi rapidement que les producteurs ajoutent des messages à la file d'attente. Sinon, la mémoire pourrait se remplir et nous obtiendrions une exception OutOfMemory .

2.2. File d'attente limitée

Le deuxième type de files d'attente est la file d'attente limitée. Nous pouvons créer de telles files d'attente en passant la capacité comme argument à un constructeur:

BlockingQueue blockingQueue = new LinkedBlockingDeque(10);

Ici, nous avons une blockingQueue qui a une capacité égale à 10. Cela signifie que lorsqu'un producteur essaie d'ajouter un élément à une file d'attente déjà pleine, selon une méthode qui a été utilisée pour l'ajouter ( offer () , add () ou put () ), il se bloquera jusqu'à ce que l'espace pour insérer l'objet devienne disponible. Sinon, les opérations échoueront.

L'utilisation d'une file d'attente limitée est un bon moyen de concevoir des programmes simultanés, car lorsque nous insérons un élément dans une file d'attente déjà pleine, ces opérations doivent attendre que les consommateurs rattrapent et libèrent de l'espace dans la file d'attente. Cela nous donne une limitation sans aucun effort de notre part.

3. API BlockingQueue

Il existe deux types de méthodes dans l' interface BlockingQueue : les méthodes chargées d'ajouter des éléments à une file d'attente et les méthodes qui récupèrent ces éléments. Chaque méthode de ces deux groupes se comporte différemment si la file d'attente est pleine / vide.

3.1. Ajout d'éléments

  • add () - renvoie true si l'insertion a réussi, sinon lève une IllegalStateException
  • put () - insère l'élément spécifié dans une file d'attente, en attendant un emplacement libre si nécessaire
  • offer () - retourne vrai si l'insertion a réussi, sinon faux
  • offer (E e, long timeout, TimeUnit unit) - essaie d'insérer un élément dans une file d'attente et attend un emplacement disponible dans un délai spécifié

3.2. Récupération d'éléments

  • take () - attend un élément head d'une file d'attente et le supprime. Si la file d'attente est vide, elle se bloque et attend qu'un élément devienne disponible
  • poll (long timeout, TimeUnit unit) - récupère et supprime la tête de la file d'attente, en attendant le temps d'attente spécifié si nécessaire pour qu'un élément devienne disponible. Renvoie null après un délai

Ces méthodes sont les blocs de construction les plus importants de l' interface BlockingQueue lors de la création de programmes producteur-consommateur.

4. Exemple de producteur-consommateur multithread

Créons un programme qui se compose de deux parties - un producteur et un consommateur.

Le producteur produira un nombre aléatoire de 0 à 100 et mettra ce nombre dans une BlockingQueue . Nous aurons 4 threads producteurs et utiliserons la méthode put () pour bloquer jusqu'à ce qu'il y ait de l'espace disponible dans la file d'attente.

La chose importante à retenir est que nous devons empêcher nos threads consommateurs d'attendre qu'un élément apparaisse indéfiniment dans une file d'attente.

Une bonne technique pour signaler du producteur au consommateur qu'il n'y a plus de messages à traiter consiste à envoyer un message spécial appelé pilule empoisonnée. Nous devons envoyer autant de pilules empoisonnées que nous avons de consommateurs. Ensuite, lorsqu'un consommateur prendra ce message spécial de pilule empoisonnée d'une file d'attente, il terminera son exécution en douceur.

Regardons une classe de producteurs:

public class NumbersProducer implements Runnable { private BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); } } }

Notre constructeur producteur prend comme argument le BlockingQueue qui est utilisé pour coordonner le traitement entre le producteur et le consommateur. Nous voyons que la méthode generateNumbers () mettra 100 éléments dans une file d'attente. Il faut aussi un message empoisonné pour savoir quel type de message doit être mis dans une file d'attente lorsque l'exécution sera terminée. Ce message doit être placé fois poisonPillPerProducer dans une file d'attente.

Chaque consommateur prendra un élément d'un BlockingQueue en utilisant la méthode take () afin qu'il se bloque jusqu'à ce qu'il y ait un élément dans une file d'attente. Après avoir pris un entier dans une file d'attente, il vérifie si le message est une pilule empoisonnée, si oui, l'exécution d'un thread est terminée. Sinon, il imprimera le résultat sur la sortie standard avec le nom du thread actuel.

Cela nous donnera un aperçu du fonctionnement interne de nos consommateurs:

public class NumbersConsumer implements Runnable { private BlockingQueue queue; private final int poisonPill; public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { return; } System.out.println(Thread.currentThread().getName() + " result: " + number); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

La chose importante à noter est l'utilisation d'une file d'attente. Comme dans le constructeur du producteur, une file d'attente est passée en argument. Nous pouvons le faire car BlockingQueue peut être partagé entre les threads sans aucune synchronisation explicite.

Now that we have our producer and consumer, we can start our program. We need to define the queue's capacity, and we set it to 100 elements.

We want to have 4 producer threads and a number of consumers threads will be equal to the number of available processors:

int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue(BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start(); 

BlockingQueue is created using construct with a capacity. We're creating 4 producers and N consumers. We specify our poison pill message to be an Integer.MAX_VALUE because such value will never be sent by our producer under normal working conditions. The most important thing to notice here is that BlockingQueue is used to coordinate work between them.

Lorsque nous exécuterons le programme, 4 threads producteurs mettront des entiers aléatoires dans une BlockingQueue et les consommateurs prendront ces éléments de la file d'attente. Chaque thread imprimera sur la sortie standard le nom du thread avec un résultat.

5. Conclusion

Cet article présente une utilisation pratique de BlockingQueue et explique les méthodes utilisées pour ajouter et récupérer des éléments. Nous avons également montré comment créer un programme producteur-consommateur multithread à l'aide de BlockingQueue pour coordonner le travail entre les producteurs et les consommateurs.

La mise en œuvre de tous ces exemples et extraits de code peut être trouvée dans le projet GitHub - il s'agit d'un projet basé sur Maven, il devrait donc être facile à importer et à exécuter tel quel.