Construire un pipeline de données avec Flink et Kafka

1. Vue d'ensemble

Apache Flink est un framework de traitement de flux qui peut être utilisé facilement avec Java. Apache Kafka est un système de traitement de flux distribué prenant en charge une tolérance aux pannes élevée.

Dans ce didacticiel, nous allons voir comment créer un pipeline de données à l'aide de ces deux technologies.

2. Installation

Pour installer et configurer Apache Kafka, veuillez vous référer au guide officiel. Après l'installation, nous pouvons utiliser les commandes suivantes pour créer les nouveaux sujets appelés flink_input et flink_output:

 bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_output bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_input

Pour les besoins de ce didacticiel, nous utiliserons la configuration par défaut et les ports par défaut pour Apache Kafka.

3. Utilisation de Flink

Apache Flink permet une technologie de traitement de flux en temps réel. Le framework permet d'utiliser plusieurs systèmes tiers comme sources ou récepteurs de flux .

Dans Flink - il existe différents connecteurs disponibles:

  • Apache Kafka (source / récepteur)
  • Apache Cassandra (évier)
  • Amazon Kinesis Streams (source / récepteur)
  • Elasticsearch (évier)
  • Système de fichiers Hadoop (récepteur)
  • RabbitMQ (source / récepteur)
  • Apache NiFi (source / récepteur)
  • API Twitter Streaming (source)

Pour ajouter Flink à notre projet, nous devons inclure les dépendances Maven suivantes:

 org.apache.flink flink-core 1.5.0   org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

L'ajout de ces dépendances nous permettra de consommer et de produire vers et à partir de sujets Kafka. Vous pouvez trouver la version actuelle de Flink sur Maven Central.

4. Consommateur Kafka String

Pour consommer les données de Kafka avec Flink, nous devons fournir un sujet et une adresse Kafka. Nous devrions également fournir un identifiant de groupe qui sera utilisé pour conserver les décalages afin de ne pas toujours lire toutes les données depuis le début.

Créons une méthode statique qui facilitera la création de FlinkKafkaConsumer :

public static FlinkKafkaConsumer011 createStringConsumerForTopic( String topic, String kafkaAddress, String kafkaGroup ) { Properties props = new Properties(); props.setProperty("bootstrap.servers", kafkaAddress); props.setProperty("group.id",kafkaGroup); FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( topic, new SimpleStringSchema(), props); return consumer; }

Cette méthode prend un sujet, kafkaAddress et kafkaGroup et crée le FlinkKafkaConsumer qui consommera les données d'un sujet donné sous forme de chaîne puisque nous avons utilisé SimpleStringSchema pour décoder les données.

Le numéro 011 dans le nom de la classe fait référence à la version Kafka.

5. Producteur de cordes Kafka

Pour produire des données à Kafka, nous devons fournir l'adresse et le sujet Kafka que nous voulons utiliser. Encore une fois, nous pouvons créer une méthode statique qui nous aidera à créer des producteurs pour différents sujets:

public static FlinkKafkaProducer011 createStringProducer( String topic, String kafkaAddress){ return new FlinkKafkaProducer011(kafkaAddress, topic, new SimpleStringSchema()); }

Cette méthode prend uniquement topic et kafkaAddress comme arguments car il n'est pas nécessaire de fournir un identifiant de groupe lorsque nous produisons un topic Kafka.

6. Traitement des flux de chaînes

Lorsque nous avons un consommateur et un producteur pleinement opérationnel, nous pouvons essayer de traiter les données de Kafka, puis enregistrer nos résultats dans Kafka. La liste complète des fonctions qui peuvent être utilisées pour le traitement de flux peut être trouvée ici.

Dans cet exemple, nous allons capitaliser les mots dans chaque entrée de Kafka, puis les réécrire dans Kafka.

Pour cela, nous devons créer une MapFunction personnalisée :

public class WordsCapitalizer implements MapFunction { @Override public String map(String s) { return s.toUpperCase(); } }

Après avoir créé la fonction, nous pouvons l'utiliser dans le traitement de flux:

public static void capitalize() { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String address = "localhost:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment .getExecutionEnvironment(); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic( inputTopic, address, consumerGroup); DataStream stringInputStream = environment .addSource(flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer( outputTopic, address); stringInputStream .map(new WordsCapitalizer()) .addSink(flinkKafkaProducer); }

L'application lira les données de la rubrique flink_input , effectuera des opérations sur le flux, puis enregistrera les résultats dans la rubrique flink_output de Kafka.

Nous avons vu comment gérer les chaînes en utilisant Flink et Kafka. Mais il est souvent nécessaire d'effectuer des opérations sur des objets personnalisés. Nous verrons comment procéder dans les prochains chapitres.

7. Désérialisation d'objets personnalisés

La classe suivante représente un message simple avec des informations sur l'expéditeur et le destinataire:

@JsonSerialize public class InputMessage { String sender; String recipient; LocalDateTime sentAt; String message; }

Auparavant, nous utilisions SimpleStringSchema pour désérialiser les messages de Kafka, mais maintenant nous voulons désérialiser les données directement en objets personnalisés .

Pour ce faire, nous avons besoin d'un DeserializationSchema personnalisé :

public class InputMessageDeserializationSchema implements DeserializationSchema { static ObjectMapper objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); @Override public InputMessage deserialize(byte[] bytes) throws IOException { return objectMapper.readValue(bytes, InputMessage.class); } @Override public boolean isEndOfStream(InputMessage inputMessage) { return false; } @Override public TypeInformation getProducedType() { return TypeInformation.of(InputMessage.class); } }

