Introduction à RSocket

1. Introduction

Dans ce didacticiel, nous allons jeter un premier regard sur RSocket et comment il permet la communication client-serveur.

2. Qu'est-ce que RSocket ?

RSocket est un protocole de communication binaire point à point destiné à être utilisé dans des applications distribuées. En ce sens, il offre une alternative à d'autres protocoles comme HTTP.

Une comparaison complète entre RSocket et d'autres protocoles dépasse le cadre de cet article. Au lieu de cela, nous nous concentrerons sur une caractéristique clé de RSocket: ses modèles d'interaction.

RSocket fournit quatre modèles d'interaction. Dans cet esprit, nous allons explorer chacun d'eux avec un exemple.

3. Dépendances de Maven

RSocket n'a besoin que de deux dépendances directes pour nos exemples:

 io.rsocket rsocket-core 0.11.13   io.rsocket rsocket-transport-netty 0.11.13 

Les dépendances rsocket-core et rsocket-transport-netty sont disponibles sur Maven Central.

Une note importante est que la bibliothèque RSocket utilise fréquemment des flux réactifs . Les classes Flux et Mono sont utilisées tout au long de cet article, donc une compréhension de base de celles-ci sera utile.

4. Configuration du serveur

Tout d'abord, créons la classe Server :

public class Server { private final Disposable server; public Server() { this.server = RSocketFactory.receive() .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) .transport(TcpServerTransport.create("localhost", TCP_PORT)) .start() .subscribe(); } public void dispose() { this.server.dispose(); } private class RSocketImpl extends AbstractRSocket {} }

Ici, nous utilisons RSocketFactory pour configurer et écouter une socket TCP. Nous transmettons notre RSocketImpl personnalisé pour gérer les demandes des clients. Nous ajouterons des méthodes à RSocketImpl au fur et à mesure.

Ensuite, pour démarrer le serveur, il suffit de l'instancier:

Server server = new Server();

Une seule instance de serveur peut gérer plusieurs connexions . En conséquence, une seule instance de serveur prendra en charge tous nos exemples.

Lorsque nous avons terminé, la méthode dispose arrêtera le serveur et libérera le port TCP.

4. Modèles d'interaction

4.1. Demande de réponse

RSocket fournit un modèle de demande / réponse - chaque demande reçoit une seule réponse.

Pour ce modèle, nous allons créer un service simple qui renvoie un message au client.

Commençons par ajouter une méthode à notre extension de AbstractRSocket, RSocketImpl :

