Spring Integration Java DSL

1. Introduction

Dans ce didacticiel, nous découvrirons le DSL Java Spring Integration pour la création d'intégrations d'applications.

Nous allons prendre l'intégration de déplacement de fichiers que nous avons construite dans Introduction to Spring Integration et utiliser le DSL à la place.

2. Dépendances

Le DSL Java Spring Integration fait partie de Spring Integration Core.

Donc, nous pouvons ajouter cette dépendance:

 org.springframework.integration spring-integration-core 5.0.6.RELEASE 

Et pour travailler sur notre application de déplacement de fichiers, nous aurons également besoin de Spring Integration File:

 org.springframework.integration spring-integration-file 5.0.6.RELEASE 

3. Spring Integration Java DSL

Avant le Java DSL, les utilisateurs configuraient les composants Spring Integration en XML.

Le DSL introduit des générateurs fluides à partir desquels nous pouvons facilement créer un pipeline d'intégration Spring complet uniquement en Java.

Alors, disons que nous voulions créer un canal qui majuscule toutes les données passant par le tuyau.

Dans le passé, nous aurions pu faire:

Et maintenant, nous pouvons à la place faire:

@Bean public IntegrationFlow upcaseFlow() { return IntegrationFlows.from("input") .transform(String::toUpperCase) .get(); }

4. L'application de déplacement de fichiers

Pour commencer notre intégration de déplacement de fichiers, nous aurons besoin de quelques blocs de construction simples.

4.1. Flux d'intégration

Le premier bloc de construction dont nous avons besoin est un flux d'intégration, que nous pouvons obtenir du générateur IntegrationFlows :

IntegrationFlows.from(...)

from peut prendre plusieurs types, mais dans ce tutoriel, nous n'en examinerons que trois:

  • MessageSource s
  • MessageChannel s et
  • Chaîne s

Nous parlerons des trois sous peu.

Après avoir appelé à partir de , certaines méthodes de personnalisation sont désormais disponibles:

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory()) // add more components .get();

En fin de compte, IntegrationFlows produira toujours une instance de IntegrationFlow, qui est le produit final de toute application Spring Integration.

Ce modèle de prise d'entrée, d'exécution des transformations appropriées et d'émission des résultats est fondamental pour toutes les applications Spring Integration .

4.2. Décrire une source d'entrée

Tout d'abord, pour déplacer des fichiers, nous devons indiquer à notre flux d'intégration où il doit les rechercher, et pour cela, nous avons besoin d'un MessageSource:

@Bean public MessageSource sourceDirectory() { // .. create a message source }

En termes simples, un MessageSource est un endroit d'où peuvent provenir des messages externes à l'application.

Plus précisément, nous avons besoin de quelque chose qui puisse adapter cette source externe dans la représentation de messagerie Spring. Et comme cette adaptation est axée sur l' entrée , on les appelle souvent des adaptateurs de canal d'entrée.

La dépendance spring-integration-file nous donne un adaptateur de canal d'entrée idéal pour notre cas d'utilisation: FileReadingMessageSource:

@Bean public MessageSource sourceDirectory() { FileReadingMessageSource messageSource = new FileReadingMessageSource(); messageSource.setDirectory(new File(INPUT_DIR)); return messageSource; }

Ici, notre FileReadingMessageSource lira un répertoire donné par INPUT_DIR et créera un MessageSource à partir de celui-ci.

Spécifions ceci comme notre source dans un appel IntegrationFlows.from :

IntegrationFlows.from(sourceDirectory());

4.3. Configuration d'une source d'entrée

Maintenant, si nous considérons cela comme une application à longue durée de vie, nous voudrons probablement être en mesure de remarquer les fichiers au fur et à mesure qu'ils arrivent , pas seulement de déplacer les fichiers qui sont déjà là au démarrage.

Pour faciliter cela, from peut également prendre des configurateurs supplémentaires pour personnaliser davantage la source d'entrée:

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

Dans ce cas, nous pouvons rendre notre source d'entrée plus résiliente en demandant à Spring Integration d'interroger cette source - notre système de fichiers dans ce cas - toutes les 10 secondes.

Et, bien sûr, cela ne s'applique pas uniquement à notre source d'entrée de fichier, nous pourrions ajouter ce poller à n'importe quel MessageSource .

4.4. Filtrage des messages à partir d'une source d'entrée

Next, let's suppose we want our file-moving application to move specific files only, say image files having jpg extension.

For this, we can use GenericSelector:

@Bean public GenericSelector onlyJpgs() { return new GenericSelector() { @Override public boolean accept(File source) { return source.getName().endsWith(".jpg"); } }; }

So, let's update our integration flow again:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs());

Or, because this filter is so simple, we could have instead defined it using a lambda:

IntegrationFlows.from(sourceDirectory()) .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. Handling Messages With Service Activators

Now that we have a filtered list of files, we need to write them to a new location.

Service Activators are what we turn to when we're thinking about outputs in Spring Integration.

Let's use the FileWritingMessageHandler service activator from spring-integration-file:

@Bean public MessageHandler targetDirectory() { FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR)); handler.setFileExistsMode(FileExistsMode.REPLACE); handler.setExpectReply(false); return handler; }

