Introduction à Netflix Mantis

1. Vue d'ensemble

Dans cet article, nous allons jeter un œil à la plateforme Mantis développée par Netflix.

Nous explorerons les principaux concepts de Mantis en créant, exécutant et analysant un travail de traitement de flux.

2. Qu'est-ce que la mante?

Mantis est une plateforme de création d'applications de traitement de flux (jobs). Il offre un moyen simple de gérer le déploiement et le cycle de vie des travaux. De plus, il facilite l'allocation des ressources, la découverte et la communication entre ces travaux.

Par conséquent, les développeurs peuvent se concentrer sur la logique métier réelle, tout en bénéficiant du support d'une plate-forme robuste et évolutive pour exécuter leurs applications non bloquantes à volume élevé, à faible latence.

Un travail Mantis se compose de trois parties distinctes:

  • la source , chargée de récupérer les données à partir d'une source externe
  • une ou plusieurs étapes , responsables du traitement des flux d'événements entrants
  • et un puits qui collecte les données traitées

Explorons maintenant chacun d'eux.

3. Configuration et dépendances

Commençons par ajouter les dépendances mantis-runtime et jackson-databind :

 io.mantisrx mantis-runtime   com.fasterxml.jackson.core jackson-databind 

Maintenant, pour configurer la source de données de notre travail, implémentons l' interface Mantis Source :

public class RandomLogSource implements Source { @Override public Observable
    
