Introduction aux types de données répliquées sans conflit

1. Vue d'ensemble

Dans cet article, nous examinerons les types de données répliquées sans conflit (CRDT) et comment les utiliser en Java. Pour nos exemples, nous utiliserons des implémentations de la bibliothèque wurmloch-crdt .

Lorsque nous avons un cluster de N nœuds de réplique dans un système distribué, nous pouvons rencontrer une partition réseau - certains nœuds sont temporairement incapables de communiquer entre eux . Cette situation s'appelle un cerveau divisé.

Lorsque nous avons un cerveau divisé dans notre système, certaines demandes d'écriture - même pour le même utilisateur - peuvent aller vers différentes répliques qui ne sont pas connectées les unes aux autres . Lorsqu'une telle situation se produit, notre système est toujours disponible mais n'est pas cohérent .

Nous devons décider quoi faire des écritures et des données qui ne sont pas cohérentes lorsque le réseau entre deux clusters séparés recommence à fonctionner.

2. Types de données répliqués sans conflit à la rescousse

Considérons deux nœuds, A et B , qui se sont déconnectés en raison d'un cerveau divisé.

Disons qu'un utilisateur change son login et qu'une demande va au nœud A . Puis il / elle décide de changer à nouveau, mais cette fois la demande va au nœud B .

En raison du cerveau divisé, les deux nœuds ne sont pas connectés. Nous devons décider à quoi doit ressembler la connexion de cet utilisateur lorsque le réseau fonctionne à nouveau.

Nous pouvons utiliser quelques stratégies: nous pouvons donner la possibilité de résoudre les conflits à l'utilisateur (comme cela se fait dans Google Docs), ou nous pouvons utiliser un CRDT pour fusionner les données de répliques divergentes pour nous.

3. Dépendance de Maven

Tout d'abord, ajoutons une dépendance à la bibliothèque qui fournit un ensemble de CRDT utiles:

 com.netopyr.wurmloch wurmloch-crdt 0.1.0 

La dernière version est disponible sur Maven Central.

4. Ensemble de culture uniquement

Le CRDT le plus basique est un ensemble de culture uniquement. Les éléments ne peuvent être ajoutés qu'à un GSet et ne peuvent jamais être supprimés. Lorsque le GSet diverge, il peut être facilement fusionné en calculant l'union de deux ensembles.

Commençons par créer deux réplicas pour simuler une structure de données distribuée et connecter ces deux répliques à l'aide de la méthode connect () :

LocalCrdtStore crdtStore1 = new LocalCrdtStore(); LocalCrdtStore crdtStore2 = new LocalCrdtStore(); crdtStore1.connect(crdtStore2);

Une fois que nous obtenons deux réplicas dans notre cluster, nous pouvons créer un GSet sur le premier réplica et le référencer sur le second réplica:

GSet replica1 = crdtStore1.createGSet("ID_1"); GSet replica2 = crdtStore2.findGSet("ID_1").get();

À ce stade, notre cluster fonctionne comme prévu et il existe une connexion active entre deux réplicas. Nous pouvons ajouter deux éléments à l'ensemble à partir de deux répliques différentes et affirmer que l'ensemble contient les mêmes éléments sur les deux répliques:

replica1.add("apple"); replica2.add("banana"); assertThat(replica1).contains("apple", "banana"); assertThat(replica2).contains("apple", "banana");

Disons que tout à coup, nous avons une partition réseau et qu'il n'y a pas de connexion entre les premier et deuxième répliques. Nous pouvons simuler la partition réseau en utilisant la méthode disconnect () :

crdtStore1.disconnect(crdtStore2);

Ensuite, lorsque nous ajoutons des éléments à l'ensemble de données à partir des deux répliques, ces modifications ne sont pas visibles globalement car il n'y a pas de connexion entre elles:

replica1.add("strawberry"); replica2.add("pear"); assertThat(replica1).contains("apple", "banana", "strawberry"); assertThat(replica2).contains("apple", "banana", "pear");

Une fois la connexion entre les deux membres du cluster établie à nouveau, le GSet est fusionné en interne à l'aide d'une union sur les deux ensembles, et les deux répliques sont à nouveau cohérentes:

crdtStore1.connect(crdtStore2); assertThat(replica1) .contains("apple", "banana", "strawberry", "pear"); assertThat(replica2) .contains("apple", "banana", "strawberry", "pear");

5. Compteur d'incrément uniquement

Le compteur d'incrément uniquement est un CRDT qui regroupe tous les incréments localement sur chaque nœud.

