La différence entre l'API RxJava et l'API Java 9 Flow

1. Introduction

L'API Java Flow a été introduite dans Java 9 en tant qu'implémentation de la spécification de flux réactif.

Dans ce didacticiel, nous allons d'abord étudier les flux réactifs. Ensuite, nous découvrirons sa relation avec RxJava et l'API Flow.

2. Que sont les flux réactifs?

Le manifeste réactif a introduit Reactive Streams pour spécifier une norme pour le traitement de flux asynchrone avec contre-pression non bloquante.

La portée de la spécification de flux réactif est de définir un ensemble minimal d'interfaces pour atteindre ces objectifs:

  • org.reactivestreams.Publisher est un fournisseur de données qui publie des données aux abonnés en fonction de leur demande

  • org.reactivestreams.Subscriber est le consommateur de données - il peut recevoir des données après s'être abonné à un éditeur

  • org.reactivestreams.L'abonnement est créé lorsqu'un éditeur accepte un abonné

  • org.reactivestreams.Processor est à la fois un abonné et un éditeur - il s'abonne à un éditeur, traite les données puis transmet les données traitées à l'abonné

L'API Flow provient de la spécification. RxJava le précède, mais depuis 2.0, RxJava a également pris en charge la spécification.

Nous allons approfondir les deux, mais d'abord, voyons un cas d'utilisation pratique.

3. Cas d'utilisation

Pour ce didacticiel, nous utiliserons un service de diffusion vidéo en direct comme cas d'utilisation.

Une vidéo en direct, contrairement au streaming vidéo à la demande, ne dépend pas du consommateur. Par conséquent, le serveur publie le flux à son propre rythme, et il appartient au consommateur de s'adapter.

Dans sa forme la plus simple, notre modèle se compose d'un éditeur de flux vidéo et d'un lecteur vidéo en tant qu'abonné.

Implémentons VideoFrame notre élément de données:

public class VideoFrame { private long number; // additional data fields // constructor, getters, setters }

Passons ensuite en revue nos implémentations API Flow et RxJava une par une.

4. Mise en œuvre avec l'API Flow

Les API de flux dans JDK 9 correspondent à la spécification Reactive Streams. Avec l'API Flow, si l'application demande initialement N éléments, l'éditeur envoie au maximum N éléments à l'abonné.

Les interfaces de l'API Flow se trouvent toutes dans l' interface java.util.concurrent.Flow . Ils sont sémantiquement équivalents à leurs équivalents respectifs de Reactive Streams.

Implémentons VideoStreamServer comme l'éditeur de VideoFrame .

public class VideoStreamServer extends SubmissionPublisher { public VideoStreamServer() { super(Executors.newSingleThreadExecutor(), 5); } }

Nous avons étendu notre VideoStreamServer à partir de SubmissionPublisher au lieu d'implémenter directement Flow :: Publisher. SubmissionPublisher est l'implémentation JDK de Flow :: Publisher pour la communication asynchrone avec les abonnés, ce qui permet à notre VideoStreamServer d'émettre à son propre rythme.

En outre, il est utile pour la gestion de la contre-pression et du tampon, car lorsque SubmissionPublisher :: subscribe est appelé, il crée une instance de BufferedSubscription , puis ajoute le nouvel abonnement à sa chaîne d'abonnements. BufferedSubscription peut mettre en mémoire tampon les éléments émis jusqu'à SubmissionPublisher # maxBufferCapacity .

Définissons maintenant VideoPlayer, qui consomme un flux de VideoFrame. Par conséquent, il doit implémenter Flow :: Subscriber .

public class VideoPlayer implements Flow.Subscriber { Flow.Subscription subscription = null; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(VideoFrame item) { log.info("play #{}" , item.getNumber()); subscription.request(1); } @Override public void onError(Throwable throwable) { log.error("There is an error in video streaming:{}" , throwable.getMessage()); } @Override public void onComplete() { log.error("Video has ended"); } }

VideoPlayer s'abonne à VideoStreamServer, puis après un abonnement réussi, la méthode VideoPlayer :: onSubscribe est appelée et elle demande une image. VideoPlayer :: onNext reçoit la trame et en demande une nouvelle. Le nombre de trames demandées dépend du cas d'utilisation et des implémentations de l' abonné .

Enfin, mettons les choses ensemble:

VideoStreamServer streamServer = new VideoStreamServer(); streamServer.subscribe(new VideoPlayer()); // submit video frames ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); AtomicLong frameNumber = new AtomicLong(); executor.scheduleWithFixedDelay(() -> { streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> { subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " droped because of backpressure")); return true; }); }, 0, 1, TimeUnit.MILLISECONDS); sleep(1000);

5. Implémentation avec RxJava

RxJava est une implémentation Java de ReactiveX. Le projet ReactiveX (ou Reactive Extensions) vise à fournir un concept de programmation réactive. C'est une combinaison du modèle Observer, du modèle Iterator et de la programmation fonctionnelle.

La dernière version majeure de RxJava est 3.x. RxJava prend en charge Reactive Streams depuis la version 2.x avec sa classe de base Flowable , mais c'est un ensemble plus important que Reactive Streams avec plusieurs classes de base telles que Flowable , Observable , Single , Completable.

Le composant de conformité de flux réactif fluide est un flux de 0 à N éléments avec gestion de la contre-pression. Flowable étend Publisher à partir de Reactive Streams. Par conséquent, de nombreux opérateurs RxJava acceptent directement Publisher et permettent une interopérabilité directe avec d'autres implémentations de Reactive Streams.

Maintenant, faisons notre générateur de flux vidéo qui est un flux paresseux infini:

Stream videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> { // sleep for 1ms; return new VideoFrame(videoFrame.getNumber() + 1); });

Ensuite, nous définissons une instance Flowable pour générer des cadres sur un thread séparé:

Flowable .fromStream(videoStream) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))

