Traitement exactement une fois dans Kafka avec Java

1. Vue d'ensemble

Dans ce didacticiel, nous verrons comment Kafka assure une livraison exactement une fois entre les applications de producteur et de consommateur via la nouvelle API transactionnelle.

De plus, nous utiliserons cette API pour implémenter des producteurs et des consommateurs transactionnels afin d'obtenir une livraison de bout en bout une seule fois dans un exemple WordCount.

2. Livraison des messages à Kafka

En raison de diverses défaillances, les systèmes de messagerie ne peuvent pas garantir la livraison des messages entre les applications de production et de consommation. Selon la manière dont les applications clientes interagissent avec ces systèmes, la sémantique de message suivante est possible:

  • Si un système de messagerie ne duplique jamais un message mais peut manquer le message occasionnel, nous l'appelons au plus une fois
  • Ou, s'il ne manquera jamais un message mais pourrait dupliquer le message occasionnel, nous l'appelons au moins une fois
  • Mais, s'il délivre toujours tous les messages sans duplication, c'est exactement une fois

Au départ, Kafka ne prenait en charge que la distribution des messages au plus une fois et au moins une fois.

Cependant, l'introduction de transactions entre les courtiers Kafka et les applications clientes garantit une livraison en une seule fois dans Kafka . Pour mieux le comprendre, passons en revue rapidement l'API client transactionnel.

3. Dépendances de Maven

Pour travailler avec l'API de transaction, nous aurons besoin du client Java de Kafka dans notre pom:

 org.apache.kafka kafka-clients 2.0.0 

4. Une boucle transactionnelle consommation-transformation-production

Pour notre exemple, nous allons consommer les messages d'un sujet d'entrée, des phrases .

Ensuite, pour chaque phrase, nous compterons chaque mot et enverrons le nombre de mots individuel à un sujet de sortie, compte .

Dans l'exemple, nous supposerons qu'il existe déjà des données transactionnelles disponibles dans la rubrique phrases .

4.1. Un producteur soucieux des transactions

Ajoutons donc d'abord un producteur Kafka typique.

Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092");

De plus, cependant, nous devons spécifier un transactional.id et activer idempotence :

producerProps.put("enable.idempotence", "true"); producerProps.put("transactional.id", "prod-1"); KafkaProducer producer = new KafkaProducer(producerProps);

Parce que nous avons activé idempotence, Kafka utilisera ce numéro de transaction dans le cadre de son algorithme pour dédupliquer tout message producteur envoie , assurant idempotence.

En termes simples, si le producteur envoie accidentellement le même message à Kafka plus d'une fois, ces paramètres lui permettent de le remarquer.

Tout ce que nous devons faire est de nous assurer que l'ID de transaction est distinct pour chaque producteur , mais cohérent entre les redémarrages.

4.2. Activation du producteur pour les transactions

Une fois que nous sommes prêts, nous devons également appeler initTransaction pour préparer le producteur à utiliser les transactions:

producer.initTransactions();

Cela enregistre le producteur auprès du courtier comme celui qui peut utiliser des transactions, en l' identifiant par son transactional.id et un numéro de séquence, ou époque . À son tour, le courtier les utilisera pour écrire à l'avance toutes les actions dans un journal de transactions.

Et par conséquent, le courtier supprimera toutes les actions de ce journal qui appartiennent à un producteur avec le même identifiant de transaction et une époque antérieure , en supposant qu'elles proviennent de transactions obsolètes.

4.3. Un consommateur soucieux des transactions

Lorsque nous consommons, nous pouvons lire tous les messages sur une partition de sujet dans l'ordre. Cependant, nous pouvons indiquer avec isolation.level que nous devons attendre de lire les messages transactionnels jusqu'à ce que la transaction associée ait été validée :

Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "my-group-id"); consumerProps.put("enable.auto.commit", "false"); consumerProps.put("isolation.level", "read_committed"); KafkaConsumer consumer = new KafkaConsumer(consumerProps); consumer.subscribe(singleton(“sentences”));

