Added removeLeaf from object tree. CHANGED concurrentSet.remove() to return TRUE (as atomic operation) if the set is empty when the element is removed. Fixed unsubscript(listener).

This commit is contained in:
nathan 2015-02-07 22:26:01 +01:00
parent 113ffb5e4a
commit 45fbc56320
19 changed files with 685 additions and 281 deletions

View File

@ -51,15 +51,17 @@ public class MBassador implements IMessageBus {
private final Disruptor<MessageHolder> disruptor; private final Disruptor<MessageHolder> disruptor;
private final RingBuffer<MessageHolder> ringBuffer; private final RingBuffer<MessageHolder> ringBuffer;
private WorkerPool<MessageHolder> workerPool;
public MBassador() { public MBassador() {
this(Runtime.getRuntime().availableProcessors() - 1); this(Runtime.getRuntime().availableProcessors() * 2 - 1);
} }
public MBassador(int numberOfThreads) { public MBassador(int numberOfThreads) {
if (numberOfThreads < 1) { if (numberOfThreads < 1) {
numberOfThreads = 1; // at LEAST 1 thread. numberOfThreads = 1; // at LEAST 1 threads
} }
this.subscriptionManager = new SubscriptionManager(); this.subscriptionManager = new SubscriptionManager();
@ -74,16 +76,16 @@ public class MBassador implements IMessageBus {
handlers[i] = new EventProcessor(this); handlers[i] = new EventProcessor(this);
} }
WorkerPool<MessageHolder> workerPool = new WorkerPool<MessageHolder>(this.ringBuffer, this.workerPool = new WorkerPool<MessageHolder>(this.ringBuffer,
this.ringBuffer.newBarrier(), this.ringBuffer.newBarrier(),
loggingExceptionHandler, loggingExceptionHandler,
handlers); handlers);
this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
} }
public final MBassador start() { public final MBassador start() {
// workerPool.start(this.executor); ?? this.workerPool.start(this.executor);
this.disruptor.start(); this.disruptor.start();
return this; return this;
} }
@ -104,8 +106,8 @@ public class MBassador implements IMessageBus {
} }
@Override @Override
public boolean unsubscribe(Object listener) { public void unsubscribe(Object listener) {
return this.subscriptionManager.unsubscribe(listener); this.subscriptionManager.unsubscribe(listener);
} }

View File

@ -16,36 +16,33 @@ public interface PubSubSupport {
/** /**
* Subscribe all handlers of the given listener. Any listener is only subscribed once * Subscribe all handlers of the given listener. Any listener is only subscribed once
* subsequent subscriptions of an already subscribed listener will be silently ignored * subsequent subscriptions of an already subscribed listener will be silently ignored
*
* @param listener
*/ */
void subscribe(Object listener); void subscribe(Object listener);
/** /**
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers * Immediately remove all registered message handlers (if any) of the given listener.
* have effectively been removed and will not receive any messages (provided that running publications (iterators) in other threads * When this call returns all handlers have effectively been removed and will not
* receive any messages (provided that running publications (iterators) in other threads
* have not yet obtained a reference to the listener) * have not yet obtained a reference to the listener)
* <p> * <p>
* A call to this method passing any object that is not subscribed will not have any effect and is silently ignored. * A call to this method passing any object that is not subscribed will not have any effect and is silently ignored.
*
* @param listener
* @return true, if the listener was found and successfully removed
* false otherwise
*/ */
boolean unsubscribe(Object listener); void unsubscribe(Object listener);
/** /**
* Synchronously publish a message to all registered listeners. This includes listeners defined for super types of the * Synchronously publish a message to all registered listeners. This includes listeners
* given message type, provided they are not configured to reject valid subtypes. The call returns when all matching handlers * defined for super types of the given message type, provided they are not configured
* of all registered listeners have been notified (invoked) of the message. * to reject valid subtypes. The call returns when all matching handlers of all registered
* listeners have been notified (invoked) of the message.
*/ */
void publish(Object message); void publish(Object message);
/** /**
* Synchronously publish <b>TWO</b> messages to all registered listeners (that match the signature). This includes listeners * Synchronously publish <b>TWO</b> messages to all registered listeners (that match the signature). This
* defined for super types of the given message type, provided they are not configured to reject valid subtypes. The call * includes listeners defined for super types of the given message type, provided they are not configured
* returns when all matching handlers of all registered listeners have been notified (invoked) of the message. * to reject valid subtypes. The call returns when all matching handlers of all registered listeners have
* been notified (invoked) of the message.
*/ */
void publish(Object message1, Object message2); void publish(Object message1, Object message2);

View File

@ -81,12 +81,23 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
} }
} }
/**
* The return on this is DIFFERENT than normal.
*
* @return TRUE if there are no more elements (aka: this set is now empty)
*/
@Override @Override
public boolean remove(T element) { public boolean remove(T element) {
if (!contains(element)) { if (!contains(element)) {
// return quickly // return quickly
return false; Lock readLock = this.lock.readLock();
readLock.lock();
boolean headIsNull = this.head == null;
readLock.unlock();
return headIsNull;
} else { } else {
boolean wasLastElement = false;
Lock writeLock = this.lock.writeLock(); Lock writeLock = this.lock.writeLock();
try { try {
writeLock.lock(); writeLock.lock();
@ -102,10 +113,14 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators //oldHead.clear(); // optimize for GC not possible because of potentially running iterators
} }
this.entries.remove(element); this.entries.remove(element);
if (this.head == null) {
wasLastElement = true;
}
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
return true; return wasLastElement;
} }
} }

View File

@ -16,5 +16,10 @@ public interface IConcurrentSet<T> extends Iterable<T> {
void addAll(Iterable<T> elements); void addAll(Iterable<T> elements);
/**
* The return on this is DIFFERENT than normal.
*
* @return TRUE if there are no more elements (aka: this set is now empty)
*/
boolean remove(T element); boolean remove(T element);
} }

View File