Here, our FileWritingMessageHandler will write each Message payload it receives to OUTPUT_DIR.

Again, let's update:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory());

And notice, by the way, the usage of setExpectReply. Because integration flows can bebidirectional, this invocation indicates that this particular pipe is one way.

4.6. Activating Our Integration Flow

When we have added all our components we need to register our IntegrationFlow as a bean to activate it:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000))) .filter(onlyJpgs()) .handle(targetDirectory()) .get(); }

The get method extracts an IntegrationFlow instance that we need to register as a Spring Bean.

As soon as our application context loads, all our components contained in our IntegrationFlow gets activated.

And now, our application will start moving files from the source directory to target directory.

5. Additional Components

In our DSL-based file-moving application, we created an Inbound Channel Adapter, a Message Filter, and a Service Activator.

Let's look at a few other common Spring Integration components and see how we might use them.

5.1. Message Channels

As mentioned earlier, a Message Channel is another way to initialize a flow:

IntegrationFlows.from("anyChannel")

We can read this as “please find or create a channel bean called anyChannel. Then, read any data that is fed into anyChannel from other flows.”

But, really it is more general-purpose than that.

Simply put, a channel abstracts away producers from consumers, and we can think of it as a Java Queue. A channel can be inserted at any point in the flow.

Let's say, for example, that we want to prioritize the files as they get moved from one directory to the next:

@Bean public PriorityChannel alphabetically() { return new PriorityChannel(1000, (left, right) -> ((File)left.getPayload()).getName().compareTo( ((File)right.getPayload()).getName())); }

Then, we can insert an invocation to channel in between our flow:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("alphabetically") .handle(targetDirectory()) .get(); }

There are dozens of channels to pick from, some of the more handy ones being for concurrency, auditing, or intermediate persistence (think Kafka or JMS buffers).

Also, channels can be powerful when combined with Bridges.

5.2. Bridge

When we want to combine two channels, we use a Bridge.

Let's imagine that instead of writing directly to an output directory, we instead had our file-moving app write to another channel:

@Bean public IntegrationFlow fileReader() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("holdingTank") .get(); }

Now, because we've simply written it to a channel, we can bridge from there to other flows.

Let's create a bridge that polls our holding tank for messages and writes them to a destination:

@Bean public IntegrationFlow fileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20))) .handle(targetDirectory()) .get(); }

Again, because we wrote to an intermediate channel, now we can add another flow that takes these same files and writes them at a different rate:

@Bean public IntegrationFlow anotherFileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10))) .handle(anotherTargetDirectory()) .get(); }

As we can see, individual bridges can control the polling configuration for different handlers.

As soon as our application context is loaded, we now have a more complex app in action that will start moving files from the source directory to two target directories.

6. Conclusion

Dans cet article, nous avons vu différentes façons d'utiliser le DSL Java Spring Integration pour créer différents pipelines d'intégration.

Essentiellement, nous avons pu recréer l'application de déplacement de fichiers à partir d'un tutoriel précédent, cette fois en utilisant java pur.

Nous avons également examiné quelques autres composants comme les canaux et les ponts.

Le code source complet utilisé dans ce tutoriel est disponible à l'adresse over sur Github.