Guide du framework Fork / Join en Java

1. Vue d'ensemble

Le framework fork / join a été présenté dans Java 7. Il fournit des outils pour accélérer le traitement parallèle en essayant d'utiliser tous les cœurs de processeur disponibles - ce qui est accompli par une approche de division et de conquête .

En pratique, cela signifie que le framework «fourche» d'abord , divisant récursivement la tâche en sous-tâches indépendantes plus petites jusqu'à ce qu'elles soient suffisamment simples pour être exécutées de manière asynchrone.

Après cela, la partie «join» commence , dans laquelle les résultats de toutes les sous-tâches sont joints de manière récursive en un seul résultat, ou dans le cas d'une tâche qui retourne void, le programme attend simplement que chaque sous-tâche soit exécutée.

Pour fournir une exécution parallèle efficace, le framework fork / join utilise un pool de threads appelé ForkJoinPool , qui gère les threads de travail de type ForkJoinWorkerThread .

2. ForkJoinPool

Le ForkJoinPool est le cœur du framework. Il s'agit d'une implémentation d' ExecutorService qui gère les threads de travail et nous fournit des outils pour obtenir des informations sur l'état et les performances du pool de threads.

Les threads de travail ne peuvent exécuter qu'une seule tâche à la fois, mais ForkJoinPool ne crée pas de thread distinct pour chaque sous-tâche. Au lieu de cela, chaque thread du pool a sa propre file d'attente à deux extrémités (ou deque, prononcé deck ) qui stocke les tâches.

Cette architecture est vitale pour équilibrer la charge de travail du thread à l'aide de l' algorithme de vol de travail.

2.1. Algorithme de vol de travail

En termes simples, les threads libres essaient de «voler» le travail de plusieurs threads occupés.

Par défaut, un thread de travail obtient des tâches de la tête de son propre deque. Lorsqu'il est vide, le thread prend une tâche à partir de la queue du deque d'un autre thread occupé ou de la file d'attente d'entrée globale, car c'est là que les plus gros travaux sont susceptibles de se trouver.

Cette approche minimise la possibilité que les threads soient en compétition pour les tâches. Cela réduit également le nombre de fois où le thread devra chercher du travail, car il travaille d'abord sur les plus gros morceaux de travail disponibles.

2.2. Instanciation ForkJoinPool

Dans Java 8, le moyen le plus pratique d'accéder à l'instance de ForkJoinPool est d'utiliser sa méthode statique commonPool (). Comme son nom l'indique, cela fournira une référence au pool commun, qui est un pool de threads par défaut pour chaque ForkJoinTask .

Selon la documentation d'Oracle, l'utilisation du pool commun prédéfini réduit la consommation de ressources, car cela décourage la création d'un pool de threads séparé par tâche.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Le même comportement peut être obtenu dans Java 7 en créant un ForkJoinPool et en l'attribuant à un champ statique public d'une classe utilitaire:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Maintenant, il est facilement accessible:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Avec les constructeurs de ForkJoinPool , il est possible de créer un pool de threads personnalisé avec un niveau spécifique de parallélisme, de fabrique de threads et de gestionnaire d'exceptions. Dans l'exemple ci-dessus, le pool a un niveau de parallélisme de 2. Cela signifie que le pool utilisera 2 cœurs de processeur.

3. ForkJoinTask

ForkJoinTask est le type de base des tâches exécutées dans ForkJoinPool. En pratique, l'une de ses deux sous-classes doit être étendue: la RecursiveAction pour les tâches void et la RecursiveTask pour les tâches qui renvoient une valeur.Ils ont tous deux une méthode abstraite compute () dans laquelle la logique de la tâche est définie.

3.1. RecursiveAction - un exemple

Dans l'exemple ci-dessous, l'unité de travail à traiter est représentée par une chaîne appelée workload . À des fins de démonstration, la tâche est insensée: elle met simplement en majuscule son entrée et la consigne.

Pour illustrer le comportement de fourche de l'infrastructure, l'exemple divise la tâche si la charge de travail .length () est supérieure à un seuil spécifiéen utilisant la méthode createSubtask () .

La chaîne est divisée de manière récursive en sous-chaînes, créant des instances CustomRecursiveTask basées sur ces sous-chaînes.

Par conséquent, la méthode renvoie un List.

La liste est soumise à ForkJoinPool à l'aide de la méthode invokeAll () :

public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List createSubtasks() { List subtasks = new ArrayList(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }

Ce modèle peut être utilisé pour développer vos propres classes RecursiveAction . Pour ce faire, créez un objet qui représente la quantité totale de travail, choisissez un seuil approprié, définissez une méthode pour diviser le travail et définissez une méthode pour effectuer le travail.

3.2. Tâche récursive

Pour les tâches qui renvoient une valeur, la logique ici est similaire, sauf que le résultat de chaque sous-tâche est réuni en un seul résultat:

public class CustomRecursiveTask extends RecursiveTask { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection createSubtasks() { List dividedTasks = new ArrayList(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a  a * 10) .sum(); } }

Dans cet exemple, le travail est représenté par un tableau stocké dans le champ arr de la classe CustomRecursiveTask . La méthode createSubtasks () divise récursivement la tâche en plus petits morceaux de travail jusqu'à ce que chaque morceau soit plus petit que le seuil . Ensuite, la méthode invokeAll () soumet les sous-tâches au pool commun et renvoie une liste de Future .

Pour déclencher l'exécution, la méthode join () est appelée pour chaque sous-tâche.

Dans cet exemple, cela est accompli à l'aide de l' API Stream de Java 8 ; la méthode sum () est utilisée comme représentation de la combinaison de sous-résultats dans le résultat final.

4. Soumission de tâches à ForkJoinPool

Pour soumettre des tâches au pool de threads, peu d'approches peuvent être utilisées.

La méthode submit () ou execute () (leurs cas d'utilisation sont les mêmes):

forkJoinPool.execute(customRecursiveTask); int result = customRecursiveTask.join();

La méthode invoke () forque la tâche et attend le résultat, et n'a pas besoin de jointure manuelle:

int result = forkJoinPool.invoke(customRecursiveTask);

La méthode invokeAll () est le moyen le plus pratique de soumettre une séquence de ForkJoinTasks au ForkJoinPool. Il prend les tâches comme paramètres (deux tâches, var args ou une collection), fourches retourne ensuite une collection d' objets Future dans l'ordre dans lequel ils ont été produits.

Alternatively, you can use separate fork() and join() methods. The fork() method submits a task to a pool, but it doesn't trigger its execution. The join() method must be used for this purpose. In the case of RecursiveAction, the join() returns nothing but null; for RecursiveTask, it returns the result of the task's execution:

customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();

In our RecursiveTask example we used the invokeAll() method to submit a sequence of subtasks to the pool. The same job can be done with fork() and join(), though this has consequences for the ordering of the results.

Pour éviter toute confusion, il est généralement judicieux d'utiliser la méthode invokeAll () pour soumettre plus d'une tâche à ForkJoinPool.

5. Conclusions

L'utilisation du framework fork / join peut accélérer le traitement de tâches volumineuses, mais pour atteindre ce résultat, certaines directives doivent être suivies:

  • Utilisez le moins de pools de threads possible - dans la plupart des cas, la meilleure décision est d'utiliser un pool de threads par application ou système
  • Utilisez le pool de threads communs par défaut, si aucun réglage spécifique n'est nécessaire
  • Utilisez un seuil raisonnable pour diviser ForkJoinTask en sous-tâches
  • Évitez tout blocage dans vos ForkJoinTasks

Les exemples utilisés dans cet article sont disponibles dans le référentiel GitHub lié.