Pools de threads personnalisés dans les flux parallèles Java 8

1. Vue d'ensemble

Java 8 a introduit le concept de treams S comme un moyen efficace d'effectuer des opérations en masse sur des données. Et des flux parallèles peuvent être obtenus dans des environnements prenant en charge la concurrence.

Ces flux peuvent s'accompagner de performances améliorées - au prix d'une surcharge multi-threading.

Dans ce rapide didacticiel, nous examinerons l' une des plus grandes limitations de l' API Stream et verrons comment faire fonctionner un flux parallèle avec une instance ThreadPool personnalisée, sinon , il existe une bibliothèque qui gère cela.

2. Flux parallèle

Commençons par un exemple simple - appelant la méthode parallelStream sur l'un des types Collection - qui renverra un Stream éventuellement parallèle :

@Test public void givenList_whenCallingParallelStream_shouldBeParallelStream(){ List aList = new ArrayList(); Stream parallelStream = aList.parallelStream(); assertTrue(parallelStream.isParallel()); }

Le traitement par défaut qui se produit dans un tel flux utilise ForkJoinPool.commonPool (), un pool de threads partagé par l'ensemble de l'application.

3. Pool de threads personnalisés

Nous pouvons en fait passer un ThreadPool personnalisé lors du traitement du flux .

L'exemple suivant permet à un flux parallèle d' utiliser un pool de threads personnalisé pour calculer la somme des valeurs longues de 1 à 1 000 000, inclus:

@Test public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() throws InterruptedException, ExecutionException { long firstNum = 1; long lastNum = 1_000_000; List aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); ForkJoinPool customThreadPool = new ForkJoinPool(4); long actualTotal = customThreadPool.submit( () -> aList.parallelStream().reduce(0L, Long::sum)).get(); assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal); }

Nous avons utilisé le constructeur ForkJoinPool avec un niveau de parallélisme de 4. Une certaine expérimentation est nécessaire pour déterminer la valeur optimale pour différents environnements, mais une bonne règle d'or consiste simplement à choisir le nombre en fonction du nombre de cœurs de votre CPU.

Ensuite, nous avons traité le contenu du Stream parallèle , en les résumant dans l' appel de réduction .

Cet exemple simple peut ne pas démontrer toute l'utilité de l'utilisation d'un pool de threads personnalisé , mais les avantages deviennent évidents dans les situations où nous ne voulons pas lier le pool de threads commun à des tâches de longue durée (par exemple, le traitement de données à partir d'une source réseau) , ou le pool de threads commun est utilisé par d'autres composants de l'application.

4. Conclusion

Nous avons brièvement examiné comment exécuter un flux parallèle à l' aide d'un pool de threads personnalisé . Dans le bon environnement et avec une utilisation correcte du niveau de parallélisme, des gains de performances peuvent être obtenus dans certaines situations.

Les exemples de code complets référencés dans cet article sont disponibles à l'adresse over sur Github.