diff --git a/README.md b/README.md index c937202..80e8dd5 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,7 @@ Of course you can always clone the repository and build from source

1.1.1

+ + Added support for DeadEvent + Introduced new property to @Listener annotation that allows to activate/deactivate any message handler + Full support of proxies created by cglib + Message handler inheritance changed! See wiki page about handler definition for more details. diff --git a/src/main/java/net/engio/mbassy/AbstractMessageBus.java b/src/main/java/net/engio/mbassy/AbstractMessageBus.java index e8d9897..bf17333 100644 --- a/src/main/java/net/engio/mbassy/AbstractMessageBus.java +++ b/src/main/java/net/engio/mbassy/AbstractMessageBus.java @@ -44,7 +44,7 @@ public abstract class AbstractMessageBus private final List dispatchers = new CopyOnWriteArrayList(); // all pending messages scheduled for asynchronous dispatch are queued here - private final BlockingQueue> pendingMessages; + private final BlockingQueue pendingMessages; // this factory is used to create specialized subscriptions based on the given message handler configuration // it can be customized by implementing the getSubscriptionFactory() method @@ -56,7 +56,7 @@ public abstract class AbstractMessageBus this.executor = configuration.getExecutor(); subscriptionFactory = configuration.getSubscriptionFactory(); this.metadataReader = configuration.getMetadataReader(); - pendingMessages = new LinkedBlockingQueue>(configuration.getMaximumNumberOfPendingMessages()); + pendingMessages = new LinkedBlockingQueue(configuration.getMaximumNumberOfPendingMessages()); initDispatcherThreads(configuration.getNumberOfMessageDispatchers()); addErrorHandler(new IPublicationErrorHandler.ConsoleLogger()); } @@ -151,7 +151,7 @@ public abstract class AbstractMessageBus } // this method enqueues a message delivery request - protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request){ + protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request){ try { pendingMessages.put(request); return request.markScheduled(); @@ -161,7 +161,7 @@ public abstract class AbstractMessageBus } // this method enqueues a message delivery request - protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit){ + protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit){ try { return pendingMessages.offer(request, timeout, unit) ? request.markScheduled() @@ -172,6 +172,7 @@ public abstract class AbstractMessageBus } // obtain the set of subscriptions for the given message type + // Note: never returns null! protected Collection getSubscriptionsByMessageType(Class messageType) { Set subscriptions = new TreeSet(Subscription.SubscriptionByPriorityDesc); diff --git a/src/main/java/net/engio/mbassy/IMessageBus.java b/src/main/java/net/engio/mbassy/IMessageBus.java index 4aef90a..06fbb12 100644 --- a/src/main/java/net/engio/mbassy/IMessageBus.java +++ b/src/main/java/net/engio/mbassy/IMessageBus.java @@ -136,7 +136,7 @@ public interface IMessageBus { * * @return A message publication that can be used to access information about the state of */ - public MessagePublication asynchronously(); + public MessagePublication asynchronously(); /** @@ -149,7 +149,7 @@ public interface IMessageBus { * * @return A message publication that wraps up the publication request */ - public MessagePublication asynchronously(long timeout, TimeUnit unit); + public MessagePublication asynchronously(long timeout, TimeUnit unit); } diff --git a/src/main/java/net/engio/mbassy/MBassador.java b/src/main/java/net/engio/mbassy/MBassador.java index 7f9aad8..78d359d 100644 --- a/src/main/java/net/engio/mbassy/MBassador.java +++ b/src/main/java/net/engio/mbassy/MBassador.java @@ -1,5 +1,6 @@ package net.engio.mbassy; +import net.engio.mbassy.common.DeadEvent; import net.engio.mbassy.subscription.Subscription; import java.util.Collection; @@ -13,16 +14,25 @@ public class MBassador extends AbstractMessageBus> } - public MessagePublication publishAsync(T message) { - return addAsynchronousDeliveryRequest(MessagePublication.Create( - getSubscriptionsByMessageType(message.getClass()), message)); + public MessagePublication publishAsync(T message) { + return addAsynchronousDeliveryRequest(createMessagePublication(message)); } - public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) { - return addAsynchronousDeliveryRequest(MessagePublication.Create( - getSubscriptionsByMessageType(message.getClass()), message), timeout, unit); + public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) { + return addAsynchronousDeliveryRequest(createMessagePublication(message), timeout, unit); } + private MessagePublication createMessagePublication(T message) { + Collection subscriptions = getSubscriptionsByMessageType(message.getClass()); + if (subscriptions == null || subscriptions.isEmpty()) { + // Dead Event + subscriptions = getSubscriptionsByMessageType(DeadEvent.class); + return MessagePublication.Create(subscriptions, new DeadEvent(message)); + } + else return MessagePublication.Create(subscriptions, message); + } + + /** * Synchronously publish a message to all registered listeners (this includes listeners defined for super types) @@ -32,13 +42,25 @@ public class MBassador extends AbstractMessageBus> */ public void publish(T message) { try { + MessagePublication publication = createMessagePublication(message); + publication.execute(); + + /* final Collection subscriptions = getSubscriptionsByMessageType(message.getClass()); - if (subscriptions == null) { - return; // TODO: Dead Event? - } - for (Subscription subscription : subscriptions) { - subscription.publish(message); + if (subscriptions == null || subscriptions.isEmpty()) { + // publish a DeadEvent since no subscriptions could be found + final Collection deadEventSubscriptions = getSubscriptionsByMessageType(DeadEvent.class); + if (deadEventSubscriptions != null && !deadEventSubscriptions.isEmpty()) { + for (Subscription subscription : deadEventSubscriptions) { + subscription.publish(new DeadEvent(message)); + } + } } + else{ + for (Subscription subscription : subscriptions) { + subscription.publish(message); + } + }*/ } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") diff --git a/src/main/java/net/engio/mbassy/MessagePublication.java b/src/main/java/net/engio/mbassy/MessagePublication.java index 15e250b..04e7381 100644 --- a/src/main/java/net/engio/mbassy/MessagePublication.java +++ b/src/main/java/net/engio/mbassy/MessagePublication.java @@ -1,5 +1,6 @@ package net.engio.mbassy; +import net.engio.mbassy.common.DeadEvent; import net.engio.mbassy.subscription.Subscription; import java.util.Collection; @@ -12,19 +13,19 @@ import java.util.Collection; * @author bennidi * Date: 11/16/12 */ -public class MessagePublication { +public class MessagePublication { - public static MessagePublication Create(Collection subscriptions, T message){ - return new MessagePublication(subscriptions, message, State.Initial); + public static MessagePublication Create(Collection subscriptions, Object message){ + return new MessagePublication(subscriptions, message, State.Initial); } private Collection subscriptions; - private T message; + private Object message; private State state = State.Scheduled; - private MessagePublication(Collection subscriptions, T message, State initialState) { + private MessagePublication(Collection subscriptions, Object message, State initialState) { this.subscriptions = subscriptions; this.message = message; this.state = initialState; @@ -54,18 +55,22 @@ public class MessagePublication { return state.equals(State.Scheduled); } - public MessagePublication markScheduled(){ + public MessagePublication markScheduled(){ if(!state.equals(State.Initial)) return this; state = State.Scheduled; return this; } - public MessagePublication setError(){ + public MessagePublication setError(){ state = State.Error; return this; } + public boolean isDeadEvent(){ + return DeadEvent.class.isAssignableFrom(message.getClass()); + } + private enum State{ Initial,Scheduled,Running,Finished,Error; } diff --git a/src/main/java/net/engio/mbassy/SyncAsyncPostCommand.java b/src/main/java/net/engio/mbassy/SyncAsyncPostCommand.java index c468095..d29396b 100644 --- a/src/main/java/net/engio/mbassy/SyncAsyncPostCommand.java +++ b/src/main/java/net/engio/mbassy/SyncAsyncPostCommand.java @@ -24,7 +24,7 @@ public class SyncAsyncPostCommand implements IMessageBus.IPostCommand { } @Override - public MessagePublication asynchronously() { + public MessagePublication asynchronously() { return mBassador.publishAsync(message); } diff --git a/src/main/java/net/engio/mbassy/common/DeadEvent.java b/src/main/java/net/engio/mbassy/common/DeadEvent.java new file mode 100644 index 0000000..38b9988 --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/DeadEvent.java @@ -0,0 +1,21 @@ +package net.engio.mbassy.common; + +/** + * The DeadEvent is delivered to all subscribed handlers (if any) whenever no message + * handlers could be found for a given message publication. + * + * @author bennidi + * Date: 1/18/13 + */ +public class DeadEvent { + + private Object event; + + public DeadEvent(Object event) { + this.event = event; + } + + public Object getEvent() { + return event; + } +} diff --git a/src/test/java/net/engio/mbassy/AllTests.java b/src/test/java/net/engio/mbassy/AllTests.java index 2b37d19..7dd199d 100644 --- a/src/test/java/net/engio/mbassy/AllTests.java +++ b/src/test/java/net/engio/mbassy/AllTests.java @@ -16,7 +16,8 @@ import org.junit.runners.Suite; FilterTest.class, MetadataReaderTest.class, ListenerSubscriptionTest.class, - MethodDispatchTest.class + MethodDispatchTest.class, + DeadEventTest.class }) public class AllTests { } diff --git a/src/test/java/net/engio/mbassy/DeadEventTest.java b/src/test/java/net/engio/mbassy/DeadEventTest.java new file mode 100644 index 0000000..a4a7bf9 --- /dev/null +++ b/src/test/java/net/engio/mbassy/DeadEventTest.java @@ -0,0 +1,48 @@ +package net.engio.mbassy; + +import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.DeadEvent; +import net.engio.mbassy.common.UnitTest; +import net.engio.mbassy.listener.Listener; +import org.junit.Test; + +/** + * Verify correct behaviour in case of empty message publications + * + * @author bennidi + * Date: 1/18/13 + */ +public class DeadEventTest extends UnitTest{ + + + @Test + public void testDeadEvent(){ + MBassador bus = new MBassador(BusConfiguration.Default()); + DeadEventHandler deadEventHandler = new DeadEventHandler(); + bus.subscribe(deadEventHandler); + assertEquals(0, deadEventHandler.getDeadEventCount()); + bus.post(new Object()).now(); + assertEquals(1, deadEventHandler.getDeadEventCount()); + bus.post(323).now(); + assertEquals(2, deadEventHandler.getDeadEventCount()); + bus.publish("fkdfdk"); + assertEquals(3, deadEventHandler.getDeadEventCount()); + } + + public class DeadEventHandler{ + + private ConcurrentSet deadEvents = new ConcurrentSet(); + + @Listener + public void handle(DeadEvent event){ + deadEvents.add(event); + } + + + public int getDeadEventCount(){ + return deadEvents.size(); + } + + } + +}