Flux de données Spring Cloud avec Apache Spark

1. Introduction

Spring Cloud Data Flow est une boîte à outils permettant de créer des pipelines d'intégration de données et de traitement de données en temps réel.

Les pipelines, dans ce cas, sont des applications Spring Boot qui sont construites avec l'utilisation des frameworks Spring Cloud Stream ou Spring Cloud Task.

Dans ce didacticiel, nous montrerons comment utiliser Spring Cloud Data Flow avec Apache Spark.

2. Serveur local de flux de données

Tout d'abord, nous devons exécuter le serveur de flux de données pour pouvoir déployer nos travaux.

Pour exécuter le serveur de flux de données localement, nous devons créer un nouveau projet avec la dépendance spring-cloud-starter-dataflow-server-local :

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 1.7.4.RELEASE 

Après cela, nous devons annoter la classe principale du serveur avec @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } }

Une fois cette application exécutée, nous aurons un serveur de flux de données local sur le port 9393.

3. Création d'un projet

Nous allons créer un Job Spark en tant qu'application locale autonome afin de ne pas avoir besoin d'un cluster pour l'exécuter.

3.1. Dépendances

Tout d'abord, nous allons ajouter la dépendance Spark:

 org.apache.spark spark-core_2.10 2.4.0  

3.2. Créer un travail

Et pour notre travail, approximons pi:

public class PiApproximation { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation"); JavaSparkContext context = new JavaSparkContext(conf); int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2; int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices; List xs = IntStream.rangeClosed(0, n) .mapToObj(element -> Integer.valueOf(element)) .collect(Collectors.toList()); JavaRDD dataSet = context.parallelize(xs, slices); JavaRDD pointsInsideTheCircle = dataSet.map(integer -> { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y )  integer + integer2); System.out.println("The pi was estimated as:" + count / n); context.stop(); } }

4. Shell de flux de données

Data Flow Shell est une application qui nous permettra d'interagir avec le serveur . Shell utilise les commandes DSL pour décrire les flux de données.

Pour utiliser le Data Flow Shell, nous devons créer un projet qui nous permettra de l'exécuter. Tout d'abord, nous avons besoin de la dépendance spring-cloud-dataflow-shell :

 org.springframework.cloud spring-cloud-dataflow-shell 1.7.4.RELEASE 

Après avoir ajouté la dépendance, nous pouvons créer la classe qui exécutera notre shell Data Flow:

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } }

5. Déploiement du projet

Pour déployer notre projet, nous utiliserons le soi-disant exécuteur de tâches qui est disponible pour Apache Spark en trois versions: cluster , yarn et client . Nous allons continuer avec la version client locale .

Le lanceur de tâches est ce qui exécute notre travail Spark.

Pour ce faire, nous devons d'abord enregistrer notre tâche à l'aide de Data Flow Shell :

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT 

La tâche nous permet de spécifier plusieurs paramètres différents, certains d'entre eux sont facultatifs, mais certains des paramètres sont nécessaires pour déployer correctement le travail Spark:

  • spark.app-class , la classe principale de notre travail soumis
  • spark.app-jar , un chemin vers le pot de graisse contenant notre travail
  • spark.app- name , le nom qui sera utilisé pour notre travail
  • spark.app-args , les arguments qui seront passés au travail

Nous pouvons utiliser la tâche enregistrée spark-client pour soumettre notre travail, en nous rappelant de fournir les paramètres requis:

task create spark1 --definition "spark-client \ --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \ --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

Notez que spark.app-jar est le chemin vers le fat-jar avec notre travail.

Après la création réussie de la tâche, nous pouvons procéder à son exécution avec la commande suivante:

task launch spark1

Cela invoquera l'exécution de notre tâche.

6. Résumé

Dans ce didacticiel, nous avons montré comment utiliser le framework Spring Cloud Data Flow pour traiter des données avec Apache Spark. Vous trouverez plus d'informations sur le framework Spring Cloud Data Flow dans la documentation.

Tous les exemples de code peuvent être trouvés sur GitHub.