L'utilisation de la valeur read_committed garantit que nous ne lirons aucun message transactionnel avant la fin de la transaction.

La valeur par défaut de isolation.level est read_uncommitted.

4.4. Consommer et transformer par transaction

Maintenant que le producteur et le consommateur sont tous deux configurés pour écrire et lire de manière transactionnelle, nous pouvons consommer les enregistrements de notre rubrique d'entrée et compter chaque mot dans chaque enregistrement:

ConsumerRecords records = consumer.poll(ofSeconds(60)); Map wordCountMap = records.records(new TopicPartition("input", 0)) .stream() .flatMap(record -> Stream.of(record.value().split(" "))) .map(word -> Tuple.of(word, 1)) .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

Notez qu'il n'y a rien de transactionnel dans le code ci-dessus. Mais, puisque nous avons utilisé read_committed, cela signifie qu'aucun message qui a été écrit dans la rubrique d'entrée dans la même transaction ne sera lu par ce consommateur jusqu'à ce qu'ils soient tous écrits.

Maintenant, nous pouvons envoyer le nombre de mots calculé au sujet de sortie.

Voyons comment nous pouvons produire nos résultats, également de manière transactionnelle.

4.5. Envoyer l'API

Pour envoyer nos comptes comme de nouveaux messages, mais dans la même transaction, nous appelons beginTransaction :

producer.beginTransaction();

Ensuite, nous pouvons écrire chacun dans notre rubrique «comptes» avec la clé étant le mot et le compte étant la valeur:

wordCountMap.forEach((key,value) -> producer.send(new ProducerRecord("counts",key,value.toString())));

Note that because the producer can partition the data by the key, this means that transactional messages can span multiple partitions, each being read by separate consumers. Therefore, Kafka broker will store a list of all updated partitions for a transaction.

Note also that, within a transaction, a producer can use multiple threads to send records in parallel.

4.6. Committing Offsets

And finally, we need to commit our offsets that we just finished consuming. With transactions, we commit the offsets back to the input topic we read them from, like normal. Also though, we send them to the producer's transaction.

We can do all of this in a single call, but we first need to calculate the offsets for each topic partition:

Map offsetsToCommit = new HashMap(); for (TopicPartition partition : records.partitions()) { List
    
      partitionedRecords = records.records(partition); long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); }
    

Note that what we commit to the transaction is the upcoming offset, meaning we need to add 1.

Then we can send our calculated offsets to the transaction:

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");

4.7. Committing or Aborting the Transaction

And, finally, we can commit the transaction, which will atomically write the offsets to the consumer_offsets topic as well as to the transaction itself:

producer.commitTransaction();

This flushes any buffered message to the respective partitions. In addition, the Kafka broker makes all messages in that transaction available to the consumers.

Of course, if anything goes wrong while we are processing, for example, if we catch an exception, we can call abortTransaction:

try { // ... read from input topic // ... transform // ... write to output topic producer.commitTransaction(); } catch ( Exception e ) { producer.abortTransaction(); }

And drop any buffered messages and remove the transaction from the broker.

If we neither commit nor abort before the broker-configured max.transaction.timeout.ms, the Kafka broker will abort the transaction itself. The default value for this property is 900,000 milliseconds or 15 minutes.

5. Other consume-transform-produce Loops

What we've just seen is a basic consume-transform-produce loop which reads and writes to the same Kafka cluster.

Conversely, applications that must read and write to different Kafka clusters must use the older commitSync and commitAsync API. Typically, applications will store consumer offsets into their external state storage to maintain transactionality.

6. Conclusion

Pour les applications critiques pour les données, un traitement de bout en bout une seule fois est souvent impératif.

Dans ce tutoriel, nous avons vu comment nous utilisons Kafka pour faire exactement cela, en utilisant des transactions , et nous avons implémenté un exemple de comptage de mots basé sur les transactions pour illustrer le principe.

N'hésitez pas à consulter tous les exemples de code sur GitHub.