Implémentation d'un tampon en anneau en Java

1. Vue d'ensemble

Dans ce didacticiel, nous allons apprendre à implémenter un tampon en anneau en Java.

2. Tampon circulaire

Le tampon en anneau (ou tampon circulaire) est une structure de données circulaire limitée qui est utilisée pour la mise en mémoire tampon des données entre deux ou plusieurs threads . Au fur et à mesure que nous écrivons dans un tampon en anneau, il s'enroule à la fin.

2.1. Comment ça fonctionne

Un tampon en anneau est implémenté à l'aide d'un tableau de taille fixe qui s'enroule aux limites .

Outre le tableau, il garde une trace de trois choses:

  • le prochain emplacement disponible dans le tampon pour insérer un élément,
  • le prochain élément non lu dans le tampon,
  • et la fin du tableau - le point auquel le tampon s'enroule jusqu'au début du tableau

La mécanique de la façon dont un tampon en anneau gère ces exigences varie avec l'implémentation. Par exemple, l'entrée Wikipedia sur le sujet montre une méthode utilisant quatre pointeurs.

Nous emprunterons l'approche de l'implémentation par Disruptor du tampon en anneau à l'aide de séquences.

La première chose que nous devons savoir est la capacité - la taille maximale fixe du tampon. Ensuite, nous utiliserons deux séquences à croissance monotone :

  • Séquence d'écriture: à partir de -1, incrémente de 1 lorsque nous insérons un élément
  • Séquence de lecture: à partir de 0, incrémente de 1 lorsque nous consommons un élément

Nous pouvons mapper une séquence à un index dans le tableau en utilisant une opération mod:

arrayIndex = sequence % capacity 

L' opération mod enveloppe la séquence autour des limites pour dériver un emplacement dans le tampon :

Voyons comment nous insérerions un élément:

buffer[++writeSequence % capacity] = element 

Nous pré-incrémentons la séquence avant d'insérer un élément.

Pour consommer un élément, nous faisons un post-incrémentation:

element = buffer[readSequence++ % capacity] 

Dans ce cas, nous effectuons un post-incrémentation sur la séquence. Consommer un élément ne le supprime pas du tampon - il reste simplement dans le tableau jusqu'à ce qu'il soit écrasé .

2.2. Tampons vides et pleins

En faisant le tour du tableau, nous commencerons à écraser les données dans le tampon. Si le tampon est plein, nous pouvons choisir d'écraser les données les plus anciennes, que le lecteur les ait consommées ou non, ou d'empêcher l'écrasement des données qui n'ont pas été lues .

Si le lecteur peut se permettre de rater les valeurs intermédiaires ou anciennes (par exemple, un symbole boursier), nous pouvons écraser les données sans attendre qu'elles soient consommées. En revanche, si le lecteur doit consommer toutes les valeurs (comme pour les transactions e-commerce), il faut attendre (blocage / occupé-attente) jusqu'à ce que le tampon ait un slot disponible.

Le tampon est plein si la taille du tampon est égale à sa capacité , où sa taille est égale au nombre d'éléments non lus:

size = (writeSequence - readSequence) + 1 isFull = (size == capacity) 

Si la séquence d'écriture est en retard par rapport à la séquence de lecture, le tampon est vide :

isEmpty = writeSequence < readSequence 

Le tampon renvoie une valeur nulle s'il est vide.

2.2. Avantages et inconvénients

A ring buffer is an efficient FIFO buffer. It uses a fixed-size array that can be pre-allocated upfront and allows an efficient memory access pattern. All the buffer operations are constant time O(1), including consuming an element, as it doesn't require a shifting of elements.

On the flip side, determining the correct size of the ring buffer is critical. For instance, the write operations may block for a long time if the buffer is under-sized and the reads are slow. We can use dynamic sizing, but it would require moving data around and we'll miss out on most of the advantages discussed above.

3. Implementation in Java

Now that we understand how a ring buffer works, let's proceed to implement it in Java.

