Introduction à Apache Kafka avec Spring

Haut de persistance

Je viens d'annoncer le nouveau cours Learn Spring , axé sur les principes de base de Spring 5 et Spring Boot 2:

>> VOIR LE COURS

1. Vue d'ensemble

Apache Kafka est un système de traitement de flux distribué et tolérant aux pannes.

Dans cet article, nous aborderons la prise en charge de Spring pour Kafka et le niveau d'abstractions qu'il fournit sur les API client Kafka Java natives.

Spring Kafka apporte le modèle de programmation de modèle Spring simple et typique avec un KafkaTemplate et des POJO pilotés par message via l' annotation @KafkaListener .

2. Installation et configuration

Pour télécharger et installer Kafka, veuillez vous référer au guide officiel ici.

Nous devons également ajouter la dépendance spring-kafka à notre pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

La dernière version de cet artefact peut être trouvée ici.

Notre exemple d'application sera une application Spring Boot.

Cet article suppose que le serveur est démarré à l'aide de la configuration par défaut et qu'aucun port de serveur n'est modifié.

3. Configuration des sujets

Auparavant, nous exécutions des outils de ligne de commande pour créer des rubriques dans Kafka telles que:

$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic

Mais avec l'introduction d' AdminClient dans Kafka, nous pouvons désormais créer des rubriques par programmation.

Nous devons ajouter le bean KafkaAdmin Spring, qui ajoutera automatiquement des rubriques pour tous les beans de type NewTopic:

@Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("baeldung", 1, (short) 1); } }

4. Produire des messages

Pour créer des messages, nous devons d'abord configurer une ProducerFactory qui définit la stratégie de création d' instances de Kafka Producer .

Ensuite, nous avons besoin d'un KafkaTemplate qui encapsule une instance Producer et fournit des méthodes pratiques pour envoyer des messages aux sujets Kafka.

Les instances Producer sont thread-safe et par conséquent, l'utilisation d'une seule instance dans un contexte d'application donnera de meilleures performances. Par conséquent, les instances de KakfaTemplate sont également thread-safe et l'utilisation d'une instance est recommandée.

4.1. Configuration du producteur

@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }

4.2. Publication de messages

Nous pouvons envoyer des messages en utilisant la classe KafkaTemplate :

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }

L' API d' envoi renvoie un objet ListenableFuture . Si nous voulons bloquer le thread d'envoi et obtenir le résultat du message envoyé, nous pouvons appeler l' API get de l' objet ListenableFuture . Le thread attendra le résultat, mais cela ralentira le producteur.

Kafka est une plateforme de traitement de flux rapide. Il est donc préférable de gérer les résultats de manière asynchrone afin que les messages suivants n'attendent pas le résultat du message précédent. Nous pouvons le faire via un rappel:

public void sendMessage(String message) { ListenableFuture
    
      future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback
     
      () { @Override public void onSuccess(SendResult result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); }
     
    

5. Consommer des messages

5.1. Configuration du consommateur

Pour consommer des messages, nous devons configurer une ConsumerFactory et une KafkaListenerContainerFactory . Une fois que ces beans sont disponibles dans l'usine de haricots Spring, les consommateurs basés sur POJO peuvent être configurés à l'aide de l' annotation @KafkaListener .

L' annotation @EnableKafka est requise sur la classe de configuration pour activer la détection de l' annotation @KafkaListener sur les beans gérés par spring:

@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); return factory; } }

5.2. Consommer des messages

@KafkaListener(topics = "topicName", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message); }

Plusieurs écouteurs peuvent être implémentés pour un sujet , chacun avec un ID de groupe différent. De plus, un consommateur peut écouter les messages de différents sujets:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring prend également en charge la récupération d'un ou plusieurs en-têtes de message à l'aide de l' annotation @Header dans l'écouteur:

@KafkaListener(topics = "topicName") public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

5.3. Consuming Messages from a Specific Partition

As you may have noticed, we had created the topic baeldung with only one partition. However, for a topic with multiple partitions, a @KafkaListener can explicitly subscribe to a particular partition of a topic with an initial offset:

@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

Since the initialOffset has been sent to 0 in this listener, all the previously consumed messages from partitions 0 and three will be re-consumed every time this listener is initialized. If setting the offset is not required, we can use the partitions property of @TopicPartition annotation to set only the partitions without the offset:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Adding Message Filter for Listeners

Listeners can be configured to consume specific types of messages by adding a custom filter. This can be done by setting a RecordFilterStrategy to the KafkaListenerContainerFactory:

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory; }

A listener can then be configured to use this container factory:

@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message); }

In this listener, all the messages matching the filter will be discarded.

6. Custom Message Converters

So far we have only covered sending and receiving Strings as messages. However, we can also send and receive custom Java objects. This requires configuring appropriate serializer in ProducerFactory and deserializer in ConsumerFactory.

Let's look at a simple bean class, which we will send as messages:

public class Greeting { private String msg; private String name; // standard getters, setters and constructor }

6.1. Producing Custom Messages

In this example, we will use JsonSerializer. Let's look at the code for ProducerFactory and KafkaTemplate:

@Bean public ProducerFactory greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate greetingKafkaTemplate() { return new KafkaTemplate(greetingProducerFactory()); }

This new KafkaTemplate can be used to send the Greeting message:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Consuming Custom Messages

Similarly, let's modify the ConsumerFactory and KafkaListenerContainerFactory to deserialize the Greeting message correctly:

@Bean public ConsumerFactory greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory( props, new StringDeserializer(), new JsonDeserializer(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; }

The spring-kafka JSON serializer and deserializer uses the Jackson library which is also an optional maven dependency for the spring-kafka project. So let's add it to our pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

Au lieu d'utiliser la dernière version de Jackson, il est recommandé d'utiliser la version qui est ajoutée au pom.xml de spring-kafka.

Enfin, nous devons écrire un auditeur pour consommer les messages d' accueil :

@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting) { // process greeting message }

7. Conclusion

Dans cet article, nous avons couvert les bases de la prise en charge de Spring pour Apache Kafka. Nous avons examiné brièvement les classes utilisées pour envoyer et recevoir des messages.

Le code source complet de cet article est disponible à l'adresse over sur GitHub. Avant d'exécuter le code, assurez-vous que le serveur Kafka est en cours d'exécution et que les rubriques sont créées manuellement.

Fond de persistance

Je viens d'annoncer le nouveau cours Learn Spring , axé sur les principes de base de Spring 5 et Spring Boot 2:

>> VOIR LE COURS