Filtrage des observables dans RxJava

1. Introduction

Après l'introduction à RxJava, nous allons examiner les opérateurs de filtrage.

En particulier, nous allons nous concentrer sur le filtrage, le saut, le filtrage temporel et certaines opérations de filtrage plus avancées.

2. Filtrage

Lorsque vous travaillez avec Observable , il est parfois utile de sélectionner uniquement un sous-ensemble d'éléments émis. Pour cela, RxJava propose diverses capacités de filtrage .

Commençons par regarder la méthode de filtrage .

2.1. L' opérateur de filtre

En termes simples, l' opérateur de filtre filtre un observable en s'assurant que les éléments émis correspondent à la condition spécifiée , qui se présente sous la forme d'un prédicat .

Voyons comment nous pouvons filtrer uniquement les valeurs impaires de celles émises:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .filter(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 3, 5, 7, 9);

2.2. L' opérateur de prise

Lors du filtrage avec prise , la logique entraîne l'émission des n premiers éléments tout en ignorant les éléments restants.

Voyons comment nous pouvons filtrer la source Observable et émettre uniquement les deux premiers éléments:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.take(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.3. L' opérateur takeWhile

Lors de l'utilisation de takeWhile, l' Observable filtré continuera à émettre des éléments jusqu'à ce qu'il rencontre un premier élément qui ne correspond pas au Predicate.

Voyons comment nous pouvons utiliser le takeWhile - avec un prédicat de filtrage :

Observable sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.4. L' opérateur TakeFirst

Chaque fois que nous voulons émettre uniquement le premier élément correspondant à une condition donnée, nous pouvons utiliser takeFirst ().

Voyons comment nous pouvons émettre le premier élément supérieur à 5:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 7, 6); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeFirst(x -> x > 5); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

2.5. Opérateurs first et firstOrDefault

Un comportement similaire peut être obtenu en utilisant la première API:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.first(); filteredObservable.subscribe(subscriber); subscriber.assertValue(1);

Cependant, si nous voulons spécifier une valeur par défaut, si aucun élément n'est émis, nous pouvons utiliser f irstOrDefault :

