Utilitaire de concurrence Java avec JCTools

1. Vue d'ensemble

Dans ce didacticiel, nous présenterons la bibliothèque JCTools (Java Concurrency Tools).

En termes simples, cela fournit un certain nombre de structures de données utilitaires adaptées au travail dans un environnement multi-thread.

2. Algorithmes non bloquants

Traditionnellement, le code multithread qui fonctionne sur un état partagé mutable utilise des verrous pour assurer la cohérence des données et les publications (modifications apportées par un thread qui sont visibles par un autre).

Cette approche présente un certain nombre d'inconvénients:

  • les threads peuvent être bloqués lors d'une tentative d'acquisition d'un verrou, ne faisant aucun progrès jusqu'à ce que l'opération d'un autre thread soit terminée - cela empêche effectivement le parallélisme
  • plus la contention de verrouillage est importante, plus la JVM passe de temps à gérer les threads de planification, à gérer les conflits et les files d'attente des threads en attente et moins elle effectue de travail réel
  • des blocages sont possibles si plus d'un verrou est impliqué et qu'ils sont acquis / libérés dans le mauvais ordre
  • un risque d'inversion de priorité est possible - un thread de haute priorité est verrouillé dans une tentative d'obtenir un verrou maintenu par un thread de basse priorité
  • la plupart du temps, des verrous à gros grain sont utilisés, ce qui nuit beaucoup au parallélisme - le verrouillage à grain fin nécessite une conception plus soignée, augmente la surcharge de verrouillage et est plus sujet aux erreurs

Une alternative est d'utiliser un algorithme non bloquant, c'est-à-dire un algorithme où l'échec ou la suspension d'un thread ne peut pas provoquer l'échec ou la suspension d'un autre thread .

Un algorithme non bloquant est sans verrouillage si au moins un des threads impliqués est assuré de progresser sur une période de temps arbitraire, c'est-à-dire que des blocages ne peuvent pas survenir pendant le traitement.

De plus, ces algorithmes sont sans attente s'il existe également une progression garantie par thread.

Voici un exemple de pile non bloquant tiré de l'excellent livre Java Concurrency in Practice; il définit l'état de base:

public class ConcurrentStack { AtomicReference
    
      top = new AtomicReference
     
      (); private static class Node { public E item; public Node next; // standard constructor } }
     
    

Et aussi quelques méthodes API:

public void push(E item){ Node newHead = new Node(item); Node oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead)); } public E pop() { Node oldHead; Node newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }

Nous pouvons voir que l'algorithme utilise des instructions de comparaison et d'échange (CAS) à granularité fine et qu'il est sans verrouillage (même si plusieurs threads appellent simultanément top.compareAndSet () , l'un d'eux est garanti pour réussir) mais pas attendre- free car il n'y a aucune garantie que CAS réussisse finalement pour un thread particulier.

3. Dépendance

Tout d'abord, ajoutons la dépendance JCTools à notre pom.xml :

 org.jctools jctools-core 2.1.2 

Veuillez noter que la dernière version disponible est disponible sur Maven Central.

4. Files d'attente JCTools

La bibliothèque offre un certain nombre de files d'attente à utiliser dans un environnement multithread, c'est-à-dire qu'un ou plusieurs threads écrivent dans une file d'attente et un ou plusieurs threads lisent à partir de celle-ci de manière thread-safe sans verrouillage.

L'interface commune pour toutes les implémentations de file d'attente est org.jctools.queues.MessagePassingQueue .

4.1. Types de files d'attente

Toutes les files d'attente peuvent être catégorisées selon leurs politiques de producteur / consommateur:

  • producteur unique, consommateur unique - ces classes sont nommées en utilisant le préfixe Spsc , par exemple SpscArrayQueue
  • producteur unique, plusieurs consommateurs - utilisez le préfixe Spmc , par exemple SpmcArrayQueue
  • plusieurs producteurs, un seul consommateur - utilisez le préfixe Mpsc , par exemple MpscArrayQueue
  • plusieurs producteurs, plusieurs consommateurs - utilisez le préfixe Mpmc , par exemple MpmcArrayQueue

Il est important de noter qu'il n'y a pas de vérification de politique en interne, c'est-à-dire qu'une file d'attente peut fonctionner en silence en cas d'utilisation incorrecte .

Par exemple, le test ci-dessous remplit une file d'attente à un seul producteur à partir de deux threads et réussit même si le consommateur n'est pas assuré de voir les données de différents producteurs:

SpscArrayQueue queue = new SpscArrayQueue(2); Thread producer1 = new Thread(() -> queue.offer(1)); producer1.start(); producer1.join(); Thread producer2 = new Thread(() -> queue.offer(2)); producer2.start(); producer2.join(); Set fromQueue = new HashSet(); Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); consumer.start(); consumer.join(); assertThat(fromQueue).containsOnly(1, 2);

4.2. Implémentations de file d'attente

