Guide du vol de travail en Java

1. Vue d'ensemble

Dans ce didacticiel, nous examinerons le concept de vol de travail en Java .

2. Qu'est-ce que le vol de travail?

Le vol de travail a été introduit en Java dans le but de réduire les conflits dans les applications multithreads . Ceci est fait en utilisant le framework fork / join.

2.1. Approche Diviser pour Conquérir

Dans le framework fork / join, les problèmes ou les tâches sont décomposés récursivement en sous-tâches . Les sous-tâches sont ensuite résolues individuellement, les sous-résultats étant combinés pour former le résultat:

Result solve(Problem problem) { if (problem is small) directly solve problem else { split problem into independent parts fork new subtasks to solve each part join all subtasks compose result from subresults } }

2.2. Fils de travail

La tâche décomposée est résolue à l'aide de threads de travail fournis par un pool de threads . Chaque thread de travail aura des sous-tâches dont il est responsable. Ceux-ci sont stockés dans des files d'attente doubles (deques).

Chaque thread de travail obtient des sous-tâches à partir de son deque en sautant continuellement une sous-tâche en haut de la deque. Lorsque la deque d'un thread de travail est vide, cela signifie que toutes les sous-tâches ont été supprimées et terminées.

À ce stade, le thread de travail sélectionne au hasard un thread de pool de threads homologue à partir duquel il peut «voler» du travail. Il utilise ensuite l'approche premier entré, premier sorti (FIFO) pour prendre des sous-tâches à partir de la fin du deque de la victime.

3. Implémentation du framework Fork / Join

Nous pouvons créer un pool de threads volants à l' aide de la classe ForkJoinPool ou de la classe Executors :

ForkJoinPool commonPool = ForkJoinPool.commonPool(); ExecutorService workStealingPool = Executors.newWorkStealingPool();

La classe Executors a une méthode newWorkStealingPool surchargée , qui prend un argument entier représentant le niveau de parallélisme .

Executors.newWorkStealingPool est une abstraction de ForkJoinPool.commonPool . La seule différence est que Executors.newWorkStealingPool crée un pool en mode asynchrone, contrairement à ForkJoinPool.commonPool .

4. Pools de threads synchrones et asynchrones

ForkJoinPool.commonPool utilise une configuration de file d'attente dernier entré, premier sorti (LIFO), tandis que Executors.newWorkStealingPool utilise une approche premier entré, premier sorti (FIFO).

Selon Doug Lea, l'approche FIFO présente ces avantages par rapport à LIFO:

  • Cela réduit les conflits en faisant opérer les voleurs du côté opposé du deque en tant que propriétaires
  • Il exploite la propriété des algorithmes récursifs de division et de conquête de générer tôt de «grandes» tâches

Le deuxième point ci-dessus signifie qu'il est possible de décomposer davantage une tâche volée plus ancienne par un thread qui l'a volée.

Selon la documentation Java, la définition de asyncMode sur true peut être appropriée pour une utilisation avec des tâches de style événement qui ne sont jamais jointes.

5. Exemple pratique - Recherche de nombres premiers

Nous utiliserons l'exemple de recherche de nombres premiers à partir d'une collection de nombres pour montrer les avantages en temps de calcul du cadre de vol de travail . Nous montrerons également les différences entre l'utilisation de pools de threads synchrones et asynchrones.

5.1. Le problème des nombres premiers

Trouver des nombres premiers à partir d'une collection de nombres peut être un processus coûteux en calcul. Cela est principalement dû à la taille de la collection de nombres.

La classe PrimeNumbers nous aide à trouver des nombres premiers:

public class PrimeNumbers extends RecursiveAction { private int lowerBound; private int upperBound; private int granularity; static final List GRANULARITIES = Arrays.asList(1, 10, 100, 1000, 10000); private AtomicInteger noOfPrimeNumbers; PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) { this.lowerBound = lowerBound; this.upperBound = upperBound; this.granularity = granularity; this.noOfPrimeNumbers = noOfPrimeNumbers; } // other constructors and methods private List subTasks() { List subTasks = new ArrayList(); for (int i = 1; i <= this.upperBound / granularity; i++) { int upper = i * granularity; int lower = (upper - granularity) + 1; subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers)); } return subTasks; } @Override protected void compute() { if (((upperBound + 1) - lowerBound)> granularity) { ForkJoinTask.invokeAll(subTasks()); } else { findPrimeNumbers(); } } void findPrimeNumbers() { for (int num = lowerBound; num <= upperBound; num++) { if (isPrime(num)) { noOfPrimeNumbers.getAndIncrement(); } } } public int noOfPrimeNumbers() { return noOfPrimeNumbers.intValue(); } }

