Client MQTT en Java

1. Vue d'ensemble

Dans ce tutoriel, nous verrons comment ajouter la messagerie MQTT dans un projet Java en utilisant les bibliothèques fournies par le projet Eclipse Paho.

2. Apprêt MQTT

MQTT (MQ Telemetry Transport) est un protocole de messagerie qui a été créé pour répondre au besoin d'une méthode simple et légère pour transférer des données vers / depuis des appareils de faible puissance, tels que ceux utilisés dans les applications industrielles.

Avec la popularité croissante des appareils IoT (Internet of Things), MQTT a connu une utilisation accrue, conduisant à sa normalisation par OASIS et ISO.

Le protocole prend en charge un seul modèle de messagerie, à savoir le modèle Publish-Subscribe: chaque message envoyé par un client contient un «sujet» associé qui est utilisé par le courtier pour l'acheminer vers les clients abonnés. Les noms de sujets peuvent être de simples chaînes comme " oiltemp " ou une chaîne de type chemin " motor / 1 / rpm ".

Afin de recevoir des messages, un client s'abonne à un ou plusieurs sujets en utilisant son nom exact ou une chaîne contenant l'un des caractères génériques pris en charge («#» pour les sujets à plusieurs niveaux et «+» pour un seul niveau »).

3. Configuration du projet

Afin d'inclure la bibliothèque Paho dans un projet Maven, nous devons ajouter la dépendance suivante:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

La dernière version du module de bibliothèque Java Eclipse Paho peut être téléchargée à partir de Maven Central.

4. Configuration du client

Lors de l'utilisation de la bibliothèque Paho, la première chose à faire pour envoyer et / ou recevoir des messages d'un courtier MQTT est d' obtenir une implémentation de l' interface IMqttClient . Cette interface contient toutes les méthodes requises par une application pour établir une connexion au serveur, envoyer et recevoir des messages.

Paho sort de la boîte avec deux implémentations de cette interface, une asynchrone ( MqttAsyncClient ) et une synchrone ( MqttClient ).Dans notre cas, nous nous concentrerons sur la version synchrone, qui a une sémantique plus simple.

La configuration elle-même est un processus en deux étapes: nous créons d'abord une instance de la classe MqttClient , puis nous la connectons à notre serveur. La sous-section suivante détaille ces étapes.

4.1. Création d'une nouvelle instance IMqttClient

L'extrait de code suivant montre comment créer une nouvelle instance synchrone IMqttClient :

String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

Dans ce cas, nous utilisons le constructeur le plus simple disponible, qui prend l'adresse du point de terminaison de notre courtier MQTT et un identifiant client , qui identifie de manière unique notre client.

Dans notre cas, nous avons utilisé un UUID aléatoire, donc un nouvel identifiant client sera généré à chaque exécution.

Paho fournit également des constructeurs supplémentaires que nous pouvons utiliser afin de personnaliser le mécanisme de persistance utilisé pour stocker les messages non acquittés et / ou le ScheduledExecutorService utilisé pour exécuter les tâches d'arrière-plan requises par l'implémentation du moteur de protocole.

Le point de terminaison du serveur que nous utilisons est un courtier MQTT public hébergé par le projet Paho , qui permet à toute personne disposant d'une connexion Internet de tester les clients sans aucune authentification.

4.2. Connexion au serveur

Notre instance MqttClient nouvellement créée n'est pas connectée au serveur. Nous le faisons en appelant sa méthode connect () , en passant éventuellement une instance MqttConnectOptions qui nous permet de personnaliser certains aspects du protocole.

En particulier, nous pouvons utiliser ces options pour transmettre des informations supplémentaires telles que les informations d'identification de sécurité, le mode de récupération de session, le mode de reconnexion, etc.

La classe MqttConnectionOptions expose ces options sous forme de propriétés simples que nous pouvons définir à l'aide des méthodes de définition normales. Nous devons uniquement définir les propriétés requises pour notre scénario - les autres prendront les valeurs par défaut.

Le code utilisé pour établir une connexion au serveur ressemble généralement à ceci:

MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setConnectionTimeout(10); publisher.connect(options);

Ici, nous définissons nos options de connexion pour que:

  • La bibliothèque essaiera automatiquement de se reconnecter au serveur en cas de panne du réseau
  • Il supprimera les messages non envoyés d'une exécution précédente
  • Le délai d'expiration de la connexion est défini sur 10 secondes

5. Envoi de messages

L'envoi de messages à l'aide d'un MqttClient déjà connecté est très simple. Nous utilisons l'une des variantes de la méthode publish () pour envoyer la charge utile, qui est toujours un tableau d'octets, à un sujet donné , en utilisant l'une des options de qualité de service suivantes:

  • 0 - sémantique «au plus une fois», également appelée «feu et oublie». Utilisez cette option lorsque la perte de message est acceptable, car elle ne nécessite aucun type d'accusé de réception ou de persistance
  • 1 - Sémantique «au moins une fois». Utilisez cette option lorsque la perte de message n'est pas acceptable et que vos abonnés peuvent gérer les doublons
  • 2 - Sémantique «exactement une fois». Utilisez cette option lorsque la perte de message n'est pas acceptable et que vos abonnés ne peuvent pas gérer les doublons

Dans notre exemple de projet, la classe EngineTemperatureSensor joue le rôle d'un capteur fictif qui produit une nouvelle lecture de température chaque fois que nous invoquons sa méthode call () .

Cette classe implémente l' interface Callable afin que nous puissions facilement l'utiliser avec l'une des implémentations ExecutorService disponibles dans le package java.util.concurrent :

public class EngineTemperatureSensor implements Callable { // ... private members omitted public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } MqttMessage msg = readEngineTemp(); msg.setQos(0); msg.setRetained(true); client.publish(TOPIC,msg); return null; } private MqttMessage readEngineTemp() { double temp = 80 + rnd.nextDouble() * 20.0; byte[] payload = String.format("T:%04.2f",temp) .getBytes(); return new MqttMessage(payload); } }

Le MqttMessage encapsule la charge utile elle-même, la qualité de service demandée et également l' indicateur conservé pour le message. Cet indicateur indique au courtier qu'il doit conserver ce message jusqu'à ce qu'il soit consommé par un abonné.

Nous pouvons utiliser cette fonctionnalité pour implémenter un comportement «dernier bon connu», donc lorsqu'un nouvel abonné se connecte au serveur, il recevra immédiatement le message conservé.

6. Réception de messages

Afin de recevoir des messages du courtier MQTT, nous devons utiliser l'une des variantes de la méthode subscribe () , qui nous permettent de spécifier:

  • Un ou plusieurs filtres de rubrique pour les messages que nous souhaitons recevoir
  • La QoS associée
  • Le gestionnaire de rappel pour traiter les messages reçus

Dans l'exemple suivant, nous montrons comment ajouter un écouteur de messages à une instance IMqttClient existante pour recevoir des messages d'une rubrique donnée. Nous utilisons un CountDownLatch comme mécanisme de synchronisation entre notre rappel et le thread d'exécution principal, en le décrémentant à chaque fois qu'un nouveau message arrive.

Dans l'exemple de code, nous avons utilisé une autre instance IMqttClient pour recevoir des messages. Nous l'avons fait juste pour préciser quel client fait quoi, mais ce n'est pas une limitation de Paho - si vous le souhaitez, vous pouvez utiliser le même client pour publier et recevoir des messages:

CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);

La variante subscribe () utilisée ci-dessus prend une instance IMqttMessageListener comme deuxième argument.

Dans notre cas, nous utilisons une simple fonction lambda qui traite la charge utile et décrémente un compteur. Si pas assez de messages arrivent dans la fenêtre de temps spécifiée (1 minute), la méthode await () lèvera une exception.

When using Paho, we don't need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.

If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.

Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.

7. Conclusion

In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.

Cette bibliothèque gère tous les détails du protocole de bas niveau, ce qui nous permet de nous concentrer sur d'autres aspects de notre solution, tout en laissant un bon espace pour personnaliser les aspects importants de ses fonctionnalités internes, telles que la persistance des messages.

Le code présenté dans cet article est disponible à l'adresse over sur GitHub.