Nous supposons ici que les messages sont conservés au format JSON dans Kafka.

Puisque nous avons un champ de type LocalDateTime , nous devons spécifier le JavaTimeModule, qui se charge de mapper les objets LocalDateTime vers JSON.

Les schémas Flink ne peuvent pas avoir de champs non sérialisables car tous les opérateurs (comme les schémas ou les fonctions) sont sérialisés au début du travail.

Il existe des problèmes similaires dans Apache Spark. L'un des correctifs connus pour ce problème consiste à initialiser les champs en tant que statiques , comme nous l'avons fait avec ObjectMapper ci-dessus. Ce n'est pas la solution la plus jolie, mais elle est relativement simple et fait le travail.

La méthode isEndOfStream peut être utilisée pour le cas particulier où le flux doit être traité uniquement jusqu'à ce que certaines données spécifiques soient reçues. Mais ce n'est pas nécessaire dans notre cas.

8. Sérialisation d'objets personnalisés

Maintenant, supposons que nous voulons que notre système ait la possibilité de créer une sauvegarde des messages. Nous voulons que le processus soit automatique, et chaque sauvegarde doit être composée de messages envoyés pendant une journée entière.

En outre, un message de sauvegarde doit avoir un identifiant unique attribué.

Pour cela, nous pouvons créer la classe suivante:

public class Backup { @JsonProperty("inputMessages") List inputMessages; @JsonProperty("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty("uuid") UUID uuid; public Backup(List inputMessages, LocalDateTime backupTimestamp) { this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID(); } }

Veuillez noter que le mécanisme de génération d'UUID n'est pas parfait, car il autorise les doublons. Cependant, cela suffit pour la portée de cet exemple.

Nous voulons enregistrer notre objet Backup au format JSON dans Kafka, nous devons donc créer notre SerializationSchema :

public class BackupSerializationSchema implements SerializationSchema { ObjectMapper objectMapper; Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); @Override public byte[] serialize(Backup backupMessage) { if(objectMapper == null) { objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); } try { return objectMapper.writeValueAsString(backupMessage).getBytes(); } catch (com.fasterxml.jackson.core.JsonProcessingException e) { logger.error("Failed to parse JSON", e); } return new byte[0]; } }

9. Timestamping Messages

Since we want to create a backup for all messages of each day, messages need a timestamp.

Flink provides the three different time characteristics EventTime, ProcessingTime, and IngestionTime.

In our case, we need to use the time at which the message has been sent, so we'll use EventTime.

To use EventTimewe need a TimestampAssigner which will extract timestamps from our input data:

public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks { @Override public long extractTimestamp(InputMessage element, long previousElementTimestamp) { ZoneId zoneId = ZoneId.systemDefault(); return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; } @Nullable @Override public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1500); } }

We need to transform our LocalDateTime to EpochSecond as this is the format expected by Flink. After assigning timestamps, all time-based operations will use time from sentAt field to operate.

Since Flink expects timestamps to be in milliseconds and toEpochSecond() returns time in seconds we needed to multiply it by 1000, so Flink will create windows correctly.

Flink defines the concept of a Watermark. Watermarks are useful in case of data that don't arrive in the order they were sent. A watermark defines the maximum lateness that is allowed for elements to be processed.

Elements that have timestamps lower than the watermark won't be processed at all.

10. Creating Time Windows

To assure that our backup gathers only messages sent during one day, we can use the timeWindowAll method on the stream, which will split messages into windows.

Cependant, nous aurons toujours besoin d'agréger les messages de chaque fenêtre et de les renvoyer en tant que sauvegarde .

Pour ce faire, nous aurons besoin d'une fonction AggregateFunction personnalisée :

public class BackupAggregator implements AggregateFunction
    
      { @Override public List createAccumulator() { return new ArrayList(); } @Override public List add( InputMessage inputMessage, List inputMessages) { inputMessages.add(inputMessage); return inputMessages; } @Override public Backup getResult(List inputMessages) { return new Backup(inputMessages, LocalDateTime.now()); } @Override public List merge(List inputMessages, List acc1) { inputMessages.addAll(acc1); return inputMessages; } }
    

11. Agrégation des sauvegardes

Après avoir attribué des horodatages appropriés et implémenté notre AggregateFunction , nous pouvons enfin prendre notre entrée Kafka et la traiter:

public static void createBackup () throws Exception { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest(); flinkKafkaConsumer.assignTimestampsAndWatermarks( new InputMessageTimestampAssigner()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource(flinkKafkaConsumer); inputMessagesStream .timeWindowAll(Time.hours(24)) .aggregate(new BackupAggregator()) .addSink(flinkKafkaProducer); environment.execute(); }

12. Conclusion

Dans cet article, nous avons présenté comment créer un pipeline de données simple avec Apache Flink et Apache Kafka.

Comme toujours, le code peut être trouvé sur Github.