Présentation de Project Reactor Bus

1. Vue d'ensemble

Dans cet article rapide, nous présenterons le bus-réacteur en mettant en place un scénario réel pour une application réactive et événementielle.

2. Les bases du projet Reactor

2.1. Pourquoi Reactor?

Les applications modernes doivent traiter un grand nombre de demandes simultanées et traiter une quantité importante de données. Le code de blocage standard n'est plus suffisant pour répondre à ces exigences.

Le modèle de conception réactive est une approche architecturale basée sur les événements pour la gestion asynchrone d'un grand volume de demandes de service simultanées provenant d'un ou de plusieurs gestionnaires de services.

Le Project Reactor est basé sur ce modèle et a pour objectif clair et ambitieux de créer des applications réactives non bloquantes sur la JVM .

2.2. Exemples de scénarios

Avant de commencer, voici quelques scénarios intéressants où tirer parti du style architectural réactif aurait du sens, juste pour avoir une idée de l'endroit où nous pourrions l'appliquer:

  • Services de notification pour une grande plateforme d'achat en ligne comme Amazon
  • D'énormes services de traitement des transactions pour le secteur bancaire
  • Sociétés de négoce d'actions où les prix des actions changent simultanément

3. Dépendances de Maven

Commençons à utiliser Project Reactor Bus en ajoutant la dépendance suivante dans notre pom.xml:

 io.projectreactor reactor-bus 2.0.8.RELEASE 

Nous pouvons vérifier la dernière version du réacteur-bus dans Maven Central.

4. Création d'une application de démonstration

Pour mieux comprendre les avantages de l'approche par réacteur, prenons un exemple pratique.

Nous allons créer une application simple chargée d'envoyer des notifications aux utilisateurs d'une plateforme d'achat en ligne. Par exemple, si un utilisateur passe une nouvelle commande, l'application envoie une confirmation de commande par e-mail ou SMS.

Une implémentation synchrone typique serait naturellement limitée par le débit du service de messagerie électronique ou SMS. Par conséquent, les pics de trafic, tels que les vacances, seraient généralement problématiques.

Avec une approche réactive, nous pouvons concevoir notre système pour qu'il soit plus flexible et pour mieux s'adapter aux pannes ou aux délais d'attente qui peuvent survenir dans les systèmes externes, tels que les serveurs de passerelle.

Jetons un coup d'œil à l'application - en commençant par les aspects les plus traditionnels et en passant aux constructions les plus réactives.

4.1. POJO simple

Tout d'abord, créons une classe POJO pour représenter les données de notification:

public class NotificationData { private long id; private String name; private String email; private String mobile; // getter and setter methods }

4.2. La couche de service

Définissons maintenant une couche de service simple:

public interface NotificationService { void initiateNotification(NotificationData notificationData) throws InterruptedException; }

Et la mise en œuvre, simulant une opération de longue durée:

@Service public class NotificationServiceimpl implements NotificationService { @Override public void initiateNotification(NotificationData notificationData) throws InterruptedException { System.out.println("Notification service started for " + "Notification ID: " + notificationData.getId()); Thread.sleep(5000); System.out.println("Notification service ended for " + "Notification ID: " + notificationData.getId()); } }

Notez que pour illustrer un scénario réel d'envoi de messages via une passerelle SMS ou e-mail, nous introduisons intentionnellement un délai de cinq secondes dans la méthode initiateNotification avec Thread.sleep (5000).

Par conséquent, lorsqu'un thread atteint le service, il sera bloqué pendant cinq secondes.

4.3. Le consommateur

Passons maintenant aux aspects les plus réactifs de notre application et implémentons un consommateur - que nous mapperons ensuite au bus d'événements du réacteur:

@Service public class NotificationConsumer implements Consumer
    
