Guide de Stream.reduce ()

1. Vue d'ensemble

L'API Stream fournit un riche répertoire de fonctions intermédiaires, de réduction et de terminal, qui prennent également en charge la parallélisation.

Plus précisément, les opérations de flux de réduction nous permettent de produire un seul résultat à partir d'une séquence d'éléments , en appliquant à plusieurs reprises une opération de combinaison aux éléments de la séquence.

Dans ce tutoriel, nous allons voir l'usage général Stream.reduce () de fonctionnement et de voir dans certains cas d'utilisation du béton.

2. Les concepts clés: identité, accumulateur et combinateur

Avant d'examiner plus en détail l'utilisation de l' opération Stream.reduce () , décomposons les éléments participants de l'opération en blocs séparés. De cette façon, nous comprendrons plus facilement le rôle que chacun joue:

  • Identité - un élément qui est la valeur initiale de l'opération de réduction et le résultat par défaut si le flux est vide
  • Accumulateur - une fonction qui prend deux paramètres: un résultat partiel de l'opération de réduction et l'élément suivant du flux
  • Combiner - une fonction utilisée pour combiner le résultat partiel de l'opération de réduction lorsque la réduction est parallélisée, ou lorsqu'il y a une discordance entre les types d'arguments de l'accumulateur et les types de l'implémentation de l'accumulateur

3. Utilisation de Stream.reduce ()

Pour mieux comprendre la fonctionnalité des éléments d'identité, d'accumulateur et de combinateur, examinons quelques exemples de base:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);

Dans ce cas, la valeur entière 0 est l'identité. Il stocke la valeur initiale de l'opération de réduction, ainsi que le résultat par défaut lorsque le flux de valeurs Integer est vide.

De même, l'expression lambda :

subtotal, element -> subtotal + element

est l'accumulateur , car il prend la somme partielle des valeurs Integer et de l'élément suivant dans le flux.

Pour rendre le code encore plus concis, nous pouvons utiliser une référence de méthode, au lieu d'une expression lambda:

int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);

Bien sûr, nous pouvons utiliser une opération de réduction () sur des flux contenant d'autres types d'éléments.

Par exemple, nous pouvons utiliser réduire () sur un tableau d' éléments String et les joindre en un seul résultat:

List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");

De même, nous pouvons passer à la version qui utilise une référence de méthode:

String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");

Utilisons l' opération reduction () pour joindre les éléments en majuscules du tableau de lettres :

String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");

De plus, nous pouvons utiliser réduire () dans un flux parallélisé (plus à ce sujet plus tard):

List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);

Lorsqu'un flux s'exécute en parallèle, le runtime Java divise le flux en plusieurs sous-flux. Dans de tels cas, nous devons utiliser une fonction pour combiner les résultats des sous-flux en un seul . C'est le rôle du combineur - dans l'extrait de code ci-dessus, c'est la référence de la méthode Integer :: sum .

Assez drôle, ce code ne compilera pas:

List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge()); 

Dans ce cas, nous avons un flux d' objets User et les types d'arguments de l'accumulateur sont Integer et User. Cependant, l'implémentation de l'accumulateur est une somme d' entiers, de sorte que le compilateur ne peut tout simplement pas déduire le type du paramètre utilisateur .

Nous pouvons résoudre ce problème en utilisant un combinateur:

int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);

Pour faire simple, si nous utilisons des flux séquentiels et que les types d'arguments de l'accumulateur et les types de son implémentation correspondent, nous n'avons pas besoin d'utiliser un combineur .

4. Réduire en parallèle

Comme nous l'avons appris auparavant, nous pouvons utiliser réduire () sur des flux parallélisés.

Lorsque nous utilisons des flux parallélisés, nous devons nous assurer que reduction () ou toute autre opération d'agrégation exécutée sur les flux sont:

  • associatif : le résultat n'est pas affecté par l'ordre des opérandes
  • non interférant : l'opération n'affecte pas la source de données
  • sans état et déterministe : l'opération n'a pas d'état et produit la même sortie pour une entrée donnée

Nous devons remplir toutes ces conditions pour éviter des résultats imprévisibles.

Comme prévu, les opérations effectuées sur des flux parallélisés, y compris reduction (), sont exécutées en parallèle, tirant ainsi parti des architectures matérielles multicœurs.

Pour des raisons évidentes, les flux parallélisés sont beaucoup plus performants que leurs homologues séquentiels . Même ainsi, ils peuvent être exagérés si les opérations appliquées au flux ne sont pas coûteuses ou si le nombre d'éléments dans le flux est faible.

Bien entendu, les flux parallélisés sont la bonne solution lorsque nous devons travailler avec de grands flux et effectuer des opérations d'agrégation coûteuses.

Créons un simple test de référence JMH (Java Microbenchmark Harness) et comparons les temps d'exécution respectifs lors de l'utilisation de l' opération Reduce () sur un flux séquentiel et parallélisé:

@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } 

In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).

These are our benchmark results:

Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op

5. Throwing and Handling Exceptions While Reducing

In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.

For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider); 

This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.

We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:

public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }

While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.

To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:

private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result } 

Now, the implementation of the divideListElements() method is again clean and streamlined:

public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); } 

Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:

List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Finally, let's test the method implementation when the divider is 0, too:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Complex Custom Objects

We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.

Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.

First, let's start with our Review object. Each Review should contain a simple comment and score:

public class Review { private int points; private String review; // constructor, getters and setters }

Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:

public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }

We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.

Next, let's define a list of Users, each with their own sets of reviews.

User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie); 

Maintenant que John et Julie sont pris en compte, utilisons Stream.reduce () pour calculer une note moyenne pour les deux utilisateurs. En tant qu'identité , retournons une nouvelle note si notre liste d'entrée est vide :

Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);

Si nous faisons le calcul, nous devrions trouver que le score moyen est de 3,6:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Conclusion

Dans ce didacticiel, nous avons appris à utiliser l' opération Stream.reduce () . De plus, nous avons appris comment effectuer des réductions sur des flux séquentiels et parallélisés, et comment gérer les exceptions tout en réduisant .

Comme d'habitude, tous les exemples de code présentés dans ce didacticiel sont disponibles à l'adresse over sur GitHub.