Lorsque les répliques se synchronisent, après une partition réseau, la valeur résultante est calculée en additionnant tous les incréments sur tous les nœuds - ceci est similaire à LongAdder de java.concurrent mais à un niveau d'abstraction plus élevé.

Créons un compteur d'incrémentation uniquement à l'aide de GCounter et incrémentons-le à partir des deux répliques. Nous pouvons voir que la somme est calculée correctement:

LocalCrdtStore crdtStore1 = new LocalCrdtStore(); LocalCrdtStore crdtStore2 = new LocalCrdtStore(); crdtStore1.connect(crdtStore2); GCounter replica1 = crdtStore1.createGCounter("ID_1"); GCounter replica2 = crdtStore2.findGCounter("ID_1").get(); replica1.increment(); replica2.increment(2L); assertThat(replica1.get()).isEqualTo(3L); assertThat(replica2.get()).isEqualTo(3L); 

Lorsque nous déconnectons les deux membres du cluster et effectuons des opérations d'incrémentation locales, nous pouvons voir que les valeurs sont incohérentes:

crdtStore1.disconnect(crdtStore2); replica1.increment(3L); replica2.increment(5L); assertThat(replica1.get()).isEqualTo(6L); assertThat(replica2.get()).isEqualTo(8L);

Mais une fois que le cluster est à nouveau sain, les incréments seront fusionnés, donnant la valeur appropriée:

crdtStore1.connect(crdtStore2); assertThat(replica1.get()) .isEqualTo(11L); assertThat(replica2.get()) .isEqualTo(11L);

6. Compteur PN

En utilisant une règle similaire pour le compteur d'incrément uniquement, nous pouvons créer un compteur qui peut être à la fois incrémenté et décrémenté. Le PNCounter stocke tous les incréments et décrémentations séparément.

Lorsque les réplicas se synchronisent, la valeur résultante sera égale à la somme de tous les incréments moins la somme de tous les décrémentations :

@Test public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() { LocalCrdtStore crdtStore1 = new LocalCrdtStore(); LocalCrdtStore crdtStore2 = new LocalCrdtStore(); crdtStore1.connect(crdtStore2); PNCounter replica1 = crdtStore1.createPNCounter("ID_1"); PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get(); replica1.increment(); replica2.decrement(2L); assertThat(replica1.get()).isEqualTo(-1L); assertThat(replica2.get()).isEqualTo(-1L); crdtStore1.disconnect(crdtStore2); replica1.decrement(3L); replica2.increment(5L); assertThat(replica1.get()).isEqualTo(-4L); assertThat(replica2.get()).isEqualTo(4L); crdtStore1.connect(crdtStore2); assertThat(replica1.get()).isEqualTo(1L); assertThat(replica2.get()).isEqualTo(1L); }

7. Registre des derniers gagnants

Sometimes, we have more complex business rules, and operating on sets or counters is insufficient. We can use the Last-Writer-Wins Register, which keeps only the last updated value when merging diverged data sets. Cassandra uses this strategy to resolve conflicts.

We need to be very cautious when using this strategy because it drops changes that occurred in the meantime.

Let's create a cluster of two replicas and instances of the LWWRegister class:

LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1"); LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2"); crdtStore1.connect(crdtStore2); LWWRegister replica1 = crdtStore1.createLWWRegister("ID_1"); LWWRegister replica2 = crdtStore2.findLWWRegister("ID_1").get(); replica1.set("apple"); replica2.set("banana"); assertThat(replica1.get()).isEqualTo("banana"); assertThat(replica2.get()).isEqualTo("banana"); 

When the first replica sets the value to apple and the second one changes it to banana, the LWWRegister keeps only the last value.

Let's see what happens if the cluster disconnects:

crdtStore1.disconnect(crdtStore2); replica1.set("strawberry"); replica2.set("pear"); assertThat(replica1.get()).isEqualTo("strawberry"); assertThat(replica2.get()).isEqualTo("pear");

Each replica keeps its local copy of data that is inconsistent. When we call the set() method, the LWWRegister internally assigns a special version value that identifies the specific update to every using a VectorClock algorithm.

When the cluster synchronizes, it takes the value with the highest versionanddiscards every previous update:

crdtStore1.connect(crdtStore2); assertThat(replica1.get()).isEqualTo("pear"); assertThat(replica2.get()).isEqualTo("pear");

8. Conclusion

In this article, we showed the problem of consistency of distributed systems while maintaining availability.

In case of network partitions, we need to merge the diverged data when the cluster is synchronized. We saw how to use CRDTs to perform a merge of diverged data.

Tous ces exemples et extraits de code peuvent être trouvés dans le projet GitHub - il s'agit d'un projet Maven, il devrait donc être facile à importer et à exécuter tel quel.