Construire un pipeline de données avec Kafka, Spark Streaming et Cassandra

1. Vue d'ensemble

Apache Kafka est une plate-forme évolutive, hautes performances et à faible latence qui permet de lire et d'écrire des flux de données comme un système de messagerie . Nous pouvons commencer avec Kafka en Java assez facilement.

Spark Streaming fait partie de la plate-forme Apache Spark qui permet un traitement évolutif, à haut débit et tolérant aux pannes des flux de données . Bien qu'écrit en Scala, Spark propose des API Java avec lesquelles travailler.

Apache Cassandra est un magasin de données NoSQL distribué et à colonnes étendues . Plus de détails sur Cassandra sont disponibles dans notre article précédent.

Dans ce didacticiel, nous les combinerons pour créer un pipeline de données hautement évolutif et tolérant aux pannes pour un flux de données en temps réel .

2. Installations

Pour commencer, nous aurons besoin de Kafka, Spark et Cassandra installés localement sur notre machine pour exécuter l'application. Nous verrons comment développer un pipeline de données en utilisant ces plates-formes au fur et à mesure.

Cependant, nous laisserons toutes les configurations par défaut, y compris les ports pour toutes les installations, ce qui aidera à faire fonctionner correctement le didacticiel.

2.1. Kafka

L'installation de Kafka sur notre machine locale est assez simple et peut être trouvée dans la documentation officielle. Nous utiliserons la version 2.1.0 de Kafka.

De plus, Kafka nécessite Apache Zookeeper pour s'exécuter, mais pour les besoins de ce didacticiel, nous allons exploiter l'instance de Zookeeper à nœud unique fournie avec Kafka.

Une fois que nous avons réussi à démarrer Zookeeper et Kafka localement en suivant le guide officiel, nous pouvons procéder à la création de notre sujet, nommé «messages»:

 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Notez que le script ci-dessus est pour la plate-forme Windows, mais il existe également des scripts similaires pour les plates-formes de type Unix.

2.2. Étincelle

Spark utilise les bibliothèques clientes de Hadoop pour HDFS et YARN. Par conséquent, il peut être très délicat d'assembler les versions compatibles de tous ces éléments . Cependant, le téléchargement officiel de Spark est pré-emballé avec les versions populaires de Hadoop. Pour ce tutoriel, nous utiliserons le package de la version 2.3.0 «pré-construit pour Apache Hadoop 2.7 et versions ultérieures».

Une fois le bon package de Spark décompressé, les scripts disponibles peuvent être utilisés pour soumettre des candidatures. Nous verrons cela plus tard lorsque nous développerons notre application dans Spring Boot.

2.3. Cassandra

DataStax met à disposition une édition communautaire de Cassandra pour différentes plates-formes, y compris Windows. Nous pouvons le télécharger et l'installer sur notre machine locale très facilement en suivant la documentation officielle. Nous utiliserons la version 3.9.0.

Une fois que nous avons réussi à installer et démarrer Cassandra sur notre machine locale, nous pouvons procéder à la création de notre espace de clés et de notre table. Cela peut être fait en utilisant le shell CQL fourni avec notre installation:

CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int);

Notez que nous avons créé un espace de noms appelé vocabulaire et une table à l'intérieur appelée mots avec deux colonnes, mot et nombre .

3. Dépendances

Nous pouvons intégrer les dépendances Kafka et Spark dans notre application via Maven. Nous allons extraire ces dépendances de Maven Central:

  • Noyau Spark
  • SQL Spark
  • Streaming Spark
  • Streaming Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

Et nous pouvons les ajouter à notre pom en conséquence:

 org.apache.spark spark-core_2.11 2.3.0 provided   org.apache.spark spark-sql_2.11 2.3.0 provided   org.apache.spark spark-streaming_2.11 2.3.0 provided   org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0   com.datastax.spark spark-cassandra-connector_2.11 2.3.0   com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Notez que certaines de ces dépendances sont marquées comme fournies dans la portée. En effet, ils seront rendus disponibles par l'installation Spark où nous soumettrons l'application pour exécution à l'aide de spark-submit.

4. Spark Streaming - Stratégies d'intégration Kafka

À ce stade, il vaut la peine de parler brièvement des stratégies d'intégration pour Spark et Kafka.

Kafka a introduit une nouvelle API grand public entre les versions 0.8 et 0.10. Par conséquent, les packages Spark Streaming correspondants sont disponibles pour les deux versions du courtier. Il est important de choisir le bon forfait en fonction du courtier disponible et des fonctionnalités souhaitées.

4.1. Spark Streaming Kafka 0.8

La version 0.8 est l'API d'intégration stable avec des options d'utilisation de la méthode Receiver ou de l'approche directe . Nous n'entrerons pas dans les détails de ces approches que nous pouvons trouver dans la documentation officielle. Un point important à noter ici est que ce package est compatible avec les versions 0.8.2.1 ou supérieures de Kafka Broker.