      call(Context context, Index index) { return Observable.just( Observable .interval(250, TimeUnit.MILLISECONDS) .map(this::createRandomLogEvent)); } private String createRandomLogEvent(Long tick) { // generate a random log entry string ... } }
    

Comme nous pouvons le voir, il génère simplement des entrées de journal aléatoires plusieurs fois par seconde.

4. Notre premier emploi

Créons maintenant un travail Mantis qui collecte simplement les événements de journal de notre RandomLogSource . Plus tard, nous ajouterons des transformations de groupe et d'agrégation pour un résultat plus complexe et intéressant.

Pour commencer, créons une entité LogEvent :

public class LogEvent implements JsonType { private Long index; private String level; private String message; // ... }

Ensuite, ajoutons notre TransformLogStage.

C'est une étape simple qui implémente l'interface ScalarComputation et divise une entrée de journal pour créer un LogEvent . En outre, il filtre toutes les chaînes mal formatées:

public class TransformLogStage implements ScalarComputation { @Override public Observable call(Context context, Observable logEntry) { return logEntry .map(log -> log.split("#")) .filter(parts -> parts.length == 3) .map(LogEvent::new); } }

4.1. Exécution du travail

À ce stade, nous avons suffisamment de blocs de construction pour mettre en place notre travail Mantis:

public class LogCollectingJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

Regardons de plus près notre travail.

Comme nous pouvons le voir, il étend MantisJobProvider. Au début, il récupère les données de notre RandomLogSource et applique le TransformLogStage aux données extraites. Enfin, il envoie les données traitées au récepteur intégré qui s'abonne avec empressement et fournit des données via SSE.

Maintenant, configurons notre travail pour qu'il s'exécute localement au démarrage:

@SpringBootApplication public class MantisApplication implements CommandLineRunner { // ... @Override public void run(String... args) { LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance()); } }

Lançons l'application. Nous verrons un message de journal comme:

... Serving modern HTTP SSE server sink on port: 86XX

Connectons-nous maintenant au lavabo en utilisant curl :

$ curl localhost:86XX data: {"index":86,"level":"WARN","message":"login attempt"} data: {"index":87,"level":"ERROR","message":"user created"} data: {"index":88,"level":"INFO","message":"user created"} data: {"index":89,"level":"INFO","message":"login attempt"} data: {"index":90,"level":"INFO","message":"user created"} data: {"index":91,"level":"ERROR","message":"user created"} data: {"index":92,"level":"WARN","message":"login attempt"} data: {"index":93,"level":"INFO","message":"user created"} ...

4.2. Configurer l'évier

Jusqu'à présent, nous avons utilisé le puits intégré pour collecter nos données traitées. Voyons si nous pouvons ajouter plus de flexibilité à notre scénario en fournissant un récepteur personnalisé.

Et si, par exemple, nous souhaitons filtrer les journaux par message ?

Créons un LogSink qui implémente l' interface Sink :

public class LogSink implements Sink { @Override public void call(Context context, PortRequest portRequest, Observable logEventObservable) { SelfDocumentingSink sink = new ServerSentEventsSink.Builder() .withEncoder(LogEvent::toJsonString) .withPredicate(filterByLogMessage()) .build(); logEventObservable.subscribe(); sink.call(context, portRequest, logEventObservable); } private Predicate filterByLogMessage() { return new Predicate("filter by message", parameters -> { if (parameters != null && parameters.containsKey("filter")) { return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); } return logEvent -> true; }); } }

Dans cette implémentation de récepteur, nous avons configuré un prédicat qui utilise le paramètre de filtre pour récupérer uniquement les journaux contenant le texte défini dans le paramètre de filtre :

$ curl localhost:8874?filter=login data: {"index":93,"level":"ERROR","message":"login attempt"} data: {"index":95,"level":"INFO","message":"login attempt"} data: {"index":97,"level":"ERROR","message":"login attempt"} ...

Remarque Mantis propose également un puissant langage de requête, MQL, qui peut être utilisé pour interroger, transformer et analyser les données de flux de manière SQL.

5. Chaînage d'étapes

Supposons maintenant que nous souhaitons connaître le nombre d' entrées de journal ERROR , WARN ou INFO que nous avons dans un intervalle de temps donné. Pour cela, nous allons ajouter deux étapes supplémentaires à notre travail et les enchaîner.

5.1. Regroupement

Tout d'abord, créons un GroupLogStage.

This stage is a ToGroupComputation implementation that receives a LogEvent stream data from the existing TransformLogStage. After that, it groups entries by logging level and sends them to the next stage:

public class GroupLogStage implements ToGroupComputation { @Override public Observable
    
      call(Context context, Observable logEvent) { return logEvent.map(log -> new MantisGroup(log.getLevel(), log)); } public static ScalarToGroup.Config config(){ return new ScalarToGroup.Config() .description("Group event data by level") .codec(JacksonCodecs.pojo(LogEvent.class)) .concurrentInput(); } }
    

We've also created a custom stage config by providing a description, the codec to use for serializing the output, and allowed this stage's call method to run concurrently by using concurrentInput().

One thing to note is that this stage is horizontally scalable. Meaning we can run as many instances of this stage as needed. Also worth mentioning, when deployed in a Mantis cluster, this stage sends data to the next stage so that all events belonging to a particular group will land on the same worker of the next stage.

5.2. Aggregating

Before we move on and create the next stage, let's first add a LogAggregate entity:

public class LogAggregate implements JsonType { private final Integer count; private final String level; }

Now, let's create the last stage in the chain.

This stage implements GroupToScalarComputation and transforms a stream of log groups to a scalar LogAggregate. It does this by counting how many times each type of log appears in the stream. In addition, it also has a LogAggregationDuration parameter, which can be used to control the size of the aggregation window:

public class CountLogStage implements GroupToScalarComputation { private int duration; @Override public void init(Context context) { duration = (int)context.getParameters().get("LogAggregationDuration", 1000); } @Override public Observable call(Context context, Observable
    
      mantisGroup) { return mantisGroup .window(duration, TimeUnit.MILLISECONDS) .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) .map((count) -> new LogAggregate(count, group.getKey())) )); } public static GroupToScalar.Config config(){ return new GroupToScalar.Config() .description("sum events for a log level") .codec(JacksonCodecs.pojo(LogAggregate.class)) .withParameters(getParameters()); } public static List
     
       getParameters() { List
      
        params = new ArrayList(); params.add(new IntParameter() .name("LogAggregationDuration") .description("window size for aggregation in milliseconds") .validator(Validators.range(100, 10000)) .defaultValue(5000) .build()); return params; } }
      
     
    

5.3. Configure and Run the Job

The only thing left to do now is to configure our job:

public class LogAggregationJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new GroupLogStage(), GroupLogStage.config()) .stage(new CountLogStage(), CountLogStage.config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

As soon as we run the application and execute our new job, we can see the log counts being retrieved every few seconds:

$ curl localhost:8133 data: {"count":3,"level":"ERROR"} data: {"count":13,"level":"INFO"} data: {"count":4,"level":"WARN"} data: {"count":8,"level":"ERROR"} data: {"count":5,"level":"INFO"} data: {"count":7,"level":"WARN"} ...

6. Conclusion

To sum up, in this article, we've seen what Netflix Mantis is and what it can be used for. Furthermore, we looked at the main concepts, used them to build jobs, and explored custom configurations for different scenarios.

Comme toujours, le code complet est disponible à l'adresse over sur GitHub.