@ -41,6 +41,166 @@ public class ObjectTree<KEY, VALUE> {
WRITE.unlock(); WRITE.unlock();
} }
public void removeValue() {
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
this.value = null;
WRITE.unlock();
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key) {
if (key == null) {
removeLeaf(key);
}
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
return;
}
Lock UPDATE = this.lock.updateLock();
UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks
ObjectTree<KEY, VALUE> leaf = null;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
leaf.removeLeaf(key2);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
WRITE.unlock();
}
}
UPDATE.unlock();
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null) {
return;
}
Lock UPDATE = this.lock.updateLock();
UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks
ObjectTree<KEY, VALUE> leaf = null;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
leaf.remove(key2, key3);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
WRITE.unlock();
}
}
UPDATE.unlock();
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
@SuppressWarnings("unchecked")
public void remove(KEY... keys) {
if (keys == null) {
return;
}
removeLeaf(0, keys);
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
private final void removeLeaf(KEY key) {
if (key != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
if (this.children != null) {
ObjectTree<KEY, VALUE> leaf = this.children.get(key);
if (leaf != null) {
if (leaf.children == null && leaf.value == null) {
this.children.remove(key);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
WRITE.unlock();
}
}
// keys CANNOT be null here!
private final void removeLeaf(int index, KEY[] keys) {
Lock UPDATE = this.lock.updateLock();
UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks
if (index == keys.length) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// we have reached the leaf to remove!
this.value = null;
this.children = null;
WRITE.unlock();
} else if (this.children != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
if (this.children != null) {
ObjectTree<KEY, VALUE> leaf = this.children.get(keys[index]);
if (leaf != null) {
leaf.removeLeaf(index+1, keys);
if (leaf.children == null && leaf.value == null) {
this.children.remove(keys[index]);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
WRITE.unlock();
}
UPDATE.unlock();
}
/** /**
* BACKWARDS, because our signature must allow for N keys... * BACKWARDS, because our signature must allow for N keys...
@ -101,7 +261,7 @@ public class ObjectTree<KEY, VALUE> {
* BACKWARDS, because our signature must allow for N keys... * BACKWARDS, because our signature must allow for N keys...
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public ObjectTree<KEY, VALUE> createLeaves(KEY... keys) { public ObjectTree<KEY, VALUE> createLeaf(KEY... keys) {
if (keys == null) { if (keys == null) {
return this; return this;
} }
@ -210,7 +370,7 @@ public class ObjectTree<KEY, VALUE> {
return returnValue; return returnValue;
} }
public VALUE get(KEY key1, KEY key2) { public VALUE getValue(KEY key1, KEY key2) {
ObjectTree<KEY, VALUE> tree = null; ObjectTree<KEY, VALUE> tree = null;
// get value from our children // get value from our children
tree = getLeaf(key1); tree = getLeaf(key1);
@ -230,7 +390,7 @@ public class ObjectTree<KEY, VALUE> {
return returnValue; return returnValue;
} }
public VALUE get(KEY key1, KEY key2, KEY key3) { public VALUE getValue(KEY key1, KEY key2, KEY key3) {
ObjectTree<KEY, VALUE> tree = null; ObjectTree<KEY, VALUE> tree = null;
// get value from our children // get value from our children
tree = getLeaf(key1); tree = getLeaf(key1);
@ -254,7 +414,7 @@ public class ObjectTree<KEY, VALUE> {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public VALUE get(KEY... keys) { public VALUE getValue(KEY... keys) {
ObjectTree<KEY, VALUE> tree = null; ObjectTree<KEY, VALUE> tree = null;
// get value from our children // get value from our children
tree = getLeaf(keys[0]); tree = getLeaf(keys[0]);
@ -300,4 +460,64 @@ public class ObjectTree<KEY, VALUE> {
return tree; return tree;
} }
public final ObjectTree<KEY, VALUE> getLeaf(KEY key1, KEY key2) {
ObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf(key2);
}
if (tree == null) {
return null;
}
return tree;
}
public final ObjectTree<KEY, VALUE> getLeaf(KEY key1, KEY key2, KEY key3) {
ObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf(key2);
}
if (tree != null) {
tree = tree.getLeaf(key3);
}
if (tree == null) {
return null;
}
return tree;
}
@SuppressWarnings("unchecked")
public final ObjectTree<KEY, VALUE> getLeaf(KEY... keys) {
int size = keys.length;
if (size == 0) {
return null;
}
ObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(keys[0]);
for (int i=1;i<size;i++) {
if (tree != null) {
tree = tree.getLeaf(keys[i]);
} else {
return null;
}
}
if (tree == null) {
return null;
}
return tree;
}
} }

View File

@ -3,9 +3,9 @@ package net.engio.mbassy.common;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement; import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set; import java.util.Set;
import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.annotations.Handler;
@ -19,8 +19,8 @@ public class ReflectionUtils
{ {
// modified by dorkbox, llc 2015 // modified by dorkbox, llc 2015
public static List<Method> getMethods(Class<?> target) { public static Collection<Method> getMethods(Class<?> target) {
List<Method> methods = new LinkedList<Method>(); Collection<Method> methods = new ArrayDeque<Method>();
try { try {
for (Method method : target.getDeclaredMethods()) { for (Method method : target.getDeclaredMethods()) {
if (getAnnotation(method, Handler.class) != null) { if (getAnnotation(method, Handler.class) != null) {
@ -85,7 +85,7 @@ public class ReflectionUtils
} }
// //
public static boolean containsOverridingMethod(final List<Method> allMethods, final Method methodToCheck) { public static boolean containsOverridingMethod(final Collection<Method> allMethods, final Method methodToCheck) {
for (Method method : allMethods) { for (Method method : allMethods) {
if (isOverriddenBy(methodToCheck, method)) { if (isOverriddenBy(methodToCheck, method)) {
return true; return true;

View File

@ -2,6 +2,7 @@ package net.engio.mbassy.listener;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays;
import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.annotations.Synchronized; import net.engio.mbassy.annotations.Synchronized;
@ -34,7 +35,7 @@ public class MessageHandler {
private final MessageListener listenerConfig; private final MessageListener listenerConfig;
// if ONE of the handled messages is of type array, then we configure it to ALSO accept var args! // if ONE of the handled messages is of type array, then we configure it to ALSO accept var args!
private final boolean acceptsVarArg; private final boolean isVarArg;
private final boolean isSynchronized; private final boolean isSynchronized;
public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){ public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){
@ -55,7 +56,7 @@ public class MessageHandler {
// if ONE of the handled messages is of type array, then we configure it to ALSO accept var args! // if ONE of the handled messages is of type array, then we configure it to ALSO accept var args!
this.acceptsVarArg = handledMessages.length == 1 && handledMessages[0].isArray(); this.isVarArg = handledMessages.length == 1 && handledMessages[0].isArray();
} }
public <A extends Annotation> A getAnnotation(Class<A> annotationType){ public <A extends Annotation> A getAnnotation(Class<A> annotationType){
@ -82,6 +83,10 @@ public class MessageHandler {
* @author dorkbox, llc * @author dorkbox, llc
* Date: 2/2/15 * Date: 2/2/15
*/ */
/** Check if this handler permits sending objects as a VarArg (variable argument) */
public boolean isVarArg() {
return this.isVarArg;
}
/** /**
* @return true if the message types are handled * @return true if the message types are handled
@ -279,8 +284,49 @@ public class MessageHandler {
} }
} }
/** Check if this handler permits sending objects as a VarArg (variable argument) */ @Override
public boolean isVarArg() { public int hashCode() {
return this.acceptsVarArg; final int prime = 31;
int result = 1;
result = prime * result + (this.acceptsSubtypes ? 1231 : 1237);
result = prime * result + (this.isVarArg ? 1231 : 1237);
result = prime * result + Arrays.hashCode(this.handledMessages);
result = prime * result + (this.handler == null ? 0 : this.handler.hashCode());
result = prime * result + (this.isSynchronized ? 1231 : 1237);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
MessageHandler other = (MessageHandler) obj;
if (this.acceptsSubtypes != other.acceptsSubtypes) {
return false;
}
if (this.isVarArg != other.isVarArg) {
return false;
}
if (!Arrays.equals(this.handledMessages, other.handledMessages)) {
return false;
}
if (this.handler == null) {
if (other.handler != null) {
return false;
}
} else if (!this.handler.equals(other.handler)) {
return false;
}
if (this.isSynchronized != other.isSynchronized) {
return false;
}
return true;
} }
} }

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.listener; package net.engio.mbassy.listener;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.LinkedList; import java.util.ArrayDeque;
import java.util.List; import java.util.Collection;
import net.engio.mbassy.annotations.Handler; import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.common.ReflectionUtils; import net.engio.mbassy.common.ReflectionUtils;
@ -20,10 +20,10 @@ public class MetadataReader {
public MessageListener getMessageListener(Class<?> target) { public MessageListener getMessageListener(Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler) // get all handlers (this will include all (inherited) methods directly annotated using @Handler)
List<Method> allHandlers = ReflectionUtils.getMethods(target); Collection<Method> allHandlers = ReflectionUtils.getMethods(target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
List<Method> bottomMostHandlers = new LinkedList<Method>(); Collection<Method> bottomMostHandlers = new ArrayDeque<Method>();
for (Method handler : allHandlers) { for (Method handler : allHandlers) {
if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) { if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) {
bottomMostHandlers.add(handler); bottomMostHandlers.add(handler);

View File

@ -5,7 +5,10 @@ import java.lang.reflect.Method;
import java.util.Arrays; import java.util.Arrays;
import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.IHandlerInvocation; import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.error.ErrorHandlingSupport; import net.engio.mbassy.error.ErrorHandlingSupport;
import net.engio.mbassy.error.PublicationError; import net.engio.mbassy.error.PublicationError;
import net.engio.mbassy.listener.MessageHandler; import net.engio.mbassy.listener.MessageHandler;
@ -33,10 +36,16 @@ public class Subscription {
private final IHandlerInvocation invocation; private final IHandlerInvocation invocation;
protected final IConcurrentSet<Object> listeners; protected final IConcurrentSet<Object> listeners;
Subscription(MessageHandler handler, IHandlerInvocation invocation, IConcurrentSet<Object> listeners) { Subscription(MessageHandler handler) {
this.listeners = new WeakConcurrentSet<Object>();
this.handlerMetadata = handler; this.handlerMetadata = handler;
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()){
invocation = new SynchronizedHandlerInvocation(invocation);
}
this.invocation = invocation; this.invocation = invocation;
this.listeners = listeners;
} }
/** /**
@ -95,12 +104,16 @@ public class Subscription {
return this.handlerMetadata.getHandledMessages(); return this.handlerMetadata.getHandledMessages();
} }
public void subscribe(Object o) { public void subscribe(Object listener) {
this.listeners.add(o); this.listeners.add(listener);
} }
/**
* @return TRUE if there are no listeners subscribed
*/
public boolean unsubscribe(Object existingListener) { public boolean unsubscribe(Object existingListener) {
// TRUE if there are no more elements (aka: this set is empty)
return this.listeners.remove(existingListener); return this.listeners.remove(existingListener);
} }
@ -337,4 +350,34 @@ public class Subscription {
} }
} }
} }
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (this.handlerMetadata == null ? 0 : this.handlerMetadata.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Subscription other = (Subscription) obj;
if (this.handlerMetadata == null) {
if (other.handlerMetadata != null) {
return false;
}
} else if (!this.handlerMetadata.equals(other.handlerMetadata)) {
return false;
}
return true;
}
} }

View File

@ -1,26 +1,23 @@
package net.engio.mbassy.subscription; package net.engio.mbassy.subscription;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.engio.mbassy.common.ObjectTree; import net.engio.mbassy.common.ObjectTree;
import net.engio.mbassy.common.ReflectionUtils; import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.IHandlerInvocation;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.error.MessageBusException; import net.engio.mbassy.error.MessageBusException;
import net.engio.mbassy.listener.MessageHandler; import net.engio.mbassy.listener.MessageHandler;
import net.engio.mbassy.listener.MetadataReader; import net.engio.mbassy.listener.MetadataReader;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
/** /**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known * It provides fast lookup of existing subscriptions when another instance of an already known
@ -55,40 +52,93 @@ public class SubscriptionManager {
private final ConcurrentHashMap<Class<?>, Object> nonListeners = new ConcurrentHashMap<Class<?>, Object>(); private final ConcurrentHashMap<Class<?>, Object> nonListeners = new ConcurrentHashMap<Class<?>, Object>();
// synchronize read/write acces to the subscription maps // synchronize read/write acces to the subscription maps
private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock();
public SubscriptionManager() { public SubscriptionManager() {
} }
public boolean unsubscribe(Object listener) { public void unsubscribe(Object listener) {
if (listener == null) { if (listener == null) {
return false; return;
} }
Collection<Subscription> subscriptions = getSubscriptionsByListener(listener);
if (subscriptions == null) {
return false;
}
boolean isRemoved = true;
for (Subscription subscription : subscriptions) {
isRemoved &= subscription.unsubscribe(listener);
}
return isRemoved;
}
boolean nothingLeft = true;
private Collection<Subscription> getSubscriptionsByListener(Object listener) {
Collection<Subscription> subscriptions; Collection<Subscription> subscriptions;
try { try {
this.LOCK.readLock().lock(); this.LOCK.writeLock().lock();
subscriptions = this.subscriptionsPerListener.get(listener.getClass()); Class<? extends Object> listenerClass = listener.getClass();
subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
boolean isEmpty = subscription.unsubscribe(listener);
if (isEmpty) {
// single or multi?
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
// NOTE: Not thread-safe! must be synchronized in outer scope
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) {
// remove element
this.subscriptionsPerMessageSingle.remove(clazz);
}
}
} else {
// multi (is thread safe)
ObjectTree<Class<?>, Collection<Subscription>> tree;
switch (size) {
case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break;
case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break;
}
if (tree != null) {
Collection<Subscription> subs = tree.getValue();
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) {
// remove tree element
switch (size) {
case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
}
}
}
}
}
}
nothingLeft &= isEmpty;
}
}
if (nothingLeft) {
// now we have to clean up
this.subscriptionsPerListener.remove(listenerClass);
}
} finally { } finally {
this.LOCK.readLock().unlock(); this.LOCK.writeLock().unlock();
} }
return subscriptions;
return;
} }
// when a class is subscribed, the registrations for that class are permanent in the "subscriptionsPerListener"?
public void subscribe(Object listener) { public void subscribe(Object listener) {
try { try {
Class<? extends Object> listenerClass = listener.getClass(); Class<? extends Object> listenerClass = listener.getClass();
@ -98,45 +148,97 @@ public class SubscriptionManager {
return; return;
} }
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener); Collection<Subscription> subscriptions;
// a listener is either subscribed for the first time try {
if (subscriptionsByListener == null) { this.LOCK.updateLock().lock();
List<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); boolean hasSubs = false;
if (messageHandlers.isEmpty()) { subscriptions = this.subscriptionsPerListener.get(listenerClass);
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listenerClass, this.nonListeners);
return;
}
// it's safe to use non-concurrent collection here (read only) if (subscriptions != null) {
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); hasSubs = true;
} else {
// create subscriptions for all detected message handlers // a listener is either subscribed for the first time
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
try { try {
IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); this.LOCK.writeLock().lock(); // upgrade updatelock to write lock, Avoid DCL
if (messageHandler.isSynchronized()){ subscriptions = this.subscriptionsPerListener.get(listenerClass);
invocation = new SynchronizedHandlerInvocation(invocation);
if (subscriptions != null) {
hasSubs = true;
} else {
// a listener is either subscribed for the first time
List<MessageHandler> messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers();
if (messageHandlers.isEmpty()) {
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listenerClass, this.nonListeners);
return;
}
// it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock
subscriptions = new ArrayList<Subscription>(messageHandlers.size());
// create subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
try {
Subscription subscription = new Subscription(messageHandler);
subscriptions.add(subscription);
} catch (Exception e) {
throw new MessageBusException(e);
}
}
for (Subscription sub : subscriptions) {
sub.subscribe(listener);
// single or multi?
Class<?>[] handledMessageTypes = sub.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
// NOTE: Not thread-safe! must be synchronized in outer scope
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null) {
subs = new ArrayDeque<Subscription>();
this.subscriptionsPerMessageSingle.put(clazz, subs);
}
subs.add(sub);
} else {
// multi (is thread safe)
ObjectTree<Class<?>, Collection<Subscription>> tree;
switch (size) {
case 2: tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); break;
case 3: tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
default: tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); break;
}
Collection<Subscription> subs = tree.getValue();
if (subs == null) {
subs = new ArrayDeque<Subscription>();
tree.putValue(subs);
}
subs.add(sub);
}
}
this.subscriptionsPerListener.put(listenerClass, subscriptions);
} }
} finally {
Subscription subscription = new Subscription(messageHandler, invocation, new WeakConcurrentSet<Object>()); this.LOCK.writeLock().unlock();
subscriptionsByListener.add(subscription);
} catch (Exception e) {
throw new MessageBusException(e);
} }
} }
// this will acquire a write lock and handle the case when another thread already subscribed if (hasSubs) {
// this particular listener in the mean-time // or the subscriptions already exist and must only be updated
subscribe(listener, subscriptionsByListener); for (Subscription sub : subscriptions) {
} sub.subscribe(listener);
else { }
// or the subscriptions already exist and must only be updated
for (Subscription sub : subscriptionsByListener) {
sub.subscribe(listener);
} }
} finally {
this.LOCK.updateLock().unlock();
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -144,88 +246,29 @@ public class SubscriptionManager {
} }
private void subscribe(Object listener, Collection<Subscription> subscriptions) {
try {
this.LOCK.writeLock().lock();
// basically this is a deferred double check
// it's an ugly pattern but necessary because atomic upgrade from read to write lock
// is not possible
// the alternative of using a write lock from the beginning would decrease performance dramatically
// because of the huge number of reads compared to writes
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
if (subscriptionsByListener == null) {
for (Subscription subscription : subscriptions) {
subscription.subscribe(listener);
// single or multi?
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
// NOTE: Not thread-safe! must be synchronized in outer scope
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null) {
subs = new LinkedList<Subscription>();
this.subscriptionsPerMessageSingle.put(clazz, subs);
}
subs.add(subscription);
} else {
// multi (is thread safe)
ObjectTree<Class<?>, Collection<Subscription>> tree = this.subscriptionsPerMessageMulti.createLeaves(handledMessageTypes);
Collection<Subscription> subs = tree.getValue();
if (subs == null) {
subs = new LinkedList<Subscription>();
tree.putValue(subs);
}
subs.add(subscription);
}
}
this.subscriptionsPerListener.put(listener.getClass(), subscriptions);
}
// the rare case when multiple threads concurrently subscribed the same class for the first time
// one will be first, all others will have to subscribe to the existing instead the generated subscriptions
else {
for (Subscription existingSubscription : subscriptionsByListener) {
existingSubscription.subscribe(listener);
}
}
} finally {
this.LOCK.writeLock().unlock();
}
}
// obtain the set of subscriptions for the given message type // obtain the set of subscriptions for the given message type
// Note: never returns null! // Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) { public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
// thread safe publication // thread safe publication
Collection<Subscription> subscriptions = new LinkedList<Subscription>(); Collection<Subscription> subscriptions = new ArrayDeque<Subscription>();
try { try {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
Set<Class<?>> types1 = ReflectionUtils.getSuperTypes(messageType); Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType);
types1.add(messageType); if (subs != null) {
subscriptions.addAll(subs);
}
// also add all subscriptions that match super types // also add all subscriptions that match super types
Set<Class<?>> types1 = ReflectionUtils.getSuperTypes(messageType);
for (Class<?> eventSuperType : types1) { for (Class<?> eventSuperType : types1) {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType); subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) { if (subs != null) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
if (sub.handlesMessageType(eventSuperType)) { if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub); subscriptions.add(sub);
} }
} }
@ -235,14 +278,15 @@ public class SubscriptionManager {
/////////////// ///////////////
// a var-arg handler might match // a var-arg handler might match
/////////////// ///////////////
// tricky part. We have to check the ARRAY version, // tricky part. We have to check the ARRAY version
types1.add(messageType);
for (Class<?> eventSuperType : types1) { for (Class<?> eventSuperType : types1) {
// messy, but the ONLY way to do it. // messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types // also add all subscriptions that match super types
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType); subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) { if (subs != null) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
subscriptions.add(sub); subscriptions.add(sub);
@ -260,17 +304,20 @@ public class SubscriptionManager {
// Note: never returns null! // Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) { public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
// thread safe publication // thread safe publication
Collection<Subscription> subscriptions = new LinkedList<Subscription>(); Collection<Subscription> subscriptions = new ArrayDeque<Subscription>();
try { try {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
Collection<Subscription> subs = this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2);
if (subs != null) {
subscriptions.addAll(subs);
}
// also add all subscriptions that match super types
Set<Class<?>> types1 = ReflectionUtils.getSuperTypes(messageType1); Set<Class<?>> types1 = ReflectionUtils.getSuperTypes(messageType1);
types1.add(messageType1);
Set<Class<?>> types2 = ReflectionUtils.getSuperTypes(messageType2); Set<Class<?>> types2 = ReflectionUtils.getSuperTypes(messageType2);
types2.add(messageType2);
// also add all subscriptions that match super types // also add all subscriptions that match super types
for (Class<?> eventSuperType1 : types1) { for (Class<?> eventSuperType1 : types1) {
ObjectTree<Class<?>, Collection<Subscription>> leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); ObjectTree<Class<?>, Collection<Subscription>> leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
@ -280,10 +327,10 @@ public class SubscriptionManager {
ObjectTree<Class<?>, Collection<Subscription>> leaf2 = leaf1.getLeaf(eventSuperType2); ObjectTree<Class<?>, Collection<Subscription>> leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) { if (leaf2 != null) {
Collection<Subscription> subs = leaf2.getValue(); subs = leaf2.getValue();
if (subs != null) { if (subs != null) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
if (sub.handlesMessageType(eventSuperType1, eventSuperType2)) { if (sub.handlesMessageType(messageType1, messageType2)) {
subscriptions.add(sub); subscriptions.add(sub);
} }
} }
@ -297,6 +344,8 @@ public class SubscriptionManager {
// if they are ALL the same type, a var-arg handler might match // if they are ALL the same type, a var-arg handler might match
/////////////// ///////////////
if (messageType1 == messageType2) { if (messageType1 == messageType2) {
types1.add(messageType1);
types1.add(messageType2);
// tricky part. We have to check the ARRAY version // tricky part. We have to check the ARRAY version
for (Class<?> eventSuperType : types1) { for (Class<?> eventSuperType : types1) {
// messy, but the ONLY way to do it. // messy, but the ONLY way to do it.
@ -304,7 +353,7 @@ public class SubscriptionManager {
eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types // also add all subscriptions that match super types
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType); subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) { if (subs != null) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
subscriptions.add(sub); subscriptions.add(sub);
@ -324,19 +373,21 @@ public class SubscriptionManager {
// Note: never returns null! // Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) { public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
// thread safe publication // thread safe publication
Collection<Subscription> subscriptions = new LinkedList<Subscription>(); Collection<Subscription> subscriptions = new ArrayDeque<Subscription>();
try { try {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
Collection<Subscription> subs = this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
if (subs != null) {
subscriptions.addAll(subs);
}
// also add all subscriptions that match super types
Set<Class<?>> types1 = ReflectionUtils.getSuperTypes(messageType1); Set<Class<?>> types1 = ReflectionUtils.getSuperTypes(messageType1);
types1.add(messageType1);
Set<Class<?>> types2 = ReflectionUtils.getSuperTypes(messageType2); Set<Class<?>> types2 = ReflectionUtils.getSuperTypes(messageType2);
types2.add(messageType2);
Set<Class<?>> types3 = ReflectionUtils.getSuperTypes(messageType3); Set<Class<?>> types3 = ReflectionUtils.getSuperTypes(messageType3);
types3.add(messageType3);
// also add all subscriptions that match super types // also add all subscriptions that match super types
for (Class<?> eventSuperType1 : types1) { for (Class<?> eventSuperType1 : types1) {
@ -351,10 +402,10 @@ public class SubscriptionManager {
ObjectTree<Class<?>, Collection<Subscription>> leaf3 = leaf2.getLeaf(eventSuperType3); ObjectTree<Class<?>, Collection<Subscription>> leaf3 = leaf2.getLeaf(eventSuperType3);
if (leaf3 != null) { if (leaf3 != null) {
Collection<Subscription> subs = leaf3.getValue(); subs = leaf3.getValue();
if (subs != null) { if (subs != null) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
if (sub.handlesMessageType(eventSuperType1, eventSuperType2, eventSuperType3)) { if (sub.handlesMessageType(messageType1, messageType2, messageType3)) {
subscriptions.add(sub); subscriptions.add(sub);
} }
} }
@ -371,13 +422,16 @@ public class SubscriptionManager {
/////////////// ///////////////
if (messageType1 == messageType2 && messageType2 == messageType3) { if (messageType1 == messageType2 && messageType2 == messageType3) {
// tricky part. We have to check the ARRAY version // tricky part. We have to check the ARRAY version
types1.add(messageType1);
types1.add(messageType2);
types1.add(messageType3);
for (Class<?> eventSuperType : types1) { for (Class<?> eventSuperType : types1) {
// messy, but the ONLY way to do it. // messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types // also add all subscriptions that match super types
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType); subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) { if (subs != null) {
for (Subscription sub : subs) { for (Subscription sub : subs) {
subscriptions.add(sub); subscriptions.add(sub);
@ -396,56 +450,64 @@ public class SubscriptionManager {
// Note: never returns null! // Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) { public Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) {
// thread safe publication // thread safe publication
Collection<Subscription> subscriptions = new LinkedList<Subscription>(); Collection<Subscription> subscriptions = new ArrayDeque<Subscription>();
try { try {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
int size = messageTypes.length; Collection<Subscription> subs = this.subscriptionsPerMessageMulti.getValue(messageTypes);
if (size == 0) { if (subs != null) {
this.LOCK.readLock().unlock(); for (Subscription sub : subs) {
return subscriptions; if (sub.handlesMessageType(messageTypes)) {
} subscriptions.add(sub);
}
boolean allSameType = true;
Class<?> firstType = messageTypes[0];
@SuppressWarnings("unchecked")
Set<Class<?>>[] types = new Set[size];
for (int i=0;i<size;i++) {
Class<?> from = messageTypes[i];
types[i] = ReflectionUtils.getSuperTypes(from);
types[i].add(from);
if (from != firstType) {
allSameType = false;
} }
} }
// add all subscriptions that match super types combinations int size = messageTypes.length;
// have to use recursion for this. BLEH if (size > 0) {
getSubsVarArg(subscriptions, types, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes); boolean allSameType = true;
Class<?> firstType = messageTypes[0];
/////////////// @SuppressWarnings("unchecked")
// if they are ALL the same type, a var-arg handler might match Set<Class<?>>[] types = new Set[size];
/////////////// for (int i=0;i<size;i++) {
if (allSameType) { Class<?> from = messageTypes[i];
// do we have a var-arg (it shows as an array) subscribed? types[i] = ReflectionUtils.getSuperTypes(from);
types[i].add(from);
if (from != firstType) {
allSameType = false;
}
}
// tricky part. We have to check the ARRAY version
for (Class<?> eventSuperType : types[0]) {
// messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types // add all subscriptions that match super types combinations
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(eventSuperType); // have to use recursion for this. BLEH
if (subs != null) { getSubsVarArg(subscriptions, types, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes);
for (Subscription sub : subs) {
subscriptions.add(sub); ///////////////
// if they are ALL the same type, a var-arg handler might match
///////////////
if (allSameType) {
// do we have a var-arg (it shows as an array) subscribed?
// tricky part. We have to check the ARRAY version
for (Class<?> eventSuperType : types[0]) {
// messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types
subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
} }
} finally { } finally {
this.LOCK.readLock().unlock(); this.LOCK.readLock().unlock();

View File

@ -13,7 +13,6 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({ @Suite.SuiteClasses({
WeakConcurrentSetTest.class, WeakConcurrentSetTest.class,
MBassadorTest.class, MBassadorTest.class,
SyncBusTest.MBassadorTest.class,
MetadataReaderTest.class, MetadataReaderTest.class,
MethodDispatchTest.class, MethodDispatchTest.class,
DeadMessageTest.class, DeadMessageTest.class,

View File

@ -47,7 +47,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
assertEquals(messages[i], listener.receivedSync.get(i)); assertEquals(messages[i], listener.receivedSync.get(i));
} }
} }
fifoBUs.shutdown();
} }
@Test @Test
@ -76,7 +76,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
pause(2000); pause(2000);
} }
for(Listener listener : listeners) { for (Listener listener : listeners) {
List<Integer> receivedSync = listener.receivedSync; List<Integer> receivedSync = listener.receivedSync;
synchronized (receivedSync) { synchronized (receivedSync) {
@ -88,6 +88,7 @@ public class AsyncFIFOBusTest extends MessageBusTest {
} }
} }
fifoBUs.shutdown();
} }
public static class Listener { public static class Listener {

View File

@ -22,10 +22,12 @@ import org.junit.Test;
*/ */
public class DeadMessageTest extends MessageBusTest{ public class DeadMessageTest extends MessageBusTest{
private static final AtomicInteger deadMessages = new AtomicInteger(0);
@Override @Override
@Before @Before
public void beforeTest(){ public void beforeTest(){
DeadMessagHandler.deadMessages.set(0); deadMessages.set(0);
} }
@ -60,7 +62,7 @@ public class DeadMessageTest extends MessageBusTest{
ConcurrentExecutor.runConcurrent(publishUnhandledMessage, ConcurrentUnits); ConcurrentExecutor.runConcurrent(publishUnhandledMessage, ConcurrentUnits);
assertEquals(InstancesPerListener * IterationsPerThread * ConcurrentUnits, DeadMessagHandler.deadMessages.get()); assertEquals(InstancesPerListener * IterationsPerThread * ConcurrentUnits, deadMessages.get());
} }
@ -69,27 +71,28 @@ public class DeadMessageTest extends MessageBusTest{
public void testUnsubscribingAllListeners() { public void testUnsubscribingAllListeners() {
final MBassador bus = createBus(); final MBassador bus = createBus();
ListenerFactory deadMessageListener = new ListenerFactory() ListenerFactory deadMessageListener = new ListenerFactory()
.create(InstancesPerListener, DeadMessagHandler.class) .create(InstancesPerListener, DeadMessagHandler.class);
.create(InstancesPerListener, Object.class);
ListenerFactory objectListener = new ListenerFactory() ListenerFactory objectListener = new ListenerFactory()
.create(InstancesPerListener, ObjectListener.class); .create(InstancesPerListener, ObjectListener.class);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, deadMessageListener), ConcurrentUnits); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, deadMessageListener), ConcurrentUnits);
// Only dead message handlers available // Only dead message handlers available
bus.publish(new Object()); bus.publish(new Object());
// The message should be caught as dead message since there are no subscribed listeners // The message should be caught as dead message since there are no subscribed listeners
assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get()); assertEquals(InstancesPerListener, deadMessages.get());
// Clear deadmessage for future tests // Clear deadmessage for future tests
DeadMessagHandler.deadMessages.set(0); deadMessages.set(0);
// Add object listeners and publish again // Add object listeners and publish again
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, objectListener), ConcurrentUnits); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, objectListener), ConcurrentUnits);
bus.publish(new Object()); bus.publish(new Object());
// verify that no dead message events were produced // verify that no dead message events were produced
assertEquals(0, DeadMessagHandler.deadMessages.get()); assertEquals(0, deadMessages.get());
// Unsubscribe all object listeners // Unsubscribe all object listeners
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(bus, objectListener), ConcurrentUnits); ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(bus, objectListener), ConcurrentUnits);
@ -98,18 +101,14 @@ public class DeadMessageTest extends MessageBusTest{
bus.publish(new Object()); bus.publish(new Object());
// The message should be caught, as it's the only listener // The message should be caught, as it's the only listener
assertEquals(InstancesPerListener, DeadMessagHandler.deadMessages.get()); assertEquals(0, deadMessages.get());
} }
public static class DeadMessagHandler { public static class DeadMessagHandler {
@SuppressWarnings("unused")
private static final AtomicInteger deadMessages = new AtomicInteger(0);
@Handler @Handler
public void handle(DeadMessage message){ public void handle(DeadMessage message){
deadMessages.incrementAndGet(); deadMessages.incrementAndGet();
} }
} }
} }

