Guide de la bibliothèque de collecteurs parallèles Java

1. Introduction

Parallel-collectors est une petite bibliothèque qui fournit un ensemble de collecteurs API Java Stream qui permettent le traitement parallèle, tout en contournant les principales lacunes des flux parallèles standard.

2. Dépendances de Maven

Si nous voulons commencer à utiliser la bibliothèque, nous devons ajouter une seule entrée dans le fichier pom.xml de Maven :

 com.pivovarit parallel-collectors 1.1.0 

Ou une seule ligne dans le fichier de construction de Gradle:

compile 'com.pivovarit:parallel-collectors:1.1.0'

La dernière version est disponible sur Maven Central.

3. Avertissements relatifs aux flux parallèles

Les flux parallèles étaient l'un des points forts de Java 8, mais ils se sont avérés applicables exclusivement au traitement intensif du processeur.

La raison en était que les flux parallèles étaient sauvegardés en interne par un ForkJoinPool partagé à l'échelle de la JVM , qui fournissait un parallélisme limité et était utilisé par tous les flux parallèles s'exécutant sur une seule instance JVM.

Par exemple, imaginons que nous ayons une liste d'identifiants et que nous voulions les utiliser pour récupérer une liste d'utilisateurs et que cette opération coûte cher.

Nous pourrions utiliser Parallel Streams pour cela:

List ids = Arrays.asList(1, 2, 3); List results = ids.parallelStream() .map(i -> fetchById(i)) // each operation takes one second .collect(Collectors.toList()); System.out.println(results); // [user-1, user-2, user-3]

Et en effet, nous pouvons voir qu'il y a une accélération notable. Mais cela devient problématique si nous commençons à exécuter plusieurs opérations de blocage parallèles… en parallèle. Cela pourrait rapidement saturer le pool et entraîner des latences potentiellement énormes. C'est pourquoi il est important de créer des cloisons en créant des pools de threads séparés - pour éviter que des tâches non liées n'influencent l'exécution de l'autre.

Afin de fournir une instance ForkJoinPool personnalisée , nous pourrions tirer parti de l'astuce décrite ici, mais cette approche reposait sur un piratage non documenté et était défectueuse jusqu'au JDK10. Nous pouvons en savoir plus dans le numéro lui-même - [JDK8190974].

4. Collecteurs parallèles en action

Les collecteurs parallèles, comme son nom l'indique, ne sont que des collecteurs API Stream standard qui permettent d'effectuer des opérations supplémentaires en parallèle lors de la phase collect () .

La classe ParallelCollectors (qui reflète la classe Collectors ) est une façade donnant accès à l'ensemble des fonctionnalités de la bibliothèque.

Si nous voulions refaire l'exemple ci-dessus, nous pourrions simplement écrire:

ExecutorService executor = Executors.newFixedThreadPool(10); List ids = Arrays.asList(1, 2, 3); CompletableFuture
    
      results = ids.stream() .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4)); System.out.println(results.join()); // [user-1, user-2, user-3]
    

Le résultat est le même, cependant, nous avons pu fournir notre pool de threads personnalisé, spécifier notre niveau de parallélisme personnalisé et le résultat est arrivé enveloppé dans une instance CompletableFuture sans bloquer le thread actuel.

Par contre, les flux parallèles standard ne pouvaient atteindre aucun de ces objectifs.

4.1. ParallelCollectors.parallelToList / ToSet ()

Aussi intuitif que cela puisse paraître, si nous voulons traiter un Stream en parallèle et collecter les résultats dans une liste ou un ensemble , nous pouvons simplement utiliser ParallelCollectors.parallelToList ou parallelToSet :

List ids = Arrays.asList(1, 2, 3); List results = ids.stream() .collect(parallelToList(i -> fetchById(i), executor, 4)) .join();

4.2. ParallelCollectors.parallelToMap ()

Si nous voulons collecter des éléments Stream dans une instance Map , tout comme avec l'API Stream, nous devons fournir deux mappeurs:

List ids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4)) .join(); // {1=user-1, 2=user-2, 3=user-3}

Nous pouvons également fournir un fournisseur d' instance de carte personnalisé :

Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4)) .join(); 

Et une stratégie de résolution de conflit personnalisée:

List ids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4)) .join();

4.3. ParallelCollectors.parallelToCollection ()

De même que ci-dessus, nous pouvons transmettre notre fournisseur de collecte personnalisé si nous voulons obtenir des résultats emballés dans notre conteneur personnalisé:

List results = ids.stream() .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4)) .join();

4.4. ParallelCollectors.parallelToStream ()

Si ce qui précède ne suffit pas, nous pouvons en fait obtenir une instance Stream et y poursuivre le traitement personnalisé:

Map
    
      results = ids.stream() .collect(parallelToStream(i -> fetchById(i), executor, 4)) .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length()))) .join();
    

4.5. ParallelCollectors.parallel ()

Celui-ci nous permet de diffuser les résultats dans l'ordre d'achèvement:

ids.stream() .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-3 // user-2 

Dans ce cas, nous pouvons nous attendre à ce que le collecteur renvoie des résultats différents à chaque fois depuis que nous avons introduit un délai de traitement aléatoire.

4.6. ParallelCollectors.parallelOrdered ()

Cette fonction permet de diffuser des résultats comme ci-dessus, mais conserve l'ordre d'origine:

ids.stream() .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-2 // user-3 

Dans ce cas, le collecteur maintiendra toujours l'ordre mais pourrait être plus lent que ce qui précède.

5. Limitations

At the point of writing, parallel-collectors don't work with infinite streams even if short-circuiting operations are used – it's a design limitation imposed by Stream API internals. Simply put, Streams treat collectors as non-short-circuiting operations so the stream needs to process all upstream elements before getting terminated.

The other limitation is that short-circuiting operations don't interrupt the remaining tasks after short-circuiting.

6. Conclusion

Nous avons vu comment la bibliothèque de collecteurs parallèles nous permet d'effectuer un traitement parallèle en utilisant des collecteurs API Java Stream personnalisés et CompletableFutures pour utiliser des pools de threads personnalisés, le parallélisme et le style non bloquant de CompletableFutures.

Comme toujours, des extraits de code sont disponibles à l'adresse over sur GitHub.

Pour plus d'informations, consultez la bibliothèque des collecteurs parallèles sur GitHub, le blog de l'auteur et le compte Twitter de l'auteur.