Apache RocketMQ avec Spring Boot

1. Introduction

Dans ce didacticiel, nous allons créer un producteur et un consommateur de messages à l'aide de Spring Boot et Apache RocketMQ, une plate-forme de messagerie distribuée et de données de streaming open-source.

2. Dépendances

Pour les projets Maven, nous devons ajouter la dépendance RocketMQ Spring Boot Starter:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Produire des messages

Pour notre exemple, nous allons créer un générateur de messages de base qui enverra des événements chaque fois que l'utilisateur ajoute ou supprime un article du panier.

Commençons par configurer l'emplacement de notre serveur et le nom du groupe dans notre application.properties :

rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=cart-producer-group

Notez que si nous avions plusieurs serveurs de noms, nous pourrions les lister comme hôte: port; hôte: port .

Maintenant, pour rester simple, nous allons créer une application CommandLineRunner et générer quelques événements lors du démarrage de l'application:

@SpringBootApplication public class CartEventProducer implements CommandLineRunner { @Autowired private RocketMQTemplate rocketMQTemplate; public static void main(String[] args) { SpringApplication.run(CartEventProducer.class, args); } public void run(String... args) throws Exception { rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1)); rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2)); rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); } }

Le CartItemEvent se compose de seulement deux propriétés - l'identifiant de l'article et une quantité:

class CartItemEvent { private String itemId; private int quantity; // constructor, getters and setters }

Dans l'exemple ci-dessus, nous utilisons la méthode convertAndSend () , une méthode générique définie par la classe abstraite AbstractMessageSendingTemplate , pour envoyer nos événements de panier. Il prend deux paramètres: une destination, qui dans notre cas est un nom de rubrique, et une charge utile de message.

4. Message au consommateur

Consommer des messages RocketMQ est aussi simple que de créer un composant Spring annoté avec @RocketMQMessageListener et d'implémenter l' interface RocketMQListener :

@SpringBootApplication public class CartEventConsumer { public static void main(String[] args) { SpringApplication.run(CartEventConsumer.class, args); } @Service @RocketMQMessageListener( topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic" ) public class CardItemAddConsumer implements RocketMQListener { public void onMessage(CartItemEvent addItemEvent) { log.info("Adding item: {}", addItemEvent); // additional logic } } @Service @RocketMQMessageListener( topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic" ) public class CardItemRemoveConsumer implements RocketMQListener { public void onMessage(CartItemEvent removeItemEvent) { log.info("Removing item: {}", removeItemEvent); // additional logic } } }

Nous devons créer un composant distinct pour chaque sujet de message que nous écoutons. Dans chacun de ces écouteurs, nous définissons le nom du sujet et le nom du groupe de consommateurs via l' annotation @ RocketMQMessageListener .

5. Transmission synchrone et asynchrone

Dans les exemples précédents, nous avons utilisé la méthode convertAndSend pour envoyer nos messages. Nous avons cependant d'autres options.

Nous pourrions, par exemple, appeler syncSend qui est différent de convertAndSend car il renvoie l' objet SendResult .

Il peut être utilisé, par exemple, pour vérifier si notre message a été envoyé avec succès ou obtenir son identifiant:

public void run(String... args) throws Exception { SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); }

Comme convertAndSend, cette méthode n'est renvoyée que lorsque la procédure d'envoi est terminée.

Nous devrions utiliser la transmission synchrone dans les cas nécessitant une fiabilité élevée, tels que les messages de notification importants ou la notification par SMS.

D'un autre côté, nous pouvons plutôt vouloir envoyer le message de manière asynchrone et être averti lorsque l'envoi se termine.

Nous pouvons le faire avec asyncSend , qui prend un SendCallback comme paramètre et retourne immédiatement:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.error("Successfully sent cart item"); } @Override public void onException(Throwable throwable) { log.error("Exception during cart item sending", throwable); } });

Nous utilisons la transmission asynchrone dans les cas nécessitant un débit élevé.

Enfin, pour les scénarios où nous avons des exigences de débit très élevées, nous pouvons utiliser sendOneWay au lieu de asyncSend . sendOneWay est différent de asyncSend en ce qu'il ne garantit pas que le message soit envoyé.

La transmission unidirectionnelle peut également être utilisée pour les cas de fiabilité ordinaires, tels que la collecte de journaux.

6. Envoi de messages dans la transaction

RocketMQ nous offre la possibilité d'envoyer des messages dans une transaction. Nous pouvons le faire en utilisant la méthode sendInTransaction () :

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build(); rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

De plus, nous devons implémenter une interface RocketMQLocalTransactionListener :

@RocketMQTransactionListener(txProducerGroup="test-transaction") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.COMMIT; } }

Dans sendMessageInTransaction () , le premier paramètre est le nom de la transaction. Il doit être le même que le @RocketMQTransactionListener champ membre de txProducerGroup.

7. Configuration du générateur de messages

Nous pouvons également configurer des aspects du générateur de message lui-même:

  • rocketmq.producer.send-message-timeout : Le délai d'envoi du message en millisecondes - la valeur par défaut est 3000
  • rocketmq.producer.compress-message-body-threshold: Threshold above which, RocketMQ will compress messages – the default value is 1024.
  • rocketmq.producer.max-message-size: The maximum message size in bytes – the default value is 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: The maximum number of retries to perform internally in asynchronous mode before sending failure – the default value is 2.
  • rocketmq.producer.retry-next-server: Indicates whether to retry another broker on sending failure internally – the default value is false.
  • rocketmq.producer.retry-times-when-send-failed: The maximum number of retries to perform internally in asynchronous mode before sending failure – the default value is 2.

8. Conclusion

Dans cet article, nous avons appris à envoyer et à consommer des messages à l'aide d'Apache RocketMQ et de Spring Boot. Comme toujours, tout le code source est disponible sur GitHub.