Guide de CompletableFuture

1. Introduction

Ce didacticiel est un guide des fonctionnalités et des cas d'utilisation de la classe CompletableFuture qui a été introduite en tant qu'amélioration de l'API Java 8 Concurrency.

2. Calcul asynchrone en Java

Le calcul asynchrone est difficile à raisonner. Habituellement, nous voulons penser à tout calcul comme une série d'étapes, mais dans le cas d'un calcul asynchrone, les actions représentées comme des rappels ont tendance à être soit dispersées dans le code, soit profondément imbriquées les unes dans les autres . Les choses empirent encore lorsque nous devons gérer des erreurs qui pourraient survenir au cours de l'une des étapes.

L' interface Future a été ajoutée dans Java 5 pour servir de résultat d'un calcul asynchrone, mais elle ne disposait d'aucune méthode pour combiner ces calculs ou gérer d'éventuelles erreurs.

Java 8 a introduit la classe CompletableFuture . En plus de l' interface Future , il a également implémenté l' interface CompletionStage . Cette interface définit le contrat d'une étape de calcul asynchrone que nous pouvons combiner avec d'autres étapes.

CompletableFuture est à la fois un bloc de construction et un framework, avec environ 50 méthodes différentes pour composer, combiner et exécuter des étapes de calcul asynchrones et gérer les erreurs .

Une API aussi volumineuse peut être écrasante, mais celles-ci relèvent principalement de plusieurs cas d'utilisation clairs et distincts.

3. Utiliser CompletableFuture comme un avenir simple

Tout d'abord, la classe CompletableFuture implémente l' interface Future , nous pouvons donc l' utiliser comme implémentation Future , mais avec une logique d'achèvement supplémentaire .

Par exemple, nous pouvons créer une instance de cette classe avec un constructeur sans argument pour représenter un résultat futur, la distribuer aux consommateurs et la compléter à un moment donné dans le futur en utilisant la méthode complete . Les consommateurs peuvent utiliser la méthode get pour bloquer le thread actuel jusqu'à ce que ce résultat soit fourni.

Dans l'exemple ci-dessous, nous avons une méthode qui crée une instance CompletableFuture , puis effectue un certain calcul dans un autre thread et renvoie immédiatement le Future .

Lorsque le calcul est terminé, la méthode complète le futur en fournissant le résultat à la méthode complète :

public Future calculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }

Pour effectuer le calcul, nous utilisons l' API Executor . Cette méthode de création et de réalisation d'un CompletableFuture peut être utilisée avec n'importe quel mécanisme de concurrence ou API, y compris les threads bruts.

Notez que la méthode CalculateAsync renvoie une instance Future .

Nous appelons simplement la méthode, recevons l' instance Future et appelons la méthode get dessus lorsque nous sommes prêts à bloquer le résultat.

Notez également que la méthode get lève des exceptions vérifiées, à savoir ExecutionException (encapsulant une exception qui s'est produite lors d'un calcul) et InterruptedException (une exception signifiant qu'un thread exécutant une méthode a été interrompu):

Future completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);

Si nous connaissons déjà le résultat d'un calcul , nous pouvons utiliser la méthode statique completedFuture avec un argument qui représente un résultat de ce calcul. Par conséquent, la méthode get du Future ne bloquera jamais, renvoyant immédiatement ce résultat à la place:

Future completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);

Comme scénario alternatif, nous pouvons souhaiter annuler l'exécution d'un Future .

4. CompletableFuture avec logique de calcul encapsulée

Le code ci-dessus nous permet de choisir n'importe quel mécanisme d'exécution simultanée, mais que se passe-t-il si nous voulons ignorer ce passe-partout et simplement exécuter du code de manière asynchrone?

Les méthodes statiques runAsync et supplyAsync nous permettent de créer une instance CompletableFuture à partir des types fonctionnels Runnable et Supplier en conséquence.

Les deux Runnable et fournisseurs sont des interfaces fonctionnelles qui permettent passer leurs instances comme des expressions lambda grâce à la nouvelle fonctionnalité Java 8.

L' interface Runnable est la même ancienne interface qui est utilisée dans les threads et elle ne permet pas de renvoyer une valeur.

L' interface fournisseur est une interface fonctionnelle générique avec une seule méthode qui n'a pas d'arguments et renvoie une valeur de type paramétré.

Cela nous permet de fournir une instance du fournisseur sous la forme d'une expression lambda qui effectue le calcul et renvoie le résultat . C'est aussi simple que:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());

5. Traitement des résultats des calculs asynchrones

