Introduction à MBassador

1. Vue d'ensemble

En termes simples, MBassador est un bus d'événements haute performance utilisant la sémantique de publication-abonnement.

Les messages sont diffusés à un ou plusieurs homologues sans que l'on sache au préalable combien d'abonnés il y a ou comment ils utilisent le message.

2. Dépendance de Maven

Avant de pouvoir utiliser la bibliothèque, nous devons ajouter la dépendance mbassador:

 net.engio mbassador 1.3.1 

3. Gestion des événements de base

3.1. Exemple simple

Nous allons commencer par un exemple simple de publication d'un message:

private MBassador dispatcher = new MBassador(); private String messageString; @Before public void prepareTests() { dispatcher.subscribe(this); } @Test public void whenStringDispatched_thenHandleString() { dispatcher.post("TestString").now(); assertNotNull(messageString); assertEquals("TestString", messageString); } @Handler public void handleString(String message) { messageString = message; } 

En haut de cette classe de test, nous voyons la création d'un MBassador avec son constructeur par défaut. Ensuite, dans la méthode @Before , nous appelons subscribe () et passons une référence à la classe elle-même.

Dans subscribe (), le répartiteur inspecte l'abonné pour les annotations @Handler .

Et, dans le premier test, nous appelons dispatcher.post (…) .now () pour envoyer le message - ce qui entraîne l' appel de handleString () .

Ce test initial démontre plusieurs concepts importants. Tout objet peut être un abonné, à condition qu'il ait une ou plusieurs méthodes annotées avec @Handler . Un abonné peut avoir n'importe quel nombre de gestionnaires.

Nous utilisons des objets de test qui s'abonnent à eux-mêmes par souci de simplicité, mais dans la plupart des scénarios de production, les répartiteurs de messages seront dans des classes différentes de celles des consommateurs.

Les méthodes de gestion n'ont qu'un seul paramètre d'entrée - le message, et ne peuvent pas lever d'exceptions vérifiées.

Semblable à la méthode subscribe () , la méthode post accepte n'importe quel objet . Cet objet est livré aux abonnés.

Lorsqu'un message est publié, il est remis à tous les écouteurs qui se sont abonnés au type de message.

Ajoutons un autre gestionnaire de messages et envoyons un type de message différent:

private Integer messageInteger; @Test public void whenIntegerDispatched_thenHandleInteger() { dispatcher.post(42).now(); assertNull(messageString); assertNotNull(messageInteger); assertTrue(42 == messageInteger); } @Handler public void handleInteger(Integer message) { messageInteger = message; } 

Comme prévu, lorsque nous expédionsun Integer , handleInteger () est appelé et handleString () ne l'est pas. Un seul répartiteur peut être utilisé pour envoyer plus d'un type de message.

3.2. Messages morts

Alors, où va un message lorsqu'il n'y a pas de gestionnaire pour lui? Ajoutons un nouveau gestionnaire d'événements, puis envoyons un troisième type de message:

private Object deadEvent; @Test public void whenLongDispatched_thenDeadEvent() { dispatcher.post(42L).now(); assertNull(messageString); assertNull(messageInteger); assertNotNull(deadEvent); assertTrue(deadEvent instanceof Long); assertTrue(42L == (Long) deadEvent); } @Handler public void handleDeadEvent(DeadMessage message) { deadEvent = message.getMessage(); } 

Dans ce test, nous distribuons un Long au lieu d'un Integer. Ni handleInteger () ni handleString () ne sont appelés, mais handleDeadEvent () l' est.

Lorsqu'il n'y a aucun gestionnaire pour un message, il est encapsulé dans un objet DeadMessage . Depuis que nous avons ajouté un gestionnaire pour Deadmessage , nous le capturons.

DeadMessage peut être ignoré en toute sécurité; si une application n'a pas besoin de suivre les messages morts, elle peut être autorisée à aller nulle part.

4. Utilisation d'une hiérarchie d'événements

L'envoi d' événements String et Integer est limité. Créons quelques classes de messages:

public class Message {} public class AckMessage extends Message {} public class RejectMessage extends Message { int code; // setters and getters }

Nous avons une classe de base simple et deux classes qui l'étendent.

4.1. Envoi d'un message de classe de base

Nous allons commencer par les événements Message :

private MBassador dispatcher = new MBassador(); private Message message; private AckMessage ackMessage; private RejectMessage rejectMessage; @Before public void prepareTests() { dispatcher.subscribe(this); } @Test public void whenMessageDispatched_thenMessageHandled() { dispatcher.post(new Message()).now(); assertNotNull(message); assertNull(ackMessage); assertNull(rejectMessage); } @Handler public void handleMessage(Message message) { this.message = message; } @Handler public void handleRejectMessage(RejectMessage message) { rejectMessage = message; } @Handler public void handleAckMessage(AckMessage message) { ackMessage = message; }

Découvrez MBassador - un bus événementiel ultra-performant. Cela nous limite à l'utilisation de Messages mais ajoute une couche supplémentaire de sécurité de type.

Lorsque nous envoyons un message , handleMessage () le reçoit. Les deux autres gestionnaires ne le font pas.

4.2. Envoi d'un message de sous-classe

Envoyons un RejectMessage :

@Test public void whenRejectDispatched_thenMessageAndRejectHandled() { dispatcher.post(new RejectMessage()).now(); assertNotNull(message); assertNotNull(rejectMessage); assertNull(ackMessage); }

Lorsque nous envoyons un RejectMessage à la fois handleRejectMessage () et handleMessage () recevoir.

Étant donné que RejectMessage étend Message, le gestionnaire de messages l'a reçu, en plus du gestionnaire R ejectMessage .

Vérifions ce comportement avec un AckMessage :

@Test public void whenAckDispatched_thenMessageAndAckHandled() { dispatcher.post(new AckMessage()).now(); assertNotNull(message); assertNotNull(ackMessage); assertNull(rejectMessage); }

Comme nous nous y attendions, lorsque nous envoyons un AckMessage , handleAckMessage () et handleMessage () le reçoivent.

5. Filtrage des messages

L'organisation des messages par type est déjà une fonctionnalité puissante, mais nous pouvons les filtrer encore plus.

5.1. Filtrer par classe et sous-classe

Lorsque nous avons publié un RejectMessage ou un AckMessage , nous avons reçu l'événement à la fois dans le gestionnaire d'événements pour le type particulier et dans la classe de base.

We can solve this type hierarchy issue by making Message abstract and creating a class such as GenericMessage. But what if we don't have this luxury?

We can use message filters:

private Message baseMessage; private Message subMessage; @Test public void whenMessageDispatched_thenMessageFiltered() { dispatcher.post(new Message()).now(); assertNotNull(baseMessage); assertNull(subMessage); } @Test public void whenRejectDispatched_thenRejectFiltered() { dispatcher.post(new RejectMessage()).now(); assertNotNull(subMessage); assertNull(baseMessage); } @Handler(filters = { @Filter(Filters.RejectSubtypes.class) }) public void handleBaseMessage(Message message) { this.baseMessage = message; } @Handler(filters = { @Filter(Filters.SubtypesOnly.class) }) public void handleSubMessage(Message message) { this.subMessage = message; }

The filters parameter for the @Handler annotation accepts a Class that implements IMessageFilter. The library offers two examples:

The Filters.RejectSubtypes does as its name suggests: it will filter out any subtypes. In this case, we see that RejectMessage is not handled by handleBaseMessage().

The Filters.SubtypesOnly also does as its name suggests: it will filter out any base types. In this case, we see that Message is not handled by handleSubMessage().

5.2. IMessageFilter

The Filters.RejectSubtypes and the Filters.SubtypesOnly both implement IMessageFilter.

RejectSubTypes compares the class of the message to its defined message types and will only allow through messages that equal one of its types, as opposed to any subclasses.

5.3. Filter With Conditions

Fortunately, there is an easier way of filtering messages. MBassador supports a subset of Java EL expressions as conditions for filtering messages.

Let's filter a String message based on its length:

private String testString; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; }

The “foobar!” message is seven characters long and is filtered. Let's send a shorter String:

 @Test public void whenShortStringDispatched_thenStringHandled() { dispatcher.post("foobar").now(); assertNotNull(testString); }

Now, the “foobar” is only six characters long and is passed through.

Our RejectMessage contains a field with an accessor. Let's write a filter for that:

private RejectMessage rejectMessage; @Test public void whenWrongRejectDispatched_thenRejectFiltered() { RejectMessage testReject = new RejectMessage(); testReject.setCode(-1); dispatcher.post(testReject).now(); assertNull(rejectMessage); assertNotNull(subMessage); assertEquals(-1, ((RejectMessage) subMessage).getCode()); } @Handler(condition = "msg.getCode() != -1") public void handleRejectMessage(RejectMessage rejectMessage) { this.rejectMessage = rejectMessage; }

Here again, we can query a method on an object and either filter the message or not.

5.4. Capture Filtered Messages

Similar to DeadEvents, we may want to capture and process filtered messages. There is a dedicated mechanism for capturing filtered events too. Filtered events are treated differently from “dead” events.

Let's write a test that illustrates this:

private String testString; private FilteredMessage filteredMessage; private DeadMessage deadMessage; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); assertNotNull(filteredMessage); assertTrue(filteredMessage.getMessage() instanceof String); assertNull(deadMessage); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; } @Handler public void handleFilterMessage(FilteredMessage message) { this.filteredMessage = message; } @Handler public void handleDeadMessage(DeadMessage deadMessage) { this.deadMessage = deadMessage; } 

With the addition of a FilteredMessage handler, we can track Strings that are filtered because of their length. The filterMessage contains our too-long String while deadMessage remains null.

