diff --git a/pom.xml b/pom.xml
index 425ca2f..3b8943f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
1.0.0.RCjarmbassador
- Library for simple implementation of bidirectional conversions
+ Mbassador is a fast and flexible message bus system that follows the publish subscribe patternUTF-8
@@ -35,6 +35,12 @@
1.5.2
+
+ commons-collections
+ commons-collections
+ 3.2
+
+
diff --git a/src/main/java/org/mbassy/IMessageBus.java b/src/main/java/org/mbassy/IMessageBus.java
new file mode 100644
index 0000000..768a259
--- /dev/null
+++ b/src/main/java/org/mbassy/IMessageBus.java
@@ -0,0 +1,82 @@
+package org.mbassy;
+
+/**
+ *
+ * A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
+ * synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
+ * The dispatch mechanism can by controlled for each concrete message publication.
+ * A message publication is the publication of any message using one of the bus' publish(..) methods.
+ *
+ * Each message publication is isolated from all other running publications such that it does not interfere with them.
+ * Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple
+ * messages get published asynchronously.
+ *
+ * A listener is any object that defines at least one message handler and that has been subscribed to at least
+ * one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
+ * as a message handler using the @Listener annotation.
+ *
+ * The bus uses weak references to all listeners such that registered listeners do not need to
+ * be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
+ * removed on-the-fly as messages get dispatched.
+ *
+ * Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any
+ * class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
+ * a specific message exactly once to each of the subscribed message handlers.
+ *
+ * Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally
+ * a message handler may define filters to narrow the set of messages that it accepts.
+ *
+ * Subscribed message handlers are available to all pending message publications that have not yet started processing.
+ * Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored)
+ *
+ * Removing a listener means removing all subscribed message handlers of that object. This remove operation
+ * immediately takes effect and on all running dispatch processes. A removed listener (a listener
+ * is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
+ *
+ * NOTE: Generic type parameters of messages will not be taken into account, e.g. a List will
+ * get dispatched to all message handlers that take an instance of List as their parameter
+ *
+ * @Author bennidi
+ * Date: 2/8/12
+ */
+public interface IMessageBus {
+
+ /**
+ * Subscribe all listeners of the given message to receive message publications.
+ * Any message may only be subscribed once (subsequent subscriptions of an already subscribed
+ * message will be silently ignored)
+ *
+ * @param listener
+ */
+ public void subscribe(Object listener);
+
+
+ /**
+ * Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns
+ * have effectively been removed and will not receive any message publications (including asynchronously scheduled
+ * publications that have been published when the messageHandler was still subscribed).
+ * A call to this method passing null, an already subscribed message or any message that does not define any listeners
+ * will not have any effect.
+ *
+ * @param listener
+ */
+ public void unsubscribe(Object listener);
+
+ /**
+ *
+ * @param message
+ * @return
+ */
+ public P post(T message);
+
+
+
+ public static interface IPostCommand{
+
+ public void now();
+
+ public void asynchronously();
+
+ }
+
+}
diff --git a/src/main/java/org/mbassy/MBassador.java b/src/main/java/org/mbassy/MBassador.java
index 3be67bc..eb5e62e 100644
--- a/src/main/java/org/mbassy/MBassador.java
+++ b/src/main/java/org/mbassy/MBassador.java
@@ -9,46 +9,8 @@ import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
-/**
- *
- * A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
- * synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
- * The dispatch mechanism can by controlled for each concrete message publication.
- * A message publication is the publication of any message using one of the bus' publish(..) methods.
- *
- * Each message publication is isolated from all other running publications such that it does not interfere with them.
- * Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple
- * messages get published asynchronously.
- *
- * A listener is any object that defines at least one message handler and that has been subscribed to at least
- * one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
- * as a message handler using the @Listener annotation.
- *
- * The bus uses weak references to all listeners such that registered listeners do not need to
- * be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
- * removed on-the-fly as messages get dispatched.
- *
- * Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any
- * class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
- * a specific message exactly once to each of the subscribed message handlers.
- *
- * Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally
- * a message handler may define filters to narrow the set of messages that it accepts.
- *
- * Subscribed message handlers are available to all pending message publications that have not yet started processing.
- * Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored)
- *
- * Removing a listener means removing all subscribed message handlers of that object. This remove operation
- * immediately takes effect and on all running dispatch processes. A removed listener (a listener
- * is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
- *
- * NOTE: Generic type parameters of messages will not be taken into account, e.g. a List will
- * get dispatched to all message handlers that take an instance of List as their parameter
- *
- * @Author bennidi
- * Date: 2/8/12
- */
-public class MBassador{
+
+public class MBassador implements IMessageBus{
// This predicate is used to find all message listeners (methods annotated with @Listener)
@@ -70,17 +32,18 @@ public class MBassador{
};
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
- private ExecutorService executor = new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue());
+ private ExecutorService executor;
// cache already created filter instances
private final Map, MessageFilter> filterCache = new HashMap, MessageFilter>();
// all subscriptions per message type
// this is the primary list for dispatching a specific message
+ // write access is synchronized and happens very infrequently
private final Map> subscriptionsPerMessage = new HashMap(50);
// all subscriptions per messageHandler type
- // this list provides access for subscribing and unsubsribing
+ // this list provides fast access for subscribing and unsubscribing
private final Map> subscriptionsPerListener = new HashMap(50);
// remember already processed classes that do not contain any listeners
@@ -107,7 +70,7 @@ public class MBassador{
try {
publish(pendingMessages.take());
} catch (InterruptedException e) {
- errorHandler.handleError(new PublicationError(e, "Asnchronous publication interupted", null, null, null));
+ errorHandler.handleError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null));
return;
}
}
@@ -119,10 +82,15 @@ public class MBassador{
}
public MBassador(){
- initDispatcherThreads(2);
+ this(2);
}
public MBassador(int dispatcherThreadCount){
+ this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue()));
+ }
+
+ public MBassador(int dispatcherThreadCount, ExecutorService executor){
+ this.executor = executor;
initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2);
}
@@ -141,7 +109,12 @@ public class MBassador{
public void publish(T message){
try {
final Collection subscriptions = getSubscriptionsByMessageType(message.getClass());
- for (Subscription subscription : subscriptions) subscription.publish(message);
+ if(subscriptions == null){
+ return; // TODO: Dead Event?
+ }
+ for (Subscription subscription : subscriptions){
+ subscription.publish(message);
+ }
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
@@ -151,32 +124,22 @@ public class MBassador{
}
- /**
- * Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns
- * have effectively been removed and will not receive any message publications (including asynchronously scheduled
- * publications that have been published when the messageHandler was still subscribed).
- * A call to this method passing null, an already subscribed message or any message that does not define any listeners
- * will not have any effect.
- *
- * @param listener
- */
+
public void unsubscribe(Object listener){
if (listener == null) return;
Collection subscriptions = subscriptionsPerListener.get(listener.getClass());
- for (Subscription subscription : subscriptions) {
+ if(subscriptions == null)return;
+ for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
}
}
+ @Override
+ public SimplePostCommand post(T message) {
+ return new SimplePostCommand(this, message);
+ }
- /**
- * Subscribe all listeners of the given message to receive message publications.
- * Any message may only be subscribed once (subsequent subscriptions of an already subscribed
- * message will be silently ignored)
- *
- * @param listener
- */
- public void subscribe(Object listener){
+ public void subscribe(Object listener){
Class listeningClass = listener.getClass();
if (nonListeners.contains(listeningClass))
return; // early reject of known classes that do not participate in eventing
@@ -196,7 +159,7 @@ public class MBassador{
if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners
MessageFilter[] filter = getFilter(messageHandler.getAnnotation(Listener.class));
Class eventType = getMessageType(messageHandler);
- Subscription subscription = new Subscription(messageHandler, filter);
+ Subscription subscription = createSubscription(messageHandler, filter);
subscription.subscribe(listener);
addMessageTypeSubscription(eventType, subscription);
subscriptionsByListener.add(subscription);
@@ -206,7 +169,7 @@ public class MBassador{
}
}
}
- // register the message to the existing subscriptions
+ // register the listener to the existing subscriptions
for (Subscription sub : subscriptionsByListener) sub.subscribe(listener);
}
@@ -219,8 +182,7 @@ public class MBassador{
// obtain the set of subscriptions for the given message type
private Collection getSubscriptionsByMessageType(Class messageType) {
- // TODO improve with cache
- Collection subscriptions = new LinkedList();
+ List subscriptions = new LinkedList();
if(subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
@@ -230,8 +192,9 @@ public class MBassador{
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
}
}
-
- return subscriptions;
+ // IMPROVEMENT: use tree list that sorts during insertion
+ //Collections.sort(subscriptions, new SubscriptionByPriorityDesc());
+ return subscriptions;
}
private Collection getSuperclasses(Class from){
@@ -253,24 +216,11 @@ public class MBassador{
subscriptions.add(subscription);
}
- /*
- private void updateMessageTypeHierarchy(Class messageType) {
- for (Class existingEventType : subscriptionsPerMessage.keySet()) {
- if (existingEventType.equals(messageType)) continue;
- if (messageType.isAssignableFrom(existingEventType)) //message is super type of existing
- messageTypeHierarchy.put(existingEventType, messageType);
- else if (existingEventType.isAssignableFrom(messageType)) { // message is sub type of existing
- messageTypeHierarchy.put(messageType, existingEventType); // add direct super type
- messageTypeHierarchy.putAll(messageType, messageTypeHierarchy.get(existingEventType)); // add all super types of super type
- }
- }
- }*/
-
private boolean isValidMessageHandler(Method handler) {
if (handler.getParameterTypes().length != 1) {
// a messageHandler only defines one parameter (the message)
- System.out.println("Found nono or more than one parameter in messageHandler [" + handler.getName()
+ System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName()
+ "]. A messageHandler must define exactly one parameter");
return false;
}
@@ -324,48 +274,64 @@ public class MBassador{
}
}
+
+ private Subscription createSubscription(Method messageHandler, MessageFilter[] filter){
+ if(filter == null || filter.length == 0){
+ if(isAsynchronous(messageHandler)){
+ return new UnfilteredAsynchronousSubscription(messageHandler);
+ }
+ else{
+ return new UnfilteredSynchronousSubscription(messageHandler);
+ }
+ }
+ else{
+ if(isAsynchronous(messageHandler)){
+ return new FilteredAsynchronousSubscription(messageHandler, filter);
+ }
+ else{
+ return new FilteredSynchronousSubscription(messageHandler, filter);
+ }
+ }
+ }
+
+ private boolean isAsynchronous(Method messageHandler){
+ return messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous);
+ }
+
+
/**
* Subscription is a thread safe container for objects that contain message handlers
- *
*/
- private class Subscription {
-
- private final MessageFilter[] filter;
+ private abstract class Subscription {
private final Method messageHandler;
- private ConcurrentLinkedBag