Messagerie PubSub avec Spring Data Redis

1. Vue d'ensemble

Dans ce deuxième article de la série explorant Spring Data Redis, nous examinerons les files d'attente de messages pub / sub.

Dans Redis, les éditeurs ne sont pas programmés pour envoyer leurs messages à des abonnés spécifiques. Les messages publiés sont plutôt caractérisés en canaux, sans savoir quels abonnés (le cas échéant) il peut y avoir.

De même, les abonnés expriment leur intérêt pour un ou plusieurs sujets et ne reçoivent que les messages qui les intéressent, sans savoir quels sont les éditeurs (le cas échéant).

Ce découplage des éditeurs et des abonnés peut permettre une plus grande évolutivité et une topologie de réseau plus dynamique.

2. Configuration Redis

Commençons par ajouter la configuration requise pour les files d'attente de messages.

Tout d'abord, nous allons définir un bean MessageListenerAdapter qui contient une implémentation personnalisée de l' interface MessageListener appelée RedisMessageSubscriber . Ce bean agit comme un abonné dans le modèle de messagerie pub-sub:

@Bean MessageListenerAdapter messageListener() { return new MessageListenerAdapter(new RedisMessageSubscriber()); }

RedisMessageListenerContainer est une classe fournie par Spring Data Redis qui fournit un comportement asynchrone pour les écouteurs de messages Redis. Ceci est appelé en interne et, selon la documentation de Spring Data Redis - «gère les détails de bas niveau de l'écoute, de la conversion et de la distribution des messages».

@Bean RedisMessageListenerContainer redisContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory()); container.addMessageListener(messageListener(), topic()); return container; }

Nous allons également créer un bean à l'aide d'une interface MessagePublisher personnalisée et d'une implémentation RedisMessagePublisher . De cette façon, nous pouvons avoir une API de publication de messages générique et faire en sorte que l'implémentation Redis prenne un redisTemplate et une rubrique comme arguments de constructeur:

@Bean MessagePublisher redisPublisher() { return new RedisMessagePublisher(redisTemplate(), topic()); }

Enfin, nous allons configurer un sujet auquel l'éditeur enverra des messages, et l'abonné les recevra:

@Bean ChannelTopic topic() { return new ChannelTopic("messageQueue"); }

3. Publication de messages

3.1. Définition de l' interface MessagePublisher

Spring Data Redis ne fournit pas d' interface MessagePublisher à utiliser pour la distribution de messages. Nous pouvons définir une interface personnalisée qui utilisera redisTemplate dans l'implémentation:

public interface MessagePublisher { void publish(String message); }

3.2. Implémentation de RedisMessagePublisher

Notre prochaine étape consiste à fournir une implémentation de l' interface MessagePublisher , en ajoutant des détails de publication de message et en utilisant les fonctions de redisTemplate.

Le modèle contient un ensemble très riche de fonctions pour un large éventail d'opérations - parmi lesquelles convertAndSend est capable d'envoyer un message à une file d'attente via une rubrique:

public class RedisMessagePublisher implements MessagePublisher { @Autowired private RedisTemplate redisTemplate; @Autowired private ChannelTopic topic; public RedisMessagePublisher() { } public RedisMessagePublisher( RedisTemplate redisTemplate, ChannelTopic topic) { this.redisTemplate = redisTemplate; this.topic = topic; } public void publish(String message) { redisTemplate.convertAndSend(topic.getTopic(), message); } } 

Comme vous pouvez le voir, la mise en œuvre de l'éditeur est simple. Il utilise la méthode convertAndSend () du redisTemplate pour formater et publier le message donné dans la rubrique configurée.

Un sujet implémente la sémantique de publication et d'abonnement: lorsqu'un message est publié, il est envoyé à tous les abonnés inscrits pour écouter ce sujet.

4. Abonnement aux messages

RedisMessageSubscriber implémente l' interface MessageListener fournie par Spring Data Redis :

@Service public class RedisMessageSubscriber implements MessageListener { public static List messageList = new ArrayList(); public void onMessage(Message message, byte[] pattern) { messageList.add(message.toString()); System.out.println("Message received: " + message.toString()); } }

Notez qu'il existe un deuxième paramètre appelé pattern , que nous n'avons pas utilisé dans cet exemple. La documentation de Spring Data Redis indique que ce paramètre représente le «modèle correspondant au canal (si spécifié)», mais qu'il peut être nul .

5. Envoi et réception de messages

Maintenant, nous allons tout mettre ensemble. Créons un message, puis publions-le à l'aide de RedisMessagePublisher :

String message = "Message " + UUID.randomUUID(); redisMessagePublisher.publish(message);

Lorsque nous appelons publier (message) , le contenu est envoyé à Redis, où il est acheminé vers le sujet de file d'attente de messages défini dans notre éditeur. Ensuite, il est distribué aux abonnés de ce sujet.

Vous avez peut-être déjà remarqué que RedisMessageSubscriber est un écouteur, qui s'enregistre dans la file d'attente pour la récupération des messages.

A l'arrivée du message, la méthode onMessage () définie de l'abonné s'est déclenchée.

Dans notre exemple, nous pouvons vérifier que nous avons reçu des messages qui ont été publiés en vérifiant la messageList dans notre RedisMessageSubscriber :

RedisMessageSubscriber.messageList.get(0).contains(message) 

6. Conclusion

Dans cet article, nous avons examiné une implémentation de file d'attente de messages pub / sub à l'aide de Spring Data Redis.

L'implémentation de l'exemple ci-dessus peut être trouvée dans un projet GitHub.