Gérer la contre-pression avec RxJava

1. Vue d'ensemble

Dans cet article, nous examinerons la manière dont la bibliothèque RxJava nous aide à gérer la contre-pression.

En termes simples, RxJava utilise un concept de flux réactifs en introduisant des Observables, auxquels un ou plusieurs observateurs peuvent s'abonner. Faire face à des flux éventuellement infinis est très difficile, car nous devons faire face à un problème de contre-pression.

Il n'est pas difficile de se retrouver dans une situation dans laquelle un observable émet des éléments plus rapidement qu'un abonné ne peut les consommer. Nous examinerons les différentes solutions au problème de l'accroissement de la mémoire tampon des articles non consommés.

2. Hot Observables Versus froid Observables

Tout d'abord, créons une fonction consommateur simple qui sera utilisée comme consommateur d'éléments d' Observables que nous définirons plus tard:

public class ComputeFunction { public static void compute(Integer v) { try { System.out.println("compute integer v: " + v); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }

Notre fonction compute () imprime simplement l'argument. La chose importante à noter ici est l'invocation d'une méthode Thread.sleep (1000) - nous le faisons pour émuler une tâche de longue durée qui obligera Observable à se remplir d'éléments plus rapidement qu'Observer pourra les consommer.

Nous avons deux types d' observables - chaud et froid - qui sont totalement différents en ce qui concerne la gestion de la contre-pression.

2.1. Observables froids

Un observable froid émet une séquence particulière d'éléments mais peut commencer à émettre cette séquence lorsque son observateur le trouve pratique, et à la vitesse souhaitée par l' observateur , sans perturber l'intégrité de la séquence. Cold Observable fournit des objets de manière paresseuse.

L' observateur prend des éléments uniquement lorsqu'il est prêt à traiter cet élément, et les éléments n'ont pas besoin d'être mis en mémoire tampon dans un observable car ils sont demandés de manière pull.

Par exemple, si vous créez un Observable basé sur une plage statique d'éléments de un à un million, cet Observable émettra la même séquence d'éléments quelle que soit la fréquence à laquelle ces éléments sont observés:

Observable.range(1, 1_000_000) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute);

Lorsque nous démarrons notre programme, les éléments seront calculés par Observer paresseusement et seront demandés de manière pull. La méthode Schedulers.computation () signifie que nous voulons exécuter notre Observer dans un pool de threads de calcul dans RxJava.

La sortie d'un programme consistera en un résultat d'une méthode compute () appelée pour un par un élément à partir d'un observable :

compute integer v: 1 compute integer v: 2 compute integer v: 3 compute integer v: 4 ...

Les observables froids n'ont pas besoin d'avoir une quelconque forme de contre-pression car ils fonctionnent de manière tirée. Des exemples d'éléments émis par un observable froid peuvent inclure les résultats d'une requête de base de données, d'une récupération de fichier ou d'une requête Web.

2.2. Observables chauds

Un observable chaud commence à générer des éléments et les émet immédiatement lorsqu'ils sont créés. C'est contraire à un modèle de traitement pull Observables froids . Hot Observable émet des objets à son propre rythme, et c'est à ses observateurs de suivre.

Lorsque l' Observateur n'est pas en mesure de consommer des éléments aussi rapidement qu'ils sont produits par un Observable, ils doivent être mis en mémoire tampon ou gérés d'une autre manière, car ils rempliront la mémoire, provoquant finalement OutOfMemoryException.

Prenons un exemple d' Observable chaud , qui produit un million d'articles à un consommateur final qui traite ces articles. Lorsqu'une méthode compute () dans l' Observer prend un certain temps pour traiter chaque élément, l' Observable commence à remplir une mémoire avec des éléments, provoquant l'échec d'un programme:

PublishSubject source = PublishSubject.create(); source.observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); IntStream.range(1, 1_000_000).forEach(source::onNext); 

L'exécution de ce programme échouera avec une exception MissingBackpressureException car nous n'avons pas défini de moyen de gérer la surproduction d' Observable .

Des exemples d'éléments émis par un observable actif peuvent inclure des événements de souris et de clavier, des événements système ou des cours de bourse.

3. Tampon de surproduction observable

La première façon de gérer la surproduction d' Observable est de définir une sorte de tampon pour les éléments qui ne peuvent pas être traités par un observateur.

Nous pouvons le faire en appelant une méthode buffer () :

PublishSubject source = PublishSubject.create(); source.buffer(1024) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); 

Définir un tampon d'une taille de 1024 donnera à un observateur un peu de temps pour rattraper une source surproductrice. Le tampon stockera les éléments qui n'ont pas encore été traités.

Nous pouvons augmenter une taille de tampon pour avoir suffisamment de place pour les valeurs produites.

