Introduction à RxJava

1. Vue d'ensemble

Dans cet article, nous allons nous concentrer sur l'utilisation des extensions réactives (Rx) en Java pour composer et consommer des séquences de données.

En un coup d'œil, l'API peut ressembler à Java 8 Streams, mais en fait, elle est beaucoup plus flexible et fluide, ce qui en fait un paradigme de programmation puissant.

Si vous voulez en savoir plus sur RxJava, consultez cet article.

2. Configuration

Pour utiliser RxJava dans notre projet Maven, nous devons ajouter la dépendance suivante à notre pom.xml:

 io.reactivex rxjava ${rx.java.version} 

Ou, pour un projet Gradle:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3. Concepts fonctionnels réactifs

D'un côté, la programmation fonctionnelle est le processus de construction de logiciels en composant des fonctions pures, évitant l'état partagé, les données mutables et les effets secondaires.

D'un autre côté, la programmation réactive est un paradigme de programmation asynchrone concerné par les flux de données et la propagation du changement.

Ensemble, la programmation réactive fonctionnelle forme une combinaison de techniques fonctionnelles et réactives qui peuvent représenter une approche élégante de la programmation événementielle - avec des valeurs qui changent au fil du temps et où le consommateur réagit aux données lorsqu'elles arrivent.

Cette technologie rassemble différentes implémentations de ses principes fondamentaux, certains auteurs ont proposé un document qui définit le vocabulaire commun pour décrire le nouveau type d'applications.

3.1. Manifeste réactif

Le manifeste réactif est un document en ligne qui établit un standard élevé pour les applications dans l'industrie du développement de logiciels. En termes simples, les systèmes réactifs sont:

  • Réactif - les systèmes doivent répondre en temps opportun
  • Message Driven - les systèmes doivent utiliser le passage de messages asynchrone entre les composants pour assurer un couplage lâche
  • Élastique - les systèmes doivent rester réactifs sous une charge élevée
  • Résilient - les systèmes doivent rester réactifs en cas de défaillance de certains composants

4. Observables

Il existe deux types de clés à comprendre lorsque vous travaillez avec Rx:

  • Observable représente tout objet qui peut obtenir des données d'une source de données et dont l'état peut être intéressant de telle sorte que d'autres objets peuvent enregistrer un intérêt
  • Un observateur est tout objet qui souhaite être notifié lorsque l'état d'un autre objet change

Un observateur souscrit à une séquence observable . La séquence envoie les éléments à l' observateur un par un.

L' observateur gère chacun d'eux avant de traiter le suivant. Si de nombreux événements arrivent de manière asynchrone, ils doivent être stockés dans une file d'attente ou supprimés.

Dans Rx , un observateur ne sera jamais appelé avec un élément dans le désordre ou appelé avant que le rappel ne soit retourné pour l'élément précédent.

4.1. Types d' observables

Il existe deux types:

  • Non bloquant - l' exécution asynchrone est prise en charge et est autorisée à se désabonner à tout moment dans le flux d'événements. Sur cet article, nous nous concentrerons principalement sur ce type de type
  • Blocage - tous les appels d' observateur onNext seront synchrones et il n'est pas possible de se désabonner au milieu d'un flux d'événements. Nous pouvons toujours convertir un observable en observable bloquant , en utilisant la méthode toBlocking:
BlockingObservable blockingObservable = observable.toBlocking();

4.2. Les opérateurs

Un opérateur est une fonction qui prend un O bservable (la source) comme premier argument et renvoie un autre Observable (la destination). Ensuite, pour chaque élément émis par l'observable source, il appliquera une fonction à cet élément, puis émettra le résultat sur l' observable de destination .

Les opérateurs peuvent être enchaînés pour créer des flux de données complexes qui filtrent les événements en fonction de certains critères. Plusieurs opérateurs peuvent être appliqués à la même observable .

Il n'est pas difficile d'entrer dans une situation dans laquelle un observable émet des éléments plus rapidement qu'un opérateur ou un observateur ne peut les consommer. Vous pouvez en savoir plus sur la contre-pression ici.

4.3. Créer observable

L'opérateur de base produit simplement un Observable qui émet une seule instance générique avant de terminer, la chaîne «Hello». Lorsque nous voulons extraire des informations d'un Observable , nous implémentons une interface d' observateur puis appelons subscribe sur l' Observable souhaité :

Observable observable = Observable.just("Hello"); observable.subscribe(s -> result = s); assertTrue(result.equals("Hello"));

4.4. OnNext, OnError et OnCompleted

Il existe trois méthodes sur l' interface d' observateur que nous voulons connaître:

  1. OnNext est appelé sur notre observateur chaque fois qu'un nouvel événement est publié dans l' Observable attaché . C'est la méthode où nous allons effectuer une action sur chaque événement
  2. OnCompleted is called when the sequence of events associated with an Observable is complete, indicating that we should not expect any more onNext calls on our observer
  3. OnError is called when an unhandled exception is thrown during the RxJava framework code or our event handling code

The return value for the Observablessubscribe method is a subscribe interface:

