parent
bbc2004232
commit
39ab0d61ef
40
README.md
40
README.md
@ -2,20 +2,20 @@ Mbassador
|
|||||||
=========
|
=========
|
||||||
|
|
||||||
Mbassador is a very light-weight message (event) bus implementation following the publish subscribe pattern. It is designed
|
Mbassador is a very light-weight message (event) bus implementation following the publish subscribe pattern. It is designed
|
||||||
for ease of use and aims to be feature rich, extensible while preserving resource efficiency and performance.
|
for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance. It uses a specialized
|
||||||
|
data structure to allow high throughput for concurrent access.
|
||||||
It uses a specialized data structure to allow high throughput for concurrent access.
|
|
||||||
|
|
||||||
Read this documentation to get an overview of its features and how cool this message (event) bus actually is.
|
Read this documentation to get an overview of its features and how cool this message (event) bus actually is.
|
||||||
You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
|
You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
|
||||||
which also contains a partial list of the features of the compared implementations.
|
which also contains a partial list of the features of the compared implementations.
|
||||||
|
|
||||||
The current version is 1.0.5.RC
|
The current version is 1.0.6.RC, see the release notes for more details.
|
||||||
|
|
||||||
Table of contents:
|
Table of contents:
|
||||||
+ [Features](#features)
|
+ [Features](#features)
|
||||||
+ [Usage](#usage)
|
+ [Usage](#usage)
|
||||||
+ [Installation](#installation)
|
+ [Installation](#installation)
|
||||||
|
+ [Release Notes](#releasenotes)
|
||||||
+ [Roadmap](#roadmap)
|
+ [Roadmap](#roadmap)
|
||||||
+ [Credits](#credits)
|
+ [Credits](#credits)
|
||||||
+ [Contribute](#contribute)
|
+ [Contribute](#contribute)
|
||||||
@ -124,26 +124,46 @@ will be done as soon as enough people use this component. Until then, the follow
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mbassy</groupId>
|
<groupId>org.mbassy</groupId>
|
||||||
<artifactId>mbassador</artifactId>
|
<artifactId>mbassador</artifactId>
|
||||||
<version>1.0.0.RC</version>
|
<version>1.0.6.RC</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</pre></code>
|
</pre></code>
|
||||||
3. Run mvn clean package to have maven download and install the required version into your local repository
|
3. Run mvn clean package to have maven download and install the required version into your local repository
|
||||||
|
|
||||||
Of course you can always clone the repository and build from source
|
Of course you can always clone the repository and build from source
|
||||||
|
|
||||||
|
<h2>Release Notes</h2>
|
||||||
|
|
||||||
|
<h3>1.0.6.RC</h3>
|
||||||
|
|
||||||
|
+ Fixed behaviour with capacity bound blocking queue such that there now are two methods to schedule a message
|
||||||
|
asynchronously. One will block until capacity becomes available, the other will timeout after a specified amount of
|
||||||
|
time.
|
||||||
|
+ Added unit tests
|
||||||
|
|
||||||
|
<h3>1.0.5.RC</h3>
|
||||||
|
|
||||||
|
+ Added MessageEnvelope and @Enveloped annotation to configure handlers that might receive arbitrary message type
|
||||||
|
+ Added handler configuration property to @Listener annotation to move from message filtering to more specific implementation
|
||||||
|
of this feature
|
||||||
|
|
||||||
|
<h3>1.0.4.RC</h3>
|
||||||
|
|
||||||
|
+ Introduced BusConfiguration as a central class to encapsulate configurational aspects
|
||||||
|
|
||||||
|
|
||||||
<h2>Roadmap</h2>
|
<h2>Roadmap</h2>
|
||||||
+ Checkout MBassador from one of the official maven repositories (as soon as the user base is big enough)
|
+ Checkout MBassador from one of the official maven repositories (as soon as the user base is big enough)
|
||||||
+ Spring integration with support for conditional message dispatch in transactional context (dispatch only after
|
+ Spring integration with support for conditional message dispatch in transactional context (dispatch only after
|
||||||
successful commit etc.). Currently in beta, see <a href="https://github.com/bennidi/mbassador-spring">this</a> repository
|
successful commit etc.). Currently in beta, see <a href="https://github.com/bennidi/mbassador-spring">this</a> repository
|
||||||
+ MessageEnvelope for each dispatch that is passed to the handler and can be used for communication between handlers
|
|
||||||
during the running dispatch
|
|
||||||
|
|
||||||
|
|
||||||
<h2>Credits</h2>
|
<h2>Credits</h2>
|
||||||
The initial inspiration for creating this component came from looking at Google Guava's event bus implementation. Since
|
The initial inspiration for creating this component came from looking at Google Guava's event bus implementation. Since
|
||||||
it did not provide all the features we needed in our project, I decided to create my own implementation. When I saw that
|
it did not provide all the features we needed in our project, I decided to create my own implementation. It matured to be
|
||||||
it outperformed the Guava implementation by far, I decided to share it with the community to see if others consider it worth
|
quite a feature rich and yet very efficient and performant message bus.
|
||||||
a shot.
|
|
||||||
|
I want to thank the development team from friendsurance (www.friendsurance.de) for their support and feedback on the bus
|
||||||
|
implementation and the management of friendsurance for allowing me to publish the component as an open source project.
|
||||||
|
|
||||||
<h2>Contribute</h2>
|
<h2>Contribute</h2>
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
usePlugin('java')
|
usePlugin('java')
|
||||||
|
|
||||||
group="org.mbassy"
|
group="org.mbassy"
|
||||||
version="1.0.4.RC"
|
version="1.0.6.RC"
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
addMavenRepo()
|
addMavenRepo()
|
||||||
|
BIN
maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar
Normal file
BIN
maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar
Normal file
Binary file not shown.
@ -0,0 +1 @@
|
|||||||
|
a2ddea6754519b5b4c029f48431e0605
|
@ -0,0 +1 @@
|
|||||||
|
f2496df5cf69076115670219c0127968e3c7ad63
|
58
maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom
Normal file
58
maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||||
|
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<groupId>org.mbassy</groupId>
|
||||||
|
<artifactId>mbassador</artifactId>
|
||||||
|
<version>1.0.6.RC</version>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
<name>mbassador</name>
|
||||||
|
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
|
||||||
|
</description>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
<project.build.java.version>1.6</project.build.java.version>
|
||||||
|
<github.url>file://${project.basedir}/maven</github.url>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<version>4.10</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<distributionManagement>
|
||||||
|
<repository>
|
||||||
|
<id>mbassador-github-repo</id>
|
||||||
|
<url>${github.url}</url>
|
||||||
|
</repository>
|
||||||
|
</distributionManagement>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>${project.build.java.version}</source>
|
||||||
|
<target>${project.build.java.version}</target>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<skipTests>false</skipTests>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
@ -0,0 +1 @@
|
|||||||
|
ebf2e22bfe53d858092befaa865d0bf4
|
@ -0,0 +1 @@
|
|||||||
|
950ca7e831a9060060f943ca02d313f7c7f5ccbf
|
@ -11,7 +11,8 @@
|
|||||||
<version>1.0.3.RC</version>
|
<version>1.0.3.RC</version>
|
||||||
<version>1.0.4.RC</version>
|
<version>1.0.4.RC</version>
|
||||||
<version>1.0.5.RC</version>
|
<version>1.0.5.RC</version>
|
||||||
|
<version>1.0.6.RC</version>
|
||||||
</versions>
|
</versions>
|
||||||
<lastUpdated>20121212135342</lastUpdated>
|
<lastUpdated>20121225153509</lastUpdated>
|
||||||
</versioning>
|
</versioning>
|
||||||
</metadata>
|
</metadata>
|
||||||
|
@ -1 +1 @@
|
|||||||
e37de32d920e2182f2549b0500dc20d8
|
0cc21fcd52a994d6b22f9fe57f210143
|
@ -1 +1 @@
|
|||||||
6b5568c98223ec0a53f7b7ede78e965ba7f0080f
|
dc58ab03113b6edefe5f3bfdfc464e5c344b253a
|
2
pom.xml
2
pom.xml
@ -4,7 +4,7 @@
|
|||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>org.mbassy</groupId>
|
<groupId>org.mbassy</groupId>
|
||||||
<artifactId>mbassador</artifactId>
|
<artifactId>mbassador</artifactId>
|
||||||
<version>1.0.5.RC</version>
|
<version>1.0.6.RC</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<name>mbassador</name>
|
<name>mbassador</name>
|
||||||
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
|
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
|
||||||
|
@ -5,7 +5,6 @@ import org.mbassy.dispatch.MessagingContext;
|
|||||||
import org.mbassy.listener.MessageHandlerMetadata;
|
import org.mbassy.listener.MessageHandlerMetadata;
|
||||||
import org.mbassy.listener.MetadataReader;
|
import org.mbassy.listener.MetadataReader;
|
||||||
import org.mbassy.subscription.Subscription;
|
import org.mbassy.subscription.Subscription;
|
||||||
import org.mbassy.subscription.SubscriptionDeliveryRequest;
|
|
||||||
import org.mbassy.subscription.SubscriptionFactory;
|
import org.mbassy.subscription.SubscriptionFactory;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
@ -46,23 +45,19 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
private final List<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
|
private final List<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
|
||||||
|
|
||||||
// all pending messages scheduled for asynchronous dispatch are queued here
|
// all pending messages scheduled for asynchronous dispatch are queued here
|
||||||
private final LinkedBlockingQueue<SubscriptionDeliveryRequest<T>> pendingMessages;
|
private final BlockingQueue<MessagePublication<T>> pendingMessages;
|
||||||
|
|
||||||
// this factory is used to create specialized subscriptions based on the given message handler configuration
|
// this factory is used to create specialized subscriptions based on the given message handler configuration
|
||||||
// it can be customized by implementing the getSubscriptionFactory() method
|
// it can be customized by implementing the getSubscriptionFactory() method
|
||||||
private final SubscriptionFactory subscriptionFactory;
|
private final SubscriptionFactory subscriptionFactory;
|
||||||
|
|
||||||
// indicates whether the shutdown method has been invoked
|
|
||||||
// -> if true, then dispatchers will have been shutdown
|
|
||||||
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public AbstractMessageBus(BusConfiguration configuration) {
|
public AbstractMessageBus(BusConfiguration configuration) {
|
||||||
this.executor = configuration.getExecutor();
|
this.executor = configuration.getExecutor();
|
||||||
subscriptionFactory = configuration.getSubscriptionFactory();
|
subscriptionFactory = configuration.getSubscriptionFactory();
|
||||||
this.metadataReader = configuration.getMetadataReader();
|
this.metadataReader = configuration.getMetadataReader();
|
||||||
pendingMessages = new LinkedBlockingQueue<SubscriptionDeliveryRequest<T>>(configuration.getMaximumNumberOfPendingMessages());
|
pendingMessages = new LinkedBlockingQueue<MessagePublication<T>>(configuration.getMaximumNumberOfPendingMessages());
|
||||||
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
|
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
|
||||||
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
|
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
|
||||||
}
|
}
|
||||||
@ -156,8 +151,25 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
errorHandlers.add(handler);
|
errorHandlers.add(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void addAsynchronousDeliveryRequest(SubscriptionDeliveryRequest<T> request) {
|
// this method enqueues a message delivery request
|
||||||
pendingMessages.offer(request);
|
protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublication<T> request){
|
||||||
|
try {
|
||||||
|
pendingMessages.put(request);
|
||||||
|
return request.markScheduled();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
return request.setError();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// this method enqueues a message delivery request
|
||||||
|
protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublication<T> request, long timeout, TimeUnit unit){
|
||||||
|
try {
|
||||||
|
return pendingMessages.offer(request, timeout, unit)
|
||||||
|
? request.markScheduled()
|
||||||
|
: request.setError();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
return request.setError();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// obtain the set of subscriptions for the given message type
|
// obtain the set of subscriptions for the given message type
|
||||||
@ -169,8 +181,11 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
}
|
}
|
||||||
// TODO: get superclasses is eligible for caching
|
// TODO: get superclasses is eligible for caching
|
||||||
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
|
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
|
||||||
if (subscriptionsPerMessage.get(eventSuperType) != null) {
|
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
|
||||||
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
|
if (subs != null) {
|
||||||
|
for(Subscription sub : subs){
|
||||||
|
if(sub.handlesMessageType(messageType))subscriptions.add(sub);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return subscriptions;
|
return subscriptions;
|
||||||
@ -209,7 +224,6 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
|
|||||||
dispatcher.interrupt();
|
dispatcher.interrupt();
|
||||||
}
|
}
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
isShutDown.set(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasPendingMessages(){
|
public boolean hasPendingMessages(){
|
||||||
|
@ -42,6 +42,7 @@ public class BusConfiguration {
|
|||||||
this.subscriptionFactory = new SubscriptionFactory();
|
this.subscriptionFactory = new SubscriptionFactory();
|
||||||
this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);
|
this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);
|
||||||
this.metadataReader = new MetadataReader();
|
this.metadataReader = new MetadataReader();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public MetadataReader getMetadataReader() {
|
public MetadataReader getMetadataReader() {
|
||||||
|
@ -2,6 +2,7 @@ package org.mbassy;
|
|||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -118,7 +119,7 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
|
|||||||
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
|
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public static interface IPostCommand{
|
public static interface IPostCommand<T>{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the message publication immediately. This call blocks until every matching message handler
|
* Execute the message publication immediately. This call blocks until every matching message handler
|
||||||
@ -127,10 +128,28 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
|
|||||||
public void now();
|
public void now();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the message publication asynchronously. This call return immediately and all matching message handlers
|
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||||
* will be invoked in another thread.
|
* configured queuing strategy:
|
||||||
|
*
|
||||||
|
* If an unbound queuing strategy is used the call returns immediately.
|
||||||
|
* If a bounded queue is used the call might block until the message can be placed in the queue.
|
||||||
|
*
|
||||||
|
* @return A message publication that can be used to access information about the state of
|
||||||
*/
|
*/
|
||||||
public void asynchronously();
|
public MessagePublication<T> asynchronously();
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the message publication asynchronously. The behaviour of this method depends on the
|
||||||
|
* configured queuing strategy:
|
||||||
|
*
|
||||||
|
* If an unbound queuing strategy is used the call returns immediately.
|
||||||
|
* If a bounded queue is used the call will block until the message can be placed in the queue
|
||||||
|
* or the timeout r
|
||||||
|
*
|
||||||
|
* @return A message publication that wraps up the publication request
|
||||||
|
*/
|
||||||
|
public MessagePublication<T> asynchronously(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,20 +1,26 @@
|
|||||||
package org.mbassy;
|
package org.mbassy;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.mbassy.subscription.Subscription;
|
import org.mbassy.subscription.Subscription;
|
||||||
import org.mbassy.subscription.SubscriptionDeliveryRequest;
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
|
||||||
public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>{
|
public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>> {
|
||||||
|
|
||||||
public MBassador(BusConfiguration configuration){
|
public MBassador(BusConfiguration configuration) {
|
||||||
super(configuration);
|
super(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void publishAsync(T message){
|
public MessagePublication<T> publishAsync(T message) {
|
||||||
addAsynchronousDeliveryRequest(new SubscriptionDeliveryRequest<T>(getSubscriptionsByMessageType(message.getClass()), message));
|
return addAsynchronousDeliveryRequest(MessagePublication.Create(
|
||||||
|
getSubscriptionsByMessageType(message.getClass()), message));
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessagePublication<T> publishAsync(T message, long timeout, TimeUnit unit) {
|
||||||
|
return addAsynchronousDeliveryRequest(MessagePublication.Create(
|
||||||
|
getSubscriptionsByMessageType(message.getClass()), message), timeout, unit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -24,23 +30,23 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
|
|||||||
*
|
*
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
public void publish(T message){
|
public void publish(T message) {
|
||||||
try {
|
try {
|
||||||
final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
|
final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
|
||||||
if(subscriptions == null){
|
if (subscriptions == null) {
|
||||||
return; // TODO: Dead Event?
|
return; // TODO: Dead Event?
|
||||||
}
|
}
|
||||||
for (Subscription subscription : subscriptions){
|
for (Subscription subscription : subscriptions) {
|
||||||
subscription.publish(message);
|
subscription.publish(message);
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
handlePublicationError(new PublicationError()
|
handlePublicationError(new PublicationError()
|
||||||
.setMessage("Error during publication of message")
|
.setMessage("Error during publication of message")
|
||||||
.setCause(e)
|
.setCause(e)
|
||||||
.setPublishedObject(message));
|
.setPublishedObject(message));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
73
src/main/java/org/mbassy/MessagePublication.java
Normal file
73
src/main/java/org/mbassy/MessagePublication.java
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package org.mbassy;
|
||||||
|
|
||||||
|
import org.mbassy.subscription.Subscription;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A message publication is created for each asynchronous message dispatch. It reflects the state
|
||||||
|
* of the corresponding message publication process, i.e. provides information whether the
|
||||||
|
* publication was successfully scheduled, is currently running etc.
|
||||||
|
*
|
||||||
|
* @author bennidi
|
||||||
|
* Date: 11/16/12
|
||||||
|
*/
|
||||||
|
public class MessagePublication<T> {
|
||||||
|
|
||||||
|
public static <T> MessagePublication<T> Create(Collection<Subscription> subscriptions, T message){
|
||||||
|
return new MessagePublication<T>(subscriptions, message, State.Initial);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<Subscription> subscriptions;
|
||||||
|
|
||||||
|
private T message;
|
||||||
|
|
||||||
|
private State state = State.Scheduled;
|
||||||
|
|
||||||
|
private MessagePublication(Collection<Subscription> subscriptions, T message, State initialState) {
|
||||||
|
this.subscriptions = subscriptions;
|
||||||
|
this.message = message;
|
||||||
|
this.state = initialState;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean add(Subscription subscription) {
|
||||||
|
return subscriptions.add(subscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void execute(){
|
||||||
|
state = State.Running;
|
||||||
|
for(Subscription sub : subscriptions){
|
||||||
|
sub.publish(message);
|
||||||
|
}
|
||||||
|
state = State.Finished;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFinished() {
|
||||||
|
return state.equals(State.Finished);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRunning() {
|
||||||
|
return state.equals(State.Running);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isScheduled() {
|
||||||
|
return state.equals(State.Scheduled);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessagePublication<T> markScheduled(){
|
||||||
|
if(!state.equals(State.Initial))
|
||||||
|
return this;
|
||||||
|
state = State.Scheduled;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessagePublication<T> setError(){
|
||||||
|
state = State.Error;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum State{
|
||||||
|
Initial,Scheduled,Running,Finished,Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,5 +1,7 @@
|
|||||||
package org.mbassy;
|
package org.mbassy;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This post command provides access to standard synchronous and asynchronous dispatch
|
* This post command provides access to standard synchronous and asynchronous dispatch
|
||||||
*
|
*
|
||||||
@ -22,7 +24,12 @@ public class SyncAsyncPostCommand<T> implements IMessageBus.IPostCommand {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void asynchronously() {
|
public MessagePublication<T> asynchronously() {
|
||||||
mBassador.publishAsync(message);
|
return mBassador.publishAsync(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessagePublication asynchronously(long timeout, TimeUnit unit) {
|
||||||
|
return mBassador.publishAsync(message, timeout, unit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,4 +21,6 @@ public @interface Listener {
|
|||||||
|
|
||||||
int priority() default 0;
|
int priority() default 0;
|
||||||
|
|
||||||
|
boolean handlesSubtypes() default true;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,8 @@ public class MessageHandlerMetadata {
|
|||||||
|
|
||||||
private List<Class<?>> handledMessages = new LinkedList<Class<?>>();
|
private List<Class<?>> handledMessages = new LinkedList<Class<?>>();
|
||||||
|
|
||||||
|
private boolean acceptsSubtypes = true;
|
||||||
|
|
||||||
|
|
||||||
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Listener listenerConfig) {
|
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Listener listenerConfig) {
|
||||||
this.handler = handler;
|
this.handler = handler;
|
||||||
@ -31,6 +33,7 @@ public class MessageHandlerMetadata {
|
|||||||
this.listenerConfig = listenerConfig;
|
this.listenerConfig = listenerConfig;
|
||||||
this.isAsynchronous = listenerConfig.dispatch().equals(Mode.Asynchronous);
|
this.isAsynchronous = listenerConfig.dispatch().equals(Mode.Asynchronous);
|
||||||
this.envelope = handler.getAnnotation(Enveloped.class);
|
this.envelope = handler.getAnnotation(Enveloped.class);
|
||||||
|
this.acceptsSubtypes = listenerConfig.handlesSubtypes();
|
||||||
if(this.envelope != null){
|
if(this.envelope != null){
|
||||||
for(Class messageType : envelope.messages())
|
for(Class messageType : envelope.messages())
|
||||||
handledMessages.add(messageType);
|
handledMessages.add(messageType);
|
||||||
@ -69,4 +72,18 @@ public class MessageHandlerMetadata {
|
|||||||
public boolean isEnveloped() {
|
public boolean isEnveloped() {
|
||||||
return envelope != null;
|
return envelope != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean handlesMessage(Class<?> messageType){
|
||||||
|
for(Class<?> handledMessage : handledMessages){
|
||||||
|
if(handledMessage.equals(messageType))return true;
|
||||||
|
if(handledMessage.isAssignableFrom(messageType) && acceptsSubtypes()) return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean acceptsSubtypes(){
|
||||||
|
return acceptsSubtypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,53 @@
|
|||||||
|
package org.mbassy.listener;
|
||||||
|
|
||||||
|
import org.mbassy.common.IPredicate;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides information about the message listeners of a specific class. Each message handler
|
||||||
|
* defined by the target class is represented as a single entity.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @author bennidi
|
||||||
|
* Date: 12/16/12
|
||||||
|
*/
|
||||||
|
public class MessageListenerMetadata<T> {
|
||||||
|
|
||||||
|
|
||||||
|
public static final IPredicate<MessageHandlerMetadata> ForMessage(final Class<?> messageType){
|
||||||
|
return new IPredicate<MessageHandlerMetadata>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(MessageHandlerMetadata target) {
|
||||||
|
return target.handlesMessage(messageType);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<MessageHandlerMetadata> handlers;
|
||||||
|
|
||||||
|
private Class<T> listenerDefinition;
|
||||||
|
|
||||||
|
public MessageListenerMetadata(List<MessageHandlerMetadata> handlers, Class<T> listenerDefinition) {
|
||||||
|
this.handlers = handlers;
|
||||||
|
this.listenerDefinition = listenerDefinition;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public List<MessageHandlerMetadata> getHandlers(IPredicate<MessageHandlerMetadata> filter){
|
||||||
|
List<MessageHandlerMetadata> matching = new LinkedList<MessageHandlerMetadata>();
|
||||||
|
for(MessageHandlerMetadata handler : handlers){
|
||||||
|
if(filter.apply(handler))matching.add(handler);
|
||||||
|
}
|
||||||
|
return matching;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean handles(Class<?> messageType){
|
||||||
|
return !getHandlers(ForMessage(messageType)).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Class<T> getListerDefinition(){
|
||||||
|
return listenerDefinition;
|
||||||
|
}
|
||||||
|
}
|
@ -60,35 +60,28 @@ public class MetadataReader {
|
|||||||
// get all listeners defined by the given class (includes
|
// get all listeners defined by the given class (includes
|
||||||
// listeners defined in super classes)
|
// listeners defined in super classes)
|
||||||
public List<MessageHandlerMetadata> getMessageHandlers(Class<?> target) {
|
public List<MessageHandlerMetadata> getMessageHandlers(Class<?> target) {
|
||||||
|
// get all handlers (this will include overridden handlers)
|
||||||
List<Method> allMethods = ReflectionUtils.getMethods(AllMessageHandlers, target);
|
List<Method> allMethods = ReflectionUtils.getMethods(AllMessageHandlers, target);
|
||||||
List<Method> handlers = new LinkedList<Method>();
|
List<MessageHandlerMetadata> handlers = new LinkedList<MessageHandlerMetadata>();
|
||||||
for(Method listener : allMethods){
|
for(Method handler : allMethods){
|
||||||
Method overriddenHandler = ReflectionUtils.getOverridingMethod(listener, target);
|
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
|
||||||
|
if(overriddenHandler == null && isValidMessageHandler(handler)){
|
||||||
if(overriddenHandler != null && isHandler(overriddenHandler)){
|
// add the handler only if it has not been overridden because
|
||||||
handlers.add(overriddenHandler);
|
// either the override in the subclass deactivates the handler (by not specifying the @Listener)
|
||||||
}
|
// or the handler defined in the subclass is part of the list and will be processed itself
|
||||||
if(overriddenHandler == null){
|
handlers.add(getHandlerMetadata(handler));
|
||||||
handlers.add(listener);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
handlers = ReflectionUtils.withoutOverridenSuperclassMethods(handlers);
|
return handlers;
|
||||||
List<MessageHandlerMetadata> messageHandlers = new ArrayList<MessageHandlerMetadata>(handlers.size());
|
|
||||||
for(Method handler : handlers){
|
|
||||||
if(isValidMessageHandler(handler))
|
|
||||||
messageHandlers.add(getHandlerMetadata(handler));
|
|
||||||
}
|
|
||||||
return messageHandlers;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isHandler(Method m){
|
|
||||||
Annotation[] annotations = m.getDeclaredAnnotations();
|
public <T> MessageListenerMetadata<T> getMessageListener(Class<T> target) {
|
||||||
for(Annotation annotation : annotations){
|
return new MessageListenerMetadata(getMessageHandlers(target), target);
|
||||||
if(annotation.annotationType().equals(Listener.class))return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private boolean isValidMessageHandler(Method handler) {
|
private boolean isValidMessageHandler(Method handler) {
|
||||||
if (handler.getParameterTypes().length != 1) {
|
if (handler.getParameterTypes().length != 1) {
|
||||||
// a messageHandler only defines one parameter (the message)
|
// a messageHandler only defines one parameter (the message)
|
||||||
|
@ -26,6 +26,11 @@ public class Subscription {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public boolean handlesMessageType(Class<?> messageType){
|
||||||
|
return context.getHandlerMetadata().handlesMessage(messageType);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void publish(Object message){
|
public void publish(Object message){
|
||||||
dispatcher.dispatch(message, listeners);
|
dispatcher.dispatch(message, listeners);
|
||||||
}
|
}
|
||||||
|
@ -1,28 +0,0 @@
|
|||||||
package org.mbassy.subscription;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author bennidi
|
|
||||||
* Date: 11/16/12
|
|
||||||
*/
|
|
||||||
public class SubscriptionDeliveryRequest<T> {
|
|
||||||
|
|
||||||
private Collection<Subscription> subscriptions;
|
|
||||||
|
|
||||||
private T message;
|
|
||||||
|
|
||||||
public SubscriptionDeliveryRequest(Collection<Subscription> subscriptions, T message) {
|
|
||||||
this.subscriptions = subscriptions;
|
|
||||||
this.message = message;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean add(Subscription subscription) {
|
|
||||||
return subscriptions.add(subscription);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void execute(){
|
|
||||||
for(Subscription sub : subscriptions)
|
|
||||||
sub.publish(message);
|
|
||||||
}
|
|
||||||
}
|
|
@ -13,7 +13,8 @@ import org.junit.runners.Suite;
|
|||||||
@Suite.SuiteClasses({
|
@Suite.SuiteClasses({
|
||||||
ConcurrentSetTest.class,
|
ConcurrentSetTest.class,
|
||||||
MBassadorTest.class,
|
MBassadorTest.class,
|
||||||
FilterTest.class
|
FilterTest.class,
|
||||||
|
MetadataReaderTest.class
|
||||||
})
|
})
|
||||||
public class AllTests {
|
public class AllTests {
|
||||||
}
|
}
|
||||||
|
178
src/test/java/org/mbassy/MetadataReaderTest.java
Normal file
178
src/test/java/org/mbassy/MetadataReaderTest.java
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
package org.mbassy;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mbassy.listener.Enveloped;
|
||||||
|
import org.mbassy.listener.Listener;
|
||||||
|
import org.mbassy.listener.MessageListenerMetadata;
|
||||||
|
import org.mbassy.listener.MetadataReader;
|
||||||
|
import org.mbassy.subscription.MessageEnvelope;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.mbassy.listener.MessageListenerMetadata.ForMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Todo: Add javadoc
|
||||||
|
*
|
||||||
|
* @author bennidi
|
||||||
|
* Date: 12/16/12
|
||||||
|
*/
|
||||||
|
public class MetadataReaderTest extends UnitTest {
|
||||||
|
|
||||||
|
private MetadataReader reader = new MetadataReader();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListenerWithoutInheritance() {
|
||||||
|
MessageListenerMetadata<EventListener1> listener = reader.getMessageListener(EventListener1.class);
|
||||||
|
ListenerValidator validator = new ListenerValidator()
|
||||||
|
.expectHandlers(2, String.class)
|
||||||
|
.expectHandlers(2, Object.class)
|
||||||
|
.expectHandlers(1, BufferedReader.class);
|
||||||
|
validator.check(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListenerWithInheritance() {
|
||||||
|
MessageListenerMetadata<EventListener2> listener = reader.getMessageListener(EventListener2.class);
|
||||||
|
ListenerValidator validator = new ListenerValidator()
|
||||||
|
.expectHandlers(2, String.class)
|
||||||
|
.expectHandlers(2, Object.class)
|
||||||
|
.expectHandlers(1, BufferedReader.class);
|
||||||
|
validator.check(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListenerWithInheritanceOverriding() {
|
||||||
|
MessageListenerMetadata<EventListener3> listener = reader.getMessageListener(EventListener3.class);
|
||||||
|
|
||||||
|
ListenerValidator validator = new ListenerValidator()
|
||||||
|
.expectHandlers(0, String.class)
|
||||||
|
.expectHandlers(2, Object.class)
|
||||||
|
.expectHandlers(0, BufferedReader.class);
|
||||||
|
validator.check(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnveloped() {
|
||||||
|
MessageListenerMetadata<EnvelopedListener> listener = reader.getMessageListener(EnvelopedListener.class);
|
||||||
|
ListenerValidator validator = new ListenerValidator()
|
||||||
|
.expectHandlers(1, String.class)
|
||||||
|
.expectHandlers(2, Integer.class)
|
||||||
|
.expectHandlers(2, Long.class)
|
||||||
|
.expectHandlers(1, Double.class)
|
||||||
|
.expectHandlers(1, Number.class)
|
||||||
|
.expectHandlers(0, List.class);
|
||||||
|
validator.check(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnvelopedSubclass() {
|
||||||
|
MessageListenerMetadata<EnvelopedListenerSubclass> listener = reader.getMessageListener(EnvelopedListenerSubclass.class);
|
||||||
|
ListenerValidator validator = new ListenerValidator()
|
||||||
|
.expectHandlers(1, String.class)
|
||||||
|
.expectHandlers(2, Integer.class)
|
||||||
|
.expectHandlers(1, Long.class)
|
||||||
|
.expectHandlers(0, Double.class)
|
||||||
|
.expectHandlers(0, Number.class);
|
||||||
|
validator.check(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private class ListenerValidator {
|
||||||
|
|
||||||
|
private Map<Class<?>, Integer> handlers = new HashMap<Class<?>, Integer>();
|
||||||
|
|
||||||
|
public ListenerValidator expectHandlers(Integer count, Class<?> messageType){
|
||||||
|
handlers.put(messageType, count);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void check(MessageListenerMetadata listener){
|
||||||
|
for(Map.Entry<Class<?>, Integer> expectedHandler: handlers.entrySet()){
|
||||||
|
if(expectedHandler.getValue() > 0){
|
||||||
|
assertTrue(listener.handles(expectedHandler.getKey()));
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
assertFalse(listener.handles(expectedHandler.getKey()));
|
||||||
|
}
|
||||||
|
assertEquals(expectedHandler.getValue(), listener.getHandlers(ForMessage(expectedHandler.getKey())).size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// a simple event listener
|
||||||
|
public class EventListener1 {
|
||||||
|
|
||||||
|
@Listener(handlesSubtypes = false)
|
||||||
|
public void handleObject(Object o) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Listener
|
||||||
|
public void handleAny(Object o) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Listener
|
||||||
|
public void handleString(String s) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// the same handlers as its super class
|
||||||
|
public class EventListener2 extends EventListener1 {}
|
||||||
|
|
||||||
|
public class EventListener3 extends EventListener2 {
|
||||||
|
|
||||||
|
// narrow the handler
|
||||||
|
@Listener(handlesSubtypes = false)
|
||||||
|
public void handleAny(Object o) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove this handler
|
||||||
|
public void handleString(String s) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class EnvelopedListener{
|
||||||
|
|
||||||
|
|
||||||
|
@Listener(handlesSubtypes = false)
|
||||||
|
@Enveloped(messages = {String.class, Integer.class, Long.class})
|
||||||
|
public void handleEnveloped(MessageEnvelope o) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Listener
|
||||||
|
@Enveloped(messages = {Number.class})
|
||||||
|
public void handleEnveloped2(MessageEnvelope o) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class EnvelopedListenerSubclass extends EnvelopedListener{
|
||||||
|
|
||||||
|
// narrow to integer
|
||||||
|
@Listener
|
||||||
|
@Enveloped(messages = {Integer.class})
|
||||||
|
public void handleEnveloped2(MessageEnvelope o) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user