Observable sourceObservable = Observable.empty(); Observable filteredObservable = sourceObservable.firstOrDefault(-1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.6. L' opérateur takeLast

Ensuite, si nous voulons n'émettre que les n derniers éléments émis par un Observable , nous pouvons utiliser takeLast .

Voyons comment il est possible d'émettre uniquement les trois derniers éléments:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.takeLast(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(8, 9, 10);

Nous devons nous rappeler que cela retarde l'émission de tout élément de la source Observable jusqu'à ce qu'il se termine.

2.7. last and lastOrDefault

Si nous voulons n'émettre que le dernier élément, autrement qu'en utilisant takeLast (1) , nous pouvons utiliser last .

This filters the Observable, emitting only the last element, which optionally verifies a filtering Predicate:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .last(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValue(9);

In case the Observable is empty, we can use lastOrDefault, that filters the Observable emitting the default value.

The default value is also emitted if the lastOrDefault operator is used and there aren't any items that verify the filtering condition:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.lastOrDefault(-1, i -> i > 10); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.8. elementAt and elementAtOrDefault Operators

With the elementAt operator, we can pick a single item emitted by the source Observable, specifying its index:

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAt(4); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

However, elementAt will throw an IndexOutOfBoundException if the specified index exceeds the number of items emitted.

To avoid this situation, it's possible to use elementAtOrDefault – which will return a default value in case the index is out of range:

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAtOrDefault(7, -1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.9. The ofType Operator

Whenever the Observable emits Object items, it's possible to filter them based on their type.

Let's see how we can only filter the String type items emitted:

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.ofType(String.class); filteredObservable.subscribe(subscriber); subscriber.assertValues("two", "five");

3. Skipping

On the other hand, when we want to filter out or skip some of the items emitted by an Observable, RxJava offers a few operators as a counterpart of the filtering ones, that we've previously discussed.

Let's start looking at the skip operator, the counterpart of take.

3.1. The skip Operator

When an Observable emits a sequence of items, it's possible to filter out or skip some of the firsts emitted items using skip.

For example. let's see how it's possible to skip the first four elements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skip(4); filteredObservable.subscribe(subscriber); subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2. The skipWhile Operator

Whenever we want to filter out all the first values emitted by an Observable that fail a filtering predicate, we can use the skipWhile operator:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .skipWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3. The skipLast Operator

The skipLast operator allows us to skip the final items emitted by the Observable accepting only those emitted before them.

With this, we can, for example, skip the last five items:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skipLast(5); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3, 4, 5);

3.4. distinct and distinctUntilChanged Operators

The distinct operator returns an Observable that emits all the items emitted by the sourceObservable that are distinct:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinct(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

However, if we want to obtain an Observable that emits all the items emitted by the sourceObservable that are distinct from their immediate predecessor, we can use the distinctUntilChanged operator:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinctUntilChanged(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 1, 3, 1);

3.5. The ignoreElements Operator

Whenever we want to ignore all the elements emitted by the sourceObservable, we can simply use the ignoreElements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable ignoredObservable = sourceObservable.ignoreElements(); ignoredObservable.subscribe(subscriber); subscriber.assertNoValues();

4. Time Filtering Operators

When working with observable sequence, the time axis is unknown but sometimes getting timely data from a sequence could be useful.

With this purpose, RxJava offers a few methods that allow us to work with Observable using also the time axis.

Before moving on to the first one, let's define a timed Observable that will emit an item every second:

TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = Observable .just(1, 2, 3, 4, 5, 6) .zipWith(Observable.interval( 0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

The TestScheduler is a special scheduler that allows advancing the clock manually at whatever pace we prefer.

4.1. sample and throttleLast Operators

The sample operator filters the timedObservable, returning an Observable that emits the most recent items emitted by this API within period time intervals.

Let's see how we can sample the timedObservable, filtering only the last emitted item every 2.5 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable sampledObservable = timedObservable .sample(2500L, TimeUnit.MILLISECONDS, testScheduler); sampledObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(3, 5, 6);

This kind of behavior can be achieved also using the throttleLast operator.

4.2. The throttleFirst Operator

The throttleFirst operator differs from throttleLast/sample since it emits the first item emitted by the timedObservable in each sampling period instead of the most recently emitted one.

Let's see how we can emit the first items, using a sampling period of 4 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .throttleFirst(4100L, TimeUnit.SECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 6);

4.3. debounce and throttleWithTimeout Operators

With the debounce operator, it's possible to emit only an item if a particular timespan has passed without emitting another item.

Therefore, if we select a timespan that is greater than the time interval between the emitted items of the timedObservable, it will only emit the last one. On the other hand, if it's smaller, it will emit all the items emitted by the timedObservable.

Let's see what happens in the first scenario:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .debounce(2000L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValue(6);

This kind of behavior can also be achieved using throttleWithTimeout.

4.4. The timeout Operator

The timeout operator mirrors the source Observable, but issue a notification error, aborting the emission of items, if the source Observable fails to emit any items during a specified time interval.

Let's see what happens if we specify a timeout of 500 milliseconds to our timedObservable:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .timeout(500L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);

5. Multiple Observable Filtering

When working with Observable, it's definitely possible to decide if filtering or skipping items based on a second Observable.

Before moving on, let's define a delayedObservable, that will emit only 1 item after 3 seconds:

Observable delayedObservable = Observable.just(1) .delay(3, TimeUnit.SECONDS, testScheduler);

Let's start with takeUntil operator.

5.1. The takeUntil Operator

The takeUntil operator discards any item emitted by the source Observable (timedObservable) after a second Observable (delayedObservable) emits an item or terminates:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .skipUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(4, 5, 6);

5.2. The skipUntil Operator

On the other hand, skipUntil discards any item emitted by the source Observable (timedObservable) until a second Observable (delayedObservable) emits an item:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .takeUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 2, 3);

6. Conclusion

In this extensive tutorial, we explored the different filtering operators available within RxJava, providing a simple example of each one.

As always, all the code examples in this article can be found over on GitHub.