      { @Autowired private NotificationService notificationService; @Override public void accept(Event notificationDataEvent) { NotificationData notificationData = notificationDataEvent.getData(); try { notificationService.initiateNotification(notificationData); } catch (InterruptedException e) { // ignore } } }
    

Comme nous pouvons le voir, le consommateur que nous avons créé implémente l' interface Consumer . La logique principale réside dans la méthode accept .

C'est une approche similaire que nous pouvons rencontrer dans une implémentation d'écouteur Spring typique.

4.4. Le controlle

Enfin, maintenant que nous sommes en mesure de consommer les événements, générons-les également.

Nous allons le faire dans un simple contrôleur:

@Controller public class NotificationController { @Autowired private EventBus eventBus; @GetMapping("/startNotification/{param}") public void startNotification(@PathVariable Integer param) { for (int i = 0; i < param; i++) { NotificationData data = new NotificationData(); data.setId(i); eventBus.notify("notificationConsumer", Event.wrap(data)); System.out.println( "Notification " + i + ": notification task submitted successfully"); } } }

C'est assez explicite - nous émettons des événements via l' EventBus ici.

Par exemple, si un client accède à l'URL avec une valeur de paramètre de dix, dix événements seront envoyés via le bus d'événements.

4.5. La configuration Java

Mettons maintenant tout ensemble et créons une simple application Spring Boot.

Tout d'abord, nous devons configurer les beans EventBus et Environment :

@Configuration public class Config { @Bean public Environment env() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean public EventBus createEventBus(Environment env) { return EventBus.create(env, Environment.THREAD_POOL); } }

Dans notre cas, nous instancions l' EventBus avec un pool de threads par défaut disponible dans l'environnement .

Alternativement, nous pouvons utiliser une instance Dispatcher personnalisée :

EventBus evBus = EventBus.create( env, Environment.newDispatcher( REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));

Maintenant, nous sommes prêts à créer un code d'application principal:

import static reactor.bus.selector.Selectors.$; @SpringBootApplication public class NotificationApplication implements CommandLineRunner { @Autowired private EventBus eventBus; @Autowired private NotificationConsumer notificationConsumer; @Override public void run(String... args) throws Exception { eventBus.on($("notificationConsumer"), notificationConsumer); } public static void main(String[] args) { SpringApplication.run(NotificationApplication.class, args); } }

Dans notre méthode d' exécution , nous enregistrons le notificationConsumer pour qu'il soit déclenché lorsque la notification correspond à un sélecteur donné .

Remarquez comment nous utilisons l'importation statique de l' attribut $ pour créer un objet Selector .

5. Testez l'application

Créons maintenant un test pour voir notre NotificationApplication en action:

@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class NotificationApplicationIntegrationTest { @LocalServerPort private int port; @Test public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getForObject("//localhost:" + port + "/startNotification/10", String.class); } }

Comme nous pouvons le voir, dès que la requête est exécutée, les dix tâches sont soumises instantanément sans créer de blocage . Et une fois soumis, les événements de notification sont traités en parallèle.

Notification 0: notification task submitted successfully Notification 1: notification task submitted successfully Notification 2: notification task submitted successfully Notification 3: notification task submitted successfully Notification 4: notification task submitted successfully Notification 5: notification task submitted successfully Notification 6: notification task submitted successfully Notification 7: notification task submitted successfully Notification 8: notification task submitted successfully Notification 9: notification task submitted successfully Notification service started for Notification ID: 1 Notification service started for Notification ID: 2 Notification service started for Notification ID: 3 Notification service started for Notification ID: 0 Notification service ended for Notification ID: 1 Notification service ended for Notification ID: 0 Notification service started for Notification ID: 4 Notification service ended for Notification ID: 3 Notification service ended for Notification ID: 2 Notification service started for Notification ID: 6 Notification service started for Notification ID: 5 Notification service started for Notification ID: 7 Notification service ended for Notification ID: 4 Notification service started for Notification ID: 8 Notification service ended for Notification ID: 6 Notification service ended for Notification ID: 5 Notification service started for Notification ID: 9 Notification service ended for Notification ID: 7 Notification service ended for Notification ID: 8 Notification service ended for Notification ID: 9

Il est important de garder à l'esprit que dans notre scénario, il n'est pas nécessaire de traiter ces événements dans un ordre particulier.

6. Conclusion

Dans ce tutoriel rapide, nous avons créé une application simple basée sur les événements . Nous avons également vu comment commencer à écrire un code plus réactif et non bloquant.

Cependant, ce scénario ne fait qu'effleurer la surface du sujet et ne représente qu'une bonne base pour commencer à expérimenter le paradigme réactif .

Comme toujours, le code source est disponible à l'adresse over sur GitHub.