6. Asynchronous Message Dispatch and Handling

So far all of our examples have used synchronous message dispatch; when we called post.now() the messages were delivered to each handler in the same thread we called post() from.

6.1. Asynchronous Dispatch

The MBassador.post() returns a SyncAsyncPostCommand. This class offers several methods, including:

  • now() – dispatch messages synchronously; the call will block until all messages have been delivered
  • asynchronously() – executes the message publication asynchronously

Let's use asynchronous dispatch in a sample class. We'll use Awaitility in these tests to simplify the code:

private MBassador dispatcher = new MBassador(); private String testString; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenAsyncDispatched_thenMessageReceived() { dispatcher.post("foobar").asynchronously(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testString); } @Handler public void handleStringMessage(String message) { this.testString = message; ready.set(true); }

We call asynchronously() in this test, and use an AtomicBoolean as a flag with await() to wait for the delivery thread to deliver the message.

If we comment out the call to await(), we risk the test failing, because we check testString before the delivery thread completes.

6.2. Asynchronous Handler Invocation

Asynchronous dispatch allows the message provider to return to message processing before the messages are delivered to each handler, but it still calls each handler in order, and each handler has to wait for the previous to finish.

This can lead to problems if one handler performs an expensive operation.

MBassador provides a mechanism for asynchronous handler invocation. Handlers configured for this receive messages in their thread:

private Integer testInteger; private String invocationThreadName; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenHandlerAsync_thenHandled() { dispatcher.post(42).now(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testInteger); assertFalse(Thread.currentThread().getName().equals(invocationThreadName)); } @Handler(delivery = Invoke.Asynchronously) public void handleIntegerMessage(Integer message) { this.invocationThreadName = Thread.currentThread().getName(); this.testInteger = message; ready.set(true); }

Handlers can request asynchronous invocation with the delivery = Invoke.Asynchronously property on the Handler annotation. We verify this in our test by comparing the Thread names in the dispatching method and the handler.

7. Customizing MBassador

So far we've been using an instance of MBassador with its default configuration. The dispatcher's behavior can be modified with annotations, similar to those we have seen so far; we'll cover a few more to finish this tutorial.

7.1. Exception Handling

Handlers cannot define checked exceptions. Instead, the dispatcher can be provided with an IPublicationErrorHandler as an argument to its constructor:

public class MBassadorConfigurationTest implements IPublicationErrorHandler { private MBassador dispatcher; private String messageString; private Throwable errorCause; @Before public void prepareTests() { dispatcher = new MBassador(this); dispatcher.subscribe(this); } @Test public void whenErrorOccurs_thenErrorHandler() { dispatcher.post("Error").now(); assertNull(messageString); assertNotNull(errorCause); } @Test public void whenNoErrorOccurs_thenStringHandler() { dispatcher.post("Error").now(); assertNull(errorCause); assertNotNull(messageString); } @Handler public void handleString(String message) { if ("Error".equals(message)) { throw new Error("BOOM"); } messageString = message; } @Override public void handleError(PublicationError error) { errorCause = error.getCause().getCause(); } }

When handleString() throws an Error, it is saved to errorCause.

7.2. Handler Priority

Handlers are called in reverse order of how they are added, but this isn't behavior we want to rely on. Even with the ability to call handlers in their threads, we may still need to know what order they will be called in.

We can set handler priority explicitly:

private LinkedList list = new LinkedList(); @Test public void whenRejectDispatched_thenPriorityHandled() { dispatcher.post(new RejectMessage()).now(); // Items should pop() off in reverse priority order assertTrue(1 == list.pop()); assertTrue(3 == list.pop()); assertTrue(5 == list.pop()); } @Handler(priority = 5) public void handleRejectMessage5(RejectMessage rejectMessage) { list.push(5); } @Handler(priority = 3) public void handleRejectMessage3(RejectMessage rejectMessage) { list.push(3); } @Handler(priority = 2, rejectSubtypes = true) public void handleMessage(Message rejectMessage) logger.error("Reject handler #3"); list.push(3); } @Handler(priority = 0) public void handleRejectMessage0(RejectMessage rejectMessage) { list.push(1); } 

Handlers are called from highest priority to lowest. Handlers with the default priority, which is zero, are called last. We see that the handler numbers pop() off in reverse order.

7.3. Reject Subtypes, the Easy Way

What happened to handleMessage() in the test above? We don't have to use RejectSubTypes.class to filter our sub types.

RejectSubTypes is a boolean flag that provides the same filtering as the class, but with better performance than the IMessageFilter implementation.

We still need to use the filter-based implementation for accepting subtypes only, though.

8. Conclusion

MBassador est une bibliothèque simple et directe pour passer des messages entre objets. Les messages peuvent être organisés de différentes manières et peuvent être distribués de manière synchrone ou asynchrone.

Et, comme toujours, l'exemple est disponible dans ce projet GitHub.