Gestion des files d'attente Amazon SQS en Java

1. Vue d'ensemble

Dans ce didacticiel, nous allons explorer comment utiliser le SQS (Simple Queue Service) d'Amazon à l'aide du SDK Java .

2. Prérequis

Les dépendances Maven, les paramètres de compte AWS et la connexion client nécessaires pour utiliser Amazon AWS SDK pour SQS sont les mêmes que dans cet article ici.

En supposant que nous ayons créé une instance d' AWSCredentials, comme décrit dans l'article précédent, nous pouvons continuer et créer notre client SQS:

AmazonSQS sqs = AmazonSQSClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) .withRegion(Regions.US_EAST_1) .build(); 

3. Création de files d'attente

Une fois que nous avons configuré notre client SQS, la création de files d'attente est assez simple.

3.1. Création d'une file d'attente standard

Voyons comment nous pouvons créer une file d'attente standard. Pour ce faire, nous devons créer une instance de CreateQueueRequest:

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("baeldung-queue"); String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl(); 

3.2. Création d'une file d'attente FIFO

La création d'un FIFO est similaire à la création d'une file d'attente standard. Nous utiliserons toujours une instance de CreateQueueRequest , comme nous l'avons fait précédemment. Seulement cette fois, nous devrons passer des attributs de file d'attente et définir l' attribut FifoQueue sur true :

Map queueAttributes = new HashMap(); queueAttributes.put("FifoQueue", "true"); queueAttributes.put("ContentBasedDeduplication", "true"); CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest( "baeldung-queue.fifo").withAttributes(queueAttributes); String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest) .getQueueUrl(); 

4. Publication de messages dans les files d'attente

Une fois nos files d'attente configurées, nous pouvons commencer à envoyer des messages.

4.1. Publication d'un message dans une file d'attente standard

Pour envoyer des messages à une file d'attente standard, nous devrons créer une instance de SendMessageRequest.

Ensuite, nous attachons une carte des attributs de message à cette demande:

Map messageAttributes = new HashMap(); messageAttributes.put("AttributeOne", new MessageAttributeValue() .withStringValue("This is an attribute") .withDataType("String")); SendMessageRequest sendMessageStandardQueue = new SendMessageRequest() .withQueueUrl(standardQueueUrl) .withMessageBody("A simple message.") .withDelaySeconds(30) .withMessageAttributes(messageAttributes); sqs.sendMessage(sendMessageStandardQueue); 

Le withDelaySeconds () spécifie après combien de temps le message doit arriver dans la file d'attente.

4.2. Publication d'un message dans une file d'attente FIFO

La seule différence, dans ce cas, est que nous devrons spécifier le groupe auquel appartient le message:

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest() .withQueueUrl(fifoQueueUrl) .withMessageBody("Another simple message.") .withMessageGroupId("baeldung-group-1") .withMessageAttributes(messageAttributes);

Comme vous pouvez le voir dans l'exemple de code ci-dessus, nous spécifions le groupe en utilisant withMessageGroupId ().

4.3. Publication de plusieurs messages dans une file d'attente

Nous pouvons également publier plusieurs messages dans une file d'attente, en utilisant une seule demande. Nous allons créer une liste de SendMessageBatchRequestEntry que nous enverrons en utilisant une instance de SendMessageBatchRequest :

List  messageEntries = new ArrayList(); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-1") .withMessageBody("batch-1") .withMessageGroupId("baeldung-group-1")); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-2") .withMessageBody("batch-2") .withMessageGroupId("baeldung-group-1")); SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(fifoQueueUrl, messageEntries); sqs.sendMessageBatch(sendMessageBatchRequest);

5. Lecture des messages à partir des files d'attente

Nous pouvons recevoir des messages de nos files d' attente en invoquant la receiveMessage () méthode sur une instance de ReceiveMessageRequest:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl) .withWaitTimeSeconds(10) .withMaxNumberOfMessages(10); List sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages(); 

En utilisant withMaxNumberOfMessages (), nous spécifions le nombre de messages à obtenir de la file d'attente - bien qu'il convient de noter que le maximum est de 10 .

La méthode withWaitTimeSeconds () active l' interrogation longue. L'interrogation longue est un moyen de limiter le nombre de demandes de réception de messages que nous envoyons à SQS.

En termes simples, cela signifie que nous attendrons le nombre de secondes spécifié pour récupérer un message. S'il n'y a aucun message dans la file d'attente pendant cette durée, la demande retournera vide. Si un message arrive dans la file d'attente pendant ce temps, il sera renvoyé.

Nous pouvons obtenir les attributs et le corps d'un message donné:

sqsMessages.get(0).getAttributes(); sqsMessages.get(0).getBody();

6. Suppression d'un message d'une file d'attente

Pour supprimer un message, nous utiliserons un DeleteMessageRequest :

sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(fifoQueueUrl) .withReceiptHandle(sqsMessages.get(0).getReceiptHandle())); 

7. Files d'attente de lettres mortes

Une file d'attente de lettres mortes doit être du même type que sa file d'attente de base - elle doit être FIFO si la file d'attente de base est FIFO et standard si la file d'attente de base est standard. Pour cet exemple, nous utiliserons une file d'attente standard.

La première chose à faire est de créer ce qui deviendra notre file d'attente de lettres mortes:

String deadLetterQueueUrl = sqs.createQueue("baeldung-dead-letter-queue").getQueueUrl(); 

Ensuite, nous obtiendrons l'ARN (Amazon Resource Name) de notre file d'attente nouvellement créée:

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes( new GetQueueAttributesRequest(deadLetterQueueUrl) .withAttributeNames("QueueArn")); String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes() .get("QueueArn"); 

Enfin, nous définissons cette file d'attente nouvellement créée comme la file d'attente de lettres mortes de notre file d'attente standard d'origine:

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest() .withQueueUrl(standardQueueUrl) .addAttributesEntry("RedrivePolicy", "{\"maxReceiveCount\":\"2\", " + "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}"); sqs.setQueueAttributes(queueAttributesRequest); 

Le paquet JSON que nous avons défini dans la méthode addAttributesEntry () lors de la construction de notre instance SetQueueAttributesRequest contient les informations dont nous avons besoin : le maxReceiveCount est 2 , ce qui signifie que si un message est reçu autant de fois, il est supposé ne pas avoir été traité correctement, et est envoyé à notre file d'attente de lettres mortes.

L' attribut deadLetterTargetArn pointe notre file d'attente standard vers notre file d'attente de lettres mortes nouvellement créée.

8. Suivi

Nous pouvons vérifier combien de messages sont actuellement dans une file d'attente donnée, et combien sont en vol avec le SDK. Tout d'abord, nous devons créer un GetQueueAttributesRequest.

De là, nous vérifierons l'état de la file d'attente:

GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(standardQueueUrl) .withAttributeNames("All"); GetQueueAttributesResult getQueueAttributesResult = sqs.getQueueAttributes(getQueueAttributesRequest); System.out.println(String.format("The number of messages on the queue: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessages"))); System.out.println(String.format("The number of messages in flight: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessagesNotVisible")));

Une surveillance plus approfondie peut être obtenue à l'aide d'Amazon Cloud Watch.

9. Conclusion

Dans cet article, nous avons vu comment gérer les files d'attente SQS à l'aide du kit AWS Java SDK.

Comme d'habitude, tous les exemples de code utilisés dans l'article peuvent être trouvés sur GitHub.