Traitement par lots Java EE 7

1. Introduction

Imaginez que nous devions effectuer manuellement des tâches telles que le traitement des fiches de paie, le calcul des intérêts et la génération de factures. Cela deviendrait assez ennuyeux, sujet aux erreurs et une liste interminable de tâches manuelles!

Dans ce didacticiel, nous examinerons le traitement par lots Java (JSR 352), une partie de la plate-forme Jakarta EE, et une excellente spécification pour automatiser des tâches comme celles-ci. Il offre aux développeurs d'applications un modèle pour développer des systèmes de traitement par lots robustes afin qu'ils puissent se concentrer sur la logique métier.

2. Dépendances de Maven

Puisque JSR 352 n'est qu'une spécification, nous devrons inclure son API et son implémentation, comme jberet :

 javax.batch javax.batch-api 1.0.1   org.jberet jberet-core 1.0.2.Final   org.jberet jberet-support 1.0.2.Final   org.jberet jberet-se 1.0.2.Final 

Nous ajouterons également une base de données en mémoire afin que nous puissions examiner des scénarios plus réalistes.

3. Concepts clés

JSR 352 présente quelques concepts que nous pouvons examiner de cette façon:

Définissons d'abord chaque pièce:

  • En commençant à gauche, nous avons le JobOperator . Il gère tous les aspects du traitement des travaux tels que le démarrage, l'arrêt et le redémarrage
  • Ensuite, nous avons le Job . Un travail est un ensemble logique d'étapes; il encapsule tout un processus par lots
  • Un travail contiendra entre 1 et n étapes . Chaque étape est une unité de travail indépendante et séquentielle. Une étape est composée de la lecture de l' entrée, du traitement de cette entrée et de l' écriture de la sortie
  • Enfin, nous avons le JobRepository qui stocke les informations en cours d'exécution des jobs. Il aide à garder une trace des travaux, de leur état et de leurs résultats d'achèvement

Les étapes ont un peu plus de détails que cela, alors jetons un coup d'œil à cela ensuite. Nous allons d'abord examiner les étapes Chunk , puis les Batchlet s.

4. Création d'un morceau

Comme indiqué précédemment, un morceau est une sorte d'étape . Nous utiliserons souvent un bloc pour exprimer une opération qui est effectuée à plusieurs reprises, par exemple sur un ensemble d'éléments. C'est un peu comme des opérations intermédiaires de Java Streams.

Lors de la description d'un morceau, nous devons indiquer où prendre les éléments, comment les traiter et où les envoyer par la suite.

4.1. Articles de lecture

Pour lire les éléments, nous devons implémenter ItemReader.

Dans ce cas, nous allons créer un lecteur qui émettra simplement les nombres 1 à 10:

@Named public class SimpleChunkItemReader extends AbstractItemReader { private Integer[] tokens; private Integer count; @Inject JobContext jobContext; @Override public Integer readItem() throws Exception { if (count >= tokens.length) { return null; } jobContext.setTransientUserData(count); return tokens[count++]; } @Override public void open(Serializable checkpoint) throws Exception { tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 }; count = 0; } }

Maintenant, nous lisons simplement l'état interne de la classe ici. Mais, bien sûr, readItem pourrait extraire d'une base de données , du système de fichiers ou d'une autre source externe.

Notez que nous sauvegardons une partie de cet état interne en utilisant JobContext # setTransientUserData () qui sera utile plus tard.

Notez également le paramètre de point de contrôle . Nous reprendrons cela aussi.

4.2. Traitement des articles

Bien sûr, la raison pour laquelle nous segmentons est que nous voulons effectuer une sorte d'opération sur nos articles!

Chaque fois que nous retournons null d'un processeur d'article, nous supprimons cet article du lot.

Alors, disons ici que nous voulons ne garder que les nombres pairs. Nous pouvons utiliser un ItemProcessor qui rejette les impairs en renvoyant null :

@Named public class SimpleChunkItemProcessor implements ItemProcessor { @Override public Integer processItem(Object t) { Integer item = (Integer) t; return item % 2 == 0 ? item : null; } }

processItem sera appelé une fois pour chaque élément émis par notre ItemReader .

4.3. Articles d'écriture

Enfin, le travail appellera ItemWriter afin que nous puissions écrire nos éléments transformés:

@Named public class SimpleChunkWriter extends AbstractItemWriter { List processed = new ArrayList(); @Override public void writeItems(List items) throws Exception { items.stream().map(Integer.class::cast).forEach(processed::add); } } 