String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( i -> result += i, //OnNext Throwable::printStackTrace, //OnError () -> result += "_Completed" //OnCompleted ); assertTrue(result.equals("abcdefg_Completed"));

5. Observable Transformations and Conditional Operators

5.1. Map

The map operator transforms items emitted by an Observable by applying a function to each item.

Let's assume there is a declared array of strings that contains some letters from the alphabet and we want to print them in capital mode:

Observable.from(letters) .map(String::toUpperCase) .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG"));

The flatMap can be used to flatten Observables whenever we end up with nested Observables.

More details about the difference between map and flatMap can be found here.

Assuming we have a method that returns an Observable from a list of strings. Now we'll be printing for each string from a new Observable the list of titles based on what Subscriber sees:

Observable getTitle() { return Observable.from(titleList); } Observable.just("book1", "book2") .flatMap(s -> getTitle()) .subscribe(l -> result += l); assertTrue(result.equals("titletitle"));

5.2. Scan

The scan operator applies a function to each item emitted by an Observable sequentially and emits each successive value.

It allows us to carry forward state from event to event:

String[] letters = {"a", "b", "c"}; Observable.from(letters) .scan(new StringBuilder(), StringBuilder::append) .subscribe(total -> result += total.toString()); assertTrue(result.equals("aababc"));

5.3. GroupBy

Group by operator allows us to classify the events in the input Observable into output categories.

Let's assume that we created an array of integers from 0 to 10, then apply group by that will divide them into the categories even and odd:

Observable.from(numbers) .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") .subscribe(group -> group.subscribe((number) -> { if (group.getKey().toString().equals("EVEN")) { EVEN[0] += number; } else { ODD[0] += number; } }) ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579"));

5.4. Filter

The operator filter emits only those items from an observable that pass a predicate test.

So let's filter in an integer array for the odd numbers:

Observable.from(numbers) .filter(i -> (i % 2 == 1)) .subscribe(i -> result += i); assertTrue(result.equals("13579"));

5.5. Conditional Operators

DefaultIfEmpty emits item from the source Observable, or a default item if the source Observable is empty:

Observable.empty() .defaultIfEmpty("Observable is empty") .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty"));

The following code emits the first letter of the alphabet ‘a' because the array letters is not empty and this is what it contains in the first position:

Observable.from(letters) .defaultIfEmpty("Observable is empty") .first() .subscribe(s -> result += s); assertTrue(result.equals("a"));

TakeWhile operator discards items emitted by an Observable after a specified condition becomes false:

Observable.from(numbers) .takeWhile(i -> i  sum[0] += s); assertTrue(sum[0] == 10);

Of course, there more others operators that could cover our needs like Contain, SkipWhile, SkipUntil, TakeUntil, etc.

6. Connectable Observables

A ConnectableObservable resembles an ordinary Observable, except that it doesn't begin emitting items when it is subscribed to, but only when the connect operator is applied to it.

In this way, we can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items:

String[] result = {""}; ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); assertFalse(result[0].equals("01")); connectable.connect(); Thread.sleep(500); assertTrue(result[0].equals("01"));

7. Single

Single is like an Observable who, instead of emitting a series of values, emits one value or an error notification.

With this source of data, we can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error
String[] result = {""}; Single single = Observable.just("Hello") .toSingle() .doOnSuccess(i -> result[0] += i) .doOnError(error -> { throw new RuntimeException(error.getMessage()); }); single.subscribe(); assertTrue(result[0].equals("Hello"));

8. Subjects

A Subject is simultaneously two elements, a subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable.

And because it's also observable, the events from multiple subscribers can be reemitted as its events to anyone observing it.

In the next example, we'll look at how the observers will be able to see the events that occur after they subscribe:

Integer subscriber1 = 0; Integer subscriber2 = 0; Observer getFirstObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber1 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber1 completed"); } }; } Observer getSecondObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber2 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber2 completed"); } }; } PublishSubject subject = PublishSubject.create(); subject.subscribe(getFirstObserver()); subject.onNext(1); subject.onNext(2); subject.onNext(3); subject.subscribe(getSecondObserver()); subject.onNext(4); subject.onCompleted(); assertTrue(subscriber1 + subscriber2 == 14)

9. Resource Management

Using operation allows us to associate resources, such as a JDBC database connection, a network connection, or open files to our observables.

Nous présentons ici en commentaires les étapes à suivre pour atteindre cet objectif ainsi qu'un exemple de mise en œuvre:

String[] result = {""}; Observable values = Observable.using( () -> "MyResource", r -> { return Observable.create(o -> { for (Character c : r.toCharArray()) { o.onNext(c); } o.onCompleted(); }); }, r -> System.out.println("Disposed: " + r) ); values.subscribe( v -> result[0] += v, e -> result[0] += e ); assertTrue(result[0].equals("MyResource"));

10. Conclusion

Dans cet article, nous avons expliqué comment utiliser la bibliothèque RxJava et comment explorer ses fonctionnalités les plus importantes.

Le code source complet du projet, y compris tous les exemples de code utilisés ici, est disponible à l'adresse over sur Github.