From 6100c657187dd654474fe2db2badab7876f919b7 Mon Sep 17 00:00:00 2001 From: bennidi Date: Sun, 26 May 2013 17:38:16 +0200 Subject: [PATCH] fixed #27 #29 #30 #31, higher test coverage, stabilized tests, --- .../bus/AbstractSyncAsyncMessageBus.java | 6 +- .../mbassy/bus/AbstractSyncMessageBus.java | 1 - .../net/engio/mbassy/bus/ISyncMessageBus.java | 26 +- .../net/engio/mbassy/bus/PostCommand.java | 9 + .../engio/mbassy/common/ReflectionUtils.java | 2 +- .../subscription/SubscriptionFactory.java | 15 +- src/test/java/net/engio/mbassy/AllTests.java | 2 +- .../mbassy/AsynchronousMessageBusTest.java | 113 ------- .../net/engio/mbassy/ConcurrentSetTest.java | 22 +- .../java/net/engio/mbassy/FilterTest.java | 4 +- .../java/net/engio/mbassy/MBassadorTest.java | 166 ++++++++++ .../net/engio/mbassy/MetadataReaderTest.java | 5 +- .../engio/mbassy/SubscriptionManagerTest.java | 298 ++++++++++-------- .../java/net/engio/mbassy/SyncBusTest.java | 222 ++++++------- .../engio/mbassy/SynchronizedHandlerTest.java | 43 +-- .../mbassy/bus/ListenerSubscriptionTest.java | 2 +- .../{UnitTest.java => AssertSupport.java} | 8 +- .../mbassy/common/ConcurrentExecutor.java | 10 + .../engio/mbassy/common/MessageBusTest.java | 18 +- .../mbassy/common/SubscriptionValidator.java | 117 +++++++ .../net/engio/mbassy/common/TestUtil.java | 56 +++- .../listeners/AbstractMessageListener.java | 58 ++++ .../listeners/CustomInvocationListener.java | 37 +++ .../listeners/ExceptionThrowingListener.java | 23 ++ .../mbassy/listeners/ICountableListener.java | 58 ++++ .../mbassy/listeners/IMessageListener.java | 58 ++++ .../listeners/IMultipartMessageListener.java | 58 ++++ .../mbassy/listeners/ListenerFactory.java | 76 ++++- .../mbassy/listeners/MessagesListener.java | 58 ++++ .../listeners/MultipartMessageListener.java | 58 ++++ .../listeners/StandardMessageListener.java | 58 ++++ .../mbassy/messages/AbstractMessage.java | 55 ++++ .../mbassy/messages/CountableMessage.java | 9 + .../net/engio/mbassy/messages/ICountable.java | 16 + .../net/engio/mbassy/messages/IMessage.java | 16 + .../mbassy/messages/IMultipartMessage.java | 9 + .../engio/mbassy/messages/ITestMessage.java | 10 - .../engio/mbassy/messages/MessageTypes.java | 62 ++++ .../mbassy/messages/MultipartMessage.java | 9 + .../mbassy/messages/StandardMessage.java | 8 + .../engio/mbassy/messages/TestMessage3.java | 10 - 41 files changed, 1407 insertions(+), 484 deletions(-) create mode 100644 src/main/java/net/engio/mbassy/bus/PostCommand.java delete mode 100644 src/test/java/net/engio/mbassy/AsynchronousMessageBusTest.java create mode 100644 src/test/java/net/engio/mbassy/MBassadorTest.java rename src/test/java/net/engio/mbassy/common/{UnitTest.java => AssertSupport.java} (90%) create mode 100644 src/test/java/net/engio/mbassy/common/SubscriptionValidator.java create mode 100644 src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/ICountableListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/IMessageListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/MessagesListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java create mode 100644 src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java create mode 100644 src/test/java/net/engio/mbassy/messages/AbstractMessage.java create mode 100644 src/test/java/net/engio/mbassy/messages/CountableMessage.java create mode 100644 src/test/java/net/engio/mbassy/messages/ICountable.java create mode 100644 src/test/java/net/engio/mbassy/messages/IMessage.java create mode 100644 src/test/java/net/engio/mbassy/messages/IMultipartMessage.java delete mode 100644 src/test/java/net/engio/mbassy/messages/ITestMessage.java create mode 100644 src/test/java/net/engio/mbassy/messages/MessageTypes.java create mode 100644 src/test/java/net/engio/mbassy/messages/MultipartMessage.java create mode 100644 src/test/java/net/engio/mbassy/messages/StandardMessage.java delete mode 100644 src/test/java/net/engio/mbassy/messages/TestMessage3.java diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java index bb3a3e6..66efa31 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -1,5 +1,7 @@ package net.engio.mbassy.bus; +import net.engio.mbassy.PublicationError; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; @@ -43,6 +45,8 @@ public abstract class AbstractSyncAsyncMessageBus { +public interface ISyncMessageBus extends PubSubSupport{ - /** - * Subscribe all listeners of the given message to receive message publications. - * Any message may only be subscribed once (subsequent subscriptions of an already subscribed - * message will be silently ignored) - * - * @param listener - */ - void subscribe(Object listener); - - /** - * Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers - * have effectively been removed and will not receive any message publications (including asynchronously scheduled - * publications that have been published when the message listener was still subscribed). - *