4.2. Spark Streaming Kafka 0.10

Ceci est actuellement dans un état expérimental et est compatible avec les versions 0.10.0 ou supérieures de Kafka Broker uniquement. Ce package propose uniquement l'approche directe, utilisant désormais la nouvelle API grand public Kafka . Nous pouvons trouver plus de détails à ce sujet dans la documentation officielle. Surtout, il n'est pas rétrocompatible avec les anciennes versions de Kafka Broker .

Veuillez noter que pour ce tutoriel, nous utiliserons le package 0.10. La dépendance mentionnée dans la section précédente fait uniquement référence à cela.

5. Développement d'un pipeline de données

We'll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message. This will then be updated in the Cassandra table we created earlier.

Let's quickly visualize how the data will flow:

5.1. Getting JavaStreamingContext

Firstly, we'll begin by initializing the JavaStreamingContext which is the entry point for all Spark Streaming applications:

SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1));

5.2. Getting DStream from Kafka

Now, we can connect to the Kafka topic from the JavaStreamingContext:

Map kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("messages"); JavaInputDStream
    
      messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
    

Please note that we've to provide deserializers for key and value here. For common data types like String, the deserializer is available by default. However, if we wish to retrieve custom data types, we'll have to provide custom deserializers.

Here, we've obtained JavaInputDStream which is an implementation of Discretized Streams or DStreams, the basic abstraction provided by Spark Streaming. Internally DStreams is nothing but a continuous series of RDDs.

5.3. Processing Obtained DStream

We'll now perform a series of operations on the JavaInputDStream to obtain word frequencies in the messages:

JavaPairDStream results = messages .mapToPair( record -> new Tuple2(record.key(), record.value()) ); JavaDStream lines = results .map( tuple2 -> tuple2._2() ); JavaDStream words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream wordCounts = words .mapToPair( s -> new Tuple2(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 );

5.4. Persisting Processed DStream into Cassandra

Finally, we can iterate over the processed JavaPairDStream to insert them into our Cassandra table:

wordCounts.foreachRDD( javaRdd -> { Map wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } );

5.5. Running the Application

As this is a stream processing application, we would want to keep this running:

streamingContext.start(); streamingContext.awaitTermination();

6. Leveraging Checkpoints

In a stream processing application, it's often useful to retain state between batches of data being processed.

For example, in our previous attempt, we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? Spark Streaming makes it possible through a concept called checkpoints.

We'll now modify the pipeline we created earlier to leverage checkpoints:

Please note that we'll be using checkpoints only for the session of data processing. This does not provide fault-tolerance. However, checkpointing can be used for fault tolerance as well.

There are a few changes we'll have to make in our application to leverage checkpoints. This includes providing the JavaStreamingContext with a checkpoint location:

streamingContext.checkpoint("./.checkpoint");

Here, we are using the local filesystem to store checkpoints. However, for robustness, this should be stored in a location like HDFS, S3 or Kafka. More on this is available in the official documentation.

Next, we'll have to fetch the checkpoint and create a cumulative count of words while processing every partition using a mapping function:

JavaMapWithStateDStream
    
      cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2(word, sum); state.update(sum); return output; } ) );
    

Once we get the cumulative word counts, we can proceed to iterate and save them in Cassandra as before.

Please note that while data checkpointing is useful for stateful processing, it comes with a latency cost. Hence, it's necessary to use this wisely along with an optimal checkpointing interval.

7. Understanding Offsets

If we recall some of the Kafka parameters we set earlier:

kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);

These basically mean that we don't want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. Consequently, our application will only be able to consume messages posted during the period it is running.

If we want to consume all messages posted irrespective of whether the application was running or not and also want to keep track of the messages already posted, we'll have to configure the offset appropriately along with saving the offset state, though this is a bit out of scope for this tutorial.

This is also a way in which Spark Streaming offers a particular level of guarantee like “exactly once”. This basically means that each message posted on Kafka topic will only be processed exactly once by Spark Streaming.

8. Deploying Application

We can deploy our application using the Spark-submit script which comes pre-packed with the Spark installation:

$SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Veuillez noter que le fichier jar que nous créons à l'aide de Maven doit contenir les dépendances qui ne sont pas marquées comme fournies dans la portée.

Une fois que nous soumettons cette application et postons quelques messages dans la rubrique Kafka que nous avons créée précédemment, nous devrions voir le nombre de mots cumulatif affiché dans la table Cassandra que nous avons créée précédemment.

9. Conclusion

Pour résumer, dans ce didacticiel, nous avons appris à créer un pipeline de données simple à l'aide de Kafka, Spark Streaming et Cassandra. Nous avons également appris à tirer parti des points de contrôle dans Spark Streaming pour maintenir l'état entre les lots.

Comme toujours, le code des exemples est disponible à l'adresse over sur GitHub.