Introduction à Hazelcast Jet

1. Introduction

Dans ce tutoriel, nous en apprendrons davantage sur Hazelcast Jet. Il s'agit d'un moteur de traitement de données distribué fourni par Hazelcast, Inc. et basé sur Hazelcast IMDG.

Si vous souhaitez en savoir plus sur Hazelcast IMDG, voici un article pour commencer.

2. Qu'est-ce que Hazelcast Jet?

Hazelcast Jet est un moteur de traitement de données distribué qui traite les données comme des flux. Il peut traiter les données stockées dans une base de données ou des fichiers ainsi que les données diffusées par un serveur Kafka.

De plus, il peut exécuter des fonctions d'agrégation sur des flux de données infinis en divisant les flux en sous-ensembles et en appliquant une agrégation sur chaque sous-ensemble. Ce concept est connu sous le nom de fenêtrage dans la terminologie Jet.

Nous pouvons déployer Jet dans un cluster de machines puis lui soumettre nos travaux de traitement de données. Jet obligera tous les membres du cluster à traiter automatiquement les données. Chaque membre du cluster consomme une partie des données, ce qui facilite la mise à l'échelle à n'importe quel niveau de débit.

Voici les cas d'utilisation typiques de Hazelcast Jet:

  • Traitement de flux en temps réel
  • Traitement par lots rapide
  • Traitement des flux Java 8 de manière distribuée
  • Traitement des données dans les microservices

3. Configuration

Pour configurer Hazelcast Jet dans notre environnement, il suffit d'ajouter une seule dépendance Maven à notre pom.xml .

Voici comment nous procédons:

 com.hazelcast.jet hazelcast-jet 4.2 

L'inclusion de cette dépendance va télécharger un fichier jar de 10 Mo qui nous fournit toute l'infrastructure dont nous avons besoin pour construire un pipeline de traitement de données distribué.

La dernière version de Hazelcast Jet peut être trouvée ici.

4. Exemple d'application

Afin d'en savoir plus sur Hazelcast Jet, nous allons créer un exemple d'application qui prend une entrée de phrases et un mot à trouver dans ces phrases et renvoie le nombre du mot spécifié dans ces phrases.

4.1. Le pipeline

Un pipeline constitue la structure de base d'une application Jet. Le traitement au sein d'un pipeline suit ces étapes:

  • lire des données à partir d'une source
  • transformer les données
  • écrire des données dans un puits

Pour notre application, le pipeline lira à partir d'une liste distribuée , appliquera la transformation de regroupement et d'agrégation et enfin écrira dans une carte distribuée .

Voici comment nous écrivons notre pipeline:

private Pipeline createPipeLine() { Pipeline p = Pipeline.create(); p.readFrom(Sources.list(LIST_NAME)) .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(wholeItem()) .aggregate(counting()) .writeTo(Sinks.map(MAP_NAME)); return p; }

Une fois que nous avons lu à partir de la source, nous parcourons les données et les divisons autour de l'espace à l'aide d'une expression régulière. Après cela, nous filtrons les blancs.

Enfin, nous regroupons les mots, les agrégons et écrivons les résultats sur une carte.

4.2. Le travail

Maintenant que notre pipeline est défini, nous créons un travail pour exécuter le pipeline.

Voici comment nous écrivons une fonction countWord qui accepte les paramètres et renvoie le nombre:

public Long countWord(List sentences, String word) { long count = 0; JetInstance jet = Jet.newJetInstance(); try { List textList = jet.getList(LIST_NAME); textList.addAll(sentences); Pipeline p = createPipeLine(); jet.newJob(p).join(); Map counts = jet.getMap(MAP_NAME); count = counts.get(word); } finally { Jet.shutdownAll(); } return count; }

Nous créons d'abord une instance Jet afin de créer notre travail et d'utiliser le pipeline. Ensuite, nous copions la liste d' entrée dans une liste distribuée afin qu'elle soit disponible sur toutes les instances.

Nous soumettons ensuite un travail en utilisant le pipeline que nous avons construit ci-dessus. La méthode newJob () renvoie un travail exécutable qui est démarré par Jet de manière asynchrone. La méthode de jointure attend la fin du travail et lève une exception si le travail est terminé avec une erreur.

Lorsque le travail est terminé, les résultats sont récupérés dans une carte distribuée , comme nous l'avons défini dans notre pipeline. Ainsi, nous obtenons la carte de l'instance Jet et obtenons le nombre de mots par rapport à elle.

Enfin, nous arrêtons l'instance Jet. Il est important de l'arrêter après la fin de notre exécution, car l' instance Jet démarre ses propres threads . Sinon, notre processus Java sera toujours actif même après la fermeture de notre méthode.

Voici un test unitaire qui teste le code que nous avons écrit pour Jet:

@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord() { List sentences = new ArrayList(); sentences.add("The first second was alright, but the second second was tough."); WordCounter wordCounter = new WordCounter(); long countSecond = wordCounter.countWord(sentences, "second"); assertEquals(3, countSecond); }

5. Conclusion

Dans cet article, nous avons découvert Hazelcast Jet. Pour en savoir plus sur celui-ci et ses fonctionnalités, reportez-vous au manuel.

Comme d'habitude, le code des exemples utilisés dans cet article se trouve à l'adresse over sur Github.