Test de Kafka et Spring Boot

1. Vue d'ensemble

Apache Kafka est un système de traitement de flux puissant, distribué et tolérant aux pannes. Dans un tutoriel précédent, nous avons appris à travailler avec Spring et Kafka.

Dans ce didacticiel, nous allons construire sur le précédent et apprendre à écrire des tests d'intégration fiables et autonomes qui ne reposent pas sur un serveur Kafka externe en cours d'exécution .

Tout d'abord, nous allons commencer par regarder comment utiliser et configurer une instance intégrée de Kafka. Ensuite, nous verrons comment nous pouvons utiliser le framework Testcontainers populaire de nos tests.

2. Dépendances

Bien sûr, nous devrons ajouter la dépendance standard spring-kafka à notre pom.xml :

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

Ensuite, nous aurons besoin de deux dépendances supplémentaires spécifiquement pour nos tests . Tout d'abord, nous allons ajouter l' artefact spring-kafka-test :

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

Et enfin, nous ajouterons la dépendance Testcontainers Kafka, qui est également disponible sur Maven Central:

 org.testcontainers kafka 1.15.0 test 

Maintenant que toutes les dépendances nécessaires sont configurées, nous pouvons écrire une simple application Spring Boot à l'aide de Kafka.

3. Une application producteur-consommateur Kafka simple

Tout au long de ce didacticiel, nos tests se concentreront sur une simple application Spring Boot Kafka producteur-consommateur.

Commençons par définir notre point d'entrée d'application:

@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } }

Comme nous pouvons le voir, il s'agit d'une application Spring Boot standard. Dans la mesure du possible, nous voulons utiliser les valeurs de configuration par défaut . Dans cet esprit, nous utilisons l' annotation @EnableAutoConfiguration pour configurer automatiquement notre application.

3.1. Configuration du producteur

Ensuite, considérons un bean producteur que nous utiliserons pour envoyer des messages à un sujet Kafka donné:

@Component public class KafkaProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String payload) { LOGGER.info("sending payload="{}" to topic="{}"", payload, topic); kafkaTemplate.send(topic, payload); } }

Notre bean KafkaProducer défini ci-dessus est simplement un wrapper autour de la classe KafkaTemplate . Cette classe fournit des opérations thread-safe de haut niveau, telles que l'envoi de données à la rubrique fournie, ce que nous faisons exactement dans notre méthode d' envoi .

3.2. Configuration du consommateur

De même, nous allons maintenant définir un bean consommateur simple qui écoutera un sujet Kafka et recevra des messages:

@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private CountDownLatch latch = new CountDownLatch(1); private String payload = null; @KafkaListener(topics = "${test.topic}") public void receive(ConsumerRecord consumerRecord) { LOGGER.info("received payload="{}"", consumerRecord.toString()); setPayload(consumerRecord.toString()); latch.countDown(); } public CountDownLatch getLatch() { return latch; } public String getPayload() { return payload; } }

Notre consommateur simple utilise l' annotation @KafkaListener sur la méthode de réception pour écouter les messages sur un sujet donné. Nous verrons plus tard comment nous configurons le test.topic à partir de nos tests.

De plus, la méthode de réception stocke le contenu du message dans notre bean et décrémente le décompte de la variable de verrouillage . Cette variable est un simple champ de compteur thread-safe que nous utiliserons plus tard à partir de nos tests pour nous assurer que nous avons bien reçu un message .

Maintenant que notre application Kafka simple utilisant Spring Boot est implémentée, voyons comment nous pouvons écrire des tests d'intégration.

4. Un mot sur les tests

En général, lors de l'écriture de tests d'intégration propres, nous ne devrions pas dépendre de services externes que nous ne pourrions pas contrôler ou qui pourraient soudainement cesser de fonctionner . Cela pourrait avoir des effets néfastes sur les résultats de nos tests.

De même, si nous dépendons d'un service externe, dans ce cas, un courtier Kafka en cours d'exécution, nous ne serons probablement pas en mesure de le configurer, de le contrôler et de le démonter comme nous le souhaitons dans nos tests.

4.1. Propriétés de l'application

Nous allons utiliser un ensemble très léger de propriétés de configuration d'application à partir de nos tests. Nous définirons ces propriétés dans notre fichier src / test / resources / application.yml :

spring: kafka: consumer: auto-offset-reset: earliest group-id: baeldung test: topic: embedded-test-topic

Il s'agit de l'ensemble minimal de propriétés dont nous avons besoin lorsque vous travaillez avec une instance intégrée de Kafka ou un courtier local.