View File

@ -128,6 +128,7 @@ public class MBassadorTest extends MessageBusTest {
pause(processingTimeInMS); pause(processingTimeInMS);
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
bus.shutdown();
} }

View File

@ -22,8 +22,9 @@ public class MultiMessageTest extends MessageBusTest {
public void testMultiMessageSending(){ public void testMultiMessageSending(){
IMessageBus bus = new MBassador().start(); IMessageBus bus = new MBassador().start();
Listener listener = new Listener(); Listener listener1 = new Listener();
bus.subscribe(listener); bus.subscribe(listener1);
bus.unsubscribe(listener1);
bus.publish("s"); bus.publish("s");
bus.publish("s", "s"); bus.publish("s", "s");
@ -33,7 +34,22 @@ public class MultiMessageTest extends MessageBusTest {
bus.publish(1, 2, 3, 4, 5, 6); bus.publish(1, 2, 3, 4, 5, 6);
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(count.get(), 10); assertEquals(0, count.get());
bus.subscribe(listener1);
bus.publish("s");
bus.publish("s", "s");
bus.publish("s", "s", "s");
bus.publish("s", "s", "s", "s");
bus.publish(1, 2, "s");
bus.publish(1, 2, 3, 4, 5, 6);
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(10, count.get());
bus.shutdown();
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -47,31 +63,31 @@ public class MultiMessageTest extends MessageBusTest {
@Handler @Handler
public void handleSync(String o1, String o2) { public void handleSync(String o1, String o2) {
count.getAndIncrement(); count.getAndIncrement();
System.err.println("match String, String"); // System.err.println("match String, String");
} }
@Handler @Handler
public void handleSync(String o1, String o2, String o3) { public void handleSync(String o1, String o2, String o3) {
count.getAndIncrement(); count.getAndIncrement();
System.err.println("match String, String, String"); // System.err.println("match String, String, String");
} }
@Handler @Handler
public void handleSync(Integer o1, Integer o2, String o3) { public void handleSync(Integer o1, Integer o2, String o3) {
count.getAndIncrement(); count.getAndIncrement();
System.err.println("match Integer, Integer, String"); // System.err.println("match Integer, Integer, String");
} }
@Handler @Handler
public void handleSync(String... o) { public void handleSync(String... o) {
count.getAndIncrement(); count.getAndIncrement();
System.err.println("match String[]"); // System.err.println("match String[]");
} }
@Handler @Handler
public void handleSync(Integer... o) { public void handleSync(Integer... o) {
count.getAndIncrement(); count.getAndIncrement();
System.err.println("match Integer[]"); // System.err.println("match Integer[]");
} }
} }
} }

View File

@ -29,17 +29,17 @@ public class ObjectTreeTest extends AssertSupport {
public void test(ObjectTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2) { public void test(ObjectTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2) {
tree.put(string, clazz1, clazz2); tree.put(string, clazz1, clazz2);
assertEquals(string, tree.get(clazz1, clazz2)); assertEquals(string, tree.getValue(clazz1, clazz2));
} }
public void test(ObjectTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) { public void test(ObjectTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) {
tree.put(string, clazz1, clazz2, clazz3); tree.put(string, clazz1, clazz2, clazz3);
assertEquals(string, tree.get(clazz1, clazz2, clazz3)); assertEquals(string, tree.getValue(clazz1, clazz2, clazz3));
} }
public void test(ObjectTree<Class<?>, String> tree, String string, Class<?>... clazzes) { public void test(ObjectTree<Class<?>, String> tree, String string, Class<?>... clazzes) {
tree.put(string, clazzes); tree.put(string, clazzes);
assertEquals(string, tree.get(clazzes)); assertEquals(string, tree.getValue(clazzes));
} }
@Test @Test
@ -58,5 +58,15 @@ public class ObjectTreeTest extends AssertSupport {
test(tree, "ssss", String.class, String.class, String.class, String.class); test(tree, "ssss", String.class, String.class, String.class, String.class);
test(tree, "oosif", Object.class, Object.class, String.class, Integer.class, Float.class); test(tree, "oosif", Object.class, Object.class, String.class, Integer.class, Float.class);
// now make sure we can REMOVE the tree elements
tree.remove(Object.class, Object.class, String.class, Integer.class, Float.class);
ObjectTree<Class<?>, String> leaf = tree.getLeaf(Object.class, Object.class, String.class, Integer.class);
assertNull(leaf);
leaf = tree.getLeaf(Object.class, Object.class);
assertNotNull(leaf);
assertEquals("xo", leaf.getValue());
} }
} }