Combien de temps dure les articles ? Dans un instant, nous définirons la taille d'un morceau, qui déterminera la taille de la liste envoyée à writeItems .

4.4. Définition d'un bloc dans un travail

Maintenant, nous rassemblons tout cela dans un fichier XML utilisant JSL ou Job Specification Language. Notez que nous allons lister notre lecteur, processeur, bloc et également une taille de bloc:

La taille du bloc correspond à la fréquence à laquelle la progression du bloc est validée dans le référentiel de travaux , ce qui est important pour garantir l'achèvement, en cas de défaillance d'une partie du système.

Nous devrons placer ce fichier dans META-INF / batch-jobs pour. jar et dans WEB-INF / classes / META-INF / batch-jobs pour les fichiers .war .

Nous avons donné à notre travail l'identifiant «simpleChunk», alors essayons cela dans un test unitaire.

Désormais, les tâches sont exécutées de manière asynchrone, ce qui les rend difficiles à tester. Dans l'exemple, assurez-vous de consulter notre BatchTestHelper qui interroge et attend que le travail soit terminé:

@Test public void givenChunk_thenBatch_completesWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleChunk", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); } 

Voilà donc ce que sont les morceaux. Maintenant, jetons un œil aux batchlets.

5. Création d'un lot

Tout ne s'intègre pas parfaitement dans un modèle itératif. Par exemple, nous pouvons avoir une tâche que nous devons simplement appeler une fois, exécuter jusqu'à la fin et renvoyer un statut de sortie.

Le contrat pour un batchlet est assez simple:

@Named public class SimpleBatchLet extends AbstractBatchlet { @Override public String process() throws Exception { return BatchStatus.COMPLETED.toString(); } }

Tout comme le JSL:

Et nous pouvons le tester en utilisant la même approche que précédemment:

@Test public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

Nous avons donc examiné différentes manières de mettre en œuvre les étapes.

Regardons maintenant les mécanismes de marquage et de garantie de progrès.

6. Point de contrôle personnalisé

Les échecs se produisent inévitablement au milieu d'un travail. Devrions-nous tout recommencer ou pouvons-nous en quelque sorte recommencer là où nous nous sommes arrêtés?

Comme son nom l'indique, les points de contrôle nous aident à définir périodiquement un signet en cas d'échec.

Par défaut, la fin du traitement des blocs est un point de contrôle naturel .

Cependant, nous pouvons le personnaliser avec notre propre CheckpointAlgorithm :

@Named public class CustomCheckPoint extends AbstractCheckpointAlgorithm { @Inject JobContext jobContext; @Override public boolean isReadyToCheckpoint() throws Exception { int counterRead = (Integer) jobContext.getTransientUserData(); return counterRead % 5 == 0; } }

Rappelez-vous le décompte que nous avons placé dans les données transitoires plus tôt? Ici, nous pouvons le retirer avec JobContext # getTransientUserDatapour indiquer que nous voulons nous engager sur tous les 5 numéros traités.

Sans cela, un commit se produirait à la fin de chaque morceau, ou dans notre cas, tous les 3 nombres.

And then, we match that up with the checkout-algorithm directive in our XML underneath our chunk:

