Introduction à Reactor Core

1. Introduction

Reactor Core est une bibliothèque Java 8 qui implémente le modèle de programmation réactive. Il repose sur la spécification Reactive Streams, une norme pour la création d'applications réactives.

Dans le contexte du développement Java non réactif, devenir réactif peut être une courbe d'apprentissage assez raide. Cela devient plus difficile en le comparant à l' API Java 8 Stream , car ils pourraient être confondus avec les mêmes abstractions de haut niveau.

Dans cet article, nous tenterons de démystifier ce paradigme. Nous ferons de petits pas dans Reactor jusqu'à ce que nous ayons construit une image de la façon de composer du code réactif, jetant les bases d'articles plus avancés à venir dans une série ultérieure.

2. Spécification des flux réactifs

Avant de regarder Reactor, nous devrions examiner la spécification Reactive Streams. C'est ce que Reactor implémente et il jette les bases de la bibliothèque.

Essentiellement, Reactive Streams est une spécification pour le traitement de flux asynchrone.

En d'autres termes, un système où de nombreux événements sont produits et consommés de manière asynchrone. Pensez à un flux de milliers de mises à jour de stock par seconde entrant dans une application financière, et à ce qu'elle doive répondre à ces mises à jour en temps opportun.

L'un des principaux objectifs est de résoudre le problème de la contre-pression. Si nous avons un producteur qui émet des événements à un consommateur plus rapidement qu'il ne peut les traiter, alors le consommateur finira par être submergé par les événements, à court de ressources système.

La contre-pression signifie que notre consommateur devrait être en mesure d'indiquer au producteur la quantité de données à envoyer pour éviter cela, et c'est ce qui est indiqué dans la spécification.

3. Dépendances de Maven

Avant de commencer, ajoutons nos dépendances Maven:

 io.projectreactor reactor-core 3.3.9.RELEASE   ch.qos.logback logback-classic 1.1.3 

Nous ajoutons également Logback en tant que dépendance. En effet, nous enregistrerons la sortie de Reactor afin de mieux comprendre le flux de données.

4. Production d'un flux de données

Pour qu'une application soit réactive, la première chose qu'elle doit pouvoir faire est de produire un flux de données.

Cela pourrait être quelque chose comme l'exemple de mise à jour de stock que nous avons donné plus tôt. Sans ces données, nous n'aurions rien à réagir, c'est pourquoi il s'agit d'une première étape logique.

Reactive Core nous donne deux types de données qui nous permettent de faire cela.

4.1. Flux

La première façon de faire est d' utiliser un Flux. C'est un flux qui peut émettre 0..n éléments. Essayons d'en créer un simple:

Flux just = Flux.just(1, 2, 3, 4);

Dans ce cas, nous avons un flux statique de quatre éléments.

4.2. Mono

La deuxième façon de faire est d' utiliser un Mono, qui est un flux de 0..1 éléments. Essayons d'en instancier un:

Mono just = Mono.just(1);

Cela ressemble et se comporte presque exactement de la même manière que le Flux , mais cette fois, nous ne sommes limités qu'à un seul élément.

4.3. Pourquoi pas seulement Flux?

Avant d'expérimenter davantage, il convient de souligner pourquoi nous avons ces deux types de données.

Tout d'abord, il convient de noter que Flux et Mono sont des implémentations de l' interface Reactive Streams Publisher . Les deux classes sont conformes à la spécification, et nous pourrions utiliser cette interface à leur place:

Publisher just = Mono.just("foo");

Mais vraiment, connaître cette cardinalité est utile. C'est parce que quelques opérations n'ont de sens que pour l'un des deux types, et parce qu'elles peuvent être plus expressives (imaginez findOne () dans un référentiel).

5. Abonnement à un flux

Maintenant, nous avons une vue d'ensemble de haut niveau sur la façon de produire un flux de données, nous devons nous y abonner pour qu'il émette les éléments.

5.1. Collecter des éléments

Utilisons la méthode subscribe () pour collecter tous les éléments d'un flux:

List elements = new ArrayList(); Flux.just(1, 2, 3, 4) .log() .subscribe(elements::add); assertThat(elements).containsExactly(1, 2, 3, 4);