View File

@ -24,15 +24,12 @@ import org.junit.Test;
* @author bennidi * @author bennidi
* Date: 2/8/12 * Date: 2/8/12
*/ */
public abstract class SyncBusTest extends MessageBusTest { public class SyncBusTest extends MessageBusTest {
protected abstract IMessageBus getSyncMessageBus();
@Test @Test
public void testSynchronousMessagePublication() throws Exception { public void testSynchronousMessagePublication() throws Exception {
final IMessageBus bus = getSyncMessageBus(); final IMessageBus bus = new MBassador().start();
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, IMessageListener.DefaultListener.class) .create(InstancesPerListener, IMessageListener.DefaultListener.class)
.create(InstancesPerListener, IMessageListener.DisabledListener.class) .create(InstancesPerListener, IMessageListener.DisabledListener.class)
@ -69,6 +66,8 @@ public abstract class SyncBusTest extends MessageBusTest {
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(IMessageListener.DefaultListener.class)); assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(IMessageListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class)); assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Simple.getTimesHandled(MessagesListener.DefaultListener.class));
assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(MessagesListener.DefaultListener.class)); assertEquals(InstancesPerListener * ConcurrentUnits, MessageTypes.Multipart.getTimesHandled(MessagesListener.DefaultListener.class));
bus.shutdown();
} }
@ -83,7 +82,7 @@ public abstract class SyncBusTest extends MessageBusTest {
} }
}; };
final IMessageBus bus = getSyncMessageBus(); final IMessageBus bus = new MBassador().start();
bus.addErrorHandler(ExceptionCounter); bus.addErrorHandler(ExceptionCounter);
ListenerFactory listeners = new ListenerFactory() ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class); .create(InstancesPerListener, ExceptionThrowingListener.class);
@ -105,19 +104,11 @@ public abstract class SyncBusTest extends MessageBusTest {
// multi threaded // multi threaded
ConcurrentExecutor.runConcurrent(publish, ConcurrentUnits); ConcurrentExecutor.runConcurrent(publish, ConcurrentUnits);
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get()); assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
bus.shutdown();
} }
public static class MBassadorTest extends SyncBusTest {
@Override
protected IMessageBus getSyncMessageBus() {
return new MBassador().start();
}
}
static class IncrementingMessage{ static class IncrementingMessage{
private int count = 1; private int count = 1;

View File

@ -1,5 +1,7 @@
package net.engio.mbassy.common; package net.engio.mbassy.common;
import java.lang.ref.WeakReference;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -8,8 +10,6 @@ import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.ref.WeakReference;
/** /**
* @author bennidi * @author bennidi
*/ */
@ -25,13 +25,13 @@ public class AssertSupport {
@Before @Before
public void beforeTest(){ public void beforeTest(){
logger.info("Running test " + getTestName()); this.logger.info("Running test " + getTestName());
testExecutionStart = System.currentTimeMillis(); this.testExecutionStart = System.currentTimeMillis();
} }
@After @After
public void afterTest(){ public void afterTest(){
logger.info(String.format("Finished " + getTestName() + ": " + (System.currentTimeMillis() - testExecutionStart) + " ms")); this.logger.info(String.format("Finished " + getTestName() + ": " + (System.currentTimeMillis() - this.testExecutionStart) + " ms"));
} }
@ -48,15 +48,14 @@ public class AssertSupport {
} }
public String getTestName(){ public String getTestName(){
return getClass().getSimpleName() + "." + name.getMethodName(); return getClass().getSimpleName() + "." + this.name.getMethodName();
} }
public void runGC() { public void runGC() {
WeakReference ref = new WeakReference<Object>(new Object()); WeakReference ref = new WeakReference<Object>(new Object());
pause(100);
while(ref.get() != null) { while(ref.get() != null) {
pause(10); this.runtime.gc();
runtime.gc(); pause();
} }
} }

View File

@ -33,8 +33,6 @@ public abstract class MessageBusTest extends AssertSupport {
} }
}; };
private static final Object mapObject = new Object();
@Before @Before
public void setUp(){ public void setUp(){
for(MessageTypes mes : MessageTypes.values()) { for(MessageTypes mes : MessageTypes.values()) {