Il est important de noter qu'un flux infini nous suffit, mais si nous avons besoin d'un moyen plus flexible pour générer notre flux, Flowable.create est un bon choix.

Flowable .create(new FlowableOnSubscribe() { AtomicLong frame = new AtomicLong(); @Override public void subscribe(@NonNull FlowableEmitter emitter) { while (true) { emitter.onNext(new VideoFrame(frame.incrementAndGet())); //sleep for 1 ms to simualte delay } } }, /* Set Backpressure Strategy Here */)

Ensuite, à l'étape suivante, VideoPlayer s'abonne à ce Flowable et observe les éléments sur un fil séparé.

videoFlowable .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(item -> { log.info("play #" + item.getNumber()); // sleep for 30 ms to simualate frame display });

Et enfin, nous allons configurer la stratégie de contre-pression. Si nous voulons arrêter la vidéo en cas de perte d'image, nous devons donc utiliser BackpressureOverflowStrategy :: ERROR lorsque la mémoire tampon est pleine.

Flowable .fromStream(videoStream) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR) .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(item -> { log.info("play #" + item.getNumber()); // sleep for 30 ms to simualate frame display });

6. Comparaison de RxJava et de l'API Flow

Even in these two simple implementations, we can see how RxJava's API is rich, especially for buffer management, error handling, and backpressure strategy. It gives us more options and fewer lines of code with its fluent API. Now let's consider more complicated cases.

Assume that our player can't display video frames without a codec. Hence with Flow API, we need to implement a Processor to simulate the codec and sit between server and player. With RxJava, we can do it with Flowable::flatMap or Flowable::map.

Or let's imagine that our player is also going to broadcast live translation audio, so we have to combine streams of video and audio from separate publishers. With RxJava, we can use Flowable::combineLatest, but with Flow API, it is not an easy task.

Although, it is possible to write a custom Processor that subscribes to both streams and sends the combined data to our VideoPlayer. The implementation is a headache, however.

7. Why Flow API?

At this point, we may have a question, what is the philosophy behind the Flow API?

If we search for Flow API usages in the JDK, we can find something in java.net.http and jdk.internal.net.http.

Furthermore, we can find adapters in the reactor project or reactive stream package. For example, org.reactivestreams.FlowAdapters has methods for converting Flow API interfaces to Reactive Stream ones and vice-versa. Therefore it helps the interoperability between Flow API and libraries with reactive stream support.

All of these facts help us to understand the purpose of Flow API: It was created to be a group of reactive specification interfaces in JDK without relay on third parties. Moreover, Java expects Flow API to be accepted as standard interfaces for reactive specification and to be used in JDK or other Java-based libraries that implement the reactive specification for middlewares and utilities.

8. Conclusions

Dans ce didacticiel, nous avons une introduction à la spécification de flux réactif, à l'API de flux et à RxJava.

De plus, nous avons vu un exemple pratique d'implémentations d'API Flow et de RxJava pour un flux vidéo en direct.

Mais tous les aspects de l'API Flow et de RxJava comme Flow :: Processor , Flowable :: map et Flowable :: flatMap ou les stratégies de contre-pression ne sont pas traités ici.

Comme toujours, vous trouverez le code complet du tutoriel sur GitHub.