@Override public Mono requestResponse(Payload payload) { try { return Mono.just(payload); // reflect the payload back to the sender } catch (Exception x) { return Mono.error(x); } }

La méthode requestResponse renvoie un seul résultat pour chaque requête , comme nous pouvons le voir par le type de réponse Mono .

La charge utile est la classe qui contient le contenu du message et les métadonnées . Il est utilisé par tous les modèles d'interaction. Le contenu de la charge utile est binaire, mais il existe des méthodes pratiques qui prennent en charge lecontenu basé sur des chaînes .

Ensuite, nous pouvons créer notre classe client:

public class ReqResClient { private final RSocket socket; public ReqResClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public String callBlocking(String string) { return socket .requestResponse(DefaultPayload.create(string)) .map(Payload::getDataUtf8) .block(); } public void dispose() { this.socket.dispose(); } }

Le client utilise la méthode RSocketFactory.connect () pour initier une connexion socket avec le serveur. Nous utilisons la méthode requestResponse sur le socket pour envoyer une charge utile au serveur .

Notre charge utile contient la chaîne transmise au client. Quand le MonoLa réponse arrive, nous pouvons utiliser la méthode getDataUtf8 () pour accéder au contenu String de la réponse.

Enfin, nous pouvons exécuter le test d'intégration pour voir la requête / réponse en action. Nous enverrons une chaîne au serveur et vérifierons que la même chaîne est renvoyée:

@Test public void whenSendingAString_thenRevceiveTheSameString() { ReqResClient client = new ReqResClient(); String string = "Hello RSocket"; assertEquals(string, client.callBlocking(string)); client.dispose(); }

4.2. Feu et oublier

Avec le modèle fire-and-forget, le client ne recevra aucune réponse du serveur .

Dans cet exemple, le client enverra des mesures simulées au serveur à des intervalles de 50 ms. Le serveur publiera les mesures.

Ajoutons un gestionnaire d'incendie et d'oubli à notre serveur dans la classe RSocketImpl :

@Override public Mono fireAndForget(Payload payload) { try { dataPublisher.publish(payload); // forward the payload return Mono.empty(); } catch (Exception x) { return Mono.error(x); } }

Ce gestionnaire ressemble beaucoup au gestionnaire de demande / réponse. Cependant, fireAndForget renvoie Mono au lieu de Mono .

Le dataPublisher est une instance de org.reactivestreams.Publisher . Ainsi, il met la charge utile à la disposition des abonnés. Nous utiliserons cela dans l'exemple request / stream.

Ensuite, nous allons créer le client fire-and-forget:

public class FireNForgetClient { private final RSocket socket; private final List data; public FireNForgetClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } /** Send binary velocity (float) every 50ms */ public void sendData() { data = Collections.unmodifiableList(generateData()); Flux.interval(Duration.ofMillis(50)) .take(data.size()) .map(this::createFloatPayload) .flatMap(socket::fireAndForget) .blockLast(); } // ... }

La configuration du socket est exactement la même qu'avant.

La méthode sendData () utilise un flux Flux pour envoyer plusieurs messages. Pour chaque message, nous invoquons socket :: fireAndForget .

Nous devons nous abonner à la réponse Mono pour chaque message . Si nous oublions de nous abonner, socket :: fireAndForget ne s'exécutera pas.

The flatMap operator makes sure the Void responses are passed to the subscriber, while the blockLast operator acts as the subscriber.

We're going to wait until the next section to run the fire-and-forget test. At that point, we'll create a request/stream client to receive the data that was pushed by the fire-and-forget client.

4.3. Request/Stream

In the request/stream model, a single request may receive multiple responses. To see this in action we can build upon the fire-and-forget example. To do that, let's request a stream to retrieve the measurements we sent in the previous section.

As before, let's start by adding a new listener to the RSocketImpl on the server:

@Override public Flux requestStream(Payload payload) { return Flux.from(dataPublisher); }

The requestStream handler returns a Flux stream. As we recall from the previous section, the fireAndForget handler published incoming data to the dataPublisher. Now, we'll create a Flux stream using that same dataPublisher as the event source. By doing this the measurement data will flow asynchronously from our fire-and-forget client to our request/stream client.

Let's create the request/stream client next:

public class ReqStreamClient { private final RSocket socket; public ReqStreamClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public Flux getDataStream() { return socket .requestStream(DefaultPayload.create(DATA_STREAM_NAME)) .map(Payload::getData) .map(buf -> buf.getFloat()) .onErrorReturn(null); } public void dispose() { this.socket.dispose(); } }

We connect to the server in the same way as our previous clients.

In getDataStream()we use socket.requestStream() to receive a Flux stream from the server. From that stream, we extract the Float values from the binary data. Finally, the stream is returned to the caller, allowing the caller to subscribe to it and process the results.

Now let's test. We'll verify the round trip from fire-and-forget to request/stream.

We can assert that each value is received in the same order as it was sent. Then, we can assert that we receive the same number of values that were sent:

@Test public void whenSendingStream_thenReceiveTheSameStream() { FireNForgetClient fnfClient = new FireNForgetClient(); ReqStreamClient streamClient = new ReqStreamClient(); List data = fnfClient.getData(); List dataReceived = new ArrayList(); Disposable subscription = streamClient.getDataStream() .index() .subscribe( tuple -> { assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2()); dataReceived.add(tuple.getT2()); }, err -> LOG.error(err.getMessage()) ); fnfClient.sendData(); // ... dispose client & subscription assertEquals("Wrong data count received", data.size(), dataReceived.size()); }

4.4. Channel

The channel model provides bidirectional communication. In this model, message streams flow asynchronously in both directions.

Let's create a simple game simulation to test this. In this game, each side of the channel will become a player. As the game runs, these players will send messages to the other side at random time intervals. The opposite side will react to the messages.

Firstly, we'll create the handler on the server. Like before, we add to the RSocketImpl:

@Override public Flux requestChannel(Publisher payloads) { Flux.from(payloads) .subscribe(gameController::processPayload); return Flux.from(gameController); }

The requestChannel handler has Payload streams for both input and output. The Publisher input parameter is a stream of payloads received from the client. As they arrive, these payloads are passed to the gameController::processPayload function.

In response, we return a different Flux stream back to the client. This stream is created from our gameController, which is also a Publisher.

Here is a summary of the GameController class:

public class GameController implements Publisher { @Override public void subscribe(Subscriber subscriber) { // send Payload messages to the subscriber at random intervals } public void processPayload(Payload payload) { // react to messages from the other player } }

When the GameController receives a subscriber it begins sending messages to that subscriber.

Next, let's create the client:

public class ChannelClient { private final RSocket socket; private final GameController gameController; public ChannelClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); this.gameController = new GameController("Client Player"); } public void playGame() { socket.requestChannel(Flux.from(gameController)) .doOnNext(gameController::processPayload) .blockLast(); } public void dispose() { this.socket.dispose(); } }

As we have seen in our previous examples, the client connects to the server in the same way as the other clients.

The client creates its own instance of the GameController.

We use socket.requestChannel() to send our Payload stream to the server. The server responds with a Payload stream of its own.

En tant que charges utiles reçues du serveur, nous les transmettons à notre gestionnaire gameController :: processPayload .

Dans notre simulation de jeu, le client et le serveur sont des images miroir l'un de l'autre. Autrement dit, chaque côté envoie un flux de données utiles et reçoit un flux de données utiles de l'autre extrémité .

Les flux s'exécutent indépendamment, sans synchronisation.

Enfin, exécutons la simulation dans un test:

@Test public void whenRunningChannelGame_thenLogTheResults() { ChannelClient client = new ChannelClient(); client.playGame(); client.dispose(); }

5. Conclusion

Dans cet article d'introduction, nous avons exploré les modèles d'interaction fournis par RSocket. Le code source complet des exemples peut être trouvé dans notre référentiel Github.

N'oubliez pas de consulter le site Web de RSocket pour une discussion plus approfondie. En particulier, les documents FAQ et Motivations fournissent un bon contexte.