Introduction à Apache Flink avec Java

1. Vue d'ensemble

Apache Flink est un framework de traitement Big Data qui permet aux programmeurs de traiter la grande quantité de données de manière très efficace et évolutive.

Dans cet article, nous présenterons certains des principaux concepts d'API et des transformations de données standard disponibles dans l' API Java Apache Flink . Le style fluide de cette API facilite le travail avec la construction centrale de Flink - la collection distribuée.

Tout d'abord, nous examinerons les transformations de l'API DataSet de Flink et les utiliserons pour implémenter un programme de comptage de mots. Ensuite, nous examinerons brièvement l' API DataStream de Flink , qui vous permet de traiter des flux d'événements en temps réel.

2. Dépendance de Maven

Pour commencer, nous devrons ajouter des dépendances Maven aux bibliothèques flink-java et flink-test-utils :

 org.apache.flink flink-java 1.2.0   org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Concepts principaux de l'API

Lorsque nous travaillons avec Flink, nous devons connaître quelques éléments liés à son API:

  • Chaque programme Flink effectue des transformations sur des collections de données distribuées. Une variété de fonctions pour transformer les données sont fournies, y compris le filtrage, le mappage, la jonction, le regroupement et l'agrégation
  • Une opération de puits dans Flink déclenche l'exécution d'un flux pour produire le résultat souhaité du programme , tel que l'enregistrement du résultat dans le système de fichiers ou son impression sur la sortie standard
  • Les transformations Flink sont paresseuses, ce qui signifie qu'elles ne sont pas exécutées tant qu'une opération de puits n'est pas appelée
  • L'API Apache Flink prend en charge deux modes d'opérations: par lots et en temps réel. Si vous avez affaire à une source de données limitée pouvant être traitée en mode batch, vous utiliserez l' API DataSet . Si vous souhaitez traiter des flux de données illimités en temps réel, vous devez utiliser l' API DataStream

4. Transformations de l'API DataSet

Le point d'entrée du programme Flink est une instance de la classe ExecutionEnvironment - cela définit le contexte dans lequel un programme est exécuté.

Créons un ExecutionEnvironment pour démarrer notre traitement:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Notez que lorsque vous lancez l'application sur la machine locale, elle effectuera le traitement sur la JVM locale. Si vous souhaitez démarrer le traitement sur un cluster de machines, vous devez installer Apache Flink sur ces machines et configurer ExecutionEnvironment en conséquence.

4.1. Créer un DataSet

Pour commencer à effectuer des transformations de données, nous devons fournir à notre programme les données.

Créons une instance de la classe DataSet à l' aide de notre ExecutionEnvironement :

DataSet amounts = env.fromElements(1, 29, 40, 50);

Vous pouvez créer un DataSet à partir de plusieurs sources, telles qu'Apache Kafka, un CSV, un fichier ou pratiquement toute autre source de données.

4.2. Filtrer et réduire

Une fois que vous avez créé une instance de la classe DataSet , vous pouvez lui appliquer des transformations.

Supposons que vous souhaitiez filtrer les nombres supérieurs à un certain seuil et les additionner ensuite tous . Vous pouvez utiliser les transformations filter () et reduction () pour y parvenir:

int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect(); assertThat(collect.get(0)).isEqualTo(90); 

Notez que la méthode collect () est une opération de puits qui déclenche les transformations de données réelles.

4.3. Carte

Supposons que vous ayez un DataSet d' objets Person :

private static class Person { private int age; private String name; // standard constructors/getters/setters }

Ensuite, créons un DataSet de ces objets:

DataSet personDataSource = env.fromCollection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael")));

Supposons que vous souhaitiez extraire uniquement le champ age de chaque objet de la collection. Vous pouvez utiliser la transformation map () pour obtenir uniquement un champ spécifique de la classe Person :

List ages = personDataSource .map(p -> p.age) .collect(); assertThat(ages).hasSize(2); assertThat(ages).contains(23, 75);

4.4. Joindre

Lorsque vous avez deux ensembles de données, vous souhaiterez peut-être les joindre sur un champ d' identifiant . Pour cela, vous pouvez utiliser la transformation join () .

Créons des collections de transactions et d'adresses d'un utilisateur:

Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet
    
