Added FilteredMessage, interface optimization, refactorings

This commit is contained in:
benni 2013-03-04 09:14:08 +01:00
parent eb50dbc96e
commit e7a76287cb
51 changed files with 541 additions and 299 deletions

117
README.md
View File

@ -5,8 +5,7 @@ MBassador is a very light-weight message (event) bus implementation following th
for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance. It uses a specialized
data structure to allow high throughput for concurrent access.
Read this documentation to get an overview of its features and how cool this message (event) bus actually is.
You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
Read this documentation to get an overview of its features. You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
which also contains a partial list of the features of the compared implementations.
The current version is 1.1.1 and it is available from the Maven Central Repository. See the release notes for more details.
@ -15,6 +14,7 @@ Table of contents:
+ [Features](#features)
+ [Usage](#usage)
+ [Installation](#installation)
+ [Wiki](#wiki)
+ [Release Notes](#releasenotes)
+ [Roadmap](#roadmap)
+ [Credits](#credits)
@ -26,26 +26,33 @@ Table of contents:
At its core MBassador offers the following features:
+ <em><strong>Annotation driven</em></strong>: To define and customize a message handler simply mark it with @Listener annotation
+ <em><strong>Delivers everything</em></strong>: Messages must not implement any interface and can be of any type (-> message bus is typed using generics with upper
bound being Object.class). The class hierarchy of a message is considered during message delivery. This means that listeners will also receive
subtypes of the message type they are listening for, e.g. a listener for Object.class receives everything.
+ <em><strong>Annotation driven</em></strong>: To define and customize a message handler simply mark it with @Handler annotation
+ <em><strong>Delivers everything</em></strong>: Messages must not implement any interface and can be of any type. It is
possible though to define an upper bound of the message type using generics. The class hierarchy of a message is considered during message delivery.
This means that listeners will also receivesubtypes of the message type they are listening for, e.g. a listener for Object.class receives everything.
+ <em><strong>Synchronous and asynchronous message delivery</em></strong>: A handler can be invoked to handle a message either synchronously or
asynchronously. This is configurable for each handler via annotations. Message publication itself supports synchronous (method
blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch
+ <em><strong>Weak references</em></strong>: Mbassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister
+ <em><strong>Weak references</em></strong>: MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister
listeners that are not used anymore (of course it is also possible to explicitly unregister a listener if needed). This is very comfortable
in certain environments where objects are created by frameworks, i.e. spring, guice etc. Just stuff everything into the message bus, it will
in certain environments where listeners are managed by frameworks, i.e. spring, guice etc. Just stuff everything into the message bus, it will
ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.
+ <em><strong>Filtering</em></strong>: Mbassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to
+ <em><strong>Filtering</em></strong>: MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to
a single message handler
+ <em><strong>Message envelopes</em></strong>: Message handlers can declare to receive an enveloped message. The envelope can wrap around different
types of messages. This allows for a single handler to handle multiple message types
+ <em><strong>Handler priorities</em></strong>: A listener can be associated with a priority to influence the order of the message delivery
+ <em><strong>Error handling</em></strong>: Errors during message delivery are sent to an error handler of which a custom implementation can easily be plugged-in.
+ <em><strong>Ease of Use</em></strong>: Using Mbassador in your project is very easy. Create as many instances of Mbassador as you like (usually a singleton will do),
mark and configure your message handlers with @Listener annotations and finally register the listeners at any Mbassador instance. Start
sending messages to your listeners using one of Mbassador's publication methods (sync or async). Done!
+ <em><strong>Message envelopes</em></strong>: Message handlers can declare to receive an enveloped message. The envelope can wrap different
types of messages. This allows for a single handler to handle multiple (unrelated) message types.
+ <em><strong>Handler priorities</em></strong>: A listener can be associated with a priority to influence the order in which messages are delivered when multiple handlers exist
+ <em><strong>Custom error handling</em></strong>: Errors during message delivery are sent to all registered error handlers which can be added to the bus as necessary.
+ <em><strong>Dead Event</em></strong>: Messages that do not match any handler result in the publication of a DeadEvent object which wraps the original message. Dead events
can be handled by registering listeners that handle DeadEvent.
+ <em><strong>Dead Event</em></strong>: Messages that have matching handlers but do not pass the configured filters result in the publication of a FilteredEvent object which wraps the original message.
Filtered events can be handled by registering listeners that handle FilteredEvent.
+ <em><strong>Extensibility</em></strong>:MBassador is designed to be extensible with custom implementations of various components like message
dispatchers and handler invocations (using the decorator pattern), metadata reader (you can add your own annotations) and factories for different
kinds of object.A configuration object can be passed to on instance creation to specify the different configurable parts
+ <em><strong>Ease of Use</em></strong>: Using MBassador in your project is very easy. Create as many instances of MBassador as you like (usually a singleton will do),
mark and configure your message handlers with @Handler annotations and finally register the listeners at any MBassador instance. Start
sending messages to your listeners using one of MBassador's publication methods (sync or async). Done!
@ -53,33 +60,33 @@ sending messages to your listeners using one of Mbassador's publication methods
Listener definition (in any bean):
// every event of type TestEvent or any subtype will be delivered
// every event of type TestMessage or any subtype will be delivered
// to this handler
@Listener
public void handleTestEvent(TestEvent event) {
@Handler
public void handleTestMessage(TestMessage event) {
// do something
}
// this handler will be invoked concurrently
@Listener(delivery = Mode.Concurrent)
public void handleSubTestEvent(SubTestEvent event) {
@Handler(delivery = Mode.Concurrent)
public void handleSubTestMessage(SubTestMessage event) {
// do something more expensive here
}
// this handler will receive messages of type SubTestEvent
// this handler will receive messages of type SubTestMessage
// or any of its sub types that passe the given filter(s)
@Listener(priority = 10,
@Handler(priority = 10,
delivery = Mode.Sequential,
filters = {@Filter(Filters.SpecialEvent.class)})
public void handleFiltered(SubTestEvent event) {
filters = {@Filter(Filters.SpecialMessage.class)})
public void handleFiltered(SubTestMessage event) {
//do something special here
}
@Listener(delivery = Mode.Sequential, rejectSubtypes = true)
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
@Handler(delivery = Mode.Sequential, rejectSubtypes = true)
@Enveloped(messages = {TestMessage.class, TestMessage2.class})
public void handleVariousEvents(MessageEnvelope envelope) {
// the envelope will contain either an instance of TestEvent or TestEvent2
// if rejectSubtypes were set to 'false' (default) also subtypes of TestEvent or TestEvent2 would be allowed
// the envelope will contain either an instance of TestMessage or TestMessage2
// if rejectSubtypes were set to 'false' (default) also subtypes of TestMessage or TestMessage2 would be allowed
}
@ -87,7 +94,7 @@ Creation of message bus and registration of listeners:
// create as many instances as necessary
// bind it to any upper bound
MBassador<TestEvent> bus = new MBassador<TestEvent>(BusConfiguration.Default());
MBassador<TestMessage> bus = new MBassador<TestMessage>(BusConfiguration.Default());
ListeningBean listener = new ListeningBean();
// the listener will be registered using a weak-reference
bus.subscribe(listener);
@ -97,18 +104,18 @@ Creation of message bus and registration of listeners:
Message publication:
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
bus.publishAsync(event); //returns immediately, publication will continue asynchronously
bus.post(event).asynchronously(); // same as above
bus.publish(subEvent); // will return after each handler has been invoked
bus.post(subEvent).now(); // same as above
bus.publishAsync(message); //returns immediately, publication will continue asynchronously
bus.post(message).asynchronously(); // same as above
bus.publish(subMessage); // will return after each handler has been invoked
bus.post(subMessage).now(); // same as above
<h2>Installation</h2>
Beginning with version 1.1.0 MBassador is available from the Maven Central Repository (Hooray!). Older versions are
still available from the included maven repository in this github repo but will be deleted in the future.
The preferred way of using MBassador is to simply add the dependency as shown in step two. Step one is only necessary
The recommended way of using MBassador in your project is to add the dependency as shown in step two. Step one is only necessary
if you want to use an older version that is not available in the central repository.
1. Add the repository location to your pom.xml
@ -131,17 +138,31 @@ if you want to use an older version that is not available in the central reposit
</pre></code>
3. Run mvn clean package to have maven download and install the required version into your local repository
Of course you can always clone the repository and build from source
Of course you can always clone the repository and build from source.
<h2>Wiki</h2>
There is ongoing afford to extend documentation and provide code samples and detailed explanations of how the message bus
works. Code samples can also be found in the various test cases. Please read about the terminology used in this project
to avoid confusion and misunderstanding.
<h2>Release Notes</h2>
<h3>1.1.2</h3>
+ Added support for FilteredMessage event
+ Renamed @Listener to @Handler and DeadEvent to DeadMessage to increase alignment with the established terminology.
Sorry for the inconvenience since this will lead to compile errors but good old find&replace will do
+ Repackaging and refactoring of some parts
+ Introduced message publication factories as configurable components to make MBassador more extensible/customizable
+ Added more documentation and unit tests
<h3>1.1.1</h3>
+ Added support for DeadEvent
+ Introduced new property to @Listener annotation that allows to activate/deactivate any message handler
+ Added support for DeadMessage event
+ Introduced new property to @Handler annotation that allows to activate/deactivate any message handler
+ Full support of proxies created by cglib
+ Message handler inheritance changed! See wiki page about handler definition for more details.
+ Changed @Listener property "dispatch" to "delivery" and renamed the associated enumeration values to
+ Changed @Handler property "dispatch" to "delivery" and renamed the associated enumeration values to
more precisely indicate their meaning
+ Added more unit tests
@ -163,7 +184,7 @@ First stable release!
<h3>1.0.5.RC</h3>
+ Added MessageEnvelope and @Enveloped annotation to configure handlers that might receive arbitrary message type
+ Added handler configuration property to @Listener annotation to move from message filtering to more specific implementation
+ Added handler configuration property to @Handler annotation to move from message filtering to more specific implementation
of this feature
<h3>1.0.4.RC</h3>
@ -177,9 +198,12 @@ successful commit etc.). Currently in beta, see <a href="https://github.com/benn
<h2>Credits</h2>
The initial inspiration for creating this component came from looking at Google Guava's event bus implementation. Since
it did not provide all the features we needed in our project, I decided to create my own implementation. It matured to be
quite a feature rich and yet very efficient and performant implementation.
The initial inspiration for creating this component came from trying out Google Guava's event bus implementation.
I liked the simplicity of its design and I do trust the developers at Google a lot, so I was happy to find that they also
provided an event bus system. The main reason it proved to be unusable for our scenario was that it uses strong references
to the listeners such that every object has to be explicitly deregistered. This was difficult in our Spring managed environment.
Finally, I decided to create a custom implementation, which then matured to be stable, extensibel and yet very efficient
and well performing.
I want to thank the development team from friendsurance (www.friendsurance.de) for their support and feedback on the bus
implementation and the management of friendsurance for allowing me to publish the component as an open source project.
@ -189,6 +213,9 @@ implementation and the management of friendsurance for allowing me to publish th
Any feature requests and feedback are more than welcome. You may suggest improvements either by submitting an
issue or by forking the repo and creating a pull request. I will try to respond as quickly as possible.
Sample code and documentation are both very appreciated contributions. Especially integration with different frameworks
such as Spring, Guice or other is of great value. Feel free and welcome to create Wiki pages to share your code and ideas.
<h2>License</h2>
This project is distributed under the terms of the MIT License. See file "LICENSE" for further reference.

View File

@ -1,7 +1,7 @@
usePlugin('java')
group="org.mbassy"
version="1.10.0-SNAPSHOT"
version="1.1.2"
dependencies {
addMavenRepo()

View File

@ -0,0 +1,28 @@
Note: Please refer to the terminology wiki page before reading the following explanations..
A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
synchronously or asynchronously and the dispatch mechanism can by controlled for each message handler and per message publication.
Each message publication is isolated from all other running publications such that it does not interfere with them.
Hence, the bus expects message handlers to be stateless as they may be invoked concurrently if multiple
messages of the same type get published asynchronously.
Messages are published to all listeners that accept the type or super type of the published message. Additionally
a message handler may define filters to narrow the set of messages that it accepts.
Subscribed listeners are available to all pending message publications that have not yet started processing.
Any message listener may only be subscribed once - subsequent subscriptions of an already subscribed message listener
will be silently ignored.
The basic contract of the bus is that it will deliver a specific message exactly once to each of the subscribed message handlers.
Currently, message handlers will be invoked in inverse sequence of subscription but any
client using this bus should not rely on this assumption.
The bus uses weak references to all listeners such that registered listeners do not need to
be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
removed on-the-fly as messages get published.
Unsubscribing a listener means removing all subscribed message handlers of that listener. This remove operation
immediately effects all running publications processes -> A removed listener will under no circumstances receive any message publications.
A listener is considered removed after the unsubscribe(Object) call returned.Any running message publication that has not yet delivered
the message to the recently removed listener will not see the listener after the remove operation completed.

View File

@ -52,7 +52,7 @@ receive all messages of type TestEvent or any subtype sequentially.
// every message of type TestEvent or any subtype will be delivered
// to this handler
@Listener
@Handler
public void handleTestEvent(TestEvent event) {
// do something
}
@ -62,7 +62,7 @@ receive all messages of type TestEvent or any subtype sequentially.
This handler will receive all messages of type SubTestEvent or any subtype concurrently
// this handler will be invoked concurrently
@Listener(delivery = Mode.Concurrent)
@Handler(delivery = Mode.Concurrent)
public void handleSubTestEvent(SubTestEvent event) {
// do something more expensive here
}
@ -73,7 +73,7 @@ defined one, since it specifies a higher priority
// this handler will receive messages of type SubTestEvent
// or any of its sub types that passe the given filter(s)
@Listener(priority = 10,
@Handler(priority = 10,
dispatch = Mode.Synchronous,
filters = {@Filter(Filters.SpecialEvent.class)})
public void handleFiltered(SubTestEvent event) {
@ -81,7 +81,7 @@ defined one, since it specifies a higher priority
}
@Listener(dispatch = Mode.Synchronous, rejectSubtypes = true)
@Handler(dispatch = Mode.Synchronous, rejectSubtypes = true)
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
public void handleVariousEvents(MessageEnvelope envelope) {
// the envelope will contain either an instance of TestEvent or TestEvent2
@ -95,10 +95,10 @@ defined one, since it specifies a higher priority
Since one parameter (the message) does not offer a great deal of flexibility if different types
of messages should be consumed, there exists the possibility to wrap a message inside an envelope.
An enveloped message handler specifies the message type it consumes by using the @Enveloped annotation
in addition to the @Listener annotation. All configurations of @Listener apply to each of the specified
in addition to the @Handler annotation. All configurations of @Handler apply to each of the specified
message types.
@Listener(dispatch = Mode.Synchronous, rejectSubtypes = true)
@Handler(dispatch = Mode.Synchronous, rejectSubtypes = true)
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
public void handleVariousEvents(MessageEnvelope envelope) {
// the envelope will contain either an instance of TestEvent or TestEvent2

View File

@ -0,0 +1,39 @@
<h2>Terminology</h2>
To avoid confusion and increase precision of the available documentation a common vocabulary of the most relevant concepts is necessary.
Specifically, the terms "event" and "message" have their own definition within the context of the message bus system and as such require
some clarification.
<h3>Message</h3>
A message is an object used for communication between multiple other objects.Other libraries established the term "event" which essentially
refers to the same idea (an event occurs at some point in the system and is published to other components such that they might react to it).
MBassador uses the term message instead of event since the object sent through it does not necessarily represent an event. It might merely represent
data to be processed, e.g. stored or transformed.
A message can be any object, no restrictions or assumptions are made. A message can be sent by any object that has access to the bus
and is delivered to all registered listeners that consume the type of message.
<h3>Message handler</h3>
A message handler is a method that defines exactly one parameter (the message) and is marked with @Handler. A handler has a message type
that is implicitly defined in the method signature (the parameter type). A message handler will be invoked for each message that has a compatible
type.
<h3>Message listener</h3>
An object that defines one or more message handlers and that has been subscribed at the message bus is referred to as (message) listener.
<h3>Subscription</h3>
Subscription is the process of adding a listener to the message bus, such that it might receive messages. It is used interchangeably with the
term "registration"
<h3>Message publication|Message dispatch</h3>
The process of delivering a message from the sender to all registered listeners is called message publication.
The initial phase of this process, that lasts until the message is actually delivered to the handlers is called message dispatch.
The distinction is necessarily drawn as all method publications share a common scheme but may vary in the way how the dispatching works.
<h3>Event</h3>
The term "event" refers to events that can occur during message publication. Currently there are two types of events:
+ DeadMessage: Whenever a message is published and no listeners exist that defined matching handlers, a DeadMessage event will be created and published
using the common message publication mechanism. Listeners with handlers for DeadEvent can be subscribed to listen for and react to dead
+ FilteredMessage: Since handlers can define filters to narrow the set of messages it consumes, it is possible that a message is not delivered
to any handler. In such a case the FilteredMessage event is published,

View File

@ -1,7 +1,9 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.dispatch.SubscriptionContext;
import net.engio.mbassy.subscription.SubscriptionContext;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.Subscription;
@ -50,12 +52,14 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
// it can be customized by implementing the getSubscriptionFactory() method
private final SubscriptionFactory subscriptionFactory;
private final MessagePublication.Factory publicationFactory;
public AbstractMessageBus(BusConfiguration configuration) {
this.executor = configuration.getExecutor();
subscriptionFactory = configuration.getSubscriptionFactory();
this.metadataReader = configuration.getMetadataReader();
this.publicationFactory = configuration.getMessagePublicationFactory();
pendingMessages = new LinkedBlockingQueue<MessagePublication>(configuration.getMaximumNumberOfPendingMessages());
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
@ -85,6 +89,10 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
}
protected MessagePublication.Factory getPublicationFactory(){
return publicationFactory;
}
@Override
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
return Collections.unmodifiableCollection(errorHandlers);

View File

@ -1,4 +1,4 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.SubscriptionFactory;
@ -6,7 +6,7 @@ import net.engio.mbassy.subscription.SubscriptionFactory;
import java.util.concurrent.*;
/**
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour. *
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
*
* @author bennidi
* Date: 12/8/12
@ -36,13 +36,23 @@ public class BusConfiguration {
private MetadataReader metadataReader;
private MessagePublication.Factory messagePublicationFactory;
public BusConfiguration() {
this.numberOfMessageDispatchers = 2;
this.maximumNumberOfPendingMessages = Integer.MAX_VALUE;
this.subscriptionFactory = new SubscriptionFactory();
this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);
this.metadataReader = new MetadataReader();
this.messagePublicationFactory = new MessagePublication.Factory();
}
public MessagePublication.Factory getMessagePublicationFactory() {
return messagePublicationFactory;
}
public void setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) {
this.messagePublicationFactory = messagePublicationFactory;
}
public MetadataReader getMetadataReader() {

View File

@ -1,4 +1,6 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import java.util.Collection;
import java.util.concurrent.Executor;
@ -17,7 +19,7 @@ import java.util.concurrent.TimeUnit;
* <p/>
* A listener is any object that defines at least one message handler and that has been subscribed to at least
* one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
* as a message handler using the @Listener annotation.
* as a message handler using the @Handler annotation.
* <p/>
* The bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
@ -145,7 +147,7 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
*
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call will block until the message can be placed in the queue
* or the timeout r
* or the timeout is reached.
*
* @return A message publication that wraps up the publication request
*/

View File

@ -1,6 +1,7 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
@ -24,12 +25,12 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
private MessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if (subscriptions == null || subscriptions.isEmpty()) {
if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadEvent.class);
return MessagePublication.Create(this, subscriptions, new DeadEvent(message));
subscriptions = getSubscriptionsByMessageType(DeadMessage.class);
return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message));
}
else return MessagePublication.Create(this, subscriptions, message);
else return getPublicationFactory().createPublication(this, subscriptions, message);
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.common.FilteredEvent;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.common.FilteredMessage;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
@ -19,8 +19,12 @@ import java.util.Collection;
*/
public class MessagePublication {
public static MessagePublication Create(IMessageBus bus, Collection<Subscription> subscriptions, Object message){
return new MessagePublication(bus,subscriptions, message, State.Initial);
public static class Factory{
public MessagePublication createPublication(IMessageBus owningBus, Collection<Subscription> subscriptions, Object message){
return new MessagePublication(owningBus, subscriptions, message, State.Initial);
}
}
private Collection<Subscription> subscriptions;
@ -33,7 +37,7 @@ public class MessagePublication {
private IMessageBus bus;
private MessagePublication(IMessageBus bus, Collection<Subscription> subscriptions, Object message, State initialState) {
public MessagePublication(IMessageBus bus, Collection<Subscription> subscriptions, Object message, State initialState) {
this.bus = bus;
this.subscriptions = subscriptions;
this.message = message;
@ -50,8 +54,14 @@ public class MessagePublication {
sub.publish(this, message);
}
state = State.Finished;
if(!delivered && !isFilteredEvent() && !isDeadEvent()){
bus.post(new FilteredEvent(message)).now();
// if the message has not been marked delivered by the dispatcher
if(!delivered){
if(!isFilteredEvent() && !isDeadEvent()){
bus.post(new FilteredMessage(message)).now();
}else if(!isDeadEvent()){
bus.post(new DeadMessage(message)).now();
}
}
}
@ -84,11 +94,11 @@ public class MessagePublication {
}
public boolean isDeadEvent(){
return DeadEvent.class.isAssignableFrom(message.getClass());
return DeadMessage.class.isAssignableFrom(message.getClass());
}
public boolean isFilteredEvent(){
return FilteredEvent.class.isAssignableFrom(message.getClass());
return FilteredMessage.class.isAssignableFrom(message.getClass());
}
private enum State{

View File

@ -1,4 +1,4 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
import java.util.concurrent.TimeUnit;

View File

@ -7,9 +7,9 @@ package net.engio.mbassy.common;
* @author bennidi
* Date: 1/18/13
*/
public class DeadEvent extends PublicationEvent {
public final class DeadMessage extends PublicationEvent {
public DeadEvent(Object message) {
public DeadMessage(Object message) {
super(message);
}

View File

@ -8,10 +8,10 @@ package net.engio.mbassy.common;
* @author bennidi
* Date: 3/1/13
*/
public class FilteredEvent extends PublicationEvent {
public class FilteredMessage extends PublicationEvent {
public FilteredEvent(Object event) {
public FilteredMessage(Object event) {
super(event);
}
}

View File

@ -15,7 +15,7 @@ public abstract class PublicationEvent {
this.event = message;
}
public Object getEvent() {
public Object getMessage() {
return event;
}
}

View File

@ -1,6 +1,5 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.subscription.MessageEnvelope;

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.listener.IMessageFilter;

View File

@ -1,7 +1,5 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
/**
* A handler invocation encapsulates the logic that is used to invoke a single
* message handler to process a given message.

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.bus.IMessageBus;
/**
* This interface marks components that have access to the message bus that they belong to.

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
/**

View File

@ -1,5 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.subscription.SubscriptionContext;
/**
* This interface marks components that have access to the subscription context.
*

View File

@ -1,11 +1,9 @@
package net.engio.mbassy.dispatch;
import java.lang.reflect.Method;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
/**
* Standard implementation for direct, unfiltered message delivery.

View File

@ -1,9 +1,9 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

View File

@ -4,7 +4,7 @@ import java.lang.annotation.*;
/**
* Configure a handler to receive an enveloped message as a wrapper around the source
* message. An enveloped message can be
* message. An enveloped message can contain any type of message
*
* @author bennidi
* Date: 2/8/12

View File

@ -6,10 +6,10 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* THe filter annotation is used to add filters to message listeners.
* It references a class that implements the MessageFilter interface.
* The object filter will be used to check whether a message should be delivered
* to the message listener or not.
* The filter annotation is used to add filters to message listeners.
* It references a class that implements the IMessageFilter interface.
* The filter will be used to check whether a message should be delivered
* to the listener or not.
*
* <p/>
* @author bennidi
@ -19,5 +19,10 @@ import java.lang.annotation.Target;
@Target(value = {ElementType.ANNOTATION_TYPE})
public @interface Filter {
/**
* The class that implements the filter.
* Note: A filter always needs to provide a non-arg constructor
* @return
*/
Class<? extends IMessageFilter> value();
}

View File

@ -1,7 +1,8 @@
package net.engio.mbassy.listener;
/**
* Todo: Add javadoc
* Some sample filters that are not particularly useful in production environment
* but illustrate how filters are meant to be used.
*
* @author bennidi
* Date: 12/12/12

View File

@ -0,0 +1,51 @@
package net.engio.mbassy.listener;
import java.lang.annotation.*;
/**
* Mark any method of any object(=listener) as a message handler and configure the handler
* using different properties.
*
* @author bennidi
* Date: 2/8/12
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD})
public @interface Handler {
/**
* Add any numbers of filters to the handler. All filters are evaluated before the handler
* is actually invoked, which is only if all the filters accept the message.
*/
Filter[] filters() default {};
/**
* Define the mode in which a message is delivered to each listener. Listeners can be notified
* sequentially or concurrently.
*
*/
Mode delivery() default Mode.Sequential;
/**
* Handlers are ordered by priority and handlers with higher priority are processed before
* those with lower priority, i.e. Influence the order in which different handlers that consume
* the same message type are invoked.
*/
int priority() default 0;
/**
* Define whether or not the handler accepts sub types of the message type it declares in its
* signature.
*/
boolean rejectSubtypes() default false;
/**
* Enable or disable the handler. Disabled handlers do not receive any messages.
* This property is useful for quick changes in configuration and necessary to disable
* handlers that have been declared by a superclass but do not apply to the subclass
*/
boolean enabled() default true;
}

View File

@ -1,28 +0,0 @@
package net.engio.mbassy.listener;
import java.lang.annotation.*;
/**
* Mark any method of any object as a message handler and configure the handler
* using different properties.
*
* @author bennidi
* Date: 2/8/12
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD})
public @interface Listener {
Filter[] filters() default {}; // no filters by default
Mode delivery() default Mode.Sequential;
int priority() default 0;
boolean rejectSubtypes() default false;
boolean enabled() default true;
}

View File

@ -16,7 +16,7 @@ public class MessageHandlerMetadata {
private IMessageFilter[] filter;
private Listener listenerConfig;
private Handler handlerConfig;
private boolean isAsynchronous = false;
@ -27,13 +27,13 @@ public class MessageHandlerMetadata {
private boolean acceptsSubtypes = true;
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Listener listenerConfig) {
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Handler handlerConfig) {
this.handler = handler;
this.filter = filter;
this.listenerConfig = listenerConfig;
this.isAsynchronous = listenerConfig.delivery().equals(Mode.Concurrent);
this.handlerConfig = handlerConfig;
this.isAsynchronous = handlerConfig.delivery().equals(Mode.Concurrent);
this.envelope = handler.getAnnotation(Enveloped.class);
this.acceptsSubtypes = !listenerConfig.rejectSubtypes();
this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
if(this.envelope != null){
for(Class messageType : envelope.messages())
handledMessages.add(messageType);
@ -54,7 +54,7 @@ public class MessageHandlerMetadata {
}
public int getPriority(){
return listenerConfig.priority();
return handlerConfig.priority();
}
public Method getHandler() {
@ -87,6 +87,6 @@ public class MessageHandlerMetadata {
public boolean isEnabled() {
return listenerConfig.enabled();
return handlerConfig.enabled();
}
}

View File

@ -16,11 +16,11 @@ import java.util.*;
*/
public class MetadataReader {
// This predicate is used to find all message listeners (methods annotated with @Listener)
// This predicate is used to find all message listeners (methods annotated with @Handler)
private static final IPredicate<Method> AllMessageHandlers = new IPredicate<Method>() {
@Override
public boolean apply(Method target) {
return target.getAnnotation(Listener.class) != null;
return target.getAnnotation(Handler.class) != null;
}
};
@ -28,7 +28,7 @@ public class MetadataReader {
private final Map<Class<? extends IMessageFilter>, IMessageFilter> filterCache = new HashMap<Class<? extends IMessageFilter>, IMessageFilter>();
// retrieve all instances of filters associated with the given subscription
private IMessageFilter[] getFilter(Listener subscription){
private IMessageFilter[] getFilter(Handler subscription){
if (subscription.filters().length == 0) return null;
IMessageFilter[] filters = new IMessageFilter[subscription.filters().length];
int i = 0;
@ -52,14 +52,14 @@ public class MetadataReader {
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler){
Listener config = messageHandler.getAnnotation(Listener.class);
Handler config = messageHandler.getAnnotation(Handler.class);
return new MessageHandlerMetadata(messageHandler, getFilter(config), config);
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public List<MessageHandlerMetadata> getMessageHandlers(Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Listener)
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
List<Method> bottomMostHandlers = new LinkedList<Method>();
@ -71,15 +71,15 @@ public class MetadataReader {
List<MessageHandlerMetadata> filteredHandlers = new LinkedList<MessageHandlerMetadata>();
// for each handler there will be no overriding method that specifies @Listener annotation
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
for(Method handler : bottomMostHandlers){
Listener listener = handler.getAnnotation(Listener.class);
if(!listener.enabled() || !isValidMessageHandler(handler)) continue; // disabled or invalid listeners are ignored
Handler handle = handler.getAnnotation(Handler.class);
if(!handle.enabled() || !isValidMessageHandler(handler)) continue; // disabled or invalid listeners are ignored
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
// if a handler is overwritten it inherits the configuration of its parent method
MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler,
getFilter(listener), listener);
getFilter(handle), handle);
filteredHandlers.add(handlerMetadata);
}
@ -94,7 +94,7 @@ public class MetadataReader {
private boolean isValidMessageHandler(Method handler) {
if(handler == null || handler.getAnnotation(Listener.class) == null){
if(handler == null || handler.getAnnotation(Handler.class) == null){
return false;
}
if (handler.getParameterTypes().length != 1) {

View File

@ -1,8 +1,7 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.dispatch.ISubscriptionContextAware;
import net.engio.mbassy.dispatch.SubscriptionContext;
/**
* The base implementation for subscription context aware objects (mightily obvious :)

View File

@ -3,10 +3,9 @@ package net.engio.mbassy.subscription;
import java.util.Comparator;
import java.util.UUID;
import net.engio.mbassy.MessagePublication;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import net.engio.mbassy.dispatch.SubscriptionContext;
/**
* A subscription is a thread safe container for objects that contain message handlers

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
package net.engio.mbassy.subscription;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.listener.MessageHandlerMetadata;
/**

View File

@ -1,7 +1,6 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.dispatch.*;
import net.engio.mbassy.dispatch.SubscriptionContext;
/**
* Created with IntelliJ IDEA.

View File

@ -1,5 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.ListenerSubscriptionTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

View File

@ -1,9 +1,11 @@
package net.engio.mbassy;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import org.junit.Test;
/**
@ -12,12 +14,12 @@ import org.junit.Test;
* @author bennidi
* Date: 1/18/13
*/
public class DeadEventTest extends UnitTest{
public class DeadEventTest extends MessageBusTest{
@Test
public void testDeadEvent(){
MBassador bus = new MBassador(BusConfiguration.Default());
MBassador bus = getBus(BusConfiguration.Default());
DeadEventHandler deadEventHandler = new DeadEventHandler();
bus.subscribe(deadEventHandler);
assertEquals(0, deadEventHandler.getDeadEventCount());
@ -33,9 +35,9 @@ public class DeadEventTest extends UnitTest{
private ConcurrentSet deadEvents = new ConcurrentSet();
@Listener
public void handle(DeadEvent event){
deadEvents.add(event);
@Handler
public void handle(DeadMessage message){
deadEvents.add(message);
}

View File

@ -1,17 +1,19 @@
package net.engio.mbassy;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.common.FilteredMessage;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.listener.*;
import org.junit.Test;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.events.SubTestEvent;
import net.engio.mbassy.events.TestEvent;
import net.engio.mbassy.listener.Filter;
import net.engio.mbassy.listener.Filters;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.listeners.ListenerFactory;
import net.engio.mbassy.listeners.NonListeningBean;
/**
* Testing of filter functionality
@ -19,42 +21,94 @@ import net.engio.mbassy.listeners.NonListeningBean;
* @author bennidi
* Date: 11/26/12
*/
public class FilterTest extends UnitTest {
public class FilterTest extends MessageBusTest {
private static final AtomicInteger FilteredEventCounter = new AtomicInteger(0);
private static final AtomicInteger DeadEventCounter = new AtomicInteger(0);
@Test
public void testSubclassFilter() throws Exception {
MBassador bus = new MBassador(new BusConfiguration());
MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class)
.create(100, Object.class)
.create(100, NonListeningBean.class);
.create(100, FilteredMessageListener.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
TestEvent subTestEvent = new SubTestEvent();
TestMessage message = new TestMessage();
TestMessage subTestMessage = new SubTestMessage();
bus.post(event).now();
bus.post(subTestEvent).now();
assertEquals(100, event.counter.get());
assertEquals(0, subTestEvent.counter.get());
bus.post(message).now();
bus.post(subTestMessage).now();
assertEquals(100, message.counter.get());
assertEquals(0, subTestMessage.counter.get());
assertEquals(100, FilteredEventCounter.get());
}
@Test
public void testFilteredFilteredEvent() throws Exception {
FilteredEventCounter.set(0);
DeadEventCounter.set(0);
MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class);
List<Object> listeners = listenerFactory.build();
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
bus.post(new Object()).now();
bus.post(new SubTestMessage()).now();
assertEquals(100, FilteredEventCounter.get()); // the SubTestMessage should have been republished as a filtered event
assertEquals(100, DeadEventCounter.get()); // Object.class was filtered and the fil
}
public static class FilteredMessageListener{
@Listener(filters = {@Filter(Filters.RejectSubtypes.class)})
public void handleTestEvent(TestEvent event){
event.counter.incrementAndGet();
// NOTE: Use rejectSubtypes property of @Handler to achieve the same functionality but with better performance
// and more concise syntax
@Handler(filters = {@Filter(Filters.RejectSubtypes.class)})
public void handleTestMessage(TestMessage message){
message.counter.incrementAndGet();
}
// FilteredEvents that contain messages of class Object will be filtered (again) and should cause a DeadEvent to be thrown
@Handler(filters = {@Filter(RejectFilteredObjects.class)})
public void handleFilteredEvent(FilteredMessage filtered){
FilteredEventCounter.incrementAndGet();
}
// will cause republication of a FilteredEvent
@Handler(filters = {@Filter(Filters.RejectAll.class)})
public void handleNone(Object any){
FilteredEventCounter.incrementAndGet();
}
// will cause republication of a FilteredEvent
@Handler
public void handleDead(DeadMessage dead){
DeadEventCounter.incrementAndGet();
}
}
public static class RejectFilteredObjects implements IMessageFilter{
@Override
public boolean accepts(Object message, MessageHandlerMetadata metadata) {
if(message.getClass().equals(FilteredMessage.class) && ((FilteredMessage)message).getMessage().getClass().equals(Object.class)){
return false;
}
return true;
}
}
}

View File

@ -3,13 +3,15 @@ package net.engio.mbassy;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.events.SubTestMessage;
import org.junit.Test;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.events.SubTestEvent;
import net.engio.mbassy.events.TestEvent;
import net.engio.mbassy.events.TestEvent2;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.events.TestMessage2;
import net.engio.mbassy.listeners.EventingTestBean;
import net.engio.mbassy.listeners.EventingTestBean2;
import net.engio.mbassy.listeners.EventingTestBean3;
@ -23,7 +25,7 @@ import net.engio.mbassy.listeners.NonListeningBean;
* @author bennidi
* Date: 2/8/12
*/
public class MessagePublicationTest extends UnitTest {
public class MessagePublicationTest extends MessageBusTest {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when
@ -34,7 +36,7 @@ public class MessagePublicationTest extends UnitTest {
@Test
public void testAsynchronousMessagePublication() throws Exception {
MBassador bus = new MBassador(new BusConfiguration());
MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, EventingTestBean.class)
.create(10000, EventingTestBean2.class)
@ -48,26 +50,26 @@ public class MessagePublicationTest extends UnitTest {
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
TestEvent2 event2 = new TestEvent2();
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
TestMessage2 message2 = new TestMessage2();
bus.publishAsync(event);
bus.publishAsync(subEvent);
bus.publishAsync(event2);
bus.publishAsync(message);
bus.publishAsync(subMessage);
bus.publishAsync(message2);
pause(processingTimeInMS);
assertEquals(50000, event.counter.get());
assertEquals(80000, subEvent.counter.get());
assertEquals(20000, event2.counter.get());
assertEquals(50000, message.counter.get());
assertEquals(80000, subMessage.counter.get());
assertEquals(20000, message2.counter.get());
}
@Test
public void testSynchronousMessagePublication() throws Exception {
MBassador bus = new MBassador(new BusConfiguration());
MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, EventingTestBean.class)
.create(10000, EventingTestBean2.class)
@ -80,27 +82,27 @@ public class MessagePublicationTest extends UnitTest {
// this will subscribe the listeners concurrently to the bus
TestUtil.setup(bus, listeners, 10);
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
bus.publish(event);
bus.publish(subEvent);
bus.publish(message);
bus.publish(subMessage);
pause(processingTimeInMS);
assertEquals(30000, event.counter.get());
assertEquals(70000, subEvent.counter.get());
assertEquals(30000, message.counter.get());
assertEquals(70000, subMessage.counter.get());
}
@Test
public void testConcurrentMixedMessagePublication() throws Exception {
final CopyOnWriteArrayList<TestEvent> testEvents = new CopyOnWriteArrayList<TestEvent>();
final CopyOnWriteArrayList<SubTestEvent> subtestEvents = new CopyOnWriteArrayList<SubTestEvent>();
final CopyOnWriteArrayList<TestMessage> testMessages = new CopyOnWriteArrayList<TestMessage>();
final CopyOnWriteArrayList<SubTestMessage> subtestMessages = new CopyOnWriteArrayList<SubTestMessage>();
final int eventLoopsPerTHread = 100;
final MBassador bus = new MBassador(new BusConfiguration());
final MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, EventingTestBean.class)
.create(10000, EventingTestBean2.class)
@ -117,25 +119,25 @@ public class MessagePublicationTest extends UnitTest {
@Override
public void run() {
for (int i = 0; i < eventLoopsPerTHread; i++) {
TestEvent event = new TestEvent();
SubTestEvent subEvent = new SubTestEvent();
testEvents.add(event);
subtestEvents.add(subEvent);
TestMessage message = new TestMessage();
SubTestMessage subMessage = new SubTestMessage();
testMessages.add(message);
subtestMessages.add(subMessage);
bus.publishAsync(event);
bus.publish(subEvent);
bus.publishAsync(message);
bus.publish(subMessage);
}
}
}, 10);
pause(processingTimeInMS);
for (TestEvent event : testEvents) {
assertEquals(30000, event.counter.get());
for (TestMessage message : testMessages) {
assertEquals(30000, message.counter.get());
}
for (SubTestEvent event : subtestEvents) {
assertEquals(70000, event.counter.get());
for (SubTestMessage message : subtestMessages) {
assertEquals(70000, message.counter.get());
}
}

View File

@ -3,7 +3,7 @@ package net.engio.mbassy;
import org.junit.Test;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Enveloped;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.MessageListenerMetadata;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.MessageEnvelope;
@ -112,18 +112,18 @@ public class MetadataReaderTest extends UnitTest {
// a simple event listener
public class EventListener1 {
@Listener(rejectSubtypes = true)
@Handler(rejectSubtypes = true)
public void handleObject(Object o) {
}
@Listener
@Handler
public void handleAny(Object o) {
}
@Listener
@Handler
public void handleString(String s) {
}
@ -143,12 +143,12 @@ public class MetadataReaderTest extends UnitTest {
public class EventListener3 extends EventListener2 {
// narrow the handler
@Listener(rejectSubtypes = true)
@Handler(rejectSubtypes = true)
public void handleAny(Object o) {
}
@Listener(enabled = false)
@Handler(enabled = false)
public void handleString(String s) {
}
@ -158,13 +158,13 @@ public class MetadataReaderTest extends UnitTest {
public class EnvelopedListener{
@Listener(rejectSubtypes = true)
@Handler(rejectSubtypes = true)
@Enveloped(messages = {String.class, Integer.class, Long.class})
public void handleEnveloped(MessageEnvelope o) {
}
@Listener
@Handler
@Enveloped(messages = {Number.class})
public void handleEnveloped2(MessageEnvelope o) {
@ -175,7 +175,7 @@ public class MetadataReaderTest extends UnitTest {
public class EnvelopedListenerSubclass extends EnvelopedListener{
// narrow to integer
@Listener
@Handler
@Enveloped(messages = {Integer.class})
public void handleEnveloped2(MessageEnvelope o) {

View File

@ -1,7 +1,9 @@
package net.engio.mbassy;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.listener.Handler;
import org.junit.Test;
/**
@ -10,7 +12,7 @@ import org.junit.Test;
* @author bennidi
* Date: 1/17/13
*/
public class MethodDispatchTest extends UnitTest{
public class MethodDispatchTest extends MessageBusTest{
private boolean listener1Called = false;
private boolean listener2Called = false;
@ -20,7 +22,7 @@ public class MethodDispatchTest extends UnitTest{
// a simple event listener
public class EventListener1 {
@Listener
@Handler
public void handleString(String s) {
listener1Called = true;
}
@ -39,7 +41,7 @@ public class MethodDispatchTest extends UnitTest{
@Test
public void testDispatch1(){
MBassador bus = new MBassador(BusConfiguration.Default());
IMessageBus bus = getBus(BusConfiguration.Default());
EventListener2 listener2 = new EventListener2();
bus.subscribe(listener2);
bus.post("jfndf").now();

View File

@ -1,10 +1,12 @@
package net.engio.mbassy;
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.events.SubTestMessage;
import org.junit.Test;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.events.SubTestEvent;
import net.engio.mbassy.events.TestEvent;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.listeners.*;
import net.engio.mbassy.subscription.Subscription;
@ -18,13 +20,13 @@ import java.util.List;
* @author bennidi
* Date: 1/9/13
*/
public class ListenerSubscriptionTest extends UnitTest{
public class ListenerSubscriptionTest extends MessageBusTest{
// this is a single threaded test for subscribing and unsubscribing of a single listener
@Test
public void testSubscribeSimple() throws InterruptedException {
MBassador bus = new MBassador(new BusConfiguration());
MBassador bus = getBus(new BusConfiguration());
List<Object> listeners = new LinkedList<Object>();
int listenerCount = 200000;
@ -43,11 +45,11 @@ public class ListenerSubscriptionTest extends UnitTest{
}
// check the generated subscriptions for existence of all previously subscribed valid listeners
Collection<Subscription> testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class);
Collection<Subscription> testEventsubscriptions = bus.getSubscriptionsByMessageType(TestMessage.class);
assertEquals(1, testEventsubscriptions.size());
assertEquals(listenerCount, getNumberOfSubscribedListeners(testEventsubscriptions));
Collection<Subscription> subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class);
Collection<Subscription> subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestMessage.class);
assertEquals(3, subTestEventsubscriptions.size());
assertEquals(3 * listenerCount, getNumberOfSubscribedListeners(subTestEventsubscriptions));
@ -57,11 +59,11 @@ public class ListenerSubscriptionTest extends UnitTest{
}
// no listener should be left
testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class);
testEventsubscriptions = bus.getSubscriptionsByMessageType(TestMessage.class);
assertEquals(1, testEventsubscriptions.size());
assertEquals(0, getNumberOfSubscribedListeners(testEventsubscriptions));
subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class);
subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestMessage.class);
assertEquals(3, subTestEventsubscriptions.size());
assertEquals(0, getNumberOfSubscribedListeners(subTestEventsubscriptions));
@ -78,7 +80,7 @@ public class ListenerSubscriptionTest extends UnitTest{
@Test
public void testConcurrentSubscription() throws Exception {
MBassador bus = new MBassador(new BusConfiguration());
MBassador bus = getBus(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(10000, EventingTestBean.class)
.create(10000, EventingTestBean2.class)
@ -92,11 +94,11 @@ public class ListenerSubscriptionTest extends UnitTest{
TestUtil.setup(bus, listeners, 10);
// check the generated subscriptions for existence of all previously subscribed valid listeners
Collection<Subscription> testEventsubscriptions = bus.getSubscriptionsByMessageType(TestEvent.class);
Collection<Subscription> testEventsubscriptions = bus.getSubscriptionsByMessageType(TestMessage.class);
assertEquals(3, testEventsubscriptions.size());
assertEquals(30000, getNumberOfSubscribedListeners(testEventsubscriptions));
Collection<Subscription> subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestEvent.class);
Collection<Subscription> subTestEventsubscriptions = bus.getSubscriptionsByMessageType(SubTestMessage.class);
assertEquals(10, subTestEventsubscriptions.size());
assertEquals(100000, getNumberOfSubscribedListeners(subTestEventsubscriptions));

View File

@ -0,0 +1,31 @@
package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.*;
import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
/**
* A base test that provides a factory for message bus that makes tests fail if any
* publication error occurs
*
* @author bennidi
* Date: 3/2/13
*/
public class MessageBusTest extends UnitTest{
private static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
@Override
public void handleError(PublicationError error) {
Assert.fail();
}
};
public MBassador getBus(BusConfiguration configuration){
MBassador bus = new MBassador(configuration);
bus.addErrorHandler(TestFailingHandler);
return bus;
}
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.common;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.bus.IMessageBus;
import java.util.List;

View File

@ -5,6 +5,6 @@ package net.engio.mbassy.events;
* @author bennidi
* Date: 11/22/12
*/
public class SubTestEvent extends TestEvent {
public class SubTestMessage extends TestMessage {
}

View File

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author bennidi
* Date: 11/22/12
*/
public class TestEvent2 {
public class TestMessage {
public AtomicInteger counter = new AtomicInteger();

View File

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author bennidi
* Date: 11/22/12
*/
public class TestEvent {
public class TestMessage2 {
public AtomicInteger counter = new AtomicInteger();

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.SubTestEvent;
import net.engio.mbassy.events.TestEvent;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.listener.*;
/**
@ -14,25 +14,25 @@ public class EventingTestBean {
// every event of type TestEvent or any subtype will be delivered
// to this listener
@Listener
public void handleTestEvent(TestEvent event) {
event.counter.incrementAndGet();
@Handler
public void handleTestEvent(TestMessage message) {
message.counter.incrementAndGet();
}
// this handler will be invoked asynchronously
@Listener(priority = 0, delivery = Mode.Concurrent)
public void handleSubTestEvent(SubTestEvent event) {
event.counter.incrementAndGet();
@Handler(priority = 0, delivery = Mode.Concurrent)
public void handleSubTestEvent(SubTestMessage message) {
message.counter.incrementAndGet();
}
// this handler will receive events of type SubTestEvent
// or any subtabe and that passes the given filter
@Listener(
@Handler(
priority = 10,
delivery = Mode.Sequential,
filters = {@Filter(Filters.RejectAll.class), @Filter(Filters.AllowAll.class)})
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet();
public void handleFiltered(SubTestMessage message) {
message.counter.incrementAndGet();
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.SubTestEvent;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Mode;
/**
@ -11,8 +11,8 @@ import net.engio.mbassy.listener.Mode;
public class EventingTestBean2 extends EventingTestBean{
// redefine the configuration for this handler
@Listener(delivery = Mode.Sequential)
public void handleSubTestEvent(SubTestEvent event) {
super.handleSubTestEvent(event);
@Handler(delivery = Mode.Sequential)
public void handleSubTestEvent(SubTestMessage message) {
super.handleSubTestEvent(message);
}
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.SubTestEvent;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Mode;
/**
@ -12,9 +12,9 @@ public class EventingTestBean3 extends EventingTestBean2{
// this handler will be invoked asynchronously
@Listener(priority = 0, delivery = Mode.Sequential)
public void handleSubTestEventAgain(SubTestEvent event) {
event.counter.incrementAndGet();
@Handler(priority = 0, delivery = Mode.Sequential)
public void handleSubTestEventAgain(SubTestMessage message) {
message.counter.incrementAndGet();
}
}

View File

@ -1,11 +1,11 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.TestEvent;
import net.engio.mbassy.events.TestEvent2;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.events.TestMessage2;
import net.engio.mbassy.listener.Enveloped;
import net.engio.mbassy.listener.Filter;
import net.engio.mbassy.listener.Filters;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Mode;
import net.engio.mbassy.subscription.MessageEnvelope;
@ -18,29 +18,29 @@ import net.engio.mbassy.subscription.MessageEnvelope;
public class MultiEventHandler {
@Listener(delivery = Mode.Sequential)
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
@Handler(delivery = Mode.Sequential)
@Enveloped(messages = {TestMessage.class, TestMessage2.class})
public void handleEvents(MessageEnvelope envelope) {
if(TestEvent.class.isAssignableFrom(envelope.getMessage().getClass())){
TestEvent event = envelope.getMessage();
event.counter.incrementAndGet();
if(TestMessage.class.isAssignableFrom(envelope.getMessage().getClass())){
TestMessage message = envelope.getMessage();
message.counter.incrementAndGet();
}
if(envelope.getMessage().getClass().equals(TestEvent2.class)){
TestEvent2 event = envelope.getMessage();
event.counter.incrementAndGet();
if(envelope.getMessage().getClass().equals(TestMessage2.class)){
TestMessage2 message = envelope.getMessage();
message.counter.incrementAndGet();
}
}
@Listener(delivery = Mode.Sequential, filters = @Filter(Filters.RejectSubtypes.class))
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
@Handler(delivery = Mode.Sequential, filters = @Filter(Filters.RejectSubtypes.class))
@Enveloped(messages = {TestMessage.class, TestMessage2.class})
public void handleSuperTypeEvents(MessageEnvelope envelope) {
if(TestEvent.class.isAssignableFrom(envelope.getMessage().getClass())){
TestEvent event = envelope.getMessage();
event.counter.incrementAndGet();
if(TestMessage.class.isAssignableFrom(envelope.getMessage().getClass())){
TestMessage message = envelope.getMessage();
message.counter.incrementAndGet();
}
if(envelope.getMessage().getClass().equals(TestEvent2.class)){
TestEvent2 event = envelope.getMessage();
event.counter.incrementAndGet();
if(envelope.getMessage().getClass().equals(TestMessage2.class)){
TestMessage2 message = envelope.getMessage();
message.counter.incrementAndGet();
}
}

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.SubTestEvent;
import net.engio.mbassy.events.TestEvent;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.listener.Handler;
/**
* This bean overrides all the handlers defined in its superclass. Since it does not specify any annotations
@ -15,20 +15,20 @@ public class NonListeningBean extends EventingTestBean{
@Override
@Listener(enabled = false)
public void handleTestEvent(TestEvent event) {
event.counter.incrementAndGet(); // should never be called
@Handler(enabled = false)
public void handleTestEvent(TestMessage message) {
message.counter.incrementAndGet(); // should never be called
}
@Override
@Listener(enabled = false)
public void handleSubTestEvent(SubTestEvent event) {
event.counter.incrementAndGet(); // should never be called
@Handler(enabled = false)
public void handleSubTestEvent(SubTestMessage message) {
message.counter.incrementAndGet(); // should never be called
}
@Override
@Listener(enabled = false)
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet(); // should never be called
@Handler(enabled = false)
public void handleFiltered(SubTestMessage message) {
message.counter.incrementAndGet(); // should never be called
}
}