Messagerie fiable avec JGroups

1. Vue d'ensemble

JGroups est une API Java pour un échange de messages fiable. Il dispose d'une interface simple qui fournit:

  • une pile de protocoles flexible, comprenant TCP et UDP
  • fragmentation et réassemblage de gros messages
  • monodiffusion et multidiffusion fiables
  • détection de panne
  • contrôle de flux

Ainsi que de nombreuses autres fonctionnalités.

Dans ce didacticiel, nous allons créer une application simple pour échanger des messages String entre des applications et fournir un état partagé aux nouvelles applications lorsqu'elles rejoignent le réseau.

2. Configuration

2.1. Dépendance de Maven

Nous devons ajouter une seule dépendance à notre pom.xml :

 org.jgroups jgroups 4.0.10.Final  

La dernière version de la bibliothèque peut être vérifiée sur Maven Central.

2.2. La mise en réseau

JGroups essaiera d'utiliser IPV6 par défaut. En fonction de la configuration de notre système, cela peut empêcher les applications de communiquer.

Pour éviter cela, nous allons définir la propriété java.net.preferIPv4Stack sur true lors de l'exécution de nos applications ici:

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Notre connexion à un réseau JGroups est un JChannel. Le canal rejoint un cluster et envoie et reçoit des messages, ainsi que des informations sur l'état du réseau.

3.1. Créer une chaîne

Nous créons un JChannel avec un chemin vers un fichier de configuration. Si nous omettons le nom du fichier, il recherchera udp.xml dans le répertoire de travail actuel.

Nous allons créer un canal avec un fichier de configuration explicitement nommé:

JChannel channel = new JChannel("src/main/resources/udp.xml"); 

La configuration de JGroups peut être très compliquée, mais les configurations UDP et TCP par défaut sont suffisantes pour la plupart des applications. Nous avons inclus le fichier pour UDP dans notre code et nous l'utiliserons pour ce tutoriel.

Pour plus d'informations sur la configuration du transport, consultez le manuel JGroups ici.

3.2. Connexion d'un canal

Après avoir créé notre chaîne, nous devons rejoindre un cluster. Un cluster est un groupe de nœuds qui échangent des messages.

Rejoindre un cluster nécessite un nom de cluster:

channel.connect("Baeldung"); 

Le premier nœud qui tente de rejoindre un cluster le créera s'il n'existe pas. Nous verrons ce processus en action ci-dessous.

3.3. Nommer une chaîne

Les nœuds sont identifiés par un nom afin que les pairs puissent envoyer des messages dirigés et recevoir des notifications indiquant qui entre et sort du cluster. JGroups attribuera un nom automatiquement, ou nous pouvons définir le nôtre:

channel.name("user1");

Nous utiliserons ces noms ci-dessous pour suivre le moment où les nœuds entrent et quittent le cluster.

3.4. Fermer une chaîne

Le nettoyage des canaux est essentiel si nous voulons que nos pairs reçoivent une notification en temps opportun de notre sortie.

Nous fermons un JChannel avec sa méthode close:

channel.close()

4. Modifications de la vue du cluster

Avec un JChannel créé, nous sommes maintenant prêts à voir l'état des pairs dans le cluster et à échanger des messages avec eux.

JGroups maintient l'état du cluster dans la classe View . Chaque canal a une seule vue du réseau. Lorsque la vue change, elle est fournie via le rappel viewAccepted () .

Pour ce didacticiel, nous allons étendre la classe d'API ReceiverAdaptor qui implémente toutes les méthodes d'interface requises pour une application.

C'est la méthode recommandée pour implémenter les rappels.

Ajoutons viewAccepted à notre application:

public void viewAccepted(View newView) { private View lastView; if (lastView == null) { System.out.println("Received initial view:"); newView.forEach(System.out::println); } else { System.out.println("Received new view."); List newMembers = View.newMembers(lastView, newView); System.out.println("New members: "); newMembers.forEach(System.out::println); List exMembers = View.leftMembers(lastView, newView); System.out.println("Exited members:"); exMembers.forEach(System.out::println); } lastView = newView; } 

Chaque vue contient une liste d' objets d' adresse , représentant chaque membre du cluster. JGroups propose des méthodes pratiques pour comparer une vue à une autre, que nous utilisons pour détecter les membres nouveaux ou sortis du cluster.