      addresses = env.fromElements(address); Tuple2 firstTransaction = new Tuple2(1, "Transaction_1"); DataSet
     
       transactions = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2")); 
     
    

Le premier champ des deux tuples est de type Integer , et c'est un champ id sur lequel nous voulons joindre les deux ensembles de données.

Pour exécuter la logique de jonction réelle, nous devons implémenter une interface KeySelector pour l'adresse et la transaction:

private static class IdKeySelectorTransaction implements KeySelector
    
      { @Override public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector
     
       { @Override public Integer getKey(Tuple3 value) { return value.f0; } }
     
    

Chaque sélecteur renvoie uniquement le champ sur lequel la jointure doit être effectuée.

Malheureusement, il n'est pas possible d'utiliser des expressions lambda ici car Flink a besoin d'informations de type générique.

Ensuite, implémentons la logique de fusion à l'aide de ces sélecteurs:

List
    
     > joined = transactions.join(addresses) .where(new IdKeySelectorTransaction()) .equalTo(new IdKeySelectorAddress()) .collect(); assertThat(joined).hasSize(1); assertThat(joined).contains(new Tuple2(firstTransaction, address)); 
    

4.5. Trier

Disons que vous avez la collection suivante de Tuple2:

Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 thirdPerson = new Tuple2(5, "Scott"); Tuple2 fourthPerson = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet
    
      transactions = env.fromElements( fourthPerson, secondPerson, thirdPerson, firstPerson); 
    

Si vous souhaitez trier cette collection par le premier champ du tuple, vous pouvez utiliser la transformation sortPartitions () :

List
    
      sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect(); assertThat(sorted) .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
    

5. Nombre de mots

Le problème du nombre de mots est un problème couramment utilisé pour présenter les capacités des cadres de traitement Big Data. La solution de base consiste à compter les occurrences de mots dans une entrée de texte. Utilisons Flink pour implémenter une solution à ce problème.

Comme première étape de notre solution, nous créons une classe LineSplitter qui divise notre entrée en jetons (mots), collectant pour chaque jeton un Tuple2 de paires clé-valeur. Dans chacun de ces tuples, la clé est un mot trouvé dans le texte et la valeur est l'entier un (1).

This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2:

public class LineSplitter implements FlatMapFunction
    
      { @Override public void flatMap(String value, Collector
     
       out) { Stream.of(value.toLowerCase().split("\\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }
     
    

We call the collect() method on the Collector class to push data forward in the processing pipeline.

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second elements to produce a count of the word occurrences:

public static DataSet
    
      startWordCount( ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }
    

We are using three types of the Flink transformations: flatMap(), groupBy(), and aggregate().

Let's write a test to assert that the word count implementation is working as expected:

List lines = Arrays.asList( "This is a first sentence", "This is a second sentence with a one word"); DataSet
    
      result = WordCount.startWordCount(env, lines); List
     
       collect = result.collect(); assertThat(collect).containsExactlyInAnyOrder( new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1), new Tuple2("is", 2), new Tuple2("this", 2), new Tuple2("second", 1), new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
     
    

6. DataStream API

6.1. Creating a DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

DataStream dataStream = executionEnvironment.fromElements( "This is a first sentence", "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:

upperCase.print(); env.execute();

It will produce the following output:

1> THIS IS A FIRST SENTENCE 2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

For this example, let's first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

SingleOutputStreamOperator
    
      windowed = env.fromElements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor 
     
      (Time.seconds(20)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 * 1000; } });
     
    

Next, let's define a window operation to group our events into five-second windows and apply a transformation on those events:

SingleOutputStreamOperator
    
      reduced = windowed .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print();
    

It will get the last element of every five-second window, so it prints out:

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

7. Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

We implemented a word count program using Flink's fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

L'implémentation de tous ces exemples et extraits de code peut être trouvée sur GitHub - il s'agit d'un projet Maven, il devrait donc être facile à importer et à exécuter tel quel.