Combinaison d'observables dans RxJava

1. Introduction

Dans ce rapide didacticiel, nous aborderons différentes façons de combiner des observables dans RxJava.

Si vous êtes nouveau sur RxJava, consultez d'abord ce didacticiel d'introduction.

Maintenant, allons-y.

2. Observables

Les séquences observables , ou simplement observables , sont des représentations de flux de données asynchrones.

Celles-ci sont basées sur le modèle d'observateur dans lequel un objet appelé observateur s'abonne aux éléments émis par un observable .

L'abonnement est non bloquant car l' Observateur est prêt à réagir à tout ce que l' Observable émettra à l'avenir. Ceci, à son tour, facilite la concurrence.

Voici une démonstration simple dans RxJava:

Observable .from(new String[] { "John", "Doe" }) .subscribe(name -> System.out.println("Hello " + name))

3. Combinaison d'observables

Lors de la programmation à l'aide d'un framework réactif, il est courant de combiner divers observables .

Dans une application Web, par exemple, nous pouvons avoir besoin d'obtenir deux ensembles de flux de données asynchrones indépendants l'un de l'autre.

Au lieu d'attendre la fin du flux précédent avant de demander le flux suivant, nous pouvons appeler les deux en même temps et nous abonner aux flux combinés.

Dans cette section, nous discuterons de certaines des différentes façons dont nous pouvons combiner plusieurs observables dans RxJava et des différents cas d'utilisation auxquels chaque méthode s'applique.

3.1. Fusionner

Nous pouvons utiliser l' opérateur de fusion pour combiner la sortie de plusieurs observables afin qu'ils agissent comme un:

@Test public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.merge( Observable.from(new String[] {"Hello", "World"}), Observable.from(new String[] {"I love", "RxJava"}) ).subscribe(testSubscriber); testSubscriber.assertValues("Hello", "World", "I love", "RxJava"); }

3.2. MergeDelayError

La méthode mergeDelayError est identique à merge en ce sens qu'elle combine plusieurs observables en un seul, mais si des erreurs se produisent pendant la fusion, elle permet aux éléments sans erreur de continuer avant de propager les erreurs :

@Test public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.mergeDelayError( Observable.from(new String[] { "hello", "world" }), Observable.error(new RuntimeException("Some exception")), Observable.from(new String[] { "rxjava" }) ).subscribe(testSubscriber); testSubscriber.assertValues("hello", "world", "rxjava"); testSubscriber.assertError(RuntimeException.class); }

L'exemple ci-dessus émet toutes les valeurs sans erreur :

hello world rxjava

Notez que si nous utilisons merge au lieu de mergeDelayError , la chaîne « rxjava» ne sera pas émise car la fusion arrête immédiatement le flux de données d' Observables lorsqu'une erreur se produit.

3.3. Zip *: français

La méthode d'extension zip rassemble deux séquences de valeurs sous forme de paires :

@Test public void givenTwoObservables_whenZipped_thenReturnCombinedResults() { List zippedStrings = new ArrayList(); Observable.zip( Observable.from(new String[] { "Simple", "Moderate", "Complex" }), Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add); assertThat(zippedStrings).isNotEmpty(); assertThat(zippedStrings.size()).isEqualTo(3); assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy"); }

3.4. Zip avec intervalle

Dans cet exemple, nous allons compresser un flux avec intervalle qui retardera en effet l'émission des éléments du premier flux:

@Test public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() { TestSubscriber testSubscriber = new TestSubscriber(); Observable data = Observable.just("one", "two", "three", "four", "five"); Observable interval = Observable.interval(1L, TimeUnit.SECONDS); Observable .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData)) .toBlocking().subscribe(testSubscriber); testSubscriber.assertCompleted(); testSubscriber.assertValueCount(5); testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five"); }

4. Résumé

Dans cet article, nous avons vu quelques-unes des méthodes pour combiner Observables avec RxJava. Vous pouvez en savoir plus sur d'autres méthodes comme combineLatest , join , groupJoin , switchOnNext , dans la documentation officielle de RxJava.

Comme toujours, le code source de cet article est disponible dans notre référentiel GitHub.