Let's test the code, again noting that some of the boilerplate steps are hidden away in BatchTestHelper:

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId) .stream() .map(BatchTestHelper::getCommitCount) .forEach(count -> assertEquals(3L, count.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

So, we might be expecting a commit count of 2 since we have ten items and configured the commits to be every 5th item. But, the framework does one more final read commit at the end to ensure everything has been processed, which is what brings us up to 3.

Next, let's look at how to handle errors.

7. Exception Handling

By default, the job operator will mark our job as FAILED in case of an exception.

Let's change our item reader to make sure that it fails:

@Override public Integer readItem() throws Exception { if (tokens.hasMoreTokens()) { String tempTokenize = tokens.nextToken(); throw new RuntimeException(); } return null; }

And then test:

@Test public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception { // ... start job and wait for completion assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED); }

But, we can override this default behavior in a number of ways:

  • skip-limit specifies the number of exceptions this step will ignore before failing
  • retry-limit specifies the number of times the job operator should retry the step before failing
  • skippable-exception-class specifies a set of exceptions that chunk processing will ignore

So, we can edit our job so that it ignores RuntimeException, as well as a few others, just for illustration:

And now our code will pass:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId).stream() .map(BatchTestHelper::getProcessSkipCount) .forEach(skipCount -> assertEquals(1L, skipCount.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8. Executing Multiple Steps

We mentioned earlier that a job can have any number of steps, so let's see that now.

8.1. Firing the Next Step

By default, each step is the last step in the job.

In order to execute the next step within a batch job, we'll have to explicitly specify by using the next attribute within the step definition:

If we forget this attribute, then the next step in sequence will not get executed.

And we can see what this looks like in the API:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(2 , jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.2. Flows

A sequence of steps can also be encapsulated into a flow. When the flow is finished, it is the entire flow that transitions to the execution element. Also, elements inside the flow can't transition to elements outside the flow.

We can, say, execute two steps inside a flow, and then have that flow transition to an isolated step:

And we can still see each step execution independently:

@Test public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(3, jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.3. Decisions

We also have if/else support in the form of decisions. Decisions provide a customized way of determining a sequence among steps, flows, and splits.

Like steps, it works on transition elements such as next which can direct or terminate job execution.

Let's see how the job can be configured:

Any decision element needs to be configured with a class that implements Decider. Its job is to return a decision as a String.

Each next inside decision is like a case in a switch statement.

8.4. Splits

Splits are handy since they allow us to execute flows concurrently:

Of course, this means that the order isn't guaranteed.

Let's confirm that they still all get run. The flow steps will be performed in an arbitrary order, but the isolated step will always be last:

@Test public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception { // ... start job and wait for completion List stepExecutions = jobOperator.getStepExecutions(executionId); assertEquals(3, stepExecutions.size()); assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

9. Partitioning a Job

We can also consume the batch properties within our Java code which have been defined in our job.

They can be scoped at three levels – the job, the step, and the batch-artifact.

Let's see some examples of how they consumed.

When we want to consume the properties at job level:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties(); ...

This can be consumed at a step level as well:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties(); ...

When we want to consume the properties at batch-artifact level:

@Inject @BatchProperty(name = "name") private String nameString;

This comes in handy with partitions.

See, with splits, we can run flows concurrently. But we can also partition a step into n sets of items or set separate inputs, allowing us another way to split up the work across multiple threads.

To comprehend the segment of work each partition should do, we can combine properties with partitions:

10. Stop and Restart

Now, that's it for defining jobs. Now let's talk for a minute about managing them.

We've already seen in our unit tests that we can get an instance of JobOperator from BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator();

And then, we can start the job:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

However, we can also stop the job:

jobOperator.stop(executionId);

And lastly, we can restart the job:

executionId = jobOperator.restart(executionId, new Properties());

Let's see how we can stop a running job:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobOperator.stop(executionId); jobExecution = BatchTestHelper.keepTestStopped(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); }

And if a batch is STOPPED, then we can restart it:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() { // ... start and stop the job assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties()); jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId)); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

11. Fetching Jobs

When a batch job is submitted then the batch runtime creates an instance of JobExecution to track it.

To obtain the JobExecution for an execution id, we can use the JobOperator#getJobExecution(executionId) method.

And, StepExecution provides helpful information for tracking a step's execution.

To obtain the StepExecution for an execution id, we can use the JobOperator#getStepExecutions(executionId) method.

And from that, we can get several metrics about the step via StepExecution#getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception { // ... start job and wait for completion assertTrue(jobOperator.getJobNames().contains("simpleChunk")); assertTrue(jobOperator.getParameters(executionId).isEmpty()); StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0); Map metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics()); assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue()); assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue()); // ... and many more! }

12. Disadvantages

JSR 352 is powerful, though it is lacking in a number of areas:

  • Il semble y avoir un manque de lecteurs et d'écrivains capables de traiter d'autres formats tels que JSON
  • Il n'y a pas de support des génériques
  • Le partitionnement ne prend en charge qu'une seule étape
  • L'API n'offre rien pour prendre en charge la planification (bien que J2EE dispose d'un module de planification séparé)
  • En raison de sa nature asynchrone, les tests peuvent être un défi
  • L'API est assez verbeuse

13. Conclusion

Dans cet article, nous avons examiné JSR 352 et découvert les blocs, les batchlets, les fractionnements, les flux et bien plus encore. Pourtant, nous avons à peine effleuré la surface.

Comme toujours, le code de démonstration peut être trouvé sur GitHub.