Intégration de Spring avec AWS Kinesis

1. Introduction

Kinesis est un outil de collecte, de traitement et d'analyse des flux de données en temps réel, développé chez Amazon. L'un de ses principaux avantages est qu'il aide au développement d'applications événementielles.

Dans ce didacticiel, nous explorerons quelques bibliothèques qui permettent à notre application Spring de produire et de consommer des enregistrements à partir d'un flux Kinesis . Les exemples de code montrent les fonctionnalités de base mais ne représentent pas le code prêt pour la production.

2. Prérequis

Avant d'aller plus loin, nous devons faire deux choses.

Le premier est de créer un projet Spring, car l'objectif ici est d'interagir avec Kinesis à partir d'un projet Spring.

La seconde consiste à créer un flux de données Kinesis. Nous pouvons le faire à partir d'un navigateur Web dans notre compte AWS. Une alternative pour les fans de l'AWS CLI parmi nous consiste à utiliser la ligne de commande. Comme nous interagirons avec lui à partir du code, nous devons également avoir à portée de main les informations d'identification AWS IAM, la clé d'accès et la clé secrète, ainsi que la région.

Tous nos producteurs créeront des enregistrements d'adresses IP factices, tandis que les consommateurs liront ces valeurs et les listeront dans la console d'application.

3. AWS SDK pour Java

La toute première bibliothèque que nous utiliserons est l'AWS SDK for Java. Son avantage est qu'il nous permet de gérer de nombreuses parties du travail avec Kinesis Data Streams. Nous pouvons lire des données, produire des données, créer des flux de données et reshardir des flux de données . L'inconvénient est que pour avoir du code prêt pour la production, nous devrons coder des aspects tels que le redistribution, la gestion des erreurs ou un démon pour maintenir le consommateur en vie.

3.1. Dépendance de Maven

La dépendance Maven amazon-kinesis-client apportera tout ce dont nous avons besoin pour avoir des exemples fonctionnels. Nous allons maintenant l'ajouter à notre fichier pom.xml :

 com.amazonaws amazon-kinesis-client 1.11.2 

3.2. Configuration du printemps

Réutilisons l' objet AmazonKinesis nécessaire pour interagir avec notre flux Kinesis. Nous allons le créer en tant que @Bean dans notre classe @SpringBootApplication :

@Bean public AmazonKinesis buildAmazonKinesis() { BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); return AmazonKinesisClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) .withRegion(Regions.EU_CENTRAL_1) .build(); }

Ensuite, définissons les aws.access.key et aws.secret.key , nécessaires pour la machine locale, dans application.properties :

aws.access.key=my-aws-access-key-goes-here aws.secret.key=my-aws-secret-key-goes-here

Et nous les lirons à l'aide de l' annotation @Value :

@Value("${aws.access.key}") private String accessKey; @Value("${aws.secret.key}") private String secretKey;

Par souci de simplicité, nous allons nous appuyer sur les méthodes @Scheduled pour créer et consommer des enregistrements.

3.3. Consommateur

Le AWS SDK Kinesis Consumer utilise un modèle d'extraction , ce qui signifie que notre code tirera des enregistrements à partir des fragments du flux de données Kinesis:

GetRecordsRequest recordsRequest = new GetRecordsRequest(); recordsRequest.setShardIterator(shardIterator.getShardIterator()); recordsRequest.setLimit(25); GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest); while (!recordsResult.getRecords().isEmpty()) { recordsResult.getRecords().stream() .map(record -> new String(record.getData().array())) .forEach(System.out::println); recordsRequest.setShardIterator(recordsResult.getNextShardIterator()); recordsResult = kinesis.getRecords(recordsRequest); }

L' objet GetRecordsRequest génère la demande de données de flux . Dans notre exemple, nous avons défini une limite de 25 enregistrements par requête, et nous continuons à lire jusqu'à ce qu'il n'y ait plus rien à lire.

Nous pouvons également remarquer que, pour notre itération, nous avons utilisé un objet GetShardIteratorResult . Nous avons créé cet objet dans une méthode @PostConstruc t afin de commencer à suivre les enregistrements immédiatement:

private GetShardIteratorResult shardIterator; @PostConstruct private void buildShardIterator() { GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest(); readShardsRequest.setStreamName(IPS_STREAM); readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST); readShardsRequest.setShardId(IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator(readShardsRequest); }

3.4. Producteur

Voyons maintenant comment gérer la création d'enregistrements pour notre flux de données Kinesis .

Nous insérons des données à l'aide d'un objet PutRecordsRequest . Pour ce nouvel objet, nous ajoutons une liste qui comprend plusieurs objets PutRecordsRequestEntry :

List entries = IntStream.range(1, 200).mapToObj(ipSuffix -> { PutRecordsRequestEntry entry = new PutRecordsRequestEntry(); entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes())); entry.setPartitionKey(IPS_PARTITION_KEY); return entry; }).collect(Collectors.toList()); PutRecordsRequest createRecordsRequest = new PutRecordsRequest(); createRecordsRequest.setStreamName(IPS_STREAM); createRecordsRequest.setRecords(entries); kinesis.putRecords(createRecordsRequest);

We've created a basic consumer and a producer of simulated IP records. All that's left to do now is to run our Spring project and see IPs listed in our application console.

4. KCL and KPL

