Guide de DelayQueue

1. Vue d'ensemble

Dans cet article, nous examinerons la construction DelayQueue du package java.util.concurrent . Il s'agit d'une file d'attente de blocage qui pourrait être utilisée dans les programmes producteur-consommateur.

Il a une caractéristique très utile - lorsque le consommateur veut prendre un élément de la file d'attente, il ne peut le prendre que lorsque le délai pour cet élément particulier a expiré.

2. Implémentation de Delayed pour les éléments dans DelayQueue

Chaque élément que nous voulons mettre dans DelayQueue doit implémenter l' interface Delayed . Disons que nous voulons créer une classe DelayObject . Les instances de cette classe seront placées dans DelayQueue.

Nous passerons les données String et delayInMilliseconds en tant que arguments et à son constructeur:

public class DelayObject implements Delayed { private String data; private long startTime; public DelayObject(String data, long delayInMilliseconds) { this.data = data; this.startTime = System.currentTimeMillis() + delayInMilliseconds; }

Nous définissons un startTime - c'est le moment où l'élément doit être consommé depuis la file d'attente. Ensuite, nous devons implémenter la méthode getDelay () - elle devrait renvoyer le délai restant associé à cet objet dans l'unité de temps donnée.

Par conséquent, nous devons utiliser la méthode TimeUnit.convert () pour renvoyer le délai restant dans le TimeUnit approprié :

@Override public long getDelay(TimeUnit unit) { long diff = startTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); }

Lorsque le consommateur essaie de prendre un élément de la file d'attente, DelayQueue exécutera getDelay () pour savoir si cet élément est autorisé à être renvoyé de la file d'attente. Si la méthode getDelay () renvoie zéro ou un nombre négatif, cela signifie qu'elle pourrait être récupérée de la file d'attente.

Nous devons également implémenter la méthode compareTo () , car les éléments de DelayQueue seront triés en fonction de l'heure d'expiration. L'élément qui expirera en premier est conservé en tête de la file d'attente et l'élément avec le délai d'expiration le plus élevé est conservé à la fin de la file d'attente:

@Override public int compareTo(Delayed o) { return Ints.saturatedCast( this.startTime - ((DelayObject) o).startTime); }

3. DelayQueue C onsumer and Producer

Pour pouvoir tester notre DelayQueue, nous devons implémenter la logique du producteur et du consommateur. La classe de producteur prend la file d'attente, le nombre d'éléments à produire et le délai de chaque message en millisecondes comme arguments.

Ensuite, lorsque la méthode run () est appelée, elle met des éléments dans la file d'attente et se met en veille pendant 500 millisecondes après chaque put:

public class DelayQueueProducer implements Runnable { private BlockingQueue queue; private Integer numberOfElementsToProduce; private Integer delayOfEachProducedMessageMilliseconds; // standard constructor @Override public void run() { for (int i = 0; i < numberOfElementsToProduce; i++) { DelayObject object = new DelayObject( UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds); System.out.println("Put object: " + object); try { queue.put(object); Thread.sleep(500); } catch (InterruptedException ie) { ie.printStackTrace(); } } } }

L'implémentation grand public est très similaire, mais elle garde également une trace du nombre de messages qui ont été consommés:

public class DelayQueueConsumer implements Runnable { private BlockingQueue queue; private Integer numberOfElementsToTake; public AtomicInteger numberOfConsumedElements = new AtomicInteger(); // standard constructors @Override public void run() { for (int i = 0; i < numberOfElementsToTake; i++) { try { DelayObject object = queue.take(); numberOfConsumedElements.incrementAndGet(); System.out.println("Consumer take: " + object); } catch (InterruptedException e) { e.printStackTrace(); } } } }

4. Test d'utilisation de DelayQueue

Pour tester le comportement de DelayQueue, nous allons créer un thread producteur et un thread consommateur.

Le producteur mettra () deux objets dans la file d'attente avec un délai de 500 millisecondes. Le test affirme que le consommateur a consommé deux messages:

@Test public void givenDelayQueue_whenProduceElement _thenShouldConsumeAfterGivenDelay() throws InterruptedException { // given ExecutorService executor = Executors.newFixedThreadPool(2); BlockingQueue queue = new DelayQueue(); int numberOfElementsToProduce = 2; int delayOfEachProducedMessageMilliseconds = 500; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); // when executor.submit(producer); executor.submit(consumer); // then executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce); }

Nous pouvons observer que l'exécution de ce programme produira la sortie suivante:

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512} Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

Le producteur place l'objet, et après un certain temps, le premier objet pour lequel le délai a expiré est consommé.

La même situation s'est produite pour le deuxième élément.

5. Consommateur incapable de consommer dans le temps imparti

Disons que nous avons un producteur qui produit un élément qui expirera dans 10 secondes :

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = 10_000; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Nous allons commencer notre test, mais il se terminera après 5 secondes. En raison des caractéristiques du DelayQueue, le consommateur ne pourra pas consommer le message de la file d'attente car l'élément n'a pas encore expiré:

executor.submit(producer); executor.submit(consumer); executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 0);

Notez que le numberOfConsumedElements du consommateur a une valeur égale à zéro.

6. Production d'un élément avec expiration immédiate

Lorsque les mises en oeuvre du différé un message getDelay () méthode renvoient un numéro de négatif, cela signifie que l'élément en question a déjà expiré. Dans cette situation, le producteur consommera cet élément immédiatement.

Nous pouvons tester la situation de production d'un élément avec un retard négatif:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = -10_000; DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Lorsque nous démarrons le cas de test, le consommateur consommera l'élément immédiatement car il a déjà expiré:

executor.submit(producer); executor.submit(consumer); executor.awaitTermination(1, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 1);

7. Conclusion

Dans cet article, nous avons examiné la construction DelayQueue du package java.util.concurrent .

Nous avons implémenté un élément Delayed qui a été produit et consommé à partir de la file d'attente.

Nous avons tiré parti de notre implémentation de DelayQueue pour consommer des éléments qui avaient expiré.

L'implémentation de tous ces exemples et extraits de code se trouve dans le projet GitHub - qui est un projet Maven, il devrait donc être facile à importer et à exécuter tel quel.