Premiers pas avec le traitement de flux avec Spring Cloud Data Flow

1. Introduction

Spring Cloud Data Flow est un modèle de programmation et d'exploitation cloud natif pour les microservices de données composables.

Avec Spring Cloud Data Flow , les développeurs peuvent créer et orchestrer des pipelines de données pour des cas d'utilisation courants tels que l'ingestion de données, l'analyse en temps réel et l'importation / exportation de données.

Ces pipelines de données sont disponibles en deux versions, les pipelines de données en continu et par lots.

Dans le premier cas, une quantité illimitée de données est consommée ou produite via un middleware de messagerie. Alors que dans le second cas, la tâche de courte durée traite un ensemble fini de données puis se termine.

Cet article se concentrera sur le traitement en continu.

2. Aperçu de l'architecture

Les composants clés de ce type d'architecture sont les applications , le serveur de flux de données et l'environnement d'exécution cible.

En plus de ces composants clés, nous avons également généralement un Data Flow Shell et un courtier de messages au sein de l'architecture.

Voyons tous ces composants plus en détail.

2.1. Applications

En règle générale, un pipeline de données en continu comprend la consommation d'événements provenant de systèmes externes, le traitement des données et la persistance polyglotte. Ces phases sont communément appelées source , processeur et récepteur dans la terminologie Spring Cloud :

  • Source: est l'application qui consomme des événements
  • Processeur: consomme les données de la source , y effectue des traitements et émet les données traitées vers l'application suivante dans le pipeline
  • Sink: consomme à partir d'une source ou d'un processeur et écrit les données dans la couche de persistance souhaitée

Ces applications peuvent être regroupées de deux manières:

  • Spring Boot uber-jar qui est hébergé dans un référentiel maven, un fichier, http ou toute autre implémentation de ressource Spring (cette méthode sera utilisée dans cet article)
  • Docker

De nombreuses applications sources, processeurs et récepteurs pour des cas d'utilisation courants (par exemple, jdbc, hdfs, http, routeur) sont déjà fournies et prêtes à être utilisées par l'équipe Spring Cloud Data Flow .

2.2. Durée

En outre, un runtime est nécessaire pour que ces applications s'exécutent. Les environnements d'exécution pris en charge sont:

  • Fonderie de nuages
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • Serveur local pour le développement (qui sera utilisé dans cet article)

2.3. Serveur de flux de données

Le composant responsable du déploiement des applications sur un environnement d'exécution est le serveur de flux de données . Un fichier jar exécutable Data Flow Server est fourni pour chacun des environnements d'exécution cibles.

Le serveur de flux de données est responsable de l'interprétation:

  • Un flux DSL qui décrit le flux logique de données via plusieurs applications.
  • Un manifeste de déploiement qui décrit le mappage des applications sur l'environnement d'exécution.

2.4. Shell de flux de données

Le Data Flow Shell est un client du serveur de flux de données. Le shell nous permet d'exécuter la commande DSL nécessaire pour interagir avec le serveur.

À titre d'exemple, le DSL pour décrire le flux de données d'une source http vers un récepteur jdbc s'écrirait comme suit: «http | jdbc ». Ces noms dans la DSL sont enregistrés auprès du serveur de flux de données et mappent sur des artefacts d'application qui peuvent être hébergés dans des référentiels Maven ou Docker.

Spring propose également une interface graphique, nommée Flo , pour créer et surveiller des pipelines de données en continu. Cependant, son utilisation est en dehors de la discussion de cet article.

2.5. Courtier de messages

Comme nous l'avons vu dans l'exemple de la section précédente, nous avons utilisé le symbole de tuyau dans la définition du flux de données. Le symbole de canal représente la communication entre les deux applications via un middleware de messagerie.

Cela signifie que nous avons besoin d'un courtier de messages opérationnel dans l'environnement cible.

Les deux courtiers middleware de messagerie pris en charge sont:

  • Apache Kafka
  • RabbitMQ

Et donc, maintenant que nous avons une vue d'ensemble des composants architecturaux, il est temps de construire notre premier pipeline de traitement de flux.

3. Installez un courtier de messages

Comme nous l'avons vu, les applications en pipeline ont besoin d'un middleware de messagerie pour communiquer. Pour les besoins de cet article, nous utiliserons RabbitMQ .

Pour les détails complets de l'installation, vous pouvez suivre les instructions sur le site officiel.

4. Le serveur de flux de données local

Pour accélérer le processus de génération de nos applications, nous utiliserons Spring Initializr; avec son aide, nous pouvons obtenir nos applications Spring Boot en quelques minutes.

Après avoir atteint le site Web, choisissez simplement un groupe et un nom d' artefact .

Une fois que cela est fait, cliquez sur le bouton Générer un projet pour démarrer le téléchargement de l'artefact Maven.

Une fois le téléchargement terminé, décompressez le projet et importez-le en tant que projet Maven dans votre IDE de votre choix.

Ajoutons une dépendance Maven au projet. Comme nous aurons besoin des bibliothèques Dataflow Local Server , ajoutons la dépendance spring-cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Nous devons maintenant annoter la classe principale de Spring Boot avec l' annotation @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } } 

C'est tout. Notre serveur de flux de données local est prêt à être exécuté:

mvn spring-boot:run

L'application démarrera sur le port 9393.

5. Le Data Flow Shell

Encore une fois, allez dans Spring Initializr et choisissez un nom de groupe et d' artefact .

Une fois que nous avons téléchargé et importé le projet, ajoutons une dépendance spring-cloud-dataflow-shell:

 org.springframework.cloud spring-cloud-dataflow-shell 

Nous devons maintenant ajouter l' annotation @EnableDataFlowShell à la classe principale Spring Boot :

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } } 

Nous pouvons maintenant exécuter le shell:

mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.

6. The Source Application

Similarly, on Initializr, we'll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

We'll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).

In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.

The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:

@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } 

Our data source is ready.

7. The Processor Application

Next- we'll create an application and add a Stream Rabbit dependency.

We'll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }

Next, we need to define a method to process the data that coming from the source application.

To define a transformer, we need to annotate this method with @Transformer annotation:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }

It converts a timestamp from the ‘input' channel to a formatted date which will be sent to the ‘output' channel.

8. The Sink Application

The last application to create is the Sink application.

Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let's add a Stream Rabbit dependency.

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }

Now we need a method to intercept the messages coming from the processor application.

To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:

@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }

The method simply prints the timestamp transformed in a formatted date to a log file.

9. Register a Stream App

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

When providing a URI with the maven scheme, the format should conform to the following:

maven://:[:[:]]:

To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT 

10. Create and Deploy the Stream

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.

Then to deploy the stream execute the following shell command:

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

Nous avons également vu le rôle des applications Source , Processor et Sink dans le flux et comment brancher et lier ce module dans un Data Flow Server grâce à l'utilisation de Data Flow Shell .

L'exemple de code se trouve dans le projet GitHub.