diff --git a/src/main/java/net/engio/mbassy/MBassador.java b/src/main/java/net/engio/mbassy/MBassador.java index 40e2f4a..a8013af 100644 --- a/src/main/java/net/engio/mbassy/MBassador.java +++ b/src/main/java/net/engio/mbassy/MBassador.java @@ -51,15 +51,17 @@ public class MBassador implements IMessageBus { private final Disruptor disruptor; private final RingBuffer ringBuffer; + private WorkerPool workerPool; + public MBassador() { - this(Runtime.getRuntime().availableProcessors() - 1); + this(Runtime.getRuntime().availableProcessors() * 2 - 1); } public MBassador(int numberOfThreads) { if (numberOfThreads < 1) { - numberOfThreads = 1; // at LEAST 1 thread. + numberOfThreads = 1; // at LEAST 1 threads } this.subscriptionManager = new SubscriptionManager(); @@ -74,16 +76,16 @@ public class MBassador implements IMessageBus { handlers[i] = new EventProcessor(this); } - WorkerPool workerPool = new WorkerPool(this.ringBuffer, - this.ringBuffer.newBarrier(), - loggingExceptionHandler, - handlers); + this.workerPool = new WorkerPool(this.ringBuffer, + this.ringBuffer.newBarrier(), + loggingExceptionHandler, + handlers); - this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); + this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences()); } public final MBassador start() { -// workerPool.start(this.executor); ?? + this.workerPool.start(this.executor); this.disruptor.start(); return this; } @@ -104,8 +106,8 @@ public class MBassador implements IMessageBus { } @Override - public boolean unsubscribe(Object listener) { - return this.subscriptionManager.unsubscribe(listener); + public void unsubscribe(Object listener) { + this.subscriptionManager.unsubscribe(listener); } diff --git a/src/main/java/net/engio/mbassy/PubSubSupport.java b/src/main/java/net/engio/mbassy/PubSubSupport.java index 01e234f..c6c6ad5 100644 --- a/src/main/java/net/engio/mbassy/PubSubSupport.java +++ b/src/main/java/net/engio/mbassy/PubSubSupport.java @@ -16,36 +16,33 @@ public interface PubSubSupport { /** * Subscribe all handlers of the given listener. Any listener is only subscribed once * subsequent subscriptions of an already subscribed listener will be silently ignored - * - * @param listener */ void subscribe(Object listener); /** - * Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers - * have effectively been removed and will not receive any messages (provided that running publications (iterators) in other threads + * Immediately remove all registered message handlers (if any) of the given listener. + * When this call returns all handlers have effectively been removed and will not + * receive any messages (provided that running publications (iterators) in other threads * have not yet obtained a reference to the listener) *

* A call to this method passing any object that is not subscribed will not have any effect and is silently ignored. - * - * @param listener - * @return true, if the listener was found and successfully removed - * false otherwise */ - boolean unsubscribe(Object listener); + void unsubscribe(Object listener); /** - * Synchronously publish a message to all registered listeners. This includes listeners defined for super types of the - * given message type, provided they are not configured to reject valid subtypes. The call returns when all matching handlers - * of all registered listeners have been notified (invoked) of the message. + * Synchronously publish a message to all registered listeners. This includes listeners + * defined for super types of the given message type, provided they are not configured + * to reject valid subtypes. The call returns when all matching handlers of all registered + * listeners have been notified (invoked) of the message. */ void publish(Object message); /** - * Synchronously publish TWO messages to all registered listeners (that match the signature). This includes listeners - * defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call - * returns when all matching handlers of all registered listeners have been notified (invoked) of the message. + * Synchronously publish TWO messages to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured + * to reject valid subtypes. The call returns when all matching handlers of all registered listeners have + * been notified (invoked) of the message. */ void publish(Object message1, Object message2); diff --git a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java index 021874f..c2fb037 100644 --- a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java @@ -81,12 +81,23 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { } } + /** + * The return on this is DIFFERENT than normal. + * + * @return TRUE if there are no more elements (aka: this set is now empty) + */ @Override public boolean remove(T element) { if (!contains(element)) { // return quickly - return false; + Lock readLock = this.lock.readLock(); + readLock.lock(); + boolean headIsNull = this.head == null; + readLock.unlock(); + + return headIsNull; } else { + boolean wasLastElement = false; Lock writeLock = this.lock.writeLock(); try { writeLock.lock(); @@ -102,10 +113,14 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { //oldHead.clear(); // optimize for GC not possible because of potentially running iterators } this.entries.remove(element); + + if (this.head == null) { + wasLastElement = true; + } } finally { writeLock.unlock(); } - return true; + return wasLastElement; } } diff --git a/src/main/java/net/engio/mbassy/common/IConcurrentSet.java b/src/main/java/net/engio/mbassy/common/IConcurrentSet.java index 8afd72a..6b99afc 100644 --- a/src/main/java/net/engio/mbassy/common/IConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/common/IConcurrentSet.java @@ -16,5 +16,10 @@ public interface IConcurrentSet extends Iterable { void addAll(Iterable elements); + /** + * The return on this is DIFFERENT than normal. + * + * @return TRUE if there are no more elements (aka: this set is now empty) + */ boolean remove(T element); } diff --git a/src/main/java/net/engio/mbassy/common/ObjectTree.java b/src/main/java/net/engio/mbassy/common/ObjectTree.java index e362257..685ccc6 100644 --- a/src/main/java/net/engio/mbassy/common/ObjectTree.java +++ b/src/main/java/net/engio/mbassy/common/ObjectTree.java @@ -41,6 +41,166 @@ public class ObjectTree { WRITE.unlock(); } + public void removeValue() { + Lock WRITE = this.lock.writeLock(); + WRITE.lock(); // upgrade to the write lock, at this point blocks other readers + + this.value = null; + + WRITE.unlock(); + } + + /** + * Removes a branch from the tree, and cleans up, if necessary + */ + public void remove(KEY key) { + if (key == null) { + removeLeaf(key); + } + } + + /** + * Removes a branch from the tree, and cleans up, if necessary + */ + public void remove(KEY key1, KEY key2) { + if (key1 == null || key2 == null) { + return; + } + + Lock UPDATE = this.lock.updateLock(); + UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks + + ObjectTree leaf = null; + if (this.children != null) { + leaf = this.children.get(key1); + + if (leaf != null) { + // promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state + Lock WRITE = this.lock.writeLock(); + WRITE.lock(); // upgrade to the write lock, at this point blocks other readers + + leaf.removeLeaf(key2); + this.children.remove(key1); + + if (this.children.isEmpty()) { + this.children = null; + } + WRITE.unlock(); + } + } + + UPDATE.unlock(); + } + + /** + * Removes a branch from the tree, and cleans up, if necessary + */ + public void remove(KEY key1, KEY key2, KEY key3) { + if (key1 == null || key2 == null) { + return; + } + + Lock UPDATE = this.lock.updateLock(); + UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks + + ObjectTree leaf = null; + if (this.children != null) { + leaf = this.children.get(key1); + + if (leaf != null) { + // promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state + Lock WRITE = this.lock.writeLock(); + WRITE.lock(); // upgrade to the write lock, at this point blocks other readers + + leaf.remove(key2, key3); + this.children.remove(key1); + + if (this.children.isEmpty()) { + this.children = null; + } + WRITE.unlock(); + } + } + + UPDATE.unlock(); + } + + + /** + * Removes a branch from the tree, and cleans up, if necessary + */ + @SuppressWarnings("unchecked") + public void remove(KEY... keys) { + if (keys == null) { + return; + } + + removeLeaf(0, keys); + } + + /** + * Removes a branch from the tree, and cleans up, if necessary + */ + private final void removeLeaf(KEY key) { + if (key != null) { + // promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state + Lock WRITE = this.lock.writeLock(); + WRITE.lock(); // upgrade to the write lock, at this point blocks other readers + + if (this.children != null) { + ObjectTree leaf = this.children.get(key); + if (leaf != null) { + if (leaf.children == null && leaf.value == null) { + this.children.remove(key); + } + + if (this.children.isEmpty()) { + this.children = null; + } + } + } + WRITE.unlock(); + } + } + + // keys CANNOT be null here! + private final void removeLeaf(int index, KEY[] keys) { + Lock UPDATE = this.lock.updateLock(); + UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks + + if (index == keys.length) { + // promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state + Lock WRITE = this.lock.writeLock(); + WRITE.lock(); // upgrade to the write lock, at this point blocks other readers + + // we have reached the leaf to remove! + this.value = null; + this.children = null; + + WRITE.unlock(); + } else if (this.children != null) { + // promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state + Lock WRITE = this.lock.writeLock(); + WRITE.lock(); // upgrade to the write lock, at this point blocks other readers + + if (this.children != null) { + ObjectTree leaf = this.children.get(keys[index]); + if (leaf != null) { + leaf.removeLeaf(index+1, keys); + if (leaf.children == null && leaf.value == null) { + this.children.remove(keys[index]); + } + + if (this.children.isEmpty()) { + this.children = null; + } + } + } + WRITE.unlock(); + } + + UPDATE.unlock(); + } /** * BACKWARDS, because our signature must allow for N keys... @@ -101,7 +261,7 @@ public class ObjectTree { * BACKWARDS, because our signature must allow for N keys... */ @SuppressWarnings("unchecked") - public ObjectTree createLeaves(KEY... keys) { + public ObjectTree createLeaf(KEY... keys) { if (keys == null) { return this; } @@ -210,7 +370,7 @@ public class ObjectTree { return returnValue; } - public VALUE get(KEY key1, KEY key2) { + public VALUE getValue(KEY key1, KEY key2) { ObjectTree tree = null; // get value from our children tree = getLeaf(key1); @@ -230,7 +390,7 @@ public class ObjectTree { return returnValue; } - public VALUE get(KEY key1, KEY key2, KEY key3) { + public VALUE getValue(KEY key1, KEY key2, KEY key3) { ObjectTree tree = null; // get value from our children tree = getLeaf(key1); @@ -254,7 +414,7 @@ public class ObjectTree { } @SuppressWarnings("unchecked") - public VALUE get(KEY... keys) { + public VALUE getValue(KEY... keys) { ObjectTree tree = null; // get value from our children tree = getLeaf(keys[0]); @@ -300,4 +460,64 @@ public class ObjectTree { return tree; } + + public final ObjectTree getLeaf(KEY key1, KEY key2) { + ObjectTree tree = null; + // get value from our children + tree = getLeaf(key1); + if (tree != null) { + tree = tree.getLeaf(key2); + } + + if (tree == null) { + return null; + } + + return tree; + } + + public final ObjectTree getLeaf(KEY key1, KEY key2, KEY key3) { + ObjectTree tree = null; + // get value from our children + tree = getLeaf(key1); + if (tree != null) { + tree = tree.getLeaf(key2); + } + if (tree != null) { + tree = tree.getLeaf(key3); + } + + if (tree == null) { + return null; + } + + return tree; + } + + @SuppressWarnings("unchecked") + public final ObjectTree getLeaf(KEY... keys) { + int size = keys.length; + + if (size == 0) { + return null; + } + + ObjectTree tree = null; + // get value from our children + tree = getLeaf(keys[0]); + + for (int i=1;i getMethods(Class target) { - List methods = new LinkedList(); + public static Collection getMethods(Class target) { + Collection methods = new ArrayDeque(); try { for (Method method : target.getDeclaredMethods()) { if (getAnnotation(method, Handler.class) != null) { @@ -85,7 +85,7 @@ public class ReflectionUtils } // - public static boolean containsOverridingMethod(final List allMethods, final Method methodToCheck) { + public static boolean containsOverridingMethod(final Collection allMethods, final Method methodToCheck) { for (Method method : allMethods) { if (isOverriddenBy(methodToCheck, method)) { return true; diff --git a/src/main/java/net/engio/mbassy/listener/MessageHandler.java b/src/main/java/net/engio/mbassy/listener/MessageHandler.java index 9637d29..dbe4af3 100644 --- a/src/main/java/net/engio/mbassy/listener/MessageHandler.java +++ b/src/main/java/net/engio/mbassy/listener/MessageHandler.java @@ -2,6 +2,7 @@ package net.engio.mbassy.listener; import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import java.util.Arrays; import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.annotations.Synchronized; @@ -34,7 +35,7 @@ public class MessageHandler { private final MessageListener listenerConfig; // if ONE of the handled messages is of type array, then we configure it to ALSO accept var args! - private final boolean acceptsVarArg; + private final boolean isVarArg; private final boolean isSynchronized; public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){ @@ -55,7 +56,7 @@ public class MessageHandler { // if ONE of the handled messages is of type array, then we configure it to ALSO accept var args! - this.acceptsVarArg = handledMessages.length == 1 && handledMessages[0].isArray(); + this.isVarArg = handledMessages.length == 1 && handledMessages[0].isArray(); } public A getAnnotation(Class annotationType){ @@ -82,6 +83,10 @@ public class MessageHandler { * @author dorkbox, llc * Date: 2/2/15 */ + /** Check if this handler permits sending objects as a VarArg (variable argument) */ + public boolean isVarArg() { + return this.isVarArg; + } /** * @return true if the message types are handled @@ -279,8 +284,49 @@ public class MessageHandler { } } - /** Check if this handler permits sending objects as a VarArg (variable argument) */ - public boolean isVarArg() { - return this.acceptsVarArg; + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (this.acceptsSubtypes ? 1231 : 1237); + result = prime * result + (this.isVarArg ? 1231 : 1237); + result = prime * result + Arrays.hashCode(this.handledMessages); + result = prime * result + (this.handler == null ? 0 : this.handler.hashCode()); + result = prime * result + (this.isSynchronized ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + MessageHandler other = (MessageHandler) obj; + if (this.acceptsSubtypes != other.acceptsSubtypes) { + return false; + } + if (this.isVarArg != other.isVarArg) { + return false; + } + if (!Arrays.equals(this.handledMessages, other.handledMessages)) { + return false; + } + if (this.handler == null) { + if (other.handler != null) { + return false; + } + } else if (!this.handler.equals(other.handler)) { + return false; + } + if (this.isSynchronized != other.isSynchronized) { + return false; + } + return true; } } diff --git a/src/main/java/net/engio/mbassy/listener/MetadataReader.java b/src/main/java/net/engio/mbassy/listener/MetadataReader.java index f570e16..10936a8 100644 --- a/src/main/java/net/engio/mbassy/listener/MetadataReader.java +++ b/src/main/java/net/engio/mbassy/listener/MetadataReader.java @@ -1,8 +1,8 @@ package net.engio.mbassy.listener; import java.lang.reflect.Method; -import java.util.LinkedList; -import java.util.List; +import java.util.ArrayDeque; +import java.util.Collection; import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.common.ReflectionUtils; @@ -20,10 +20,10 @@ public class MetadataReader { public MessageListener getMessageListener(Class target) { // get all handlers (this will include all (inherited) methods directly annotated using @Handler) - List allHandlers = ReflectionUtils.getMethods(target); + Collection allHandlers = ReflectionUtils.getMethods(target); // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) - List bottomMostHandlers = new LinkedList(); + Collection bottomMostHandlers = new ArrayDeque(); for (Method handler : allHandlers) { if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) { bottomMostHandlers.add(handler); diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index d15dd0b..91e3e32 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -5,7 +5,10 @@ import java.lang.reflect.Method; import java.util.Arrays; import net.engio.mbassy.common.IConcurrentSet; +import net.engio.mbassy.common.WeakConcurrentSet; import net.engio.mbassy.dispatch.IHandlerInvocation; +import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; +import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.error.ErrorHandlingSupport; import net.engio.mbassy.error.PublicationError; import net.engio.mbassy.listener.MessageHandler; @@ -33,10 +36,16 @@ public class Subscription { private final IHandlerInvocation invocation; protected final IConcurrentSet listeners; - Subscription(MessageHandler handler, IHandlerInvocation invocation, IConcurrentSet listeners) { + Subscription(MessageHandler handler) { + this.listeners = new WeakConcurrentSet(); this.handlerMetadata = handler; + + IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); + if (handler.isSynchronized()){ + invocation = new SynchronizedHandlerInvocation(invocation); + } + this.invocation = invocation; - this.listeners = listeners; } /** @@ -95,12 +104,16 @@ public class Subscription { return this.handlerMetadata.getHandledMessages(); } - public void subscribe(Object o) { - this.listeners.add(o); + public void subscribe(Object listener) { + this.listeners.add(listener); } + /** + * @return TRUE if there are no listeners subscribed + */ public boolean unsubscribe(Object existingListener) { + // TRUE if there are no more elements (aka: this set is empty) return this.listeners.remove(existingListener); } @@ -337,4 +350,34 @@ public class Subscription { } } } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (this.handlerMetadata == null ? 0 : this.handlerMetadata.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Subscription other = (Subscription) obj; + if (this.handlerMetadata == null) { + if (other.handlerMetadata != null) { + return false; + } + } else if (!this.handlerMetadata.equals(other.handlerMetadata)) { + return false; + } + return true; + } } diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java index 903f293..3dabf24 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionManager.java @@ -1,26 +1,23 @@ package net.engio.mbassy.subscription; import java.lang.reflect.Array; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import net.engio.mbassy.common.ObjectTree; import net.engio.mbassy.common.ReflectionUtils; -import net.engio.mbassy.common.WeakConcurrentSet; -import net.engio.mbassy.dispatch.IHandlerInvocation; -import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; -import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.error.MessageBusException; import net.engio.mbassy.listener.MessageHandler; import net.engio.mbassy.listener.MetadataReader; +import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; + /** * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * It provides fast lookup of existing subscriptions when another instance of an already known @@ -55,40 +52,93 @@ public class SubscriptionManager { private final ConcurrentHashMap, Object> nonListeners = new ConcurrentHashMap, Object>(); // synchronize read/write acces to the subscription maps - private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); + private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock(); public SubscriptionManager() { } - public boolean unsubscribe(Object listener) { + public void unsubscribe(Object listener) { if (listener == null) { - return false; + return; } - Collection subscriptions = getSubscriptionsByListener(listener); - if (subscriptions == null) { - return false; - } - boolean isRemoved = true; - for (Subscription subscription : subscriptions) { - isRemoved &= subscription.unsubscribe(listener); - } - return isRemoved; - } - - private Collection getSubscriptionsByListener(Object listener) { + boolean nothingLeft = true; Collection subscriptions; try { - this.LOCK.readLock().lock(); - subscriptions = this.subscriptionsPerListener.get(listener.getClass()); + this.LOCK.writeLock().lock(); + Class listenerClass = listener.getClass(); + subscriptions = this.subscriptionsPerListener.get(listenerClass); + + if (subscriptions != null) { + for (Subscription subscription : subscriptions) { + boolean isEmpty = subscription.unsubscribe(listener); + + if (isEmpty) { + // single or multi? + Class[] handledMessageTypes = subscription.getHandledMessageTypes(); + int size = handledMessageTypes.length; + if (size == 1) { + // single + Class clazz = handledMessageTypes[0]; + + // NOTE: Not thread-safe! must be synchronized in outer scope + Collection subs = this.subscriptionsPerMessageSingle.get(clazz); + if (subs != null) { + subs.remove(subscription); + + if (subs.isEmpty()) { + // remove element + this.subscriptionsPerMessageSingle.remove(clazz); + } + } + } else { + // multi (is thread safe) + ObjectTree, Collection> tree; + + switch (size) { + case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break; + case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; + default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break; + } + + if (tree != null) { + Collection subs = tree.getValue(); + if (subs != null) { + subs.remove(subscription); + + if (subs.isEmpty()) { + // remove tree element + switch (size) { + case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break; + case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; + default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break; + } + } + } + } + } + } + + nothingLeft &= isEmpty; + } + } + + if (nothingLeft) { + // now we have to clean up + this.subscriptionsPerListener.remove(listenerClass); + } + } finally { - this.LOCK.readLock().unlock(); + this.LOCK.writeLock().unlock(); } - return subscriptions; + + return; } + + // when a class is subscribed, the registrations for that class are permanent in the "subscriptionsPerListener"? public void subscribe(Object listener) { try { Class listenerClass = listener.getClass(); @@ -98,45 +148,97 @@ public class SubscriptionManager { return; } - Collection subscriptionsByListener = getSubscriptionsByListener(listener); - // a listener is either subscribed for the first time - if (subscriptionsByListener == null) { - List messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); - if (messageHandlers.isEmpty()) { - // remember the class as non listening class if no handlers are found - this.nonListeners.put(listenerClass, this.nonListeners); - return; - } + Collection subscriptions; + try { + this.LOCK.updateLock().lock(); + boolean hasSubs = false; + subscriptions = this.subscriptionsPerListener.get(listenerClass); - // it's safe to use non-concurrent collection here (read only) - subscriptionsByListener = new ArrayList(messageHandlers.size()); - - // create subscriptions for all detected message handlers - for (MessageHandler messageHandler : messageHandlers) { - // create the subscription + if (subscriptions != null) { + hasSubs = true; + } else { + // a listener is either subscribed for the first time try { - IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); + this.LOCK.writeLock().lock(); // upgrade updatelock to write lock, Avoid DCL - if (messageHandler.isSynchronized()){ - invocation = new SynchronizedHandlerInvocation(invocation); + subscriptions = this.subscriptionsPerListener.get(listenerClass); + + if (subscriptions != null) { + hasSubs = true; + } else { + // a listener is either subscribed for the first time + List messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); + if (messageHandlers.isEmpty()) { + // remember the class as non listening class if no handlers are found + this.nonListeners.put(listenerClass, this.nonListeners); + return; + } + + // it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock + subscriptions = new ArrayList(messageHandlers.size()); + + // create subscriptions for all detected message handlers + for (MessageHandler messageHandler : messageHandlers) { + // create the subscription + try { + Subscription subscription = new Subscription(messageHandler); + subscriptions.add(subscription); + } catch (Exception e) { + throw new MessageBusException(e); + } + } + + for (Subscription sub : subscriptions) { + sub.subscribe(listener); + + // single or multi? + Class[] handledMessageTypes = sub.getHandledMessageTypes(); + int size = handledMessageTypes.length; + if (size == 1) { + // single + Class clazz = handledMessageTypes[0]; + + // NOTE: Not thread-safe! must be synchronized in outer scope + Collection subs = this.subscriptionsPerMessageSingle.get(clazz); + if (subs == null) { + subs = new ArrayDeque(); + this.subscriptionsPerMessageSingle.put(clazz, subs); + } + subs.add(sub); + } else { + // multi (is thread safe) + ObjectTree, Collection> tree; + + switch (size) { + case 2: tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); break; + case 3: tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; + default: tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); break; + } + + Collection subs = tree.getValue(); + if (subs == null) { + subs = new ArrayDeque(); + tree.putValue(subs); + } + subs.add(sub); + } + } + + this.subscriptionsPerListener.put(listenerClass, subscriptions); } - - Subscription subscription = new Subscription(messageHandler, invocation, new WeakConcurrentSet()); - subscriptionsByListener.add(subscription); - } catch (Exception e) { - throw new MessageBusException(e); + } finally { + this.LOCK.writeLock().unlock(); } } - // this will acquire a write lock and handle the case when another thread already subscribed - // this particular listener in the mean-time - subscribe(listener, subscriptionsByListener); - } - else { - // or the subscriptions already exist and must only be updated - for (Subscription sub : subscriptionsByListener) { - sub.subscribe(listener); + if (hasSubs) { + // or the subscriptions already exist and must only be updated + for (Subscription sub : subscriptions) { + sub.subscribe(listener); + } } + } finally { + this.LOCK.updateLock().unlock(); } } catch (Exception e) { throw new RuntimeException(e); @@ -144,88 +246,29 @@ public class SubscriptionManager { } - private void subscribe(Object listener, Collection subscriptions) { - try { - this.LOCK.writeLock().lock(); - // basically this is a deferred double check - // it's an ugly pattern but necessary because atomic upgrade from read to write lock - // is not possible - // the alternative of using a write lock from the beginning would decrease performance dramatically - // because of the huge number of reads compared to writes - Collection subscriptionsByListener = getSubscriptionsByListener(listener); - - if (subscriptionsByListener == null) { - for (Subscription subscription : subscriptions) { - subscription.subscribe(listener); - - // single or multi? - Class[] handledMessageTypes = subscription.getHandledMessageTypes(); - int size = handledMessageTypes.length; - if (size == 1) { - // single - Class clazz = handledMessageTypes[0]; - - // NOTE: Not thread-safe! must be synchronized in outer scope - Collection subs = this.subscriptionsPerMessageSingle.get(clazz); - if (subs == null) { - subs = new LinkedList(); - this.subscriptionsPerMessageSingle.put(clazz, subs); - } - subs.add(subscription); - } else { - // multi (is thread safe) - ObjectTree, Collection> tree = this.subscriptionsPerMessageMulti.createLeaves(handledMessageTypes); - Collection subs = tree.getValue(); - if (subs == null) { - subs = new LinkedList(); - tree.putValue(subs); - } - subs.add(subscription); - } - } - this.subscriptionsPerListener.put(listener.getClass(), subscriptions); - } - // the rare case when multiple threads concurrently subscribed the same class for the first time - // one will be first, all others will have to subscribe to the existing instead the generated subscriptions - else { - for (Subscription existingSubscription : subscriptionsByListener) { - existingSubscription.subscribe(listener); - } - } - } finally { - this.LOCK.writeLock().unlock(); - } - } - - - - - - - - - - // obtain the set of subscriptions for the given message type // Note: never returns null! public Collection getSubscriptionsByMessageType(Class messageType) { // thread safe publication - Collection subscriptions = new LinkedList(); + Collection subscriptions = new ArrayDeque(); try { this.LOCK.readLock().lock(); - Set> types1 = ReflectionUtils.getSuperTypes(messageType); - types1.add(messageType); + Collection subs = this.subscriptionsPerMessageSingle.get(messageType); + if (subs != null) { + subscriptions.addAll(subs); + } // also add all subscriptions that match super types + Set> types1 = ReflectionUtils.getSuperTypes(messageType); for (Class eventSuperType : types1) { - Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { - if (sub.handlesMessageType(eventSuperType)) { + if (sub.handlesMessageType(messageType)) { subscriptions.add(sub); } } @@ -235,14 +278,15 @@ public class SubscriptionManager { /////////////// // a var-arg handler might match /////////////// - // tricky part. We have to check the ARRAY version, + // tricky part. We have to check the ARRAY version + types1.add(messageType); for (Class eventSuperType : types1) { // messy, but the ONLY way to do it. // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types - Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { subscriptions.add(sub); @@ -260,17 +304,20 @@ public class SubscriptionManager { // Note: never returns null! public Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2) { // thread safe publication - Collection subscriptions = new LinkedList(); + Collection subscriptions = new ArrayDeque(); try { this.LOCK.readLock().lock(); + Collection subs = this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2); + if (subs != null) { + subscriptions.addAll(subs); + } + + + // also add all subscriptions that match super types Set> types1 = ReflectionUtils.getSuperTypes(messageType1); - types1.add(messageType1); - Set> types2 = ReflectionUtils.getSuperTypes(messageType2); - types2.add(messageType2); - // also add all subscriptions that match super types for (Class eventSuperType1 : types1) { ObjectTree, Collection> leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); @@ -280,10 +327,10 @@ public class SubscriptionManager { ObjectTree, Collection> leaf2 = leaf1.getLeaf(eventSuperType2); if (leaf2 != null) { - Collection subs = leaf2.getValue(); + subs = leaf2.getValue(); if (subs != null) { for (Subscription sub : subs) { - if (sub.handlesMessageType(eventSuperType1, eventSuperType2)) { + if (sub.handlesMessageType(messageType1, messageType2)) { subscriptions.add(sub); } } @@ -297,6 +344,8 @@ public class SubscriptionManager { // if they are ALL the same type, a var-arg handler might match /////////////// if (messageType1 == messageType2) { + types1.add(messageType1); + types1.add(messageType2); // tricky part. We have to check the ARRAY version for (Class eventSuperType : types1) { // messy, but the ONLY way to do it. @@ -304,7 +353,7 @@ public class SubscriptionManager { eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types - Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { subscriptions.add(sub); @@ -324,19 +373,21 @@ public class SubscriptionManager { // Note: never returns null! public Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { // thread safe publication - Collection subscriptions = new LinkedList(); + Collection subscriptions = new ArrayDeque(); try { this.LOCK.readLock().lock(); + Collection subs = this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); + if (subs != null) { + subscriptions.addAll(subs); + } + + + // also add all subscriptions that match super types Set> types1 = ReflectionUtils.getSuperTypes(messageType1); - types1.add(messageType1); - Set> types2 = ReflectionUtils.getSuperTypes(messageType2); - types2.add(messageType2); - Set> types3 = ReflectionUtils.getSuperTypes(messageType3); - types3.add(messageType3); // also add all subscriptions that match super types for (Class eventSuperType1 : types1) { @@ -351,10 +402,10 @@ public class SubscriptionManager { ObjectTree, Collection> leaf3 = leaf2.getLeaf(eventSuperType3); if (leaf3 != null) { - Collection subs = leaf3.getValue(); + subs = leaf3.getValue(); if (subs != null) { for (Subscription sub : subs) { - if (sub.handlesMessageType(eventSuperType1, eventSuperType2, eventSuperType3)) { + if (sub.handlesMessageType(messageType1, messageType2, messageType3)) { subscriptions.add(sub); } } @@ -371,13 +422,16 @@ public class SubscriptionManager { /////////////// if (messageType1 == messageType2 && messageType2 == messageType3) { // tricky part. We have to check the ARRAY version + types1.add(messageType1); + types1.add(messageType2); + types1.add(messageType3); for (Class eventSuperType : types1) { // messy, but the ONLY way to do it. // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types - Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); if (subs != null) { for (Subscription sub : subs) { subscriptions.add(sub); @@ -396,56 +450,64 @@ public class SubscriptionManager { // Note: never returns null! public Collection getSubscriptionsByMessageType(Class... messageTypes) { // thread safe publication - Collection subscriptions = new LinkedList(); + Collection subscriptions = new ArrayDeque(); try { this.LOCK.readLock().lock(); - int size = messageTypes.length; - if (size == 0) { - this.LOCK.readLock().unlock(); - return subscriptions; - } - - boolean allSameType = true; - Class firstType = messageTypes[0]; - - @SuppressWarnings("unchecked") - Set>[] types = new Set[size]; - for (int i=0;i from = messageTypes[i]; - types[i] = ReflectionUtils.getSuperTypes(from); - types[i].add(from); - if (from != firstType) { - allSameType = false; + Collection subs = this.subscriptionsPerMessageMulti.getValue(messageTypes); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.handlesMessageType(messageTypes)) { + subscriptions.add(sub); + } } } - // add all subscriptions that match super types combinations - // have to use recursion for this. BLEH - getSubsVarArg(subscriptions, types, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes); + int size = messageTypes.length; + if (size > 0) { + boolean allSameType = true; + Class firstType = messageTypes[0]; - /////////////// - // if they are ALL the same type, a var-arg handler might match - /////////////// - if (allSameType) { - // do we have a var-arg (it shows as an array) subscribed? + @SuppressWarnings("unchecked") + Set>[] types = new Set[size]; + for (int i=0;i from = messageTypes[i]; + types[i] = ReflectionUtils.getSuperTypes(from); + types[i].add(from); + if (from != firstType) { + allSameType = false; + } + } - // tricky part. We have to check the ARRAY version - for (Class eventSuperType : types[0]) { - // messy, but the ONLY way to do it. - // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method - eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); - // also add all subscriptions that match super types - Collection subs = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + // add all subscriptions that match super types combinations + // have to use recursion for this. BLEH + getSubsVarArg(subscriptions, types, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes); + + /////////////// + // if they are ALL the same type, a var-arg handler might match + /////////////// + if (allSameType) { + // do we have a var-arg (it shows as an array) subscribed? + + // tricky part. We have to check the ARRAY version + for (Class eventSuperType : types[0]) { + // messy, but the ONLY way to do it. + // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method + eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); + + // also add all subscriptions that match super types + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } + } } finally { this.LOCK.readLock().unlock(); diff --git a/src/test/java/net/engio/mbassy/AllTests.java b/src/test/java/net/engio/mbassy/AllTests.java index 974da92..7bc3773 100644 --- a/src/test/java/net/engio/mbassy/AllTests.java +++ b/src/test/java/net/engio/mbassy/AllTests.java @@ -13,7 +13,6 @@ import org.junit.runners.Suite; @Suite.SuiteClasses({ WeakConcurrentSetTest.class, MBassadorTest.class, - SyncBusTest.MBassadorTest.class, MetadataReaderTest.class, MethodDispatchTest.class, DeadMessageTest.class, diff --git a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java index 40aae59..7c25442 100644 --- a/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java +++ b/src/test/java/net/engio/mbassy/AsyncFIFOBusTest.java @@ -47,7 +47,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { assertEquals(messages[i], listener.receivedSync.get(i)); } } - + fifoBUs.shutdown(); } @Test @@ -76,7 +76,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { pause(2000); } - for(Listener listener : listeners) { + for (Listener listener : listeners) { List receivedSync = listener.receivedSync; synchronized (receivedSync) { @@ -88,6 +88,7 @@ public class AsyncFIFOBusTest extends MessageBusTest { } } + fifoBUs.shutdown(); } public static class Listener { diff --git a/src/test/java/net/engio/mbassy/DeadMessageTest.java b/src/test/java/net/engio/mbassy/DeadMessageTest.java index cac3e92..fb37c98 100644 --- a/src/test/java/net/engio/mbassy/DeadMessageTest.java +++ b/src/test/java/net/engio/mbassy/DeadMessageTest.java @@ -22,10 +22,12 @@ import org.junit.Test; */ public class DeadMessageTest extends MessageBusTest{ + private static final AtomicInteger deadMessages = new AtomicInteger(0); + @Override @Before public void beforeTest(){ - DeadMessagHandler.deadMessages.set(0); + deadMessages.set(0); } @@ -60,7 +62,7 @@ public class DeadMessageTest extends MessageBusTest{ ConcurrentExecutor.runConcurrent(publishUnhandledMessage, ConcurrentUnits); - assertEquals(InstancesPerListener * IterationsPerThread * ConcurrentUnits, DeadMessagHandler.deadMessages.get()); + assertEquals(InstancesPerListener * IterationsPerThread * ConcurrentUnits, deadMessages.get()); } @@ -69,27 +71,28 @@ public class DeadMessageTest extends MessageBusTest{ public void testUnsubscribingAllListeners() { final MBassador bus = createBus(); ListenerFactory deadMessageListener = new ListenerFactory() - .create(InstancesPerListener, DeadMessagHandler.class) - .create(InstancesPerListener, Object.class); + .create(InstancesPerListener, DeadMessagHandler.class); + ListenerFactory objectListener = new ListenerFactory() .create(InstancesPerListener, ObjectListener.class); + ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, deadMessageListener), ConcurrentUnits); // Only dead message handlers available bus.publish(new Object()); // The message should be caught as dead message since there are no subscribed listeners - assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get()); + assertEquals(InstancesPerListener, deadMessages.get()); // Clear deadmessage for future tests - DeadMessagHandler.deadMessages.set(0); + deadMessages.set(0); // Add object listeners and publish again ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, objectListener), ConcurrentUnits); bus.publish(new Object()); // verify that no dead message events were produced - assertEquals(0, DeadMessagHandler.deadMessages.get()); + assertEquals(0, deadMessages.get()); // Unsubscribe all object listeners ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(bus, objectListener), ConcurrentUnits); @@ -98,18 +101,14 @@ public class DeadMessageTest extends MessageBusTest{ bus.publish(new Object()); // The message should be caught, as it's the only listener - assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get()); + assertEquals(0, deadMessages.get()); } public static class DeadMessagHandler { - - private static final AtomicInteger deadMessages = new AtomicInteger(0); - + @SuppressWarnings("unused") @Handler public void handle(DeadMessage message){ deadMessages.incrementAndGet(); } - } - } diff --git a/src/test/java/net/engio/mbassy/MBassadorTest.java b/src/test/java/net/engio/mbassy/MBassadorTest.java index 4c0d921..5d7b42d 100644 --- a/src/test/java/net/engio/mbassy/MBassadorTest.java +++ b/src/test/java/net/engio/mbassy/MBassadorTest.java @@ -128,6 +128,7 @@ public class MBassadorTest extends MessageBusTest { pause(processingTimeInMS); assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); + bus.shutdown(); } diff --git a/src/test/java/net/engio/mbassy/MultiMessageTest.java b/src/test/java/net/engio/mbassy/MultiMessageTest.java index fdc9e8d..f839659 100644 --- a/src/test/java/net/engio/mbassy/MultiMessageTest.java +++ b/src/test/java/net/engio/mbassy/MultiMessageTest.java @@ -22,8 +22,9 @@ public class MultiMessageTest extends MessageBusTest { public void testMultiMessageSending(){ IMessageBus bus = new MBassador().start(); - Listener listener = new Listener(); - bus.subscribe(listener); + Listener listener1 = new Listener(); + bus.subscribe(listener1); + bus.unsubscribe(listener1); bus.publish("s"); bus.publish("s", "s"); @@ -33,7 +34,22 @@ public class MultiMessageTest extends MessageBusTest { bus.publish(1, 2, 3, 4, 5, 6); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); - assertEquals(count.get(), 10); + assertEquals(0, count.get()); + + bus.subscribe(listener1); + + bus.publish("s"); + bus.publish("s", "s"); + bus.publish("s", "s", "s"); + bus.publish("s", "s", "s", "s"); + bus.publish(1, 2, "s"); + bus.publish(1, 2, 3, 4, 5, 6); + bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); + + assertEquals(10, count.get()); + + + bus.shutdown(); } @SuppressWarnings("unused") @@ -47,31 +63,31 @@ public class MultiMessageTest extends MessageBusTest { @Handler public void handleSync(String o1, String o2) { count.getAndIncrement(); - System.err.println("match String, String"); +// System.err.println("match String, String"); } @Handler public void handleSync(String o1, String o2, String o3) { count.getAndIncrement(); - System.err.println("match String, String, String"); +// System.err.println("match String, String, String"); } @Handler public void handleSync(Integer o1, Integer o2, String o3) { count.getAndIncrement(); - System.err.println("match Integer, Integer, String"); +// System.err.println("match Integer, Integer, String"); } @Handler public void handleSync(String... o) { count.getAndIncrement(); - System.err.println("match String[]"); +// System.err.println("match String[]"); } @Handler public void handleSync(Integer... o) { count.getAndIncrement(); - System.err.println("match Integer[]"); +// System.err.println("match Integer[]"); } } } diff --git a/src/test/java/net/engio/mbassy/ObjectTreeTest.java b/src/test/java/net/engio/mbassy/ObjectTreeTest.java index 076d5aa..cd575c5 100644 --- a/src/test/java/net/engio/mbassy/ObjectTreeTest.java +++ b/src/test/java/net/engio/mbassy/ObjectTreeTest.java @@ -29,17 +29,17 @@ public class ObjectTreeTest extends AssertSupport { public void test(ObjectTree, String> tree, String string, Class clazz1, Class clazz2) { tree.put(string, clazz1, clazz2); - assertEquals(string, tree.get(clazz1, clazz2)); + assertEquals(string, tree.getValue(clazz1, clazz2)); } public void test(ObjectTree, String> tree, String string, Class clazz1, Class clazz2, Class clazz3) { tree.put(string, clazz1, clazz2, clazz3); - assertEquals(string, tree.get(clazz1, clazz2, clazz3)); + assertEquals(string, tree.getValue(clazz1, clazz2, clazz3)); } public void test(ObjectTree, String> tree, String string, Class... clazzes) { tree.put(string, clazzes); - assertEquals(string, tree.get(clazzes)); + assertEquals(string, tree.getValue(clazzes)); } @Test @@ -58,5 +58,15 @@ public class ObjectTreeTest extends AssertSupport { test(tree, "ssss", String.class, String.class, String.class, String.class); test(tree, "oosif", Object.class, Object.class, String.class, Integer.class, Float.class); + + + // now make sure we can REMOVE the tree elements + tree.remove(Object.class, Object.class, String.class, Integer.class, Float.class); + ObjectTree, String> leaf = tree.getLeaf(Object.class, Object.class, String.class, Integer.class); + assertNull(leaf); + leaf = tree.getLeaf(Object.class, Object.class); + assertNotNull(leaf); + + assertEquals("xo", leaf.getValue()); } } diff --git a/src/test/java/net/engio/mbassy/SyncBusTest.java b/src/test/java/net/engio/mbassy/SyncBusTest.java index 5558007..b7e55db 100644 --- a/src/test/java/net/engio/mbassy/SyncBusTest.java +++ b/src/test/java/net/engio/mbassy/SyncBusTest.java @@ -24,15 +24,12 @@ import org.junit.Test; * @author bennidi * Date: 2/8/12 */ -public abstract class SyncBusTest extends MessageBusTest { - - - protected abstract IMessageBus getSyncMessageBus(); +public class SyncBusTest extends MessageBusTest { @Test public void testSynchronousMessagePublication() throws Exception { - final IMessageBus bus = getSyncMessageBus(); + final IMessageBus bus = new MBassador().start(); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DisabledListener.class) @@ -69,6 +66,8 @@ public abstract class SyncBusTest extends MessageBusTest { assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(IMessageListener.DefaultListener.class)); assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class)); assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(MessagesListener.DefaultListener.class)); + + bus.shutdown(); } @@ -83,7 +82,7 @@ public abstract class SyncBusTest extends MessageBusTest { } }; - final IMessageBus bus = getSyncMessageBus(); + final IMessageBus bus = new MBassador().start(); bus.addErrorHandler(ExceptionCounter); ListenerFactory listeners = new ListenerFactory() .create(InstancesPerListener, ExceptionThrowingListener.class); @@ -105,19 +104,11 @@ public abstract class SyncBusTest extends MessageBusTest { // multi threaded ConcurrentExecutor.runConcurrent(publish, ConcurrentUnits); assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); + + bus.shutdown(); } - public static class MBassadorTest extends SyncBusTest { - - - @Override - protected IMessageBus getSyncMessageBus() { - return new MBassador().start(); - } - - } - static class IncrementingMessage{ private int count = 1; diff --git a/src/test/java/net/engio/mbassy/common/AssertSupport.java b/src/test/java/net/engio/mbassy/common/AssertSupport.java index f02560e..4a0e0c4 100644 --- a/src/test/java/net/engio/mbassy/common/AssertSupport.java +++ b/src/test/java/net/engio/mbassy/common/AssertSupport.java @@ -1,5 +1,7 @@ package net.engio.mbassy.common; +import java.lang.ref.WeakReference; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -8,8 +10,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.ref.WeakReference; - /** * @author bennidi */ @@ -25,13 +25,13 @@ public class AssertSupport { @Before public void beforeTest(){ - logger.info("Running test " + getTestName()); - testExecutionStart = System.currentTimeMillis(); + this.logger.info("Running test " + getTestName()); + this.testExecutionStart = System.currentTimeMillis(); } @After public void afterTest(){ - logger.info(String.format("Finished " + getTestName() + ": " + (System.currentTimeMillis() - testExecutionStart) + " ms")); + this.logger.info(String.format("Finished " + getTestName() + ": " + (System.currentTimeMillis() - this.testExecutionStart) + " ms")); } @@ -48,15 +48,14 @@ public class AssertSupport { } public String getTestName(){ - return getClass().getSimpleName() + "." + name.getMethodName(); + return getClass().getSimpleName() + "." + this.name.getMethodName(); } public void runGC() { WeakReference ref = new WeakReference(new Object()); - pause(100); while(ref.get() != null) { - pause(10); - runtime.gc(); + this.runtime.gc(); + pause(); } } diff --git a/src/test/java/net/engio/mbassy/common/MessageBusTest.java b/src/test/java/net/engio/mbassy/common/MessageBusTest.java index 35b5326..324f2c5 100644 --- a/src/test/java/net/engio/mbassy/common/MessageBusTest.java +++ b/src/test/java/net/engio/mbassy/common/MessageBusTest.java @@ -33,8 +33,6 @@ public abstract class MessageBusTest extends AssertSupport { } }; - private static final Object mapObject = new Object(); - @Before public void setUp(){ for(MessageTypes mes : MessageTypes.values()) {