Pour résumer les classifications ci-dessus, voici la liste des files d'attente JCTools:

  • SpscArrayQueue - producteur unique, consommateur unique, utilise un tableau en interne, capacité liée
  • SpscLinkedQueue - producteur unique, consommateur unique, utilise une liste chaînée en interne, capacité non liée
  • SpscChunkedArrayQueue - producteur unique, consommateur unique, commence avec la capacité initiale et augmente jusqu'à la capacité maximale
  • SpscGrowableArrayQueue - un seul producteur, un seul consommateur, commence avec la capacité initiale et augmente jusqu'à la capacité maximale. Il s'agit du même contrat que SpscChunkedArrayQueue , la seule différence est la gestion des blocs internes. Il est recommandé d'utiliser SpscChunkedArrayQueue car il a une implémentation simplifiée
  • SpscUnboundedArrayQueue - producteur unique, consommateur unique, utilise un tableau en interne, capacité non liée
  • SpmcArrayQueue - producteur unique, plusieurs consommateurs, utilise un tableau en interne, capacité liée
  • MpscArrayQueue - plusieurs producteurs, un seul consommateur, utilise un tableau en interne, capacité liée
  • MpscLinkedQueue - plusieurs producteurs, un seul consommateur, utilise une liste chaînée en interne, capacité non liée
  • MpmcArrayQueue - plusieurs producteurs, plusieurs consommateurs, utilise un tableau en interne, capacité liée

4.3. Files d'attente atomiques

Toutes les files d'attente mentionnées dans la section précédente utilisent sun.misc.Unsafe . Cependant, avec l'avènement de Java 9 et du JEP-260, cette API devient inaccessible par défaut.

Il existe donc des files d'attente alternatives qui utilisent java.util.concurrent.atomic.AtomicLongFieldUpdater (API publique, moins performante) au lieu de sun.misc.Unsafe .

They are generated from the queues above and their names have the word Atomic inserted in between, e.g. SpscChunkedAtomicArrayQueue or MpmcAtomicArrayQueue.

It's recommended to use ‘regular' queues if possible and resort to AtomicQueues only in environments where sun.misc.Unsafe is prohibited/ineffective like HotSpot Java9+ and JRockit.

4.4. Capacity

All JCTools queues might also have a maximum capacity or be unbound. When a queue is full and it's bound by capacity, it stops accepting new elements.

In the following example, we:

  • fill the queue
  • ensure that it stops accepting new elements after that
  • drain from it and ensure that it's possible to add more elements afterward

Please note that a couple of code statements are dropped for readability. The complete implementation can be found over on GitHub:

SpscChunkedArrayQueue queue = new SpscChunkedArrayQueue(8, 16); CountDownLatch startConsuming = new CountDownLatch(1); CountDownLatch awakeProducer = new CountDownLatch(1); Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue(); }); producer.start(); startConsuming.await(); Set fromQueue = new HashSet(); queue.drain(fromQueue::add); awakeProducer.countDown(); producer.join(); queue.drain(fromQueue::add); assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));

5. Other JCTools Data Structures

JCTools offers a couple of non-Queue data structures as well.

All of them are listed below:

  • NonBlockingHashMap a lock-free ConcurrentHashMap alternative with better-scaling properties and generally lower mutation costs. It's implemented via sun.misc.Unsafe, so, it's not recommended to use this class in a HotSpot Java9+ or JRockit environment
  • NonBlockingHashMapLong like NonBlockingHashMap but uses primitive long keys
  • NonBlockingHashSet a simple wrapper around NonBlockingHashMaplike JDK's java.util.Collections.newSetFromMap()
  • NonBlockingIdentityHashMap like NonBlockingHashMap but compares keys by identity.
  • NonBlockingSetInta multi-threaded bit-vector set implemented as an array of primitive longs. Works ineffectively in case of silent autoboxing

6. Performance Testing

Let's use JMH for comparing the JDK's ArrayBlockingQueue vs. JCTools queue's performance. JMH is an open-source micro-benchmark framework from Sun/Oracle JVM gurus which protects us from indeterminism of compiler/jvm optimization algorithms). Please feel free to get more details on it in this article.

Note that the code snippet below misses a couple of statements in order to improve readability. Please find the complete source code on GitHub:

public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } } }

Results (excerpt for the 95th percentile, nanoseconds per-operation):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

We can see thatMpmcArrayQueue performs just slightly better than MpmcAtomicArrayQueue and ArrayBlockingQueue is slower by a factor of two.

7. Drawbacks of Using JCTools

Using JCTools has an important drawback – it's not possible to enforce that the library classes are used correctly. For example, consider a situation when we start using MpscArrayQueue in our large and mature project (note that there must be a single consumer).

Unfortunately, as the project is big, there is a possibility that someone makes a programming or configuration error and the queue is now read from more than one thread. The system seems to work as before but now there is a chance that consumers miss some messages. That is a real problem which might have a big impact and is very hard to debug.

Ideally, it should be possible to run a system with a particular system property which forces JCTools to ensure thread access policy. E.g. local/test/staging environments (but not production) might have it turned on. Sadly, JCTools does not provide such a property.

Une autre considération est que même si nous nous sommes assurés que JCTools est nettement plus rapide que son homologue JDK, cela ne signifie pas que notre application gagne la même vitesse que nous commençons à utiliser les implémentations de file d'attente personnalisées. La plupart des applications n'échangent pas beaucoup d'objets entre les threads et sont principalement liées aux E / S.

8. Conclusion

Nous avons maintenant une compréhension de base des classes d'utilité offertes par JCTools et avons vu à quel point elles fonctionnent, par rapport aux homologues du JDK sous une charge lourde.

En conclusion, cela vaut la peine d'utiliser la bibliothèque uniquement si nous échangeons beaucoup d'objets entre les threads et même dans ce cas, il faut faire très attention à préserver la politique d'accès aux threads.

Comme toujours, le code source complet des exemples ci-dessus se trouve à l'adresse over sur GitHub.