Les données ne commenceront pas à circuler tant que nous ne nous serons pas abonnés. Notez que nous avons également ajouté une journalisation, ce qui sera utile lorsque nous verrons ce qui se passe dans les coulisses.

5.2. Le flux des éléments

Avec la connexion en place, nous pouvons l'utiliser pour visualiser comment les données circulent dans notre flux:

20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()

Tout d'abord, tout fonctionne sur le thread principal. N'entrons pas dans les détails à ce sujet, car nous examinerons plus en détail la concurrence plus tard dans cet article. Cela simplifie les choses, car nous pouvons tout gérer dans l'ordre.

Passons maintenant en revue la séquence que nous avons enregistrée un par un:

  1. onSubscribe () - Ceci est appelé lorsque nous nous abonnons à notre flux
  2. request(unbounded) – When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
  3. onNext() – This is called on every single element
  4. onComplete() – This is called last, after receiving the last element. There's actually a onError() as well, which would be called if there is an exception, but in this case, there isn't

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that's what's been instantiated behind the scenes in our call to onSubscribe(). It's a useful method, but to better understand what's happening let's provide a Subscriber interface directly:

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { elements.add(integer); } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that the Flux has provided us with a helper method to reduce this verbosity.

5.3. Comparison to Java 8 Streams

It still might appear that we have something synonymous to a Java 8 Stream doing collect:

List collected = Stream.of(1, 2, 3, 4) .collect(toList());

Only we don't.

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams and apply backpressure, which we will cover next.

6. Backpressure

The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.

Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.

We can modify our Subscriber implementation to apply backpressure. Let's tell the upstream to only send two elements at a time by using request():

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { private Subscription s; int onNextAmount; @Override public void onSubscribe(Subscription s) { this.s = s; s.request(2); } @Override public void onNext(Integer integer) { elements.add(integer); onNextAmount++; if (onNextAmount % 2 == 0) { s.request(2); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

Now if we run our code again, we'll see the request(2) is called, followed by two onNext() calls, then request(2) again.

23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.

If we imagine we were being streamed tweets from twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.

7. Operating on a Stream

We can also perform operations on the data in our stream, responding to events as we see fit.

7.1. Mapping Data in a Stream

A simple operation that we can perform is applying a transformation. In this case, let's just double all the numbers in our stream:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribe(elements::add);

map() will be applied when onNext() is called.

7.2. Combining Two Streams

We can then make things more interesting by combining another stream with this one. Let's try this by using zip() function:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .zipWith(Flux.range(0, Integer.MAX_VALUE), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two)) .subscribe(elements::add); assertThat(elements).containsExactly( "First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3");

Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:

20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete() 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel() 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()

Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.

8. Hot Streams

Currently, we've focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.

For example, we could have a stream of mouse movements that constantly needs to be reacted to or a twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

8.1. Creating a ConnectableFlux

One way to create a hot stream is by converting a cold stream into one. Let's create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .publish();

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won't cause it to start emitting, allowing us to add multiple subscriptions:

publish.subscribe(System.out::println); publish.subscribe(System.out::println);

If we try running this code, nothing will happen. It's not until we call connect(), that the Flux will start emitting:

publish.connect();

8.2. Throttling

If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Let's try getting around this with throttling:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .sample(ofSeconds(2)) .publish();

Here, we've introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.

Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.

9. Concurrency

All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Let's try subscribing to a different thread to main:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribeOn(Schedulers.parallel()) .subscribe(elements::add);

The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.

20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()

Concurrency get's more interesting than this, and it will be worth us exploring it in another article.

10. Conclusion

Dans cet article, nous avons donné une vue d'ensemble de haut niveau et de bout en bout de Reactive Core. Nous avons expliqué comment nous pouvons publier et nous abonner à des flux, appliquer une contre-pression, fonctionner sur des flux et également gérer des données de manière asynchrone. Nous espérons que cela nous permettra d'écrire des applications réactives.

Les articles ultérieurs de cette série couvriront l'accès concurrentiel plus avancé et d'autres concepts réactifs. Il y a aussi un autre article sur Reactor with Spring.

Le code source de notre application est disponible sur over sur GitHub; il s'agit d'un projet Maven qui devrait pouvoir s'exécuter tel quel.