From 17b1fee1b6b44612588e711aa0279c511c9d8d3a Mon Sep 17 00:00:00 2001 From: benni Date: Sun, 12 May 2013 14:32:16 +0200 Subject: [PATCH] Added SubsciptionManager, fixed #30 and #31 --- .../mbassy/bus/AbstractSyncMessageBus.java | 126 ++---------- .../net/engio/mbassy/bus/IMessageBus.java | 2 +- .../mbassy/common/AbstractConcurrentSet.java | 76 +++++--- .../engio/mbassy/common/ReflectionUtils.java | 10 +- .../engio/mbassy/listener/MetadataReader.java | 7 +- .../mbassy/subscription/Subscription.java | 8 + .../subscription/SubscriptionContext.java | 2 +- .../subscription/SubscriptionFactory.java | 20 +- .../subscription/SubscriptionManager.java | 181 ++++++++++++++++++ .../net/engio/mbassy/ConcurrentSetTest.java | 94 ++++++++- .../java/net/engio/mbassy/FilterTest.java | 4 +- .../engio/mbassy/MessagePublicationTest.java | 6 +- .../net/engio/mbassy/MetadataReaderTest.java | 16 +- .../engio/mbassy/SubscriptionManagerTest.java | 60 ++++++ .../mbassy/bus/ListenerSubscriptionTest.java | 4 +- .../mbassy/listeners/EventingTestBean.java | 4 +- .../mbassy/listeners/EventingTestBean2.java | 2 +- .../mbassy/listeners/EventingTestBean3.java | 2 +- .../mbassy/listeners/MultiEventHandler.java | 4 +- .../mbassy/listeners/NonListeningBean.java | 4 +- .../engio/mbassy/messages/ITestMessage.java | 10 + .../{events => messages}/SubTestMessage.java | 2 +- .../{events => messages}/TestMessage.java | 2 +- .../{events => messages}/TestMessage2.java | 2 +- 24 files changed, 458 insertions(+), 190 deletions(-) create mode 100644 src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java create mode 100644 src/test/java/net/engio/mbassy/SubscriptionManagerTest.java create mode 100644 src/test/java/net/engio/mbassy/messages/ITestMessage.java rename src/test/java/net/engio/mbassy/{events => messages}/SubTestMessage.java (75%) rename src/test/java/net/engio/mbassy/{events => messages}/TestMessage.java (84%) rename src/test/java/net/engio/mbassy/{events => messages}/TestMessage2.java (84%) diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java index f5a2cf1..9ae5baa 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java @@ -2,15 +2,13 @@ package net.engio.mbassy.bus; import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.PublicationError; -import net.engio.mbassy.common.ReflectionUtils; -import net.engio.mbassy.listener.MessageHandlerMetadata; -import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.subscription.Subscription; -import net.engio.mbassy.subscription.SubscriptionContext; -import net.engio.mbassy.subscription.SubscriptionFactory; +import net.engio.mbassy.subscription.SubscriptionManager; -import java.util.*; -import java.util.concurrent.*; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * The base class for all message bus implementations. @@ -21,37 +19,17 @@ import java.util.concurrent.*; public abstract class AbstractSyncMessageBus implements ISyncMessageBus { - // the metadata reader that is used to parse objects passed to the subscribe method - private final MetadataReader metadataReader; - - // all subscriptions per message type - // this is the primary list for dispatching a specific message - // write access is synchronized and happens very infrequently - private final Map> subscriptionsPerMessage - = new HashMap>(50); - - // all subscriptions per messageHandler type - // this list provides fast access for subscribing and unsubscribing - // write access is synchronized and happens very infrequently - private final Map> subscriptionsPerListener - = new HashMap>(50); - - // remember already processed classes that do not contain any listeners - private final Collection nonListeners = new HashSet(); - // this handler will receive all errors that occur during message dispatch or message handling private final List errorHandlers = new CopyOnWriteArrayList(); - // this factory is used to create specialized subscriptions based on the given message handler configuration - // it can be customized by implementing the getSubscriptionFactory() method - private final SubscriptionFactory subscriptionFactory; - private final MessagePublication.Factory publicationFactory; + private final SubscriptionManager subscriptionManager; + public AbstractSyncMessageBus(SyncBusConfiguration configuration) { - subscriptionFactory = configuration.getSubscriptionFactory(); - this.metadataReader = configuration.getMetadataReader(); + this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(), + configuration.getSubscriptionFactory().setBus(this)); this.publicationFactory = configuration.getMessagePublicationFactory(); addErrorHandler(new IPublicationErrorHandler.ConsoleLogger()); } @@ -66,63 +44,12 @@ public abstract class AbstractSyncMessageBus subscriptions = subscriptionsPerListener.get(listener.getClass()); - if (subscriptions == null) { - return false; - } - boolean isRemoved = true; - for (Subscription subscription : subscriptions) { - isRemoved = isRemoved && subscription.unsubscribe(listener); - } - return isRemoved; + return subscriptionManager.unsubscribe(listener); } public void subscribe(Object listener) { - try { - Class listeningClass = listener.getClass(); - if (nonListeners.contains(listeningClass)) { - return; // early reject of known classes that do not participate in eventing - } - Collection subscriptionsByListener = subscriptionsPerListener.get(listeningClass); - if (subscriptionsByListener == null) { // if the type is registered for the first time - synchronized (this) { // new subscriptions must be processed sequentially - subscriptionsByListener = subscriptionsPerListener.get(listeningClass); - if (subscriptionsByListener == null) { // double check (a bit ugly but works here) - List messageHandlers = metadataReader.getMessageHandlers(listeningClass); - if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found - nonListeners.add(listeningClass); - return; - } - subscriptionsByListener = new ArrayList(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only) - // create subscriptions for all detected listeners - for (MessageHandlerMetadata messageHandler : messageHandlers) { - // create the subscription - Subscription subscription = subscriptionFactory - .createSubscription(new SubscriptionContext(this, messageHandler)); - subscription.subscribe(listener); - subscriptionsByListener.add(subscription);// add it for the listener type (for future subscriptions) - - List> messageTypes = messageHandler.getHandledMessages(); - for (Class messageType : messageTypes) { - addMessageTypeSubscription(messageType, subscription); - } - //updateMessageTypeHierarchy(eventType); - } - subscriptionsPerListener.put(listeningClass, subscriptionsByListener); - } - } - } - // register the listener to the existing subscriptions - for (Subscription sub : subscriptionsByListener) { - sub.subscribe(listener); - } - } catch (Exception e) { - throw new RuntimeException(e); - } + subscriptionManager.subscribe(listener); } @@ -131,39 +58,10 @@ public abstract class AbstractSyncMessageBus getSubscriptionsByMessageType(Class messageType) { - Set subscriptions = new TreeSet(Subscription.SubscriptionByPriorityDesc); - - if (subscriptionsPerMessage.get(messageType) != null) { - subscriptions.addAll(subscriptionsPerMessage.get(messageType)); - } - // TODO: get superclasses is eligible for caching - for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) { - Collection subs = subscriptionsPerMessage.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType)) { - subscriptions.add(sub); - } - } - } - } - return subscriptions; - } - - - // associate a suscription with a message type - // NOTE: Not thread-safe! must be synchronized in outer scope - private void addMessageTypeSubscription(Class messageType, Subscription subscription) { - Collection subscriptions = subscriptionsPerMessage.get(messageType); - if (subscriptions == null) { - subscriptions = new LinkedList(); - subscriptionsPerMessage.put(messageType, subscriptions); - } - subscriptions.add(subscription); + return subscriptionManager.getSubscriptionsByMessageType(messageType); } diff --git a/src/main/java/net/engio/mbassy/bus/IMessageBus.java b/src/main/java/net/engio/mbassy/bus/IMessageBus.java index f80dbda..0c9e98b 100644 --- a/src/main/java/net/engio/mbassy/bus/IMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/IMessageBus.java @@ -55,7 +55,7 @@ public interface IMessageBus extends ISyn Executor getExecutor(); /** - * Check whether any asynchronous message publications are pending for being processed + * Check whether any asynchronous message publications are pending to be processed * * @return */ diff --git a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java index 3af5ea2..1db5fd0 100644 --- a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java @@ -2,6 +2,8 @@ package net.engio.mbassy.common; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This data structure is optimized for non-blocking reads even when write operations occur. @@ -15,7 +17,7 @@ import java.util.Map; public abstract class AbstractConcurrentSet implements IConcurrentSet { // Internal state - private final Object lock = new Object(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Map> entries; // maintain a map of entries for O(log n) lookup protected Entry head; // reference to the first element @@ -27,27 +29,38 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { @Override public IConcurrentSet add(T element) { + if (element == null) return this; + Lock writeLock = lock.writeLock(); + writeLock.lock(); if (element == null || entries.containsKey(element)) { + writeLock.unlock(); return this; - } - synchronized (lock) { + } else { insert(element); + writeLock.unlock(); } return this; } @Override public boolean contains(T element) { - ISetEntry entry = entries.get(element); + Lock readLock = lock.readLock(); + ISetEntry entry; + try { + readLock.lock(); + entry = entries.get(element); + + } finally { + readLock.unlock(); + } return entry != null && entry.getValue() != null; } private void insert(T element) { - if (entries.containsKey(element)) { - return; + if (!entries.containsKey(element)) { + head = createEntry(element, head); + entries.put(element, head); } - head = createEntry(element, head); - entries.put(element, head); } @Override @@ -57,38 +70,45 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { @Override public IConcurrentSet addAll(Iterable elements) { - synchronized (lock) { + Lock writeLock = lock.writeLock(); + try { + writeLock.lock(); for (T element : elements) { - if (element == null || entries.containsKey(element)) { - return this; + if (element != null) { + insert(element); } - - insert(element); } + } finally { + writeLock.unlock(); } return this; } @Override public boolean remove(T element) { - if (!entries.containsKey(element)) { + if (!contains(element)) { return false; - } - synchronized (lock) { - ISetEntry listelement = entries.get(element); - if (listelement == null) { - return false; //removed by other thread + } else { + Lock writeLock = lock.writeLock(); + try { + writeLock.lock(); + ISetEntry listelement = entries.get(element); + if (listelement == null) { + return false; //removed by other thread + } + if (listelement != head) { + listelement.remove(); + } else { + ISetEntry oldHead = head; + head = head.next(); + oldHead.clear(); // optimize for GC + } + entries.remove(element); + } finally { + writeLock.unlock(); } - if (listelement != head) { - listelement.remove(); - } else { - ISetEntry oldHead = head; - head = head.next(); - oldHead.clear(); // optimize for GC - } - entries.remove(element); + return true; } - return true; } diff --git a/src/main/java/net/engio/mbassy/common/ReflectionUtils.java b/src/main/java/net/engio/mbassy/common/ReflectionUtils.java index bf242c2..2c41827 100644 --- a/src/main/java/net/engio/mbassy/common/ReflectionUtils.java +++ b/src/main/java/net/engio/mbassy/common/ReflectionUtils.java @@ -62,13 +62,21 @@ public class ReflectionUtils { public static Collection getSuperclasses(Class from) { Collection superclasses = new LinkedList(); - while (!from.equals(Object.class)) { + while (!from.equals(Object.class) && !from.isInterface()) { superclasses.add(from.getSuperclass()); from = from.getSuperclass(); } + collectInterfaces(from, superclasses); return superclasses; } + public static void collectInterfaces(Class from, Collection accumulator){ + for(Class intface : from.getInterfaces()){ + accumulator.add(intface); + collectInterfaces(intface, accumulator); + } + } + public static boolean containsOverridingMethod(final List allMethods, final Method methodToCheck) { for (Method method : allMethods) { if (isOverriddenBy(methodToCheck, method)) { diff --git a/src/main/java/net/engio/mbassy/listener/MetadataReader.java b/src/main/java/net/engio/mbassy/listener/MetadataReader.java index a5bdcb7..814ad65 100644 --- a/src/main/java/net/engio/mbassy/listener/MetadataReader.java +++ b/src/main/java/net/engio/mbassy/listener/MetadataReader.java @@ -58,7 +58,8 @@ public class MetadataReader { // get all listeners defined by the given class (includes // listeners defined in super classes) - public List getMessageHandlers(Class target) { + public List getMessageHandlers(Object listener) { + Class target = listener.getClass(); Listener listenerConfig = target.getAnnotation(Listener.class); // get all handlers (this will include all (inherited) methods directly annotated using @Handler) List allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target); @@ -89,8 +90,8 @@ public class MetadataReader { } - public MessageListenerMetadata getMessageListener(Class target) { - return new MessageListenerMetadata(getMessageHandlers(target), target); + public MessageListenerMetadata getMessageListener(Object listener) { + return new MessageListenerMetadata(getMessageHandlers(listener), listener.getClass()); } diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index 9a48911..3d656a6 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -5,6 +5,7 @@ import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.dispatch.IMessageDispatcher; import java.util.Comparator; +import java.util.List; import java.util.UUID; /** @@ -26,11 +27,18 @@ public class Subscription { this.listeners = listeners; } + public boolean contains(Object listener){ + return listeners.contains(listener); + } public boolean handlesMessageType(Class messageType) { return context.getHandlerMetadata().handlesMessage(messageType); } + public List> getHandledMessageTypes(){ + return context.getHandlerMetadata().getHandledMessages(); + } + public void publish(MessagePublication publication, Object message) { dispatcher.dispatch(publication, message, listeners); diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java index 122a5fe..fab5d93 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java @@ -7,7 +7,7 @@ import net.engio.mbassy.listener.MessageHandlerMetadata; * The subscription context holds all (meta)data/objects that are relevant to successfully publish * a message within a subscription. A one-to-one relation between a subscription and * subscription context holds -> a subscription context is created for each distinct subscription - * that lives inside a message bus. + * managed by the subscription manager. * * @author bennidi * Date: 11/23/12 diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java index 0f1ea79..9a2f8e8 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java @@ -1,25 +1,31 @@ package net.engio.mbassy.subscription; import net.engio.mbassy.MessageBusException; +import net.engio.mbassy.bus.ISyncMessageBus; import net.engio.mbassy.common.StrongConcurrentSet; import net.engio.mbassy.common.WeakConcurrentSet; import net.engio.mbassy.dispatch.*; +import net.engio.mbassy.listener.MessageHandlerMetadata; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; /** - * Created with IntelliJ IDEA. - * - * @author bennidi - * Date: 11/16/12 - * Time: 10:39 AM - * To change this template use File | Settings | File Templates. + * The subscription factory is used to create an empty subscription for specific message handler. + * The message handler's configuration is evaluated and a corresponding subscription is built. */ public class SubscriptionFactory { - public Subscription createSubscription(SubscriptionContext context) throws MessageBusException{ + private ISyncMessageBus bus; + + public SubscriptionFactory setBus(ISyncMessageBus bus) { + this.bus = bus; + return this; + } + + public Subscription createSubscription(MessageHandlerMetadata handlerMetadata) throws MessageBusException{ try { + SubscriptionContext context = new SubscriptionContext(bus, handlerMetadata); IHandlerInvocation invocation = buildInvocationForHandler(context); IMessageDispatcher dispatcher = buildDispatcher(context, invocation); return new Subscription(context, dispatcher, context.getHandlerMetadata().useStrongReferences() diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java new file mode 100644 index 0000000..dc302f7 --- /dev/null +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java @@ -0,0 +1,181 @@ +package net.engio.mbassy.subscription; + +import net.engio.mbassy.common.ReflectionUtils; +import net.engio.mbassy.common.StrongConcurrentSet; +import net.engio.mbassy.listener.MessageHandlerMetadata; +import net.engio.mbassy.listener.MetadataReader; + +import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 5/11/13 + */ +public class SubscriptionManager { + + // the metadata reader that is used to inspect objects passed to the subscribe method + private final MetadataReader metadataReader; + + // all subscriptions per message type + // this is the primary list for dispatching a specific message + // write access is synchronized and happens only when a listener of a specific class is registered the first time + private final Map> subscriptionsPerMessage + = new HashMap>(50); + + // all subscriptions per messageHandler type + // this map provides fast access for subscribing and unsubscribing + // write access is synchronized and happens very infrequently + // once a collection of subscriptions is stored it does not change + private final Map> subscriptionsPerListener + = new HashMap>(50); + + // remember already processed classes that do not contain any message handlers + private final StrongConcurrentSet nonListeners = new StrongConcurrentSet(); + + // this factory is used to create specialized subscriptions based on the given message handler configuration + // it can be customized by implementing the getSubscriptionFactory() method + private final SubscriptionFactory subscriptionFactory; + + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + public SubscriptionManager(MetadataReader metadataReader, SubscriptionFactory subscriptionFactory) { + this.metadataReader = metadataReader; + this.subscriptionFactory = subscriptionFactory; + } + + + public boolean unsubscribe(Object listener) { + if (listener == null) { + return false; + } + Collection subscriptions = getSubscriptionsByListener(listener); + if (subscriptions == null) { + return false; + } + boolean isRemoved = true; + for (Subscription subscription : subscriptions) { + isRemoved &= subscription.unsubscribe(listener); + } + return isRemoved; + } + + + private Collection getSubscriptionsByListener(Object listener) { + Collection subscriptions; + try { + readWriteLock.readLock().lock(); + subscriptions = subscriptionsPerListener.get(listener.getClass()); + } finally { + readWriteLock.readLock().unlock(); + } + return subscriptions; + } + + public void subscribe(Object listener) { + try { + if (isKnownNonListener(listener)) { + return; // early reject of known classes that do not define message handlers + } + Collection subscriptionsByListener = getSubscriptionsByListener(listener); + // a listener is either subscribed for the first time + if (subscriptionsByListener == null) { + List messageHandlers = metadataReader.getMessageHandlers(listener); + if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found + nonListeners.add(listener.getClass()); + return; + } + subscriptionsByListener = new ArrayList(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only) + // create subscriptions for all detected message handlers + for (MessageHandlerMetadata messageHandler : messageHandlers) { + // create the subscription + subscriptionsByListener.add(subscriptionFactory.createSubscription(messageHandler)); + } + // this will acquire a write lock and handle the case when another thread already subscribed + // this particular listener in the mean-time + subscribe(listener, subscriptionsByListener); + } // or the subscriptions already exist and must only be updated + else { + for (Subscription sub : subscriptionsByListener) { + sub.subscribe(listener); + } + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + private void subscribe(Object listener, Collection subscriptions) { + try { + readWriteLock.writeLock().lock(); + // basically this is a deferred double check + // it's an ugly pattern but necessary because atomic upgrade from read to write lock + // is not possible and using a write lock from the beginning with will dramatically decrease performance + Collection subscriptionsByListener = getSubscriptionsByListener(listener); + + if (subscriptionsByListener == null) { + for (Subscription subscription : subscriptions) { + subscription.subscribe(listener); + for (Class messageType : subscription.getHandledMessageTypes()) { + addMessageTypeSubscription(messageType, subscription); + } + } + subscriptionsPerListener.put(listener.getClass(), subscriptions); + } + // the rare case when multiple threads concurrently subscribed the same class for the first time + // one will be first, all others will have to subscribe to the existing instead the generated subscriptions + else { + for (Subscription existingSubscription : subscriptionsByListener) { + existingSubscription.subscribe(listener); + } + } + } finally { + readWriteLock.writeLock().unlock(); + } + + + } + + private boolean isKnownNonListener(Object listener) { + Class listeningClass = listener.getClass(); + return nonListeners.contains(listeningClass); + } + + // obtain the set of subscriptions for the given message type + // Note: never returns null! + public Collection getSubscriptionsByMessageType(Class messageType) { + Set subscriptions = new TreeSet(Subscription.SubscriptionByPriorityDesc); + readWriteLock.readLock().lock(); + if (subscriptionsPerMessage.get(messageType) != null) { + subscriptions.addAll(subscriptionsPerMessage.get(messageType)); + } + for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) { + Collection subs = subscriptionsPerMessage.get(eventSuperType); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.handlesMessageType(messageType)) { + subscriptions.add(sub); + } + } + } + } + readWriteLock.readLock().unlock(); + return subscriptions; + } + + + // associate a suscription with a message type + // NOTE: Not thread-safe! must be synchronized in outer scope + private void addMessageTypeSubscription(Class messageType, Subscription subscription) { + Collection subscriptions = subscriptionsPerMessage.get(messageType); + if (subscriptions == null) { + subscriptions = new LinkedList(); + subscriptionsPerMessage.put(messageType, subscriptions); + } + subscriptions.add(subscription); + } +} diff --git a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java index ea9ee0f..4768dca 100644 --- a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java @@ -6,10 +6,8 @@ import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.common.UnitTest; import org.junit.Test; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Random; +import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; /** * This test ensures the correct behaviour of the set implementation that is the building @@ -218,13 +216,13 @@ public abstract class ConcurrentSetTest extends UnitTest { @Test public void testRemovalViaIterator() { final HashSet source = new HashSet(); - final IConcurrentSet testSetWeak = createSet(); + final IConcurrentSet setUnderTest = createSet(); // build set of candidates and mark subset for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); - testSetWeak.add(candidate); + setUnderTest.add(candidate); } // build test set by adding the candidates @@ -232,7 +230,7 @@ public abstract class ConcurrentSetTest extends UnitTest { ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - Iterator iterator = testSetWeak.iterator(); + Iterator iterator = setUnderTest.iterator(); while(iterator.hasNext()){ iterator.remove(); } @@ -242,11 +240,89 @@ public abstract class ConcurrentSetTest extends UnitTest { // ensure that the test set still contains all objects from the source set that have not been marked // for removal - assertEquals(0, testSetWeak.size()); + assertEquals(0, setUnderTest.size()); for(Object src : source){ - assertFalse(testSetWeak.contains(src)); + assertFalse(setUnderTest.contains(src)); } } + /** + * In this test HashMap will cross capacity threshold multiple times in + * different directions which will trigger rehashing. Because rehashing + * requires modification of Entry class for all hash map entries some keys + * may temporarily disappear from the map. + *