Kinesis Client Library (KCL) is a library that simplifies the consuming of records. It’s also a layer of abstraction over the AWS SDK Java APIs for Kinesis Data Streams. Behind the scenes, the library handles load balancing across many instances, responding to instance failures, checkpointing processed records, and reacting to resharding.

Kinesis Producer Library (KPL) is a library useful for writing to a Kinesis data stream. It also provides a layer of abstraction that sits over the AWS SDK Java APIs for Kinesis Data Streams. For better performance, the library automatically handles batching, multi-threading, and retry logic.

KCL and KPL both have the main advantage that they're easy to use so that we can focus on producing and consuming records.

4.1. Maven Dependencies

The two libraries can be brought separately in our project if needed. To include KPL and KCL in our Maven project, we need to update our pom.xml file:

 com.amazonaws amazon-kinesis-producer 0.13.1   com.amazonaws amazon-kinesis-client 1.11.2 

4.2. Spring Setup

The only Spring preparation we need is to make sure we have the IAM credentials at hand. The values for aws.access.key and aws.secret.key are defined in our application.properties file so we can read them with @Value when needed.

4.3. Consumer

First, we'll create a class that implements the IRecordProcessor interface and defines our logic for how to handle Kinesis data stream records, which is to print them in the console:

public class IpProcessor implements IRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { processRecordsInput.getRecords() .forEach(record -> System.out.println(new String(record.getData().array()))); } @Override public void shutdown(ShutdownInput shutdownInput) { } }

The next step is to define a factory class that implements the IRecordProcessorFactory interface and returns a previously created IpProcessor object:

public class IpProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new IpProcessor(); } }

And now for the final step, we’ll use a Worker object to define our consumer pipeline. We need a KinesisClientLibConfiguration object that will define, if needed, the IAM Credentials and AWS Region.

We’ll pass the KinesisClientLibConfiguration, and our IpProcessorFactory object, to our Worker and then start it in a separate thread. We keep this logic of consuming records always alive with the use of the Worker class, so we’re continuously reading new records now:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration( APP_NAME, IPS_STREAM, new AWSStaticCredentialsProvider(awsCredentials), IPS_WORKER) .withRegionName(Regions.EU_CENTRAL_1.getName()); final Worker worker = new Worker.Builder() .recordProcessorFactory(new IpProcessorFactory()) .config(consumerConfig) .build(); CompletableFuture.runAsync(worker.run());

4.4. Producer

Let's now define the KinesisProducerConfiguration object, adding the IAM Credentials and the AWS Region:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration() .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials)) .setVerifyCertificate(false) .setRegion(Regions.EU_CENTRAL_1.getName()); this.kinesisProducer = new KinesisProducer(producerConfig);

We'll include the kinesisProducer object previously created in a @Scheduled job and produce records for our Kinesis data stream continuously:

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes())) .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Spring Cloud Stream Binder Kinesis

We’ve already seen two libraries, both created outside of the Spring ecosystem. We’ll now see how the Spring Cloud Stream Binder Kinesis can simplify our life further while building on top of Spring Cloud Stream.

5.1. Maven Dependency

The Maven dependency we need to define in our application for the Spring Cloud Stream Binder Kinesis is:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE 

5.2. Spring Setup

When running on EC2, the required AWS properties are automatically discovered, so there is no need to define them. Since we're running our examples on a local machine, we need to define our IAM access key, secret key, and region for our AWS account. We've also disabled the automatic CloudFormation stack name detection for the application:

cloud.aws.credentials.access-key=my-aws-access-key cloud.aws.credentials.secret-key=my-aws-secret-key cloud.aws.region.static=eu-central-1 cloud.aws.stack.auto=false

Spring Cloud Stream is bundled with three interfaces that we can use in our stream binding:

  • The Sink is for data ingestion
  • The Source is used for publishing records
  • The Processor is a combination of both

We can also define our own interfaces if we need to.

5.3. Consumer

Defining a consumer is a two-part job. First, we'll define, in the application.properties, the data stream from which we'll consume:

spring.cloud.stream.bindings.input.destination=live-ips spring.cloud.stream.bindings.input.group=live-ips-group spring.cloud.stream.bindings.input.content-type=text/plain

And next, let's define a Spring @Component class. The annotation @EnableBinding(Sink.class) will allow us to read from the Kinesis stream using the method annotated with @StreamListener(Sink.INPUT):

@EnableBinding(Sink.class) public class IpConsumer { @StreamListener(Sink.INPUT) public void consume(String ip) { System.out.println(ip); } }

5.4. Producer

The producer can also be split in two. First, we have to define our stream properties inside application.properties:

spring.cloud.stream.bindings.output.destination=live-ips spring.cloud.stream.bindings.output.content-type=text/plain

And then we add @EnableBinding(Source.class) on a Spring @Component and create new test messages every few seconds:

@Component @EnableBinding(Source.class) public class IpProducer { @Autowired private Source source; @Scheduled(fixedDelay = 3000L) private void produce() { IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix) .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build())); } }

C'est tout ce dont nous avons besoin pour que Spring Cloud Stream Binder Kinesis fonctionne. Nous pouvons simplement démarrer l'application maintenant.

6. Conclusion

Dans cet article, nous avons vu comment intégrer notre projet Spring avec deux bibliothèques AWS pour interagir avec un flux de données Kinesis. Nous avons également vu comment utiliser la bibliothèque Spring Cloud Stream Binder Kinesis pour rendre la mise en œuvre encore plus facile.

Le code source de cet article se trouve à l'adresse over sur Github.