La manière la plus générique de traiter le résultat d'un calcul est de le transmettre à une fonction. La méthode thenApply fait exactement cela; il accepte une instance de Function , l'utilise pour traiter le résultat et renvoie un Future qui contient une valeur retournée par une fonction:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());

Si nous n'avons pas besoin de renvoyer une valeur dans la chaîne Future , nous pouvons utiliser une instance de l' interface fonctionnelle Consumer . Sa méthode unique prend un paramètre et renvoie void .

Il existe une méthode pour ce cas d'utilisation dans CompletableFuture. La méthode thenAccept reçoit un Consumer et lui transmet le résultat du calcul. Ensuite, l' appel final future.get () renvoie une instance du type Void :

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();

Enfin, si nous n'avons pas besoin de la valeur du calcul, ni ne voulons renvoyer une valeur à la fin de la chaîne, alors nous pouvons passer un lambda Runnable à la méthode thenRun . Dans l'exemple suivant, nous imprimons simplement une ligne dans la console après avoir appelé future.get ():

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();

6. Combinaison de contrats à terme

The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.

The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.

In the following example we use the thenCompose method to chain two Futures sequentially.

Notice that this method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());

The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes also available in Java 8.

Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type. This functional structure allows composing the instances of these classes as building blocks.

If we want to execute two independent Futures and do something with their results, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());

A simpler case is when we want to do something with two Futures‘ results, but don't need to pass any resulting value down a Future chain. The thenAcceptBoth method is there to help:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));

7. Difference Between thenApply() and thenCompose()

In our previous sections, we've shown examples regarding thenApply() and thenCompose(). Both APIs help chain different CompletableFuture calls, but the usage of these 2 functions is different.

7.1. thenApply()

We can use this method to work with a result of the previous call. However, a key point to remember is that the return type will be combined of all calls.

So this method is useful when we want to transform the result of a CompletableFuture call:

CompletableFuture finalResult = compute().thenApply(s-> s + 1);

7.2. thenCompose()

The thenCompose() method is similar to thenApply() in that both return a new Completion Stage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():

CompletableFuture computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);

So if the idea is to chain CompletableFuture methods then it’s better to use thenCompose().

Also, note that the difference between these two methods is analogous to the difference between map() and flatMap().

8. Running Multiple Futures in Parallel

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.

The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());

Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture. The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);

The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream.map() method.

9. Handling Errors

For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.

Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).

In the following example, we use the handle method to provide a default value when the asynchronous computation of a greeting was finished with an error because no name was provided:

String name = null; // ... CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());

As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also have the ability to complete it with an exception. The completeExceptionally method is intended for just that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:

CompletableFuture completableFuture = new CompletableFuture(); // ... completableFuture.completeExceptionally( new RuntimeException("Calculation failed!")); // ... completableFuture.get(); // ExecutionException

In the example above, we could have handled the exception with the handle method asynchronously, but with the get method we can use the more typical approach of a synchronous exception processing.

10. Async Methods

Most methods of the fluent API in CompletableFuture class have two additional variants with the Async postfix. These methods are usually intended for running a corresponding step of execution in another thread.

The methods without the Async postfix run the next execution stage using a calling thread. In contrast, the Async method without the Executor argument runs a step using the common fork/join pool implementation of Executor that is accessed with the ForkJoinPool.commonPool() method. Finally, the Async method with an Executor argument runs a step using the passed Executor.

Here's a modified example that processes the result of a computation with a Function instance. The only visible difference is the thenApplyAsync method, but under the hood the application of a function is wrapped into a ForkJoinTask instance (for more information on the fork/join framework, see the article “Guide to the Fork/Join Framework in Java”). This allows us to parallelize our computation even more and use system resources more efficiently:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

Java 9 enhances the CompletableFuture API with the following changes:

  • New factory methods added
  • Support for delays and timeouts
  • Improved support for subclassing

and new instance APIs:

  • Executor defaultExecutor()
  • CompletableFuture newIncompleteFuture()
  • CompletableFuture copy()
  • CompletionStage minimalCompletionStage()
  • CompletableFuture completeAsync(Supplier supplier, Executor executor)
  • CompletableFuture completeAsync(Supplier supplier)
  • CompletableFuture orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

We also now have a few static utility methods:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • CompletionStage completedStage(U value)
  • CompletionStage failedStage(Throwable ex)
  • CompletableFuture failedFuture(Throwable ex)

Finally, to address timeout, Java 9 has introduced two more new functions:

  • orTimeout()
  • completeOnTimeout()

Here's the detailed article for further reading: Java 9 CompletableFuture API Improvements.

12. Conclusion

Dans cet article, nous avons décrit les méthodes et les cas d'utilisation typiques de la classe CompletableFuture .

Le code source de l'article est disponible à l'adresse over sur GitHub.