+ * For more information please take a look at transfer method in HashMap. + * + * Thanks to Ivan Koblik (http://koblik.blogspot.com) for contributing initial code and idea + */ + @Test + public void testConcurrentAddRemove() { + final IConcurrentSet set = createSet(); + final List permanentObjects = createWithRandomIntegers(80, null); + final List volatileObjects = createWithRandomIntegers(10000, permanentObjects); + final CopyOnWriteArraySet missing = new CopyOnWriteArraySet(); + final int mutatorThreshold = 1000; + + // Add elements that will not be touched by the constantly running mutating thread + final int numItems = 8; + for (Object permanent : permanentObjects) { + set.add(permanent); + } + + // Adds and removes items >= numItems + // thus forcing constant rehashing of the backing hashtable + Runnable updatingThread = new Runnable() { + public void run() { + Random rand = new Random(); + for(int times = 0; times < 1000 ; times++){ + System.out.println("New mutator cycle: " + times); + HashSet elements = new HashSet(mutatorThreshold); + + for (int i = numItems; i < mutatorThreshold; i++) { + Object volatileObject = volatileObjects.get(Math.abs(rand.nextInt()) % volatileObjects.size()); + set.add(volatileObject); + elements.add(volatileObject); + } + for (Object volObj : elements) { + set.remove(volObj); + } + } + }; + }; + + Runnable lookupThread = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 10000; i++) { + System.out.println("New lookup cycle: " + i); + for (Object permanent : permanentObjects) { + // permanent items are never touched, + // --> set.contains(j) should always return true + if(!set.contains(permanent)) + missing.add(permanent); + } + } + } + }; + + ConcurrentExecutor.runConcurrent(updatingThread, lookupThread, lookupThread, lookupThread); + assertTrue("There where items temporarily unavailable: " + missing.size(), missing.size() == 0); + + } + + + public List createWithRandomIntegers(int size, List exluding){ + if(exluding == null) exluding = new ArrayList(); + List result = new ArrayList(size); + Random rand = new Random(); + for(int i = 0; i < size;i++){ + result.add(rand.nextInt()); + } + for(Integer excluded : exluding) + result.remove(excluded); + return result; + } + } diff --git a/src/test/java/net/engio/mbassy/FilterTest.java b/src/test/java/net/engio/mbassy/FilterTest.java index cda9d8a..a7b0d5f 100644 --- a/src/test/java/net/engio/mbassy/FilterTest.java +++ b/src/test/java/net/engio/mbassy/FilterTest.java @@ -6,8 +6,8 @@ import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.common.FilteredMessage; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.TestUtil; -import net.engio.mbassy.events.SubTestMessage; -import net.engio.mbassy.events.TestMessage; +import net.engio.mbassy.messages.SubTestMessage; +import net.engio.mbassy.messages.TestMessage; import net.engio.mbassy.listener.*; import net.engio.mbassy.listeners.ListenerFactory; import org.junit.Test; diff --git a/src/test/java/net/engio/mbassy/MessagePublicationTest.java b/src/test/java/net/engio/mbassy/MessagePublicationTest.java index aa5b373..1bf1c0f 100644 --- a/src/test/java/net/engio/mbassy/MessagePublicationTest.java +++ b/src/test/java/net/engio/mbassy/MessagePublicationTest.java @@ -5,9 +5,9 @@ import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.common.ConcurrentExecutor; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.TestUtil; -import net.engio.mbassy.events.SubTestMessage; -import net.engio.mbassy.events.TestMessage; -import net.engio.mbassy.events.TestMessage2; +import net.engio.mbassy.messages.SubTestMessage; +import net.engio.mbassy.messages.TestMessage; +import net.engio.mbassy.messages.TestMessage2; import net.engio.mbassy.listeners.*; import org.junit.Test; diff --git a/src/test/java/net/engio/mbassy/MetadataReaderTest.java b/src/test/java/net/engio/mbassy/MetadataReaderTest.java index 99b345a..8507c3d 100644 --- a/src/test/java/net/engio/mbassy/MetadataReaderTest.java +++ b/src/test/java/net/engio/mbassy/MetadataReaderTest.java @@ -27,7 +27,7 @@ public class MetadataReaderTest extends UnitTest { @Test public void testListenerWithoutInheritance() { - MessageListenerMetadata listener = reader.getMessageListener(EventListener1.class); + MessageListenerMetadata listener = reader.getMessageListener(new MessageListener1()); ListenerValidator validator = new ListenerValidator() .expectHandlers(2, String.class) .expectHandlers(2, Object.class) @@ -38,7 +38,7 @@ public class MetadataReaderTest extends UnitTest { @Test public void testListenerWithInheritance() { - MessageListenerMetadata listener = reader.getMessageListener(EventListener2.class); + MessageListenerMetadata listener = reader.getMessageListener(new MessageListener2()); ListenerValidator validator = new ListenerValidator() .expectHandlers(2, String.class) .expectHandlers(2, Object.class) @@ -48,7 +48,7 @@ public class MetadataReaderTest extends UnitTest { @Test public void testListenerWithInheritanceOverriding() { - MessageListenerMetadata listener = reader.getMessageListener(EventListener3.class); + MessageListenerMetadata listener = reader.getMessageListener(new MessageListener3()); ListenerValidator validator = new ListenerValidator() .expectHandlers(0, String.class) @@ -59,7 +59,7 @@ public class MetadataReaderTest extends UnitTest { @Test public void testEnveloped() { - MessageListenerMetadata listener = reader.getMessageListener(EnvelopedListener.class); + MessageListenerMetadata listener = reader.getMessageListener(new EnvelopedListener()); ListenerValidator validator = new ListenerValidator() .expectHandlers(1, String.class) .expectHandlers(2, Integer.class) @@ -72,7 +72,7 @@ public class MetadataReaderTest extends UnitTest { @Test public void testEnvelopedSubclass() { - MessageListenerMetadata listener = reader.getMessageListener(EnvelopedListenerSubclass.class); + MessageListenerMetadata listener = reader.getMessageListener(new EnvelopedListenerSubclass()); ListenerValidator validator = new ListenerValidator() .expectHandlers(1, String.class) .expectHandlers(2, Integer.class) @@ -110,7 +110,7 @@ public class MetadataReaderTest extends UnitTest { // a simple event listener - public class EventListener1 { + public class MessageListener1 { @Handler(rejectSubtypes = true) public void handleObject(Object o) { @@ -131,7 +131,7 @@ public class MetadataReaderTest extends UnitTest { } // the same handlers as its super class - public class EventListener2 extends EventListener1 { + public class MessageListener2 extends MessageListener1 { // redefine handler implementation (not configuration) public void handleString(String s) { @@ -140,7 +140,7 @@ public class MetadataReaderTest extends UnitTest { } - public class EventListener3 extends EventListener2 { + public class MessageListener3 extends MessageListener2 { // narrow the handler @Handler(rejectSubtypes = true) diff --git a/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java new file mode 100644 index 0000000..98437e6 --- /dev/null +++ b/src/test/java/net/engio/mbassy/SubscriptionManagerTest.java @@ -0,0 +1,60 @@ +package net.engio.mbassy; + +import net.engio.mbassy.common.UnitTest; +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.messages.ITestMessage; +import net.engio.mbassy.messages.TestMessage; +import net.engio.mbassy.subscription.Subscription; +import net.engio.mbassy.subscription.SubscriptionFactory; +import net.engio.mbassy.subscription.SubscriptionManager; +import org.junit.Test; + +import java.util.Collection; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 5/12/13 + */ +public class SubscriptionManagerTest extends UnitTest{ + + @Test + public void testSimpleSynchronousHandler(){ + SubscriptionManager subMan = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory()); + SimpleSynchronousMessageHandler + listener1 = new SimpleSynchronousMessageHandler(), + listener2 = new SimpleSynchronousMessageHandler(); + subMan.subscribe(listener1); + subMan.subscribe(listener2); + + Collection subscriptions = subMan.getSubscriptionsByMessageType(TestMessage.class); + assertEquals(1, subscriptions.size()); + for(Subscription sub : subscriptions){ + assertEquals(2, sub.size()); + assertTrue(sub.contains(listener1)); + assertTrue(sub.contains(listener2)); + } + + subscriptions = subMan.getSubscriptionsByMessageType(ITestMessage.class); + assertEquals(1, subscriptions.size()); + for(Subscription sub : subscriptions){ + assertEquals(2, sub.size()); + assertTrue(sub.contains(listener1)); + assertTrue(sub.contains(listener2)); + } + } + + + static class SimpleSynchronousMessageHandler{ + + @Handler + public void handle(TestMessage message) { + } + + @Handler + public void handle(ITestMessage message) { + } + } +} diff --git a/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java b/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java index 842a39a..ad5327f 100644 --- a/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java +++ b/src/test/java/net/engio/mbassy/bus/ListenerSubscriptionTest.java @@ -2,8 +2,8 @@ package net.engio.mbassy.bus; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.TestUtil; -import net.engio.mbassy.events.SubTestMessage; -import net.engio.mbassy.events.TestMessage; +import net.engio.mbassy.messages.SubTestMessage; +import net.engio.mbassy.messages.TestMessage; import net.engio.mbassy.listeners.EventingTestBean; import net.engio.mbassy.listeners.EventingTestBean2; import net.engio.mbassy.listeners.EventingTestBean3; diff --git a/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java b/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java index 35ef3d1..7843e53 100644 --- a/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java +++ b/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java @@ -1,8 +1,8 @@ package net.engio.mbassy.listeners; import net.engio.mbassy.dispatch.HandlerInvocation; -import net.engio.mbassy.events.SubTestMessage; -import net.engio.mbassy.events.TestMessage; +import net.engio.mbassy.messages.SubTestMessage; +import net.engio.mbassy.messages.TestMessage; import net.engio.mbassy.listener.*; import net.engio.mbassy.subscription.SubscriptionContext; diff --git a/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java b/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java index 3ac9395..637fc44 100644 --- a/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java +++ b/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.events.SubTestMessage; +import net.engio.mbassy.messages.SubTestMessage; import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listener.Invoke; import net.engio.mbassy.listener.Listener; diff --git a/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java b/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java index aa50901..86ccf29 100644 --- a/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java +++ b/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java @@ -1,6 +1,6 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.events.SubTestMessage; +import net.engio.mbassy.messages.SubTestMessage; import net.engio.mbassy.listener.Handler; import net.engio.mbassy.listener.Invoke; import net.engio.mbassy.listener.Listener; diff --git a/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java b/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java index 446577e..43c8149 100644 --- a/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java +++ b/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java @@ -1,7 +1,7 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.events.TestMessage; -import net.engio.mbassy.events.TestMessage2; +import net.engio.mbassy.messages.TestMessage; +import net.engio.mbassy.messages.TestMessage2; import net.engio.mbassy.listener.*; import net.engio.mbassy.listener.Invoke; import net.engio.mbassy.subscription.MessageEnvelope; diff --git a/src/test/java/net/engio/mbassy/listeners/NonListeningBean.java b/src/test/java/net/engio/mbassy/listeners/NonListeningBean.java index 6537b4c..3bc5f34 100644 --- a/src/test/java/net/engio/mbassy/listeners/NonListeningBean.java +++ b/src/test/java/net/engio/mbassy/listeners/NonListeningBean.java @@ -1,7 +1,7 @@ package net.engio.mbassy.listeners; -import net.engio.mbassy.events.SubTestMessage; -import net.engio.mbassy.events.TestMessage; +import net.engio.mbassy.messages.SubTestMessage; +import net.engio.mbassy.messages.TestMessage; import net.engio.mbassy.listener.Handler; /** diff --git a/src/test/java/net/engio/mbassy/messages/ITestMessage.java b/src/test/java/net/engio/mbassy/messages/ITestMessage.java new file mode 100644 index 0000000..45eaefb --- /dev/null +++ b/src/test/java/net/engio/mbassy/messages/ITestMessage.java @@ -0,0 +1,10 @@ +package net.engio.mbassy.messages; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 5/12/13 + */ +public interface ITestMessage { +} diff --git a/src/test/java/net/engio/mbassy/events/SubTestMessage.java b/src/test/java/net/engio/mbassy/messages/SubTestMessage.java similarity index 75% rename from src/test/java/net/engio/mbassy/events/SubTestMessage.java rename to src/test/java/net/engio/mbassy/messages/SubTestMessage.java index ef295ec..293bba3 100644 --- a/src/test/java/net/engio/mbassy/events/SubTestMessage.java +++ b/src/test/java/net/engio/mbassy/messages/SubTestMessage.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.events; +package net.engio.mbassy.messages; /** * diff --git a/src/test/java/net/engio/mbassy/events/TestMessage.java b/src/test/java/net/engio/mbassy/messages/TestMessage.java similarity index 84% rename from src/test/java/net/engio/mbassy/events/TestMessage.java rename to src/test/java/net/engio/mbassy/messages/TestMessage.java index 12e69dd..d0cbeea 100644 --- a/src/test/java/net/engio/mbassy/events/TestMessage.java +++ b/src/test/java/net/engio/mbassy/messages/TestMessage.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.events; +package net.engio.mbassy.messages; import java.util.concurrent.atomic.AtomicInteger; diff --git a/src/test/java/net/engio/mbassy/events/TestMessage2.java b/src/test/java/net/engio/mbassy/messages/TestMessage2.java similarity index 84% rename from src/test/java/net/engio/mbassy/events/TestMessage2.java rename to src/test/java/net/engio/mbassy/messages/TestMessage2.java index 1270529..33b9edc 100644 --- a/src/test/java/net/engio/mbassy/events/TestMessage2.java +++ b/src/test/java/net/engio/mbassy/messages/TestMessage2.java @@ -1,4 +1,4 @@ -package net.engio.mbassy.events; +package net.engio.mbassy.messages; import java.util.concurrent.atomic.AtomicInteger;