La plupart d'entre eux sont explicites, mais celui que nous devrions souligner d'une importance particulière est la réinitialisation automatique de l'offset de la propriété du consommateur : le plus tôt . Cette propriété garantit que notre groupe de consommateurs obtient les messages que nous envoyons, car le conteneur peut démarrer une fois les envois terminés.

De plus, nous configurons une propriété de rubrique avec la valeur embedded-test-topic , qui est la rubrique que nous utiliserons à partir de nos tests.

5. Test à l'aide de Kafka intégré

Dans cette section, nous verrons comment utiliser une instance Kafka en mémoire pour exécuter nos tests. Ceci est également connu sous le nom de Kafka intégré.

La dépendance spring-kafka-test que nous avons ajoutée précédemment contient des utilitaires utiles pour vous aider à tester notre application. Plus particulièrement, il contient la classe EmbeddedKafkaBroker .

Dans cet esprit, allons-y et écrivons notre premier test d'intégration:

@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) class EmbeddedKafkaIntegrationTest { @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own simple KafkaProducer"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Passons en revue les éléments clés de notre test. Tout d'abord, nous commençons par décorer notre classe de test avec deux annotations Spring assez standard:

  • L' annotation @SpringBootTest garantira que notre test amorce le contexte de l'application Spring
  • Nous utilisons également l' annotation @DirtiesContext , qui s'assurera que ce contexte est nettoyé et réinitialisé entre les différents tests

Here comes the crucial part, we use the @EmbeddedKafka annotation to inject an instance of an EmbeddedKafkaBroker into our tests. Moreover, there are several properties available we can use to configure the embedded Kafka node:

  • partitions – this is the number of partitions used per topic. To keep things nice and simple, we only want one to be used from our tests
  • brokerProperties – additional properties for the Kafka broker. Again we keep things simple and specify a plain text listener and a port number

Next, we auto-wire our consumer and producer classes and configure a topic to use the value from our application.properties.

For the final piece of the jigsaw, we simply send a message to our test topic and verify that the message has been received and contains the name of our test topic.

When we run our test, we'll see amongst the verbose Spring output:

... 12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer - sending payload="Sending with our own simple KafkaProducer" to topic="embedded-test-topic" ... 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.embedded.KafkaConsumer - received payload= 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialized key size = -1, serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),  key = null, value = Sending with our own simple KafkaProducer)' 

This confirms that our test is working properly. Awesome! We now have a way to write self-contained, independent integration tests using an in-memory Kafka broker.

6. Testing Kafka With TestContainers

Sometimes we might see small differences between a real external service vs. an embedded in-memory instance of a service that has been specifically provided for testing purposes. Although unlikely, it could also be that the port used from our test might be occupied, causing a failure.

With that in mind, in this section, we'll see a variation on our previous approach to testing using the Testcontainers framework. We'll see how to instantiate and manage an external Apache Kafka broker hosted inside a Docker container from our integration test.

Let's define another integration test which will be quite similar to the one we saw in the previous section:

@RunWith(SpringRunner.class) @Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest(classes = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest { @ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own controller"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Let's take a look at the differences this time around. We're declaring the kafka field, which is a standard JUnit @ClassRule. This field is an instance of the KafkaContainer class that will prepare and manage the lifecycle of our container running Kafka.

To avoid port clashes, Testcontainers allocates a port number dynamically when our docker container starts. For this reason, we provide a custom consumer and producer factory configuration using the class KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs() { Map props = new HashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // more standard configuration return props; } @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); // more standard configuration return new DefaultKafkaProducerFactory(configProps); }

We then reference this configuration via the @Import annotation at the beginning of our test.

The reason for this is that we need a way to inject the server address into our application, which as previously mentioned, is generated dynamically. We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:

bootstrap.servers = [PLAINTEXT://localhost:32789]

Now when we run our test, we should see that Testcontainers does several things:

  • Checks our local Docker setup.
  • Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
  • Starts a new container and waits for it to be ready
  • Finally, shuts down and deletes the container after our test finishes

Again, this is confirmed by inspecting the test output:

13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Creating container for image: confluentinc/cp-kafka:5.4.3 13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! A working integration test using a Kafka docker container.

7. Conclusion

Dans cet article, nous avons découvert quelques approches pour tester les applications Kafka avec Spring Boot. Dans la première approche, nous avons vu comment configurer et utiliser un courtier Kafka en mémoire local.

Ensuite, nous avons vu comment utiliser Testcontainers pour configurer un courtier Kafka externe s'exécutant dans un conteneur docker à partir de nos tests.

Comme toujours, le code source complet de l'article est disponible à l'adresse over sur GitHub.