Distribution de messages RabbitMQ avec Spring AMQP

1. Introduction

Dans ce didacticiel, nous explorerons le concept de fanout et d'échanges de sujets avec Spring AMQP et RabbitMQ.

À un niveau élevé, les échanges sortance vont diffuser le même message à toutes les files d' attente liées , alors que les échanges thématiques utilisent une clé de routage pour faire passer des messages à une file d' attente liée en particulier ou des files d' attente .

La lecture préalable de Messagerie avec Spring AMQP est recommandée pour ce didacticiel.

2. Configuration d'un échange de fanout

Configurons un échange de fanout avec deux files d'attente liées. Lorsque nous envoyons un message à cet échange, les deux files d'attente recevront le message. Notre échange de fanout ignore toute clé de routage incluse avec le message.

Spring AMQP nous permet d'agréger toutes les déclarations de files d'attente, d'échanges et de liaisons dans un objet Déclarables :

@Bean public Declarables fanoutBindings() { Queue fanoutQueue1 = new Queue("fanout.queue1", false); Queue fanoutQueue2 = new Queue("fanout.queue2", false); FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); return new Declarables( fanoutQueue1, fanoutQueue2, fanoutExchange, bind(fanoutQueue1).to(fanoutExchange), BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)); }

3. Configuration d'un échange de sujets

Maintenant, nous allons également mettre en place un échange de sujets avec deux files d'attente, chacune avec un modèle de liaison différent:

@Bean public Declarables topicBindings() { Queue topicQueue1 = new Queue(topicQueue1Name, false); Queue topicQueue2 = new Queue(topicQueue2Name, false); TopicExchange topicExchange = new TopicExchange(topicExchangeName); return new Declarables( topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind(topicQueue1) .to(topicExchange).with("*.important.*"), BindingBuilder .bind(topicQueue2) .to(topicExchange).with("#.error")); }

Un échange de sujets nous permet de lier des files d'attente à lui avec différents modèles de clé. Ceci est très flexible et nous permet de lier plusieurs files d'attente avec le même modèle ou même plusieurs modèles à la même file d'attente.

Lorsque la clé de routage du message correspond au modèle, il sera placé dans la file d'attente. Si une file d'attente a plusieurs liaisons qui correspondent à la clé de routage du message, une seule copie du message est placée dans la file d'attente.

Nos modèles de reliure peuvent utiliser un astérisque («*») pour faire correspondre un mot dans une position spécifique ou un signe dièse («#») pour correspondre à zéro ou plusieurs mots.

Ainsi, notre topicQueue1 recevra des messages qui ont des clés de routage ayant un modèle de trois mots avec le mot du milieu étant «important» - par exemple: «user.important.error» ou «blog.important.notification».

Et, notre topicQueue2 recevra des messages dont les clés de routage se terminent par le mot erreur; les exemples correspondants sont «error» , «user.important.error» ou «blog.post.save.error».

4. Configurer un producteur

Nous utiliserons la méthode convertAndSend du RabbitTemplate pour envoyer nos exemples de messages:

 String message = " payload is broadcast"; return args -> { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); };

Le RabbitTemplate fournit de nombreuses méthodes convertAndSend () surchargées pour différents types d'échange.

Lorsque nous envoyons un message à un échange de fanout, la clé de routage est ignorée et le message est transmis à toutes les files d'attente liées.

Lorsque nous envoyons un message à l'échange de sujets, nous devons passer une clé de routage. Sur la base de cette clé de routage, le message sera remis à des files d'attente spécifiques.

5. Configuration des consommateurs

Enfin, configurons quatre consommateurs - un pour chaque file d'attente - pour récupérer les messages produits:

 @RabbitListener(queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1(String message) { System.out.println("Received fanout 1 message: " + message); } @RabbitListener(queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2(String message) { System.out.println("Received fanout 2 message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1(String message) { System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2(String message) { System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); }

Nous configurons les consommateurs à l'aide de l' annotation @RabbitListener . Le seul argument passé ici est le nom des files d'attente. Les consommateurs ne sont pas informés ici des échanges ou des clés de routage.

6. Exécution de l'exemple

Notre exemple de projet est une application Spring Boot, il initialisera donc l'application avec une connexion à RabbitMQ et configurera toutes les files d'attente, échanges et liaisons.

Par défaut, notre application attend une instance RabbitMQ s'exécutant sur l'hôte local sur le port 5672. Nous pouvons modifier ceci et d'autres valeurs par défaut dans application.yaml .

Notre projet expose le point de terminaison HTTP sur l'URI - / broadcast - qui accepte les POST avec un message dans le corps de la requête.

Lorsque nous envoyons une requête à cet URI avec le corps «Test», nous devrions voir quelque chose de similaire dans la sortie:

Received fanout 1 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important warn payload is broadcast Received topic 2 (#.error) message: topic important error payload is broadcast Received fanout 2 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important error payload is broadcast

L'ordre dans lequel nous verrons ces messages n'est bien entendu pas garanti.

7. Conclusion

Dans ce tutoriel rapide, nous avons couvert les échanges de fanout et de sujets avec Spring AMQP et RabbitMQ.

Le code source complet et tous les extraits de code de ce didacticiel sont disponibles sur le référentiel GitHub.