Notez cependant qu'en général, cela peut n'être qu'une correction temporaire car le débordement peut toujours se produire si la source surproduit la taille de tampon prévue.

4. Articles émis par lots

Nous pouvons regrouper les éléments surproduits dans des fenêtres de N éléments.

Lorsque Observable produit des éléments plus rapidement que Observer ne peut les traiter, nous pouvons atténuer cela en regroupant les éléments produits ensemble et en envoyant un lot d'éléments à Observer qui est capable de traiter une collection d'éléments au lieu d'un élément un par un:

PublishSubject source = PublishSubject.create(); source.window(500) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); 

L'utilisation de la méthode window () avec l'argument 500 indiquera à Observable de regrouper les éléments en lots de taille 500. Cette technique peut réduire un problème de surproduisant Observable lorsque Observer est capable de traiter un lot d'éléments comparant plus rapide aux éléments de traitement un par un.

5. Saut d'éléments

Si certaines des valeurs produites par Observable peuvent être ignorées en toute sécurité, nous pouvons utiliser l'échantillonnage dans un délai et des opérateurs de limitation spécifiques.

Les méthodes sample () et throttleFirst () prennent la durée comme paramètre:

  • La méthode s ample () examine périodiquement la séquence des éléments et émet le dernier élément qui a été produit pendant la durée spécifiée comme paramètre
  • La méthode throttleFirst () émet le premier élément qui a été produit après la durée spécifiée en paramètre

La durée est un temps après lequel un élément spécifique est sélectionné dans la séquence des éléments produits. Nous pouvons spécifier une stratégie pour gérer la contre-pression en sautant des éléments:

PublishSubject source = PublishSubject.create(); source.sample(100, TimeUnit.MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

Nous avons précisé que la stratégie de saut d'éléments sera une méthode sample () . Nous voulons un échantillon d'une séquence d'une durée de 100 millisecondes. Cet élément sera émis vers l' observateur.

Rappelez-vous, cependant, que ces opérateurs réduisent uniquement le taux de réception de valeur par l' Observer en aval et qu'ils peuvent donc toujours conduire à MissingBackpressureException .

6. Manipulation d'un tampon observable de remplissage

Dans le cas où nos stratégies d'échantillonnage ou de mise en lots d'éléments n'aident pas à remplir un tampon , nous devons implémenter une stratégie de gestion des cas où un tampon se remplit.

Nous devons utiliser une méthode onBackpressureBuffer () pour empêcher BufferOverflowException.

The onBackpressureBuffer() method takes three arguments: a capacity of an Observable buffer, a method that is invoked when a buffer is filling up, and a strategy for handling elements that need to be discarded from a buffer. Strategies for overflow are in a BackpressureOverflow class.

There are 4 types of actions that can be executed when the buffer fills up:

  • ON_OVERFLOW_ERROR – this is the default behavior signaling a BufferOverflowException when the buffer is full
  • ON_OVERFLOW_DEFAULT – currently it is the same as ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST – if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream Observer requests
  • ON_OVERFLOW_DROP_OLDEST – drops the oldest element in the buffer and adds the current value to it

Let's see how to specify that strategy:

Observable.range(1, 1_000_000) .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn(Schedulers.computation()) .subscribe(e -> {}, Throwable::printStackTrace); 

Here our strategy for handling the overflowing buffer is dropping the oldest element in a buffer and adding newest item produced by an Observable.

Note that the last two strategies cause a discontinuity in the stream as they drop out elements. In addition, they won't signal BufferOverflowException.

7. Dropping All Overproduced Elements

Whenever the downstream Observer is not ready to receive an element, we can use an onBackpressureDrop() method to drop that element from the sequence.

We can think of that method as an onBackpressureBuffer() method with a capacity of a buffer set to zero with a strategy ON_OVERFLOW_DROP_LATEST.

This operator is useful when we can safely ignore values from a source Observable (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on:

Observable.range(1, 1_000_000) .onBackpressureDrop() .observeOn(Schedulers.computation()) .doOnNext(ComputeFunction::compute) .subscribe(v -> {}, Throwable::printStackTrace);

The method onBackpressureDrop() is eliminating a problem of overproducing Observable but needs to be used with caution.

8. Conclusion

Dans cet article, nous avons examiné un problème de surproduction d' Observable et les moyens de gérer une contre-pression. Nous avons examiné des stratégies de mise en mémoire tampon, de mise en lots et de saut d'éléments lorsque l' observateur n'est pas capable de consommer des éléments aussi rapidement qu'ils sont produits par un observable.

L'implémentation de tous ces exemples et extraits de code se trouve dans le projet GitHub - il s'agit d'un projet Maven, il devrait donc être facile à importer et à exécuter tel quel.