- * A call to this method passing null, an already unsubscribed listener or any object that does not define any message - * handlers will not have any effect and is silently ignored. - * - * @param listener - * @return true, if the listener was found and successfully removed - * false otherwise - */ - boolean unsubscribe(Object listener); - /** * @param message * @return diff --git a/src/main/java/net/engio/mbassy/bus/PostCommand.java b/src/main/java/net/engio/mbassy/bus/PostCommand.java new file mode 100644 index 0000000..f21463a --- /dev/null +++ b/src/main/java/net/engio/mbassy/bus/PostCommand.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.bus; + +/** + * + * @author bennidi + * Date: 5/25/13 + */ +public interface PostCommand { +} diff --git a/src/main/java/net/engio/mbassy/common/ReflectionUtils.java b/src/main/java/net/engio/mbassy/common/ReflectionUtils.java index a56891a..57c7890 100644 --- a/src/main/java/net/engio/mbassy/common/ReflectionUtils.java +++ b/src/main/java/net/engio/mbassy/common/ReflectionUtils.java @@ -53,8 +53,8 @@ public class ReflectionUtils { collectInterfaces(from, superclasses); while (!from.equals(Object.class) && !from.isInterface()) { superclasses.add(from.getSuperclass()); - from = from.getSuperclass(); + collectInterfaces(from, superclasses); } return superclasses; } diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java index 5d817cd..7ba50d8 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java @@ -8,6 +8,7 @@ import net.engio.mbassy.dispatch.*; import net.engio.mbassy.listener.MessageHandlerMetadata; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; /** @@ -58,12 +59,20 @@ public class SubscriptionFactory { return dispatcher; } - protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws Exception { + protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws MessageBusException { Class invocation = context.getHandlerMetadata().getHandlerInvocation(); if(invocation.isMemberClass() && !Modifier.isStatic(invocation.getModifiers())){ throw new MessageBusException("The handler invocation must be top level class or nested STATIC inner class"); } - Constructor constructor = invocation.getConstructor(SubscriptionContext.class); - return constructor.newInstance(context); + try { + Constructor constructor = invocation.getConstructor(SubscriptionContext.class); + return constructor.newInstance(context); + } catch (NoSuchMethodException e) { + throw new MessageBusException("The provided handler invocation did not specify the necessary constructor " + + invocation.getSimpleName() + "(SubscriptionContext);", e); + } catch (Exception e) { + throw new MessageBusException("Could not instantiate the provided handler invocation " + + invocation.getSimpleName(), e); + } } } diff --git a/src/test/java/net/engio/mbassy/AllTests.java b/src/test/java/net/engio/mbassy/AllTests.java index f517f9f..00c586e 100644 --- a/src/test/java/net/engio/mbassy/AllTests.java +++ b/src/test/java/net/engio/mbassy/AllTests.java @@ -14,7 +14,7 @@ import org.junit.runners.Suite; @Suite.SuiteClasses({ StrongConcurrentSetTest.class, WeakConcurrentSetTest.class, - AsynchronousMessageBusTest.class, + MBassadorTest.class, SyncBusTest.MBassadorTest.class, SyncBusTest.SyncMessageBusTest.class, FilterTest.class, diff --git a/src/test/java/net/engio/mbassy/AsynchronousMessageBusTest.java b/src/test/java/net/engio/mbassy/AsynchronousMessageBusTest.java deleted file mode 100644 index c398f07..0000000 --- a/src/test/java/net/engio/mbassy/AsynchronousMessageBusTest.java +++ /dev/null @@ -1,113 +0,0 @@ -package net.engio.mbassy; - -import net.engio.mbassy.bus.BusConfiguration; -import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.common.ConcurrentExecutor; -import net.engio.mbassy.common.MessageBusTest; -import net.engio.mbassy.common.TestUtil; -import net.engio.mbassy.messages.SubTestMessage; -import net.engio.mbassy.messages.TestMessage; -import net.engio.mbassy.messages.TestMessage2; -import net.engio.mbassy.listeners.*; -import org.junit.Test; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * Test synchronous and asynchronous dispatch in single and multi-threaded scenario. - * - * @author bennidi - * Date: 2/8/12 - */ -public class AsynchronousMessageBusTest extends MessageBusTest { - - // this value probably needs to be adjusted depending on the performance of the underlying plattform - // otherwise the tests will fail since asynchronous processing might not have finished when - // evaluation is run - private int processingTimeInMS = 4000; - - - @Test - public void testAsynchronousMessagePublication() throws Exception { - - MBassador bus = getBus(new BusConfiguration()); - ListenerFactory listenerFactory = new ListenerFactory() - .create(10000, EventingTestBean.class) - .create(10000, EventingTestBean2.class) - .create(10000, EventingTestBean3.class) - .create(10000, Object.class) - .create(10000, NonListeningBean.class) - .create(10000, MultiEventHandler.class); - - List listeners = listenerFactory.build(); - - // this will subscribe the listeners concurrently to the bus - TestUtil.setup(bus, listeners, 10); - - TestMessage message = new TestMessage(); - TestMessage subMessage = new SubTestMessage(); - TestMessage2 message2 = new TestMessage2(); - - bus.publishAsync(message); - bus.publishAsync(subMessage); - bus.publishAsync(message2); - - pause(processingTimeInMS); - - assertEquals(50000, message.counter.get()); - assertEquals(80000, subMessage.counter.get()); - assertEquals(20000, message2.counter.get()); - - } - - - @Test - public void testConcurrentMixedMessagePublication() throws Exception { - final CopyOnWriteArrayList testMessages = new CopyOnWriteArrayList(); - final CopyOnWriteArrayList subtestMessages = new CopyOnWriteArrayList(); - final int eventLoopsPerTHread = 100; - - - final MBassador bus = getBus(new BusConfiguration()); - ListenerFactory listenerFactory = new ListenerFactory() - .create(10000, EventingTestBean.class) - .create(10000, EventingTestBean2.class) - .create(10000, EventingTestBean3.class) - .create(10000, Object.class) - .create(10000, NonListeningBean.class); - - List listeners = listenerFactory.build(); - - // this will subscribe the listeners concurrently to the bus - TestUtil.setup(bus, listeners, 10); - - ConcurrentExecutor.runConcurrent(new Runnable() { - @Override - public void run() { - for (int i = 0; i < eventLoopsPerTHread; i++) { - TestMessage message = new TestMessage(); - SubTestMessage subMessage = new SubTestMessage(); - testMessages.add(message); - subtestMessages.add(subMessage); - - bus.publishAsync(message); - bus.publish(subMessage); - } - } - }, 10); - - pause(processingTimeInMS); - - for (TestMessage message : testMessages) { - assertEquals(30000, message.counter.get()); - } - - for (SubTestMessage message : subtestMessages) { - assertEquals(70000, message.counter.get()); - } - - } - - -} diff --git a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java index 8aa95b4..17d4a89 100644 --- a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java @@ -1,9 +1,9 @@ package net.engio.mbassy; import junit.framework.Assert; +import net.engio.mbassy.common.AssertSupport; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.IConcurrentSet; -import net.engio.mbassy.common.UnitTest; import org.junit.Test; import java.util.*; @@ -20,7 +20,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * @author bennidi * Date: 11/12/12 */ -public abstract class ConcurrentSetTest extends UnitTest { +public abstract class ConcurrentSetTest extends AssertSupport { // Shared state protected final int numberOfElements = 100000; @@ -38,7 +38,7 @@ public abstract class ConcurrentSetTest extends UnitTest { final IConcurrentSet testSetWeak = createSet(); Random rand = new Random(); - // build set of distinct objects and list of duplicates + // getAll set of distinct objects and list of duplicates Object candidate = new Object(); for (int i = 0; i < numberOfElements; i++) { if (rand.nextInt() % 3 == 0) { @@ -72,7 +72,7 @@ public abstract class ConcurrentSetTest extends UnitTest { final HashSet toRemove = new HashSet(); final IConcurrentSet testSetWeak = createSet(); - // build set of distinct objects and mark a subset of those for removal + // getAll set of distinct objects and mark a subset of those for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); @@ -81,7 +81,7 @@ public abstract class ConcurrentSetTest extends UnitTest { } } - // build the test set from the set of candidates + // getAll the test set from the set of candidates ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { @@ -119,7 +119,7 @@ public abstract class ConcurrentSetTest extends UnitTest { final HashSet toRemove = new HashSet(); final IConcurrentSet testSetWeak = createSet(); - // build set of candidates and mark subset for removal + // getAll set of candidates and mark subset for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); @@ -128,7 +128,7 @@ public abstract class ConcurrentSetTest extends UnitTest { } } - // build test set by adding the candidates + // getAll test set by adding the candidates // and subsequently removing those marked for removal ConcurrentExecutor.runConcurrent(new Runnable() { @Override @@ -158,14 +158,14 @@ public abstract class ConcurrentSetTest extends UnitTest { final HashSet source = new HashSet(); final IConcurrentSet testSetWeak = createSet(); - // build set of candidates and mark subset for removal + // getAll set of candidates and mark subset for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); testSetWeak.add(candidate); } - // build test set by adding the candidates + // getAll test set by adding the candidates // and subsequently removing those marked for removal ConcurrentExecutor.runConcurrent(new Runnable() { @Override @@ -190,14 +190,14 @@ public abstract class ConcurrentSetTest extends UnitTest { final HashSet source = new HashSet(); final IConcurrentSet setUnderTest = createSet(); - // build set of candidates and mark subset for removal + // getAll set of candidates and mark subset for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); setUnderTest.add(candidate); } - // build test set by adding the candidates + // getAll test set by adding the candidates // and subsequently removing those marked for removal ConcurrentExecutor.runConcurrent(new Runnable() { @Override diff --git a/src/test/java/net/engio/mbassy/FilterTest.java b/src/test/java/net/engio/mbassy/FilterTest.java index a7b0d5f..43630d8 100644 --- a/src/test/java/net/engio/mbassy/FilterTest.java +++ b/src/test/java/net/engio/mbassy/FilterTest.java @@ -35,7 +35,7 @@ public class FilterTest extends MessageBusTest { ListenerFactory listenerFactory = new ListenerFactory() .create(100, FilteredMessageListener.class); - List listeners = listenerFactory.build(); + List listeners = listenerFactory.getAll(); // this will subscribe the listeners concurrently to the bus TestUtil.setup(bus, listeners, 10); @@ -60,7 +60,7 @@ public class FilterTest extends MessageBusTest { ListenerFactory listenerFactory = new ListenerFactory() .create(100, FilteredMessageListener.class); - List listeners = listenerFactory.build(); + List listeners = listenerFactory.getAll(); // this will subscribe the listeners concurrently to the bus TestUtil.setup(bus, listeners, 10); diff --git a/src/test/java/net/engio/mbassy/MBassadorTest.java b/src/test/java/net/engio/mbassy/MBassadorTest.java new file mode 100644 index 0000000..4ab45cf --- /dev/null +++ b/src/test/java/net/engio/mbassy/MBassadorTest.java @@ -0,0 +1,166 @@ +package net.engio.mbassy; + +import net.engio.mbassy.bus.BusConfiguration; +import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.common.ConcurrentExecutor; +import net.engio.mbassy.common.MessageBusTest; +import net.engio.mbassy.common.TestUtil; +import net.engio.mbassy.listeners.ExceptionThrowingListener; +import net.engio.mbassy.listeners.IMessageListener; +import net.engio.mbassy.listeners.ListenerFactory; +import net.engio.mbassy.listeners.MessagesListener; +import net.engio.mbassy.messages.MessageTypes; +import net.engio.mbassy.messages.MultipartMessage; +import net.engio.mbassy.messages.StandardMessage; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Test synchronous and asynchronous dispatch in single and multi-threaded scenario. + * + * @author bennidi + * Date: 2/8/12 + */ +public class MBassadorTest extends MessageBusTest { + + + @Test + public void testSynchronousMessagePublication() throws Exception { + + final MBassador bus = getBus(new BusConfiguration()); + ListenerFactory listeners = new ListenerFactory() + .create(InstancesPerListener, IMessageListener.DefaultListener.class) + .create(InstancesPerListener, IMessageListener.AsyncListener.class) + .create(InstancesPerListener, IMessageListener.DisabledListener.class) + .create(InstancesPerListener, MessagesListener.DefaultListener.class) + .create(InstancesPerListener, MessagesListener.AsyncListener.class) + .create(InstancesPerListener, MessagesListener.DisabledListener.class) + .create(InstancesPerListener, Object.class); + + + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); + + Runnable publishAndCheck = new Runnable() { + @Override + public void run() { + StandardMessage standardMessage = new StandardMessage(); + MultipartMessage multipartMessage = new MultipartMessage(); + + bus.post(standardMessage).now(); + bus.post(multipartMessage).now(); + bus.post(MessageTypes.Simple).now(); + + pause(processingTimeInMS); + + assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.AsyncListener.class)); + + assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.AsyncListener.class)); + + + } + }; + + ConcurrentExecutor.runConcurrent(publishAndCheck, 1); + + MessageTypes.resetAll(); + ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.AsyncListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.AsyncListener.class)); + } + + @Test + public void testAsynchronousMessagePublication() throws Exception { + + final MBassador bus = getBus(new BusConfiguration()); + ListenerFactory listeners = new ListenerFactory() + .create(InstancesPerListener, IMessageListener.DefaultListener.class) + .create(InstancesPerListener, IMessageListener.AsyncListener.class) + .create(InstancesPerListener, IMessageListener.DisabledListener.class) + .create(InstancesPerListener, MessagesListener.DefaultListener.class) + .create(InstancesPerListener, MessagesListener.AsyncListener.class) + .create(InstancesPerListener, MessagesListener.DisabledListener.class) + .create(InstancesPerListener, Object.class); + + + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); + + Runnable publishAndCheck = new Runnable() { + @Override + public void run() { + StandardMessage standardMessage = new StandardMessage(); + MultipartMessage multipartMessage = new MultipartMessage(); + + bus.post(standardMessage).asynchronously(); + bus.post(multipartMessage).asynchronously(); + bus.post(MessageTypes.Simple).asynchronously(); + + pause(processingTimeInMS); + + assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.AsyncListener.class)); + + assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.AsyncListener.class)); + + + } + }; + + ConcurrentExecutor.runConcurrent(publishAndCheck, 1); + + MessageTypes.resetAll(); + ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.AsyncListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.AsyncListener.class)); + } + + + @Test + public void testExceptionInHandlerInvocation(){ + final AtomicInteger exceptionCount = new AtomicInteger(0); + IPublicationErrorHandler ExceptionCounter = new IPublicationErrorHandler() { + @Override + public void handleError(PublicationError error) { + exceptionCount.incrementAndGet(); + } + }; + + final MBassador bus = new MBassador(new BusConfiguration()); + bus.addErrorHandler(ExceptionCounter); + ListenerFactory listeners = new ListenerFactory() + .create(InstancesPerListener, ExceptionThrowingListener.class); + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); + + Runnable publishAndCheck = new Runnable() { + @Override + public void run() { + bus.post(new StandardMessage()).asynchronously(); + + } + }; + + // single threaded + ConcurrentExecutor.runConcurrent(publishAndCheck, 1); + pause(processingTimeInMS); + assertEquals(InstancesPerListener, exceptionCount.get()); + + + // multi threaded + exceptionCount.set(0); + ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits); + pause(processingTimeInMS); + assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); + + } + + + + +} diff --git a/src/test/java/net/engio/mbassy/MetadataReaderTest.java b/src/test/java/net/engio/mbassy/MetadataReaderTest.java index 88d7f76..e86d188 100644 --- a/src/test/java/net/engio/mbassy/MetadataReaderTest.java +++ b/src/test/java/net/engio/mbassy/MetadataReaderTest.java @@ -1,7 +1,7 @@ package net.engio.mbassy; +import net.engio.mbassy.common.AssertSupport; import org.junit.Test; -import net.engio.mbassy.common.UnitTest; import net.engio.mbassy.listener.Enveloped; import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listener.MessageListenerMetadata; @@ -16,12 +16,11 @@ import java.util.Map; import static net.engio.mbassy.listener.MessageListenerMetadata.ForMessage; /** - * Todo: Add javadoc * * @author bennidi * Date: 12/16/12 */ -public class MetadataReaderTest extends UnitTest { +public class MetadataReaderTest extends AssertSupport { private MetadataReader reader = new MetadataReader(); diff --git a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java index dbf964c..205e4d3 100644 --- a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java +++ b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java @@ -1,167 +1,209 @@ package net.engio.mbassy; +import net.engio.mbassy.common.AssertSupport; import net.engio.mbassy.common.ConcurrentExecutor; -import net.engio.mbassy.common.IPredicate; -import net.engio.mbassy.common.UnitTest; -import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.common.SubscriptionValidator; +import net.engio.mbassy.common.TestUtil; import net.engio.mbassy.listener.MetadataReader; -import net.engio.mbassy.messages.ITestMessage; -import net.engio.mbassy.messages.TestMessage; +import net.engio.mbassy.listeners.*; +import net.engio.mbassy.messages.*; import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.SubscriptionFactory; import net.engio.mbassy.subscription.SubscriptionManager; import org.junit.Test; -import java.util.*; +import java.util.Collection; /** - * Todo: Add javadoc - * * @author bennidi * Date: 5/12/13 */ -public class SubscriptionManagerTest extends UnitTest{ +public class SubscriptionManagerTest extends AssertSupport { + private static final int InstancesPerListener = 5000; + private static final int ConcurrentUnits = 10; @Test - public void testSimpleSynchronousHandler(){ - final SubscriptionManager subMan = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory()); - final Set listeners = Collections.synchronizedSet(new HashSet()); - final int concurrentUnits = 5; - final int numberOfLoops = 100; - final int numberOfListeners = numberOfLoops * concurrentUnits; + public void testIMessageListener(){ + ListenerFactory listeners = listeners( + IMessageListener.DefaultListener.class, + IMessageListener.AsyncListener.class, + IMessageListener.DisabledListener.class, + IMessageListener.NoSubtypesListener.class); - ConcurrentExecutor.runConcurrent(new Runnable() { - @Override - public void run() { - for(int i = 0 ; i < numberOfLoops ; i++){ - SimpleSynchronousMessageHandler - listener1 = new SimpleSynchronousMessageHandler(); - SimpleSynchronousMessageHandler2 listener2 = new SimpleSynchronousMessageHandler2(); - subMan.subscribe(listener1); - subMan.subscribe(listener2); - listeners.add(listener1); - listeners.add(listener2); - } + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(IMessageListener.DefaultListener.class).handles(IMessage.class, + AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class) + .listener(IMessageListener.AsyncListener.class).handles(IMessage.class, + AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class) + .listener(IMessageListener.NoSubtypesListener.class).handles(IMessage.class); - } - }, concurrentUnits); + runTestWith(listeners, expectedSubscriptions); + } - SubscriptionValidator validator = new SubscriptionValidator(); - validator.expect(numberOfListeners, SimpleSynchronousMessageHandler.class, ITestMessage.class); - validator.expect(numberOfListeners, SimpleSynchronousMessageHandler2.class, ITestMessage.class); - validator.expect(numberOfListeners, SimpleSynchronousMessageHandler.class, TestMessage.class); - validator.expect(numberOfListeners, SimpleSynchronousMessageHandler2.class, TestMessage.class); + @Test + public void testAbstractMessageListener(){ + ListenerFactory listeners = listeners( + AbstractMessageListener.DefaultListener.class, + AbstractMessageListener.AsyncListener.class, + AbstractMessageListener.DisabledListener.class, + AbstractMessageListener.NoSubtypesListener.class); - validator.validate(subMan); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class) + .listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class) + .listener(AbstractMessageListener.AsyncListener.class).handles(StandardMessage.class, AbstractMessage.class); + runTestWith(listeners, expectedSubscriptions); + } + + @Test + public void testMessagesListener(){ + ListenerFactory listeners = listeners( + MessagesListener.DefaultListener.class, + MessagesListener.AsyncListener.class, + MessagesListener.DisabledListener.class, + MessagesListener.NoSubtypesListener.class); + + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class) + .listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class) + .listener(MessagesListener.AsyncListener.class).handles(MessageTypes.class); + + runTestWith(listeners, expectedSubscriptions); + } + + @Test + public void testMultipartMessageListener(){ + ListenerFactory listeners = listeners( + MultipartMessageListener.DefaultListener.class, + MultipartMessageListener.AsyncListener.class, + MultipartMessageListener.DisabledListener.class, + MultipartMessageListener.NoSubtypesListener.class); + + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class) + .listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class) + .listener(MultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class); + + runTestWith(listeners, expectedSubscriptions); + } + + @Test + public void testIMultipartMessageListener(){ + ListenerFactory listeners = listeners( + IMultipartMessageListener.DefaultListener.class, + IMultipartMessageListener.AsyncListener.class, + IMultipartMessageListener.DisabledListener.class, + IMultipartMessageListener.NoSubtypesListener.class); + + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class) + .listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class) + .listener(IMultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class); + + runTestWith(listeners, expectedSubscriptions); + } + + @Test + public void testStandardMessageListener(){ + ListenerFactory listeners = listeners( + StandardMessageListener.DefaultListener.class, + StandardMessageListener.AsyncListener.class, + StandardMessageListener.DisabledListener.class, + StandardMessageListener.NoSubtypesListener.class); + + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class) + .listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class) + .listener(StandardMessageListener.AsyncListener.class).handles(StandardMessage.class); + + runTestWith(listeners, expectedSubscriptions); + } + + @Test + public void testICountableListener(){ + ListenerFactory listeners = listeners( + ICountableListener.DefaultListener.class, + ICountableListener.AsyncListener.class, + ICountableListener.DisabledListener.class, + ICountableListener.NoSubtypesListener.class); + + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(ICountableListener.DefaultListener.class).handles(ICountable.class) + .listener(ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class) + .listener(ICountableListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class); + + runTestWith(listeners, expectedSubscriptions); + } + + @Test + public void testMultipleMessageListeners(){ + ListenerFactory listeners = listeners( + ICountableListener.DefaultListener.class, + ICountableListener.AsyncListener.class, + ICountableListener.DisabledListener.class, + IMultipartMessageListener.DefaultListener.class, + IMultipartMessageListener.AsyncListener.class, + IMultipartMessageListener.DisabledListener.class, + MessagesListener.DefaultListener.class, + MessagesListener.AsyncListener.class, + MessagesListener.DisabledListener.class); + + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) + .listener(ICountableListener.DefaultListener.class) + .handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class) + .listener(ICountableListener.AsyncListener.class) + .handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class) + .listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class) + .listener(IMultipartMessageListener.AsyncListener.class).handles(MultipartMessage.class, IMultipartMessage.class) + .listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class) + .listener(MessagesListener.AsyncListener.class).handles(MessageTypes.class); + + runTestWith(listeners, expectedSubscriptions); + } + + @Test + public void testStrongListenerSubscription() throws Exception { + ListenerFactory listeners = listeners(CustomInvocationListener.class); + SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory()); + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); + + + listeners.clear(); + runGC(); + + Collection subscriptions = subscriptionManager.getSubscriptionsByMessageType(StandardMessage.class); + assertEquals(1, subscriptions.size()); + for(Subscription sub : subscriptions) + assertEquals(InstancesPerListener, sub.size()); } - class SubscriptionValidator{ - - - private List validations = new LinkedList(); - private Set messageTypes = new HashSet(); - private Set subsribers = new HashSet(); - - - public SubscriptionValidator expect(int numberOfSubscriber, Class subscriber, Class messageType){ - validations.add(new Entry(messageType, numberOfSubscriber, subscriber)); - messageTypes.add(messageType); - subsribers.add(subscriber); - return this; + private ListenerFactory listeners(Class ...listeners){ + ListenerFactory factory = new ListenerFactory(); + for(Class listener : listeners){ + factory.create(InstancesPerListener, listener); } + return factory; + } - public void validate(SubscriptionManager manager){ - for(Class messageType : messageTypes){ - Collection subscriptions = manager.getSubscriptionsByMessageType(messageType); - Collection validationEntries = getEntries(EntriesByMessageType(messageType)); - assertEquals(subscriptions.size(), validationEntries.size()); - for(Entry validationEntry : validationEntries){ - Subscription matchingSub = null; - // one of the subscriptions must belong to the subscriber type - for(Subscription sub : subscriptions){ - if(sub.belongsTo(validationEntry.subscriber)){ - matchingSub = sub; - break; - } - } - assertNotNull(matchingSub); - assertEquals(validationEntry.numberOfSubscribers, matchingSub.size()); - } - } - } + private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){ + final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory()); + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); - private Collection getEntries(IPredicate filter){ - Collection matching = new LinkedList(); - for (Entry validationEntry : validations){ - if(filter.apply(validationEntry))matching.add(validationEntry); - } - return matching; - } + validator.validate(subscriptionManager); - private IPredicate EntriesByMessageType(final Class messageType){ - return new IPredicate() { - @Override - public boolean apply(Entry target) { - return target.messageType.equals(messageType); - } - }; - } + ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), ConcurrentUnits); - private IPredicate EntriesBySubscriberType(final Class subscriberType){ - return new IPredicate() { - @Override - public boolean apply(Entry target) { - return target.subscriber.equals(subscriberType); - } - }; - } - - - - private class Entry{ - - private int numberOfSubscribers; - - private Class subscriber; - - private Class messageType; - - private Entry(Class messageType, int numberOfSubscribers, Class subscriber) { - this.messageType = messageType; - this.numberOfSubscribers = numberOfSubscribers; - this.subscriber = subscriber; - } - - - } + listeners.clear(); + validator.validate(subscriptionManager); } - static class SimpleSynchronousMessageHandler{ - - @Handler - public void handle(TestMessage message) { - } - - @Handler - public void handle(ITestMessage message) { - } - } - static class SimpleSynchronousMessageHandler2{ - @Handler - public void handle(TestMessage message) { - } - - @Handler - public void handle(ITestMessage message) { - } - } } diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java index d0c0901..28df663 100644 --- a/src/test/java/net/engio/mbassy/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -1,21 +1,16 @@ package net.engio.mbassy; import net.engio.mbassy.bus.*; -import net.engio.mbassy.common.DeadMessage; +import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.TestUtil; -import net.engio.mbassy.dispatch.HandlerInvocation; -import net.engio.mbassy.messages.ITestMessage; -import net.engio.mbassy.messages.SubTestMessage; -import net.engio.mbassy.messages.TestMessage; -import net.engio.mbassy.listener.*; import net.engio.mbassy.listeners.*; -import net.engio.mbassy.messages.TestMessage3; -import net.engio.mbassy.subscription.SubscriptionContext; -import org.junit.Assert; +import net.engio.mbassy.messages.MessageTypes; +import net.engio.mbassy.messages.MultipartMessage; +import net.engio.mbassy.messages.StandardMessage; import org.junit.Test; -import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * Test synchronous and asynchronous dispatch in single and multi-threaded scenario. @@ -25,152 +20,117 @@ import java.util.List; */ public abstract class SyncBusTest extends MessageBusTest { - // this value probably needs to be adjusted depending on the performance of the underlying plattform - // otherwise the tests will fail since asynchronous processing might not have finished when - // evaluation is run - private int processingTimeInMS = 4000; + protected abstract ISyncMessageBus getSyncMessageBus(); @Test public void testSynchronousMessagePublication() throws Exception { - ISyncMessageBus bus = getSyncMessageBus(); - ListenerFactory listenerFactory = new ListenerFactory() - .create(10000, MessageListener1.class) - .create(10000, MessageListener2.class) - .create(10000, MessageListener3.class) - .create(10000, Object.class) - .create(10000, NonListeningBean.class); + final ISyncMessageBus bus = getSyncMessageBus(); + ListenerFactory listeners = new ListenerFactory() + .create(InstancesPerListener, IMessageListener.DefaultListener.class) + .create(InstancesPerListener, IMessageListener.DisabledListener.class) + .create(InstancesPerListener, MessagesListener.DefaultListener.class) + .create(InstancesPerListener, MessagesListener.DisabledListener.class) + .create(InstancesPerListener, Object.class); - List listeners = listenerFactory.build(); - // this will subscribe the listeners concurrently to the bus - TestUtil.setup(bus, listeners, 10); + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); - TestMessage message = new TestMessage(); - TestMessage subMessage = new SubTestMessage(); + Runnable publishAndCheck = new Runnable() { + @Override + public void run() { + StandardMessage standardMessage = new StandardMessage(); + MultipartMessage multipartMessage = new MultipartMessage(); - bus.post(message).now(); - bus.post(subMessage).now(); + bus.post(standardMessage).now(); + bus.post(multipartMessage).now(); + bus.post(MessageTypes.Simple).now(); + bus.post(MessageTypes.Multipart).now(); - pause(processingTimeInMS); + assertEquals(InstancesPerListener, standardMessage.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener, multipartMessage.getTimesHandled(IMessageListener.DefaultListener.class)); + } + }; - assertEquals(30000, message.counter.get()); - assertEquals(70000, subMessage.counter.get()); + // single threaded + ConcurrentExecutor.runConcurrent(publishAndCheck, 1); + // multi threaded + MessageTypes.resetAll(); + ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(IMessageListener.DefaultListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class)); + assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(MessagesListener.DefaultListener.class)); } - @Test - public void testStrongListenerSubscription() throws Exception { - - ISyncMessageBus bus = getSyncMessageBus(); - - for(int i = 0; i< 10000; i++){ - bus.subscribe(new MessageListener2()); - } - - runGC(); - - TestMessage message = new TestMessage(); - TestMessage subMessage = new SubTestMessage(); - - bus.post(message).now(); - bus.post(subMessage).now(); - - pause(processingTimeInMS); - - assertEquals(10000, message.counter.get()); - assertEquals(20000, subMessage.counter.get()); - - } - - protected abstract ISyncMessageBus getSyncMessageBus(); - - @Test - public void testHandlerUsingInterface() { - MBassador bus = new MBassador(BusConfiguration.Default()); - bus.subscribe(new InterfaceMessageListener()); - bus.publish(new TestMessage3()); + public void testExceptionInHandlerInvocation(){ + final AtomicInteger exceptionCount = new AtomicInteger(0); + IPublicationErrorHandler ExceptionCounter = new IPublicationErrorHandler() { + @Override + public void handleError(PublicationError error) { + exceptionCount.incrementAndGet(); + } + }; + + final ISyncMessageBus bus = getSyncMessageBus(); + bus.addErrorHandler(ExceptionCounter); + ListenerFactory listeners = new ListenerFactory() + .create(InstancesPerListener, ExceptionThrowingListener.class); + + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); + + Runnable publish = new Runnable() { + @Override + public void run() { + bus.post(new StandardMessage()).now(); + } + }; + + // single threaded + ConcurrentExecutor.runConcurrent(publish, 1); + + exceptionCount.set(0); + + // multi threaded + ConcurrentExecutor.runConcurrent(publish, ConcurrentUnits); + assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); } - @Listener(references = References.Strong) - static class InterfaceMessageListener{ - - @Handler - public void handleFoo(ITestMessage f) { - Assert.assertTrue(f instanceof TestMessage3); - } - - @Handler - public void handleDead(DeadMessage d) { - Assert.fail("This class should handle this message appropriately!"); - } - - } + @Test + public void testCustomHandlerInvocation(){ + final ISyncMessageBus bus = getSyncMessageBus(); + ListenerFactory listeners = new ListenerFactory() + .create(InstancesPerListener, CustomInvocationListener.class) + .create(InstancesPerListener, Object.class); - public static class MessageListener1 { + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); - // every event of type TestEvent or any subtype will be delivered - // to this listener - @Handler - public void handleTestEvent(TestMessage message) { - message.counter.incrementAndGet(); - } + Runnable publishAndCheck = new Runnable() { + @Override + public void run() { + StandardMessage standardMessage = new StandardMessage(); + MultipartMessage multipartMessage = new MultipartMessage(); - // this handler will be invoked asynchronously - @Handler(priority = 0, invocation = HandleSubTestEventInvocation.class) - public void handleSubTestEvent(SubTestMessage message) { - message.counter.incrementAndGet(); - } + bus.post(standardMessage).now(); + bus.post(multipartMessage).now(); + bus.post(MessageTypes.Simple).now(); - // this handler will receive events of type SubTestEvent - // or any subtabe and that passes the given filter - @Handler( - priority = 10, - delivery = Invoke.Synchronously, - filters = {@Filter(Filters.RejectAll.class), @Filter(Filters.AllowAll.class)}) - public void handleFiltered(SubTestMessage message) { - message.counter.incrementAndGet(); - } + assertEquals(InstancesPerListener * 2, standardMessage.getTimesHandled(CustomInvocationListener.class)); + assertEquals(0, multipartMessage.getTimesHandled(CustomInvocationListener.class)); + assertEquals(0, MessageTypes.Simple.getTimesHandled(CustomInvocationListener.class)); + } + }; + // single threaded + ConcurrentExecutor.runConcurrent(publishAndCheck, 1); - } - - public static class HandleSubTestEventInvocation extends HandlerInvocation { - - public HandleSubTestEventInvocation(SubscriptionContext context) { - super(context); - } - - @Override - public void invoke(MessageListener1 listener, SubTestMessage message) { - listener.handleSubTestEvent(message); - } - } - - @Listener(references = References.Strong) - public static class MessageListener2 extends net.engio.mbassy.listeners.EventingTestBean { - - // redefine the configuration for this handler - @Handler(delivery = Invoke.Synchronously) - public void handleSubTestEvent(SubTestMessage message) { - super.handleSubTestEvent(message); - } - - } - - @Listener(references = References.Strong) - public static class MessageListener3 extends net.engio.mbassy.listeners.EventingTestBean2 { - - - // this handler will be invoked asynchronously - @Handler(priority = 0, delivery = Invoke.Synchronously) - public void handleSubTestEventAgain(SubTestMessage message) { - message.counter.incrementAndGet(); - } + // multi threaded + ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits); } diff --git a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java index d015b88..c6c97e1 100644 --- a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java +++ b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java @@ -21,17 +21,17 @@ import java.util.List; public class SynchronizedHandlerTest extends MessageBusTest { - private static int incrementsPerHandler = 10000; + private static int incrementsPerMessage = 10000; private static int numberOfMessages = 1000; - private static int numberOfHandlers = 1000; + private static int numberOfListeners = 1000; @Test public void testSynchronizedWithSynchronousInvocation(){ - List handlers = new LinkedList(); + List handlers = new LinkedList(); IMessageBus bus = getBus(BusConfiguration.Default() .setNumberOfMessageDispatchers(6)); - for(int i = 0; i < numberOfHandlers; i++){ - SynchronizedMessageHandlerSync handler = new SynchronizedMessageHandlerSync(); + for(int i = 0; i < numberOfListeners; i++){ + SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery(); handlers.add(handler); bus.subscribe(handler); } @@ -40,23 +40,24 @@ public class SynchronizedHandlerTest extends MessageBusTest { for(int i = 0; i < numberOfMessages; i++){ publication = bus.post(new Object()).asynchronously(); } + // wait for last publication while (!publication.isFinished()){ - pause(2000); + pause(100); } - for(SynchronizedMessageHandlerSync handler : handlers){ - assertEquals(incrementsPerHandler * numberOfMessages, handler.Counter); + for(SynchronizedWithSynchronousDelivery handler : handlers){ + assertEquals(incrementsPerMessage * numberOfMessages, handler.counter); } } @Test public void testSynchronizedWithAsSynchronousInvocation(){ - List handlers = new LinkedList(); + List handlers = new LinkedList(); IMessageBus bus = getBus(BusConfiguration.Default() .setNumberOfMessageDispatchers(6)); - for(int i = 0; i < numberOfHandlers; i++){ - SynchronizedMessageHandlerAsyn handler = new SynchronizedMessageHandlerAsyn(); + for(int i = 0; i < numberOfListeners; i++){ + SynchronizedWithAsynchronousDelivery handler = new SynchronizedWithAsynchronousDelivery(); handlers.add(handler); bus.subscribe(handler); } @@ -67,35 +68,35 @@ public class SynchronizedHandlerTest extends MessageBusTest { pause(10000); - for(SynchronizedMessageHandlerAsyn handler : handlers){ - assertEquals(incrementsPerHandler * numberOfMessages, handler.Counter); + for(SynchronizedWithAsynchronousDelivery handler : handlers){ + assertEquals(incrementsPerMessage * numberOfMessages, handler.counter); } } - public static class SynchronizedMessageHandlerSync{ + public static class SynchronizedWithSynchronousDelivery { - private int Counter = 0; + private int counter = 0; @Handler @Synchronized public void handleMessage(Object o){ - for(int i = 0; i < incrementsPerHandler; i++){ - Counter++; + for(int i = 0; i < incrementsPerMessage; i++){ + counter++; } } } - public static class SynchronizedMessageHandlerAsyn{ + public static class SynchronizedWithAsynchronousDelivery { - private int Counter = 0; + private int counter = 0; @Handler(delivery = Invoke.Asynchronously) @Synchronized public void handleMessage(Object o){ - for(int i = 0; i < incrementsPerHandler; i++){ - Counter++; + for(int i = 0; i < incrementsPerMessage; i++){ + counter++; } } diff --git a/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java b/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java index ad5327f..0c2d080 100644 --- a/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java +++ b/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java @@ -90,7 +90,7 @@ public class ListenerSubscriptionTest extends MessageBusTest{ .create(10000, Object.class) .create(10000, NonListeningBean.class); - List listeners = listenerFactory.build(); + List listeners = listenerFactory.getAll(); // this will subscribe the listeners concurrently to the bus TestUtil.setup(bus, listeners, 10); diff --git a/src/test/java/net/engio/mbassy/common/UnitTest.java b/src/test/java/net/engio/mbassy/common/AssertSupport.java similarity index 90% rename from src/test/java/net/engio/mbassy/common/UnitTest.java rename to src/test/java/net/engio/mbassy/common/AssertSupport.java index cf3a146..aed8ae0 100644 --- a/src/test/java/net/engio/mbassy/common/UnitTest.java +++ b/src/test/java/net/engio/mbassy/common/AssertSupport.java @@ -5,13 +5,9 @@ import org.junit.Assert; import java.lang.ref.WeakReference; /** - * Created with IntelliJ IDEA. * @author bennidi - * Date: 11/12/12 - * Time: 3:16 PM - * To change this template use File | Settings | File Templates. */ -public class UnitTest { +public class AssertSupport { // Internal state private Runtime runtime = Runtime.getRuntime(); @@ -31,7 +27,9 @@ public class UnitTest { public void runGC() { WeakReference ref = new WeakReference(new Object()); + pause(100); while(ref.get() != null) { + pause(10); runtime.gc(); } } diff --git a/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java b/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java index be248bb..b887751 100644 --- a/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java +++ b/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java @@ -43,6 +43,7 @@ public class ConcurrentExecutor { returnValues.add(executor.submit(wrapper)); } + // wait until all tasks have been executed try { executor.shutdown();// tells the thread pool to execute all waiting tasks @@ -51,6 +52,15 @@ public class ConcurrentExecutor { // unlikely that this will happen e.printStackTrace(); } + + for(Future task : returnValues){ + try { + task.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index bdd0f21..3a56ea2 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -6,6 +6,8 @@ import net.engio.mbassy.PublicationError; import net.engio.mbassy.bus.BusConfiguration; import net.engio.mbassy.bus.ISyncMessageBus; import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.messages.MessageTypes; +import org.junit.Before; /** * A base test that provides a factory for message bus that makes tests fail if any @@ -14,7 +16,14 @@ import net.engio.mbassy.bus.MBassador; * @author bennidi * Date: 3/2/13 */ -public class MessageBusTest extends UnitTest { +public abstract class MessageBusTest extends AssertSupport { + + // this value probably needs to be adjusted depending on the performance of the underlying plattform + // otherwise the tests will fail since asynchronous processing might not have finished when + // evaluation is run + protected static final int processingTimeInMS = 6000; + protected static final int InstancesPerListener = 5000; + protected static final int ConcurrentUnits = 10; protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { @Override @@ -23,6 +32,13 @@ public class MessageBusTest extends UnitTest { } }; + + @Before + public void setUp(){ + for(MessageTypes mes : MessageTypes.values()) + mes.reset(); + } + public MBassador getBus(BusConfiguration configuration) { MBassador bus = new MBassador(configuration); bus.addErrorHandler(TestFailingHandler); diff --git a/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java b/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java new file mode 100644 index 0000000..16b4968 --- /dev/null +++ b/src/test/java/net/engio/mbassy/common/SubscriptionValidator.java @@ -0,0 +1,117 @@ +package net.engio.mbassy.common; + +import net.engio.mbassy.listeners.ListenerFactory; +import net.engio.mbassy.subscription.Subscription; +import net.engio.mbassy.subscription.SubscriptionManager; + +import java.util.*; + +/** +* Todo: Add javadoc +* +* @author bennidi +* Date: 5/25/13 +*/ +public class SubscriptionValidator extends AssertSupport{ + + + private List validations = new LinkedList(); + private Set messageTypes = new HashSet(); + private Set subscribers = new HashSet(); + private ListenerFactory subscribedListener; + + public SubscriptionValidator(ListenerFactory subscribedListener) { + this.subscribedListener = subscribedListener; + } + + public Expectation listener(Class subscriber){ + return new Expectation(subscriber); + } + + private SubscriptionValidator expect(Class subscriber, Class messageType){ + validations.add(new ValidationEntry(messageType, subscriber)); + messageTypes.add(messageType); + subscribers.add(subscriber); + return this; + } + + // match subscriptions with existing validation entries + // for each tuple of subscriber and message type the specified number of listeners must exist + public void validate(SubscriptionManager manager){ + for(Class messageType : messageTypes){ + Collection subscriptions = manager.getSubscriptionsByMessageType(messageType); + Collection validationEntries = getEntries(EntriesByMessageType(messageType)); + assertEquals(subscriptions.size(), validationEntries.size()); + for(ValidationEntry validationValidationEntry : validationEntries){ + Subscription matchingSub = null; + // one of the subscriptions must belong to the subscriber type + for(Subscription sub : subscriptions){ + if(sub.belongsTo(validationValidationEntry.subscriber)){ + matchingSub = sub; + break; + } + } + assertNotNull(matchingSub); + assertEquals(subscribedListener.getNumberOfListeners(validationValidationEntry.subscriber), matchingSub.size()); + } + } + } + + + private Collection getEntries(IPredicate filter){ + Collection matching = new LinkedList(); + for (ValidationEntry validationValidationEntry : validations){ + if(filter.apply(validationValidationEntry))matching.add(validationValidationEntry); + } + return matching; + } + + private IPredicate EntriesByMessageType(final Class messageType){ + return new IPredicate() { + @Override + public boolean apply(ValidationEntry target) { + return target.messageType.equals(messageType); + } + }; + } + + private IPredicate EntriesBySubscriberType(final Class subscriberType){ + return new IPredicate() { + @Override + public boolean apply(ValidationEntry target) { + return target.subscriber.equals(subscriberType); + } + }; + } + + public class Expectation{ + + private Class listener; + + private Expectation(Class listener) { + this.listener = listener; + } + + public SubscriptionValidator handles(Class ...messages){ + for(Class message : messages) + expect(listener, message); + return SubscriptionValidator.this; + } + } + + private class ValidationEntry { + + + private Class subscriber; + + private Class messageType; + + private ValidationEntry(Class messageType, Class subscriber) { + this.messageType = messageType; + this.subscriber = subscriber; + } + + + } + +} diff --git a/src/test/java/net/engio/mbassy/common/TestUtil.java b/src/test/java/net/engio/mbassy/common/TestUtil.java index c7ff235..5a1e3b8 100644 --- a/src/test/java/net/engio/mbassy/common/TestUtil.java +++ b/src/test/java/net/engio/mbassy/common/TestUtil.java @@ -1,8 +1,10 @@ package net.engio.mbassy.common; -import net.engio.mbassy.bus.IMessageBus; import net.engio.mbassy.bus.ISyncMessageBus; +import net.engio.mbassy.listeners.ListenerFactory; +import net.engio.mbassy.subscription.SubscriptionManager; +import java.util.Iterator; import java.util.List; /** @@ -14,6 +16,58 @@ import java.util.List; public class TestUtil { + public static Runnable subscriber(final SubscriptionManager manager, final ListenerFactory listeners){ + final Iterator source = listeners.iterator(); + return new Runnable() { + @Override + public void run() { + Object next; + while((next = source.next()) != null){ + manager.subscribe(next); + } + } + }; + } + + public static Runnable unsubscriber(final SubscriptionManager manager, final ListenerFactory listeners){ + final Iterator source = listeners.iterator(); + return new Runnable() { + @Override + public void run() { + Object next; + while((next = source.next()) != null){ + manager.unsubscribe(next); + } + } + }; + } + + public static Runnable subscriber(final ISyncMessageBus bus, final ListenerFactory listeners){ + final Iterator source = listeners.iterator(); + return new Runnable() { + @Override + public void run() { + Object next; + while((next = source.next()) != null){ + bus.subscribe(next); + } + } + }; + } + + public static Runnable unsubscriber(final ISyncMessageBus bus, final ListenerFactory listeners){ + final Iterator source = listeners.iterator(); + return new Runnable() { + @Override + public void run() { + Object next; + while((next = source.next()) != null){ + bus.unsubscribe(next); + } + } + }; + } + public static void setup(final ISyncMessageBus bus, final List listeners, int numberOfThreads) { Runnable[] setupUnits = new Runnable[numberOfThreads]; int partitionSize; diff --git a/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java b/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java new file mode 100644 index 0000000..33823d5 --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/AbstractMessageListener.java @@ -0,0 +1,58 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.messages.AbstractMessage; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class AbstractMessageListener { + + private static abstract class BaseListener { + + @Handler + public void handle(AbstractMessage message){ + message.handled(this.getClass()); + } + + } + + public static class DefaultListener extends BaseListener { + + public void handle(AbstractMessage message){ + super.handle(message); + } + } + + public static class NoSubtypesListener extends BaseListener { + + @Handler(rejectSubtypes = true) + public void handle(AbstractMessage message){ + super.handle(message); + } + } + + + public static class AsyncListener extends BaseListener { + + @Handler(delivery = Invoke.Asynchronously) + public void handle(AbstractMessage message){ + super.handle(message); + } + + } + + public static class DisabledListener extends BaseListener { + + @Handler(enabled = false) + public void handle(AbstractMessage message){ + super.handle(message); + } + + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java b/src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java new file mode 100644 index 0000000..c62a6d5 --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/CustomInvocationListener.java @@ -0,0 +1,37 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.dispatch.HandlerInvocation; +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Listener; +import net.engio.mbassy.listener.References; +import net.engio.mbassy.messages.StandardMessage; +import net.engio.mbassy.subscription.SubscriptionContext; + +/** + * @author bennidi + * Date: 5/25/13 + */ +@Listener(references = References.Strong) +public class CustomInvocationListener { + + + // this handler will be invoked asynchronously + @Handler(invocation = HandleSubTestEventInvocation.class) + public void handle(StandardMessage message) { + message.handled(this.getClass()); + message.handled(this.getClass()); + } + + public static class HandleSubTestEventInvocation extends HandlerInvocation { + + public HandleSubTestEventInvocation(SubscriptionContext context) { + super(context); + } + + @Override + public void invoke(CustomInvocationListener listener, StandardMessage message) { + listener.handle(message); + } + } + +} diff --git a/src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java b/src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java new file mode 100644 index 0000000..1d92d4d --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/ExceptionThrowingListener.java @@ -0,0 +1,23 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Listener; +import net.engio.mbassy.listener.References; +import net.engio.mbassy.messages.StandardMessage; + +/** + * @author bennidi + * Date: 5/25/13 + */ +@Listener(references = References.Strong) +public class ExceptionThrowingListener { + + + // this handler will be invoked asynchronously + @Handler() + public void handle(StandardMessage message) { + throw new RuntimeException("This is an expected exception"); + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/ICountableListener.java b/src/test/java/net/engio/mbassy/listeners/ICountableListener.java new file mode 100644 index 0000000..f2e242e --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/ICountableListener.java @@ -0,0 +1,58 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.messages.ICountable; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class ICountableListener { + + private static abstract class BaseListener { + + @Handler + public void handle(ICountable message){ + message.handled(this.getClass()); + } + + } + + public static class DefaultListener extends BaseListener { + + public void handle(ICountable message){ + super.handle(message); + } + } + + public static class NoSubtypesListener extends BaseListener { + + @Handler(rejectSubtypes = true) + public void handle(ICountable message){ + super.handle(message); + } + } + + + public static class AsyncListener extends BaseListener { + + @Handler(delivery = Invoke.Asynchronously) + public void handle(ICountable message){ + super.handle(message); + } + + } + + public static class DisabledListener extends BaseListener { + + @Handler(enabled = false) + public void handle(ICountable message){ + super.handle(message); + } + + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/IMessageListener.java b/src/test/java/net/engio/mbassy/listeners/IMessageListener.java new file mode 100644 index 0000000..108372b --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/IMessageListener.java @@ -0,0 +1,58 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.messages.IMessage; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class IMessageListener { + + private static abstract class BaseListener { + + @Handler + public void handle(IMessage message){ + message.handled(this.getClass()); + } + + } + + public static class DefaultListener extends BaseListener { + + public void handle(IMessage message){ + super.handle(message); + } + } + + public static class NoSubtypesListener extends BaseListener { + + @Handler(rejectSubtypes = true) + public void handle(IMessage message){ + super.handle(message); + } + } + + + public static class AsyncListener extends BaseListener { + + @Handler(delivery = Invoke.Asynchronously) + public void handle(IMessage message){ + super.handle(message); + } + + } + + public static class DisabledListener extends BaseListener { + + @Handler(enabled = false) + public void handle(IMessage message){ + super.handle(message); + } + + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java b/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java new file mode 100644 index 0000000..d728ddb --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/IMultipartMessageListener.java @@ -0,0 +1,58 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.messages.IMultipartMessage; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class IMultipartMessageListener { + + private static abstract class BaseListener { + + @Handler + public void handle(IMultipartMessage message){ + message.handled(this.getClass()); + } + + } + + public static class DefaultListener extends BaseListener { + + public void handle(IMultipartMessage message){ + super.handle(message); + } + } + + public static class NoSubtypesListener extends BaseListener { + + @Handler(rejectSubtypes = true) + public void handle(IMultipartMessage message){ + super.handle(message); + } + } + + + public static class AsyncListener extends BaseListener { + + @Handler(delivery = Invoke.Asynchronously) + public void handle(IMultipartMessage message){ + super.handle(message); + } + + } + + public static class DisabledListener extends BaseListener { + + @Handler(enabled = false) + public void handle(IMultipartMessage message){ + super.handle(message); + } + + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/ListenerFactory.java b/src/test/java/net/engio/mbassy/listeners/ListenerFactory.java index 1bb3603..9dc0b8f 100644 --- a/src/test/java/net/engio/mbassy/listeners/ListenerFactory.java +++ b/src/test/java/net/engio/mbassy/listeners/ListenerFactory.java @@ -1,13 +1,13 @@ package net.engio.mbassy.listeners; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; /** - * This factory will create a list of beans according to some specified configuration. - * It can be used to setup different test scenarios. + * The factory can be used to declaratively specify how many instances of some given classes + * should be created. It will create those instances using reflection and provide a list containing those instances. + * The factory also holds strong references to the instances such that GC will not interfere with tests unless the + * factory is explicitly cleared. * * @author bennidi * Date: 11/22/12 @@ -15,24 +15,68 @@ import java.util.Map; public class ListenerFactory { private Map requiredBeans = new HashMap(); + private List generatedListeners; + private int requiredSize = 0; + public int getNumberOfListeners(Class listener){ + return requiredBeans.containsKey(listener) ? requiredBeans.get(listener) : 0; + } - - public ListenerFactory create(int numberOfInstance, Class clazz){ - requiredBeans.put(clazz, numberOfInstance); + public ListenerFactory create(int numberOfInstances, Class clazz){ + requiredBeans.put(clazz, numberOfInstances); + requiredSize +=numberOfInstances; return this; } - public List build() throws Exception{ - List beans = new LinkedList(); - for(Class clazz : requiredBeans.keySet()){ - int numberOfRequiredBeans = requiredBeans.get(clazz); - for(int i = 0; i < numberOfRequiredBeans; i++){ - beans.add(clazz.newInstance()); + public List getAll(){ + generatedListeners = new ArrayList(requiredSize); + try { + for(Class clazz : requiredBeans.keySet()){ + int numberOfRequiredBeans = requiredBeans.get(clazz); + for(int i = 0; i < numberOfRequiredBeans; i++){ + generatedListeners.add(clazz.newInstance()); + } } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); } - return beans; + Collections.shuffle(generatedListeners); + return generatedListeners; + } + + // not thread-safe but not yet used concurrently + public void clear(){ + generatedListeners = null; + requiredBeans.clear(); + } + + /** + * Create a thread-safe read-only iterator + * @return + */ + public Iterator iterator(){ + if(generatedListeners == null)getAll(); + final AtomicInteger current = new AtomicInteger(0); + + return new Iterator() { + @Override + public boolean hasNext() { + return current.get() < generatedListeners.size(); + } + + @Override + public Object next() { + int index = current.getAndIncrement(); + return index < generatedListeners.size() ? generatedListeners.get(index) : null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Iterator is read only"); + } + }; } diff --git a/src/test/java/net/engio/mbassy/listeners/MessagesListener.java b/src/test/java/net/engio/mbassy/listeners/MessagesListener.java new file mode 100644 index 0000000..c48bd76 --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/MessagesListener.java @@ -0,0 +1,58 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.messages.MessageTypes; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class MessagesListener { + + private static abstract class BaseListener { + + @Handler + public void handle(MessageTypes message){ + message.handled(this.getClass()); + } + + } + + public static class DefaultListener extends BaseListener { + + public void handle(MessageTypes message){ + super.handle(message); + } + } + + public static class NoSubtypesListener extends BaseListener { + + @Handler(rejectSubtypes = true) + public void handle(MessageTypes message){ + super.handle(message); + } + } + + + public static class AsyncListener extends BaseListener { + + @Handler(delivery = Invoke.Asynchronously) + public void handle(MessageTypes message){ + super.handle(message); + } + + } + + public static class DisabledListener extends BaseListener { + + @Handler(enabled = false) + public void handle(MessageTypes message){ + super.handle(message); + } + + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java b/src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java new file mode 100644 index 0000000..ede0d95 --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/MultipartMessageListener.java @@ -0,0 +1,58 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.messages.MultipartMessage; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class MultipartMessageListener { + + private static abstract class BaseListener { + + @Handler + public void handle(MultipartMessage message){ + message.handled(this.getClass()); + } + + } + + public static class DefaultListener extends BaseListener { + + public void handle(MultipartMessage message){ + super.handle(message); + } + } + + public static class NoSubtypesListener extends BaseListener { + + @Handler(rejectSubtypes = true) + public void handle(MultipartMessage message){ + super.handle(message); + } + } + + + public static class AsyncListener extends BaseListener { + + @Handler(delivery = Invoke.Asynchronously) + public void handle(MultipartMessage message){ + super.handle(message); + } + + } + + public static class DisabledListener extends BaseListener { + + @Handler(enabled = false) + public void handle(MultipartMessage message){ + super.handle(message); + } + + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java b/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java new file mode 100644 index 0000000..41f8f1a --- /dev/null +++ b/src/test/java/net/engio/mbassy/listeners/StandardMessageListener.java @@ -0,0 +1,58 @@ +package net.engio.mbassy.listeners; + +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.messages.StandardMessage; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class StandardMessageListener { + + private static abstract class BaseListener { + + @Handler + public void handle(StandardMessage message){ + message.handled(this.getClass()); + } + + } + + public static class DefaultListener extends BaseListener { + + public void handle(StandardMessage message){ + super.handle(message); + } + } + + public static class NoSubtypesListener extends BaseListener { + + @Handler(rejectSubtypes = true) + public void handle(StandardMessage message){ + super.handle(message); + } + } + + + public static class AsyncListener extends BaseListener { + + @Handler(delivery = Invoke.Asynchronously) + public void handle(StandardMessage message){ + super.handle(message); + } + + } + + public static class DisabledListener extends BaseListener { + + @Handler(enabled = false) + public void handle(StandardMessage message){ + super.handle(message); + } + + } + + +} diff --git a/src/test/java/net/engio/mbassy/messages/AbstractMessage.java b/src/test/java/net/engio/mbassy/messages/AbstractMessage.java new file mode 100644 index 0000000..322b41d --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/AbstractMessage.java @@ -0,0 +1,55 @@ +package net.engio.mbassy.messages; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public abstract class AbstractMessage implements IMessage{ + + private Map handledByListener = new HashMap(); + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + + @Override + public void reset() { + lock.writeLock().lock(); + try { + handledByListener.clear(); + }finally { + lock.writeLock().unlock(); + } + } + + @Override + public void handled(Class listener) { + lock.writeLock().lock(); + try { + Integer count = handledByListener.get(listener); + if(count == null){ + handledByListener.put(listener, 1); + } + else{ + handledByListener.put(listener, count + 1); + } + }finally { + lock.writeLock().unlock(); + } + } + + @Override + public int getTimesHandled(Class listener) { + lock.readLock().lock(); + try { + return handledByListener.containsKey(listener) + ? handledByListener.get(listener) + : 0; + }finally { + lock.readLock().unlock(); + } + } +} diff --git a/src/test/java/net/engio/mbassy/messages/CountableMessage.java b/src/test/java/net/engio/mbassy/messages/CountableMessage.java new file mode 100644 index 0000000..2d35b97 --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/CountableMessage.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.messages; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class CountableMessage extends AbstractMessage implements ICountable{ +} diff --git a/src/test/java/net/engio/mbassy/messages/ICountable.java b/src/test/java/net/engio/mbassy/messages/ICountable.java new file mode 100644 index 0000000..18a5090 --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/ICountable.java @@ -0,0 +1,16 @@ +package net.engio.mbassy.messages; + +/** + * Interface analogous to IMessage. Exists to test more complex class/interface hierarchies + * + * @author bennidi + * Date: 5/24/13 + */ +public interface ICountable { + + void reset(); + + void handled(Class listener); + + int getTimesHandled(Class listener); +} diff --git a/src/test/java/net/engio/mbassy/messages/IMessage.java b/src/test/java/net/engio/mbassy/messages/IMessage.java new file mode 100644 index 0000000..5cf50c4 --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/IMessage.java @@ -0,0 +1,16 @@ +package net.engio.mbassy.messages; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public interface IMessage { + + void reset(); + + void handled(Class listener); + + int getTimesHandled(Class listener); + +} diff --git a/src/test/java/net/engio/mbassy/messages/IMultipartMessage.java b/src/test/java/net/engio/mbassy/messages/IMultipartMessage.java new file mode 100644 index 0000000..a1aa4ed --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/IMultipartMessage.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.messages; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public interface IMultipartMessage extends IMessage, ICountable{ +} diff --git a/src/test/java/net/engio/mbassy/messages/ITestMessage.java b/src/test/java/net/engio/mbassy/messages/ITestMessage.java deleted file mode 100644 index 45eaefb..0000000 --- a/src/test/java/net/engio/mbassy/messages/ITestMessage.java +++ /dev/null @@ -1,10 +0,0 @@ -package net.engio.mbassy.messages; - -/** - * Todo: Add javadoc - * - * @author bennidi - * Date: 5/12/13 - */ -public interface ITestMessage { -} diff --git a/src/test/java/net/engio/mbassy/messages/MessageTypes.java b/src/test/java/net/engio/mbassy/messages/MessageTypes.java new file mode 100644 index 0000000..5abbcde --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/MessageTypes.java @@ -0,0 +1,62 @@ +package net.engio.mbassy.messages; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Enum used to test handlers that consume enumerations. + * + * @author bennidi + * Date: 5/24/13 + */ +public enum MessageTypes implements IMessage{ + Simple,Persistent,Multipart; + + public static void resetAll(){ + for(MessageTypes m : values()) + m.reset(); + } + + private Map handledByListener = new HashMap(); + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + + @Override + public void reset() { + lock.writeLock().lock(); + try { + handledByListener.clear(); + }finally { + lock.writeLock().unlock(); + } + } + + @Override + public void handled(Class listener) { + lock.writeLock().lock(); + try { + Integer count = handledByListener.get(listener); + if(count == null){ + handledByListener.put(listener, 1); + } + else{ + handledByListener.put(listener, count + 1); + } + }finally { + lock.writeLock().unlock(); + } + } + + @Override + public int getTimesHandled(Class listener) { + lock.readLock().lock(); + try { + return handledByListener.containsKey(listener) + ? handledByListener.get(listener) + : 0; + }finally { + lock.readLock().unlock(); + } + } +} diff --git a/src/test/java/net/engio/mbassy/messages/MultipartMessage.java b/src/test/java/net/engio/mbassy/messages/MultipartMessage.java new file mode 100644 index 0000000..c2d3710 --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/MultipartMessage.java @@ -0,0 +1,9 @@ +package net.engio.mbassy.messages; + +/** + * + * @author bennidi + * Date: 5/24/13 + */ +public class MultipartMessage extends AbstractMessage implements IMultipartMessage, ICountable{ +} diff --git a/src/test/java/net/engio/mbassy/messages/StandardMessage.java b/src/test/java/net/engio/mbassy/messages/StandardMessage.java new file mode 100644 index 0000000..b546f56 --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/StandardMessage.java @@ -0,0 +1,8 @@ +package net.engio.mbassy.messages; + +/** + * @author bennidi + * Date: 5/24/13 + */ +public class StandardMessage extends AbstractMessage implements ICountable{ +} diff --git a/src/test/java/net/engio/mbassy/messages/TestMessage3.java b/src/test/java/net/engio/mbassy/messages/TestMessage3.java deleted file mode 100644 index 5409beb..0000000 --- a/src/test/java/net/engio/mbassy/messages/TestMessage3.java +++ /dev/null @@ -1,10 +0,0 @@ -package net.engio.mbassy.messages; - -/** - * A test message that uses an interface - * - * @author durron597 - */ -public class TestMessage3 implements ITestMessage { - -}