Quelques points importants à noter à propos de cette classe:

  • Il étend RecursiveAction , qui nous permet d'implémenter la méthode de calcul utilisée dans le calcul des tâches à l'aide d'un pool de threads
  • Il décompose de manière récursive les tâches en sous-tâches en fonction de la valeur de granularité
  • Les constructeurs prennent des valeurs limites inférieures et supérieures qui contrôlent la plage de nombres pour laquelle nous voulons déterminer les nombres premiers
  • Il nous permet de déterminer les nombres premiers à l'aide d'un pool de threads volants ou d'un seul thread

5.2. Résoudre le problème plus rapidement avec les pools de threads

Déterminons les nombres premiers de manière mono-thread et en utilisant également des pools de threads voleurs de travail.

Tout d'abord, voyons l' approche monothread :

PrimeNumbers primes = new PrimeNumbers(10000); primes.findPrimeNumbers();

Et maintenant, l' approche ForkJoinPool.commonPool :

PrimeNumbers primes = new PrimeNumbers(10000); ForkJoinPool pool = ForkJoinPool.commonPool(); pool.invoke(primes); pool.shutdown();

Enfin, nous examinerons l' approche Executors.newWorkStealingPool :

PrimeNumbers primes = new PrimeNumbers(10000); int parallelism = ForkJoinPool.getCommonPoolParallelism(); ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism); stealer.invoke(primes); stealer.shutdown();

Nous utilisons la méthode invoke de la classe ForkJoinPool pour passer des tâches au pool de threads. Cette méthode prend des instances de sous-classes de RecursiveAction . À l'aide de Java Microbench Harness, nous comparons ces différentes approches les unes aux autres en termes de temps moyen par opération:

# Run complete. Total time: 00:04:50 Benchmark Mode Cnt Score Error Units PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark avgt 20 119.885 ± 9.917 ms/op PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark avgt 20 119.791 ± 7.811 ms/op PrimeNumbersUnitTest.Benchmarker.singleThread avgt 20 475.964 ± 7.929 ms/op

Il est clair que ForkJoinPool.commonPool et Executors.newWorkStealingPool nous permettent de déterminer les nombres premiers plus rapidement qu'avec une approche à un seul thread.

Le framework fork / join pool nous permet de décomposer la tâche en sous-tâches. Nous avons divisé la collection de 10 000 entiers en lots de 1 à 100, 101 à 200, 201 à 300 et ainsi de suite. Nous avons ensuite déterminé les nombres premiers pour chaque lot et rendu le nombre total de nombres premiers disponible avec notre méthode noOfPrimeNumbers .

5.3. Voler du travail pour calculer

Avec un pool de threads synchrones, ForkJoinPool.commonPool place les threads dans le pool tant que la tâche est toujours en cours. En conséquence, le niveau de vol de travail ne dépend pas du niveau de granularité des tâches.

Les exécuteurs asynchrones.newWorkStealingPoolest mieux géré, ce qui permet au niveau de vol de travail de dépendre du niveau de granularité des tâches.

Nous obtenons le niveau de vol de travail en utilisant le getStealCount de la classe ForkJoinPool :

long steals = forkJoinPool.getStealCount();

La détermination du nombre de vols de travail pour Executors.newWorkStealingPool et ForkJoinPool.commonPool nous donne un comportement différent:

Executors.newWorkStealingPool -> Granularity: [1], Steals: [6564] Granularity: [10], Steals: [572] Granularity: [100], Steals: [56] Granularity: [1000], Steals: [60] Granularity: [10000], Steals: [1] ForkJoinPool.commonPool -> Granularity: [1], Steals: [6923] Granularity: [10], Steals: [7540] Granularity: [100], Steals: [7605] Granularity: [1000], Steals: [7681] Granularity: [10000], Steals: [7681]

Lorsque la granularité passe de fine à grossière (1 à 10 000) pour Executors.newWorkStealingPool , le niveau de vol de travail diminue . Par conséquent, le nombre de vols est un lorsque la tâche n'est pas ventilée (granularité de 10 000).

Le ForkJoinPool.commonPool a un comportement différent. Le niveau de vol de travail est toujours élevé et peu influencé par le changement de granularité des tâches.

Techniquement parlant, notre exemple de nombres premiers est celui qui prend en charge le traitement asynchrone des tâches de style événement. C'est parce que notre implémentation n'impose pas la jonction des résultats.

Un cas peut être fait que Executors.newWorkStealingPool offre la meilleure utilisation des ressources pour résoudre le problème.

6. Conclusion

Dans cet article, nous avons examiné le vol de travail et comment l'appliquer à l'aide du framework fork / join. Nous avons également examiné les exemples de vol de travail et comment il peut améliorer le temps de traitement et l'utilisation des ressources.

Comme toujours, le code source complet de l'exemple est disponible à l'adresse over sur GitHub.