3.1. Initialization

First, let's define a constructor that initializes the buffer with a predefined capacity:

public CircularBuffer(int capacity) { this.capacity = capacity; this.data = (E[]) new Object[capacity]; this.readSequence = 0; this.writeSequence = -1; } 

This will create an empty buffer and initialize the sequence fields as discussed in the previous section.

3.3. Offer

Next, we'll implement the offer operation that inserts an element into the buffer at the next available slot and returns true on success. It returns false if the buffer can't find an empty slot, that is, we can't overwrite unread values.

Let's implement the offer method in Java:

public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data[nextWriteSeq % capacity] = element; writeSequence++; return true; } return false; } 

So, we're incrementing the write sequence and computing the index in the array for the next available slot. Then, we're writing the data to the buffer and storing the updated write sequence.

Let's try it out:

@Test public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size()); } 

3.4. Poll

Finally, we'll implement the poll operation that retrieves and removes the next unread element. The poll operation doesn't remove the element but increments the read sequence.

Let's implement it:

public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data[readSequence % capacity]; readSequence++; return nextValue; } return null; } 

Here, we're reading the data at the current read sequence by computing the index in the array. Then, we're incrementing the sequence and returning the value, if the buffer is not empty.

Let's test it out:

@Test public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape); } 

4. Producer-Consumer Problem

We've talked about the use of a ring buffer for exchanging data between two or more threads, which is an example of a synchronization problem called the Producer-Consumer problem. In Java, we can solve the producer-consumer problem in various ways using semaphores, bounded queues, ring buffers, etc.

Let's implement a solution based on a ring buffer.

4.1. volatile Sequence Fields

Our implementation of the ring buffer is not thread-safe. Let's make it thread-safe for the simple single-producer and single-consumer case.

The producer writes data to the buffer and increments the writeSequence, while the consumer only reads from the buffer and increments the readSequence. So, the backing array is contention-free and we can get away without any synchronization.

But we still need to ensure that the consumer can see the latest value of the writeSequence field (visibility) and that the writeSequence is not updated before the data is actually available in the buffer (ordering).

We can make the ring buffer concurrent and lock-free in this case by making the sequence fields volatile:

private volatile int writeSequence = -1, readSequence = 0; 

In the offer method, a write to the volatile field writeSequence guarantees that the writes to the buffer happen before updating the sequence. At the same time, the volatile visibility guarantee ensures that the consumer will always see the latest value of writeSequence.

4.2. Producer

Let's implement a simple producer Runnable that writes to the ring buffer:

public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items[i])) { System.out.println("Produced: " + items[i]); i++; } } } 

The producer thread would wait for an empty slot in a loop (busy-waiting).

4.3. Consumer

We'll implement a consumer Callable that reads from the buffer:

public T[] call() { T[] items = (T[]) new Object[expectedCount]; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items[i++] = item; System.out.println("Consumed: " + item); } } return items; } 

Le thread consommateur continue sans imprimer s'il reçoit une valeur nulle du tampon.

Écrivons notre code de pilote:

executorService.submit(new Thread(new Producer(buffer))); executorService.submit(new Thread(new Consumer(buffer))); 

L'exécution de notre programme producteur-consommateur produit une sortie comme ci-dessous:

Produced: Circle Produced: Triangle Consumed: Circle Produced: Rectangle Consumed: Triangle Consumed: Rectangle Produced: Square Produced: Rhombus Consumed: Square Produced: Trapezoid Consumed: Rhombus Consumed: Trapezoid Produced: Pentagon Produced: Pentagram Produced: Hexagon Consumed: Pentagon Consumed: Pentagram Produced: Hexagram Consumed: Hexagon Consumed: Hexagram 

5. Conclusion

Dans ce didacticiel, nous avons appris à implémenter un tampon en anneau et exploré comment il peut être utilisé pour résoudre le problème producteur-consommateur.

Comme d'habitude, le code source de tous les exemples est disponible à l'adresse over sur GitHub.