5. Envoi de messages

La gestion des messages dans JGroups est simple. Un message contient un tableau d' octets et des objets d' adresse correspondant à l'expéditeur et au destinataire.

Pour ce didacticiel, nous utilisons des chaînes lues à partir de la ligne de commande, mais il est facile de voir comment une application pourrait échanger d'autres types de données.

5.1. Broadcast Messages

A Message is created with a destination and a byte array; JChannel sets the sender for us. If the target is null, the entire cluster will receive the message.

We'll accept text from the command line and send it to the cluster:

System.out.print("Enter a message: "); String line = in.readLine().toLowerCase(); Message message = new Message(null, line.getBytes()); channel.send(message); 

If we run multiple instances of our program and send this message (after we implement the receive() method below), all of them would receive it, including the sender.

5.2. Blocking Our Messages

If we don't want to see our messages, we can set a property for that:

channel.setDiscardOwnMessages(true); 

When we run the previous test, the message sender does not receive its broadcast message.

5.3. Direct Messages

Sending a direct message requires a valid Address. If we're referring to nodes by name, we need a way to look up an Address. Fortunately, we have the View for that.

The current View is always available from the JChannel:

private Optional getAddress(String name) { View view = channel.view(); return view.getMembers().stream() .filter(address -> name.equals(address.toString())) .findAny(); } 

Address names are available via the class toString() method, so we merely search the List of cluster members for the name we want.

So we can accept a name on from the console, find the associated destination, and send a direct message:

Address destination = null; System.out.print("Enter a destination: "); String destinationName = in.readLine().toLowerCase(); destination = getAddress(destinationName) .orElseThrow(() -> new Exception("Destination not found"); Message message = new Message(destination, "Hi there!"); channel.send(message); 

6. Receiving Messages

We can send messages, now let's add try to receive them now.

Let's override ReceiverAdaptor's empty receive method:

public void receive(Message message) { String line = Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); } 

Since we know the message contains a String, we can safely pass getObject() to System.out.

7. State Exchange

When a node enters the network, it may need to retrieve state information about the cluster. JGroups provides a state transfer mechanism for this.

When a node joins the cluster, it simply calls getState(). The cluster usually retrieves the state from the oldest member in the group – the coordinator.

Let's add a broadcast message count to our application. We'll add a new member variable and increment it inside receive():

private Integer messageCount = 0; public void receive(Message message) { String line = "Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); if (message.getDest() == null) { messageCount++; System.out.println("Message count: " + messageCount); } } 

We check for a null destination because if we count direct messages, each node will have a different number.

Next, we override two more methods in ReceiverAdaptor:

public void setState(InputStream input) { try { messageCount = Util.objectFromStream(new DataInputStream(input)); } catch (Exception e) { System.out.println("Error deserialing state!"); } System.out.println(messageCount + " is the current messagecount."); } public void getState(OutputStream output) throws Exception { Util.objectToStream(messageCount, new DataOutputStream(output)); } 

Similar to messages, JGroups transfers state as an array of bytes.

JGroups supplies an InputStream to the coordinator to write the state to, and an OutputStream for the new node to read. The API provides convenience classes for serializing and deserializing the data.

Note that in production code access to state information must be thread-safe.

Finally, we add the call to getState() to our startup, after we connect to the cluster:

channel.connect(clusterName); channel.getState(null, 0); 

getState() accepts a destination from which to request the state and a timeout in milliseconds. A null destination indicates the coordinator and 0 means do not timeout.

Lorsque nous exécutons cette application avec une paire de nœuds et échangeons des messages de diffusion, nous voyons l'incrément du nombre de messages.

Ensuite, si nous ajoutons un troisième client ou arrêtons et démarrons l'un d'entre eux, nous verrons le nœud nouvellement connecté imprimer le nombre de messages correct.

8. Conclusion

Dans ce tutoriel, nous avons utilisé JGroups pour créer une application d'échange de messages. Nous avons utilisé l'API pour surveiller les nœuds connectés au cluster et en avoir quitté le cluster, ainsi que pour transférer l'état du cluster vers un nouveau nœud lorsqu'il s'est joint.

Des exemples de code, comme toujours, peuvent être trouvés sur GitHub.