diff --git a/src/main/java/net/engio/mbassy/MessageBusException.java b/src/main/java/net/engio/mbassy/MessageBusException.java index 7fcde70..c5a5473 100644 --- a/src/main/java/net/engio/mbassy/MessageBusException.java +++ b/src/main/java/net/engio/mbassy/MessageBusException.java @@ -23,7 +23,5 @@ public class MessageBusException extends Exception{ super(cause); } - public MessageBusException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } + } diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index bacfddc..bb3a3e6 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -5,7 +5,7 @@ import java.util.List; import java.util.concurrent.*; /** - * The base class for all message bus implementations. + * The base class for all async message bus implementations. * * @param * @param

diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java index f5a2cf1..e6683f0 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java @@ -2,6 +2,7 @@ package net.engio.mbassy.bus; import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.PublicationError; +import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.common.ReflectionUtils; import net.engio.mbassy.listener.MessageHandlerMetadata; import net.engio.mbassy.listener.MetadataReader; @@ -130,7 +131,16 @@ public abstract class AbstractSyncMessageBus subscriptions = getSubscriptionsByMessageType(message.getClass()); + if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) { + // Dead Event + subscriptions = getSubscriptionsByMessageType(DeadMessage.class); + return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message)); + } else { + return getPublicationFactory().createPublication(this, subscriptions, message); + } + } // obtain the set of subscriptions for the given message type // Note: never returns null! diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index 6bf1941..1e7b446 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -23,17 +23,6 @@ public class MBassador extends AbstractSyncAsyncMessageBus subscriptions = getSubscriptionsByMessageType(message.getClass()); - if ((subscriptions == null || subscriptions.isEmpty()) && !message.getClass().equals(DeadMessage.class)) { - // Dead Event - subscriptions = getSubscriptionsByMessageType(DeadMessage.class); - return getPublicationFactory().createPublication(this, subscriptions, new DeadMessage(message)); - } else { - return getPublicationFactory().createPublication(this, subscriptions, message); - } - } - /** * Synchronously publish a message to all registered listeners (this includes listeners defined for super types) diff --git a/src/main/java/net/engio/mbassy/bus/MessagePublication.java b/src/main/java/net/engio/mbassy/bus/MessagePublication.java index d126e87..f0d8318 100644 --- a/src/main/java/net/engio/mbassy/bus/MessagePublication.java +++ b/src/main/java/net/engio/mbassy/bus/MessagePublication.java @@ -21,7 +21,7 @@ public class MessagePublication { public static class Factory { - public MessagePublication createPublication(IMessageBus owningBus, Collection subscriptions, Object message) { + public MessagePublication createPublication(ISyncMessageBus owningBus, Collection subscriptions, Object message) { return new MessagePublication(owningBus, subscriptions, message, State.Initial); } @@ -35,9 +35,9 @@ public class MessagePublication { private boolean delivered = false; - private IMessageBus bus; + private ISyncMessageBus bus; - public MessagePublication(IMessageBus bus, Collection subscriptions, Object message, State initialState) { + public MessagePublication(ISyncMessageBus bus, Collection subscriptions, Object message, State initialState) { this.bus = bus; this.subscriptions = subscriptions; this.message = message; diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java new file mode 100644 index 0000000..89c10fe --- /dev/null +++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java @@ -0,0 +1,57 @@ +package net.engio.mbassy.bus; + +import net.engio.mbassy.PublicationError; + +/** + * Created with IntelliJ IDEA. + * User: benjamin + * Date: 4/3/13 + * Time: 9:02 AM + * To change this template use File | Settings | File Templates. + */ +public class SyncMessageBus extends AbstractSyncMessageBus{ + + + public SyncMessageBus(SyncBusConfiguration configuration) { + super(configuration); + } + + /** + * Synchronously publish a message to all registered listeners (this includes listeners defined for super types) + * The call blocks until every messageHandler has processed the message. + * + * @param message + */ + public void publish(T message) { + try { + MessagePublication publication = createMessagePublication(message); + publication.execute(); + } catch (Throwable e) { + handlePublicationError(new PublicationError() + .setMessage("Error during publication of message") + .setCause(e) + .setPublishedObject(message)); + } + + } + + @Override + public SyncPostCommand post(T message) { + return new SyncPostCommand(message); + } + + public class SyncPostCommand implements ISyncMessageBus.ISyncPostCommand{ + + + private T message; + + public SyncPostCommand(T message) { + this.message = message; + } + + @Override + public void now() { + publish(message); + } + } +} diff --git a/src/test/java/net/engio/mbassy/AllTests.java b/src/test/java/net/engio/mbassy/AllTests.java index 756e141..fe842ea 100644 --- a/src/test/java/net/engio/mbassy/AllTests.java +++ b/src/test/java/net/engio/mbassy/AllTests.java @@ -14,7 +14,9 @@ import org.junit.runners.Suite; @Suite.SuiteClasses({ StrongConcurrentSetTest.class, WeakConcurrentSetTest.class, - MessagePublicationTest.class, + AsynchronousMessageBusTest.class, + SyncBusTest.MBassadorTest.class, + SyncBusTest.SyncMessageBusTest.class, FilterTest.class, MetadataReaderTest.class, ListenerSubscriptionTest.class, diff --git a/src/test/java/net/engio/mbassy/MessagePublicationTest.java b/src/test/java/net/engio/mbassy/AsynchronousMessageBusTest.java similarity index 70% rename from src/test/java/net/engio/mbassy/MessagePublicationTest.java rename to src/test/java/net/engio/mbassy/AsynchronousMessageBusTest.java index aa5b373..f3b7bc5 100644 --- a/src/test/java/net/engio/mbassy/MessagePublicationTest.java +++ b/src/test/java/net/engio/mbassy/AsynchronousMessageBusTest.java @@ -20,7 +20,7 @@ import java.util.concurrent.CopyOnWriteArrayList; * @author bennidi * Date: 2/8/12 */ -public class MessagePublicationTest extends MessageBusTest { +public class AsynchronousMessageBusTest extends MessageBusTest { // this value probably needs to be adjusted depending on the performance of the underlying plattform // otherwise the tests will fail since asynchronous processing might not have finished when @@ -61,59 +61,6 @@ public class MessagePublicationTest extends MessageBusTest { } - @Test - public void testSynchronousMessagePublication() throws Exception { - - MBassador bus = getBus(new BusConfiguration()); - ListenerFactory listenerFactory = new ListenerFactory() - .create(10000, EventingTestBean.class) - .create(10000, EventingTestBean2.class) - .create(10000, EventingTestBean3.class) - .create(10000, Object.class) - .create(10000, NonListeningBean.class); - - List listeners = listenerFactory.build(); - - // this will subscribe the listeners concurrently to the bus - TestUtil.setup(bus, listeners, 10); - - TestMessage message = new TestMessage(); - TestMessage subMessage = new SubTestMessage(); - - bus.publish(message); - bus.publish(subMessage); - - pause(processingTimeInMS); - - assertEquals(30000, message.counter.get()); - assertEquals(70000, subMessage.counter.get()); - - } - - @Test - public void testStrongListenerSubscription() throws Exception { - - MBassador bus = getBus(new BusConfiguration()); - - - for(int i = 0; i< 10000; i++){ - bus.subscribe(new EventingTestBean2()); - } - - runGC(); - - TestMessage message = new TestMessage(); - TestMessage subMessage = new SubTestMessage(); - - bus.publish(message); - bus.publish(subMessage); - - pause(processingTimeInMS); - - assertEquals(10000, message.counter.get()); - assertEquals(20000, subMessage.counter.get()); - - } @Test public void testConcurrentMixedMessagePublication() throws Exception { diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java new file mode 100644 index 0000000..0147965 --- /dev/null +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -0,0 +1,170 @@ +package net.engio.mbassy; + +import net.engio.mbassy.bus.*; +import net.engio.mbassy.common.MessageBusTest; +import net.engio.mbassy.common.TestUtil; +import net.engio.mbassy.dispatch.HandlerInvocation; +import net.engio.mbassy.events.SubTestMessage; +import net.engio.mbassy.events.TestMessage; +import net.engio.mbassy.listener.*; +import net.engio.mbassy.listeners.*; +import net.engio.mbassy.subscription.SubscriptionContext; +import org.junit.Test; + +import java.util.List; + +/** + * Test synchronous and asynchronous dispatch in single and multi-threaded scenario. + * + * @author bennidi + * Date: 2/8/12 + */ +public abstract class SyncBusTest extends MessageBusTest { + + // this value probably needs to be adjusted depending on the performance of the underlying plattform + // otherwise the tests will fail since asynchronous processing might not have finished when + // evaluation is run + private int processingTimeInMS = 4000; + + + @Test + public void testSynchronousMessagePublication() throws Exception { + + ISyncMessageBus bus = getSyncMessageBus(); + ListenerFactory listenerFactory = new ListenerFactory() + .create(10000, MessageListener1.class) + .create(10000, MessageListener2.class) + .create(10000, MessageListener3.class) + .create(10000, Object.class) + .create(10000, NonListeningBean.class); + + List listeners = listenerFactory.build(); + + // this will subscribe the listeners concurrently to the bus + TestUtil.setup(bus, listeners, 10); + + TestMessage message = new TestMessage(); + TestMessage subMessage = new SubTestMessage(); + + bus.post(message).now(); + bus.post(subMessage).now(); + + pause(processingTimeInMS); + + assertEquals(30000, message.counter.get()); + assertEquals(70000, subMessage.counter.get()); + + } + + @Test + public void testStrongListenerSubscription() throws Exception { + + ISyncMessageBus bus = getSyncMessageBus(); + + + for(int i = 0; i< 10000; i++){ + bus.subscribe(new MessageListener2()); + } + + runGC(); + + TestMessage message = new TestMessage(); + TestMessage subMessage = new SubTestMessage(); + + bus.post(message).now(); + bus.post(subMessage).now(); + + pause(processingTimeInMS); + + assertEquals(10000, message.counter.get()); + assertEquals(20000, subMessage.counter.get()); + + } + + protected abstract ISyncMessageBus getSyncMessageBus(); + + + public static class MessageListener1 { + + // every event of type TestEvent or any subtype will be delivered + // to this listener + @Handler + public void handleTestEvent(TestMessage message) { + message.counter.incrementAndGet(); + } + + // this handler will be invoked asynchronously + @Handler(priority = 0, invocation = HandleSubTestEventInvocation.class) + public void handleSubTestEvent(SubTestMessage message) { + message.counter.incrementAndGet(); + } + + // this handler will receive events of type SubTestEvent + // or any subtabe and that passes the given filter + @Handler( + priority = 10, + delivery = Invoke.Synchronously, + filters = {@Filter(Filters.RejectAll.class), @Filter(Filters.AllowAll.class)}) + public void handleFiltered(SubTestMessage message) { + message.counter.incrementAndGet(); + } + + + } + + public static class HandleSubTestEventInvocation extends HandlerInvocation { + + public HandleSubTestEventInvocation(SubscriptionContext context) { + super(context); + } + + @Override + public void invoke(MessageListener1 listener, SubTestMessage message) { + listener.handleSubTestEvent(message); + } + } + + @Listener(references = References.Strong) + public static class MessageListener2 extends net.engio.mbassy.listeners.EventingTestBean { + + // redefine the configuration for this handler + @Handler(delivery = Invoke.Synchronously) + public void handleSubTestEvent(SubTestMessage message) { + super.handleSubTestEvent(message); + } + + } + + @Listener(references = References.Strong) + public static class MessageListener3 extends net.engio.mbassy.listeners.EventingTestBean2 { + + + // this handler will be invoked asynchronously + @Handler(priority = 0, delivery = Invoke.Synchronously) + public void handleSubTestEventAgain(SubTestMessage message) { + message.counter.incrementAndGet(); + } + + } + + + public static class MBassadorTest extends SyncBusTest { + + + @Override + protected ISyncMessageBus getSyncMessageBus() { + return new MBassador(BusConfiguration.Default()); + } + + } + + public static class SyncMessageBusTest extends SyncBusTest { + + + @Override + protected ISyncMessageBus getSyncMessageBus() { + return new SyncMessageBus(new SyncBusConfiguration()); + } + } + +} diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index f4ca632..bdd0f21 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -4,6 +4,7 @@ import junit.framework.Assert; import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.PublicationError; import net.engio.mbassy.bus.BusConfiguration; +import net.engio.mbassy.bus.ISyncMessageBus; import net.engio.mbassy.bus.MBassador; /** @@ -13,9 +14,9 @@ import net.engio.mbassy.bus.MBassador; * @author bennidi * Date: 3/2/13 */ -public class MessageBusTest extends UnitTest { +public class MessageBusTest extends UnitTest { - private static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { + protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { @Override public void handleError(PublicationError error) { Assert.fail(); @@ -27,4 +28,5 @@ public class MessageBusTest extends UnitTest { bus.addErrorHandler(TestFailingHandler); return bus; } + } diff --git a/src/test/java/net/engio/mbassy/common/TestUtil.java b/src/test/java/net/engio/mbassy/common/TestUtil.java index 6a7edc0..c7ff235 100644 --- a/src/test/java/net/engio/mbassy/common/TestUtil.java +++ b/src/test/java/net/engio/mbassy/common/TestUtil.java @@ -1,6 +1,7 @@ package net.engio.mbassy.common; import net.engio.mbassy.bus.IMessageBus; +import net.engio.mbassy.bus.ISyncMessageBus; import java.util.List; @@ -13,7 +14,7 @@ import java.util.List; public class TestUtil { - public static void setup(final IMessageBus bus, final List listeners, int numberOfThreads) { + public static void setup(final ISyncMessageBus bus, final List listeners, int numberOfThreads) { Runnable[] setupUnits = new Runnable[numberOfThreads]; int partitionSize; if(listeners.size() >= numberOfThreads){