Changed to single access for sub/unsub (now passes unit tests). Changed to ConcurrentHashMapV8, 174/87 ns/op

This commit is contained in:
nathan 2015-02-24 14:26:32 +01:00
parent 4eef758e89
commit b8f06b7c33
19 changed files with 4715 additions and 223 deletions

View File

@ -1,10 +1,8 @@
package net.engio.mbassy.multi;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
@ -15,7 +13,6 @@ import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
@ -30,12 +27,16 @@ public class MultiMBassador implements IMessageBus {
// this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
private final SubscriptionManager subscriptionManager;
private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
private final SubscriptionManager subscriptionManager;
// all threads that are available for asynchronous message dispatching
private List<Thread> threads;
private final int numberOfThreads;
private final Collection<Thread> threads;
public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors());
@ -46,9 +47,10 @@ public class MultiMBassador implements IMessageBus {
if (numberOfThreads < 1) {
numberOfThreads = 1; // at LEAST 1 thread
}
this.numberOfThreads = numberOfThreads;
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
this.threads = new ArrayList<Thread>(numberOfThreads);
this.threads = new ArrayDeque<Thread>(numberOfThreads);
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
@ -57,7 +59,7 @@ public class MultiMBassador implements IMessageBus {
@SuppressWarnings("null")
@Override
public void run() {
TransferQueue<Runnable> IN_QUEUE= MultiMBassador.this.dispatchQueue;
TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
Runnable event = null;
int counter;
@ -88,6 +90,8 @@ public class MultiMBassador implements IMessageBus {
this.threads.add(runner);
runner.start();
}
}
@Override
@ -105,19 +109,19 @@ public class MultiMBassador implements IMessageBus {
}
@Override
public void unsubscribe(Object listener) {
this.subscriptionManager.unsubscribe(listener);
public void subscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.subscribe(listener);
}
@Override
public void subscribe(Object listener) {
this.subscriptionManager.subscribe(listener);
public void unsubscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.unsubscribe(listener);
}
@Override
public boolean hasPendingMessages() {
return !this.dispatchQueue.isEmpty();
return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads;
}
@Override

View File

@ -1,17 +1,18 @@
package net.engio.mbassy.multi.subscription;
package net.engio.mbassy.multi;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import net.engio.mbassy.multi.common.ConcurrentHashMapV8;
import net.engio.mbassy.multi.common.IdentityObjectTree;
import net.engio.mbassy.multi.common.ReflectionUtils;
import net.engio.mbassy.multi.common.StrongConcurrentSet;
import net.engio.mbassy.multi.listener.MessageHandler;
import net.engio.mbassy.multi.listener.MetadataReader;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import net.engio.mbassy.multi.subscription.Subscription;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
@ -19,31 +20,43 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
* listener is subscribed and takes care of creating new set of subscriptions for any unknown class that defines
* message handlers.
*
*
* Subscribe/Unsubscribe, while it is possible for them to be 100% concurrent (in relation to listeners per subscription),
* getting an accurate reflection of the number of subscriptions, or guaranteeing a "HAPPENS-BEFORE" relationship really
* complicates this.
*
* Given these restrictions and complexity, it is much easier to create a MPSC blocking queue, and have a single thread
* manage sub/unsub.
*
* @author bennidi
* Date: 5/11/13
* @author dorkbox, llc
* Date: 2/2/15
*/
public class SubscriptionManager {
private final int MAP_STRIPING;
private final int STRIPE_SIZE;
private float LOAD_FACTOR;
// the metadata reader that is used to inspect objects passed to the subscribe method
private static final MetadataReader metadataReader = new MetadataReader();
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final ConcurrentHashMap<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
// synchronize read/write access to the subscription maps
private final ReentrantLock lock = new ReentrantLock();
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final ConcurrentHashMap<Class<?>, Collection<Subscription>> subscriptionsPerListener;
private final Object holder = new Object[0];
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener;
private final Map<Class<?>, Set<Class<?>>> superClassesCache;
@ -54,62 +67,36 @@ public class SubscriptionManager {
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Object> nonListeners;
public SubscriptionManager(int numberOfThreads) {
this.MAP_STRIPING = numberOfThreads;
SubscriptionManager(int numberOfThreads) {
this.STRIPE_SIZE = numberOfThreads;
this.LOAD_FACTOR = 0.8f;
this.subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, 0.8F, this.STRIPE_SIZE);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR);
this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR);
this.superClassesCache = new ConcurrentHashMap<Class<?>, Set<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Set<Class<?>>>(8, this.LOAD_FACTOR, this.STRIPE_SIZE);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance on handlers
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Set<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, Set<Subscription>>(8, this.LOAD_FACTOR, this.STRIPE_SIZE);
}
private final void resetSuperClassSubs() {
this.superClassSubscriptions.clear();
}
public void unsubscribe(Object listener) {
/**
* Can ONLY be called by a single thread, in order to guarantee a "happens-before" relationship to subscriptions
*/
public void subscribe(Object listener) {
if (listener == null) {
return;
}
Class<?> listenerClass = listener.getClass();
// these are a concurrent collection
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
// purposefully DO NOT do anything else. We keep references to Class<?>/subscription, because
// it acts as a "cache" of sorts, so that future add operations are quicker.
}
}
return;
}
private final ThreadLocal<Collection<Subscription>> subInitialValue = new ThreadLocal<Collection<Subscription>>() {
@Override
protected java.util.Collection<Subscription> initialValue() {
return new StrongConcurrentSet<Subscription>(8, SubscriptionManager.this.LOAD_FACTOR);
}
};
// when a class is subscribed, the registrations for that class are permanent in the "subscriptionsPerListener"?
public void subscribe(Object listener) {
Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) {
@ -117,72 +104,76 @@ public class SubscriptionManager {
return;
}
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions == null) {
// a listener is subscribed for the first time
Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size();
if (handlersSize == 0) {
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listenerClass, this.holder);
} else {
Collection<Subscription> putIfAbsent = this.subscriptionsPerListener.putIfAbsent(listenerClass, this.subInitialValue.get());
if (putIfAbsent != null) {
subscriptions = putIfAbsent;
} else {
subscriptions = this.subInitialValue.get();
this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
}
this.lock.lock();
try {
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions == null) {
// a listener is subscribed for the first time
resetSuperClassSubs();
// create NEW subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size();
subscriptions.add(subscription);
if (handlersSize == 0) {
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listener.getClass(), Boolean.TRUE);
return;
} else {
subscriptions = new StrongConcurrentSet<Subscription>(messageHandlers.size(), this.LOAD_FACTOR);
//
// save the subscription per message type
//
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
boolean acceptsSubtypes = subscription.acceptsSubtypes();
// create NEW subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) {
// create the subscription
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
subscriptions.add(subscription);
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null) {
putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get());
if (putIfAbsent != null) {
subs = putIfAbsent;
} else {
subs = this.subInitialValue.get();
this.subInitialValue.set(new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR));
// now add this subscription to each of the handled types
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs == null || subs.isEmpty()) {
subs = new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR);
this.subscriptionsPerMessageSingle.put(clazz, subs);
}
}
subs.add(subscription);
if (acceptsSubtypes) {
// race conditions will result in duplicate answers, which we don't care about
subs.add(subscription);
setupSuperClassCache(clazz);
} else {
// multiversion
}
}
// order is critical for safe publication
this.subscriptionsPerListener.put(listenerClass, subscriptions);
}
} else {
// subscriptions already exist and must only be updated
for (Subscription subscription : subscriptions) {
subscription.subscribe(listener);
}
}
} else {
// subscriptions already exist and must only be updated
for (Subscription subscription : subscriptions) {
subscription.subscribe(listener);
}
} finally {
this.lock.unlock();
}
// } else {
// // subscriptions already exist and must only be updated
// for (Subscription subscription : subscriptions) {
// subscription.subscribe(listener);
// }
// }
//
// if (subscriptions != null) {
// // subscriptions already exist and must only be updated
@ -293,6 +284,85 @@ public class SubscriptionManager {
// }
}
/**
* Can ONLY be called by a single thread, in order to guarantee a "happens-before" relationship to subscriptions
*/
public final void unsubscribe(Object listener) {
if (listener == null) {
return;
}
Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers
return;
}
resetSuperClassSubs();
this.lock.lock();
try {
// these are a concurrent collection
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener); // this is thread safe, but the following stuff is NOT thread safe.
boolean isEmpty = subscription.isEmpty();
if (isEmpty) {
// single or multi?
Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
int size = handledMessageTypes.length;
if (size == 1) {
// single
Class<?> clazz = handledMessageTypes[0];
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) {
// remove element
this.subscriptionsPerMessageSingle.remove(clazz);
}
}
} else {
// // NOTE: Not thread-safe! must be synchronized in outer scope
// IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
//
// switch (size) {
// case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break;
// }
//
// if (tree != null) {
// Collection<Subscription> subs = tree.getValue();
// if (subs != null) {
// subs.remove(subscription);
//
// if (subs.isEmpty()) {
// // remove tree element
// switch (size) {
// case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
// }
// }
// }
// }
}
}
}
}
} finally {
this.lock.unlock();
}
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType);
@ -323,11 +393,10 @@ public class SubscriptionManager {
if (subsPerType == null) {
// this caches our class hierarchy. This is never cleared.
Set<Class<?>> types = this.superClassesCache.get(superType);
if (types == null || types.isEmpty()) {
Collection<Class<?>> types = setupSuperClassCache(superType);
if (types.isEmpty()) {
return null;
}
subsPerType = new StrongConcurrentSet<Subscription>(types.size() + 1, this.LOAD_FACTOR);
Iterator<Class<?>> iterator = types.iterator();
@ -474,11 +543,13 @@ public class SubscriptionManager {
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
*/
private void setupSuperClassCache(Class<?> clazz) {
if (!this.superClassesCache.containsKey(clazz)) {
private Collection<Class<?>> setupSuperClassCache(Class<?> clazz) {
Collection<Class<?>> superTypes = this.superClassesCache.get(clazz);
if (superTypes == null) {
// it doesn't matter if concurrent access stomps on values, since they are always the same.
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
superTypes = ReflectionUtils.getSuperTypes(clazz);
StrongConcurrentSet<Class<?>> set = new StrongConcurrentSet<Class<?>>(superTypes.size() + 1, this.LOAD_FACTOR);
for (Class<?> c : superTypes) {
set.add(c);
@ -487,53 +558,7 @@ public class SubscriptionManager {
// race conditions will result in duplicate answers, which we don't care about
this.superClassesCache.put(clazz, set);
}
}
public static class SuperClassIterator implements Iterator<Class<?>> {
private final Iterator<Class<?>> iterator;
private Class<?> clazz;
public SuperClassIterator(Class<?> clazz, Collection<Class<?>> types) {
this.clazz = clazz;
if (types != null) {
this.iterator = types.iterator();
} else {
this.iterator = null;
}
}
@Override
public boolean hasNext() {
if (this.clazz != null) {
return true;
}
if (this.iterator != null) {
return this.iterator.hasNext();
}
return false;
}
@Override
public Class<?> next() {
if (this.clazz != null) {
Class<?> clazz2 = this.clazz;
this.clazz = null;
return clazz2;
}
if (this.iterator != null) {
return this.iterator.next();
}
return null;
}
@Override
public void remove() {
throw new NotImplementedException();
}
return superTypes;
}
}

View File

@ -3,6 +3,7 @@ package net.engio.mbassy.multi.common;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@ -19,6 +20,8 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
* Date: 2/12/12
*/
public abstract class AbstractConcurrentSet<T> implements Set<T> {
private static final AtomicLong id = new AtomicLong();
private final long ID = id.getAndIncrement();
// Internal state
protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
@ -135,13 +138,12 @@ public abstract class AbstractConcurrentSet<T> implements Set<T> {
@Override
public Object[] toArray() {
throw new NotImplementedException();
return this.entries.entrySet().toArray();
}
@SuppressWarnings("hiding")
@Override
public <T> T[] toArray(T[] a) {
throw new NotImplementedException();
return this.entries.entrySet().toArray(a);
}
@Override
@ -164,6 +166,32 @@ public abstract class AbstractConcurrentSet<T> implements Set<T> {
throw new NotImplementedException();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (this.ID ^ this.ID >>> 32);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
AbstractConcurrentSet other = (AbstractConcurrentSet) obj;
if (this.ID != other.ID) {
return false;
}
return true;
}
public abstract static class Entry<T> implements ISetEntry<T> {

File diff suppressed because it is too large Load Diff

View File

@ -21,12 +21,12 @@ public class IdentityObjectTree<KEY, VALUE> {
public IdentityObjectTree() {
}
// can be overridded to provide a custom backing map
/**
* can be overridden to provide a custom backing map
*/
protected Map<KEY, IdentityObjectTree<KEY, VALUE>> createChildren() {
//TODO: this needs to be concurrent?!?!?
return new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f, 1);
// return new Reference2ReferenceOpenHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, 0.75f);
}
}
public VALUE getValue() {
VALUE returnValue = this.value;

View File

@ -76,11 +76,16 @@ public class NamedThreadFactory implements ThreadFactory {
stringBuilder.append('-');
stringBuilder.append(this.threadID.getAndIncrement());
// stack size is arbitrary based on JVM implementation. Default is 0
return newThread(stringBuilder.toString(), r);
}
public Thread newThread(String name, Runnable r) {
// stack size is arbitrary based on JVM implementation. Default is 0
// 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k
// To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit)
// Setting the size MAY or MAY NOT have any effect!!!
Thread t = new Thread(this.group, r, stringBuilder.toString(), NamedThreadFactory.stackSizeForThreads);
Thread t = new Thread(this.group, r, name, NamedThreadFactory.stackSizeForThreads);
t.setDaemon(true);// FORCE these threads to finish before allowing the JVM to exit
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);

View File

@ -1,6 +1,5 @@
package net.engio.mbassy.multi.common;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
/**
* This implementation uses strong references to the elements.
@ -17,7 +16,7 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
}
public StrongConcurrentSet(int size, float loadFactor) {
super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, 1));
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor));
}
@Override

View File

@ -61,6 +61,7 @@ public class MessageHandler {
return this.isSynchronized;
}
// only in unit test
public boolean isFromListener(Class<?> listener){
return this.listenerConfig.isFromListener(listener);
}

View File

@ -28,15 +28,11 @@ public class MessageListener {
this.listenerDefinition = listenerDefinition;
}
// only in unit test
public boolean isFromListener(Class<?> listener) {
return this.listenerDefinition.equals(listener);
}
public MessageListener addHandlers(Collection<? extends MessageHandler> c) {
this.handlers.addAll(c);
return this;
}
public boolean addHandler(MessageHandler messageHandler) {
return this.handlers.add(messageHandler);
}
@ -44,8 +40,4 @@ public class MessageListener {
public Collection<MessageHandler> getHandlers() {
return this.handlers;
}
public Class<?> getListerDefinition() {
return this.listenerDefinition;
}
}

View File

@ -0,0 +1,47 @@
package net.engio.mbassy.multi.subscription;
import net.engio.mbassy.multi.PubSubSupport;
import com.lmax.disruptor.WorkHandler;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class EventProcessor implements WorkHandler<MessageHolder> {
private final PubSubSupport publisher;
public EventProcessor(PubSubSupport publisher) {
this.publisher = publisher;
}
@Override
public void onEvent(MessageHolder event) throws Exception {
MessageType messageType = event.messageType;
switch (messageType) {
case ONE: {
this.publisher.publish(event.message1);
event.message1 = null; // cleanup
return;
}
case TWO: {
this.publisher.publish(event.message1, event.message2);
event.message1 = null; // cleanup
event.message2 = null; // cleanup
return;
}
case THREE: {
this.publisher.publish(event.message1, event.message2, event.message3);
event.message1 = null; // cleanup
event.message2 = null; // cleanup
event.message3 = null; // cleanup
return;
}
case ARRAY: {
this.publisher.publish(event.messages);
event.messages = null; // cleanup
return;
}
}
}
}

View File

@ -0,0 +1,18 @@
package net.engio.mbassy.multi.subscription;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class MessageHolder {
public MessageType messageType = MessageType.ONE;
public Object message1 = null;
public Object message2 = null;
public Object message3 = null;
public Object[] messages = null;
public MessageHolder() {
}
}

View File

@ -0,0 +1,8 @@
package net.engio.mbassy.multi.subscription;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public enum MessageType {
ONE, TWO, THREE, ARRAY
}

View File

@ -38,9 +38,9 @@ public class Subscription {
private final MessageHandler handlerMetadata;
private final IHandlerInvocation invocation;
private final Collection<Object> listeners;
private final StrongConcurrentSet<Object> listeners;
Subscription(MessageHandler handler) {
public Subscription(MessageHandler handler) {
this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSet<Object>();
@ -60,6 +60,10 @@ public class Subscription {
return this.handlerMetadata.acceptsSubtypes();
}
public boolean isEmpty() {
return this.listeners.isEmpty();
}
public void subscribe(Object listener) {
this.listeners.add(listener);
}
@ -96,38 +100,34 @@ public class Subscription {
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) {
e.printStackTrace();
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (IllegalArgumentException e) {
e.printStackTrace();
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0])
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0])
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (InvocationTargetException e) {
e.printStackTrace();
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (Throwable e) {
e.printStackTrace();
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)

View File

@ -2,7 +2,6 @@ package net.engio.mbassy.multi;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest;
@ -77,19 +76,22 @@ public class MBassadorTest extends MessageBusTest {
Runnable publishAndCheck = new Runnable() {
@Override
public void run() {
bus.publishAsync(MessageTypes.Simple);
}
};
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
messageManager.waitForMessages(processingTimeInMS);
while (bus.hasPendingMessages()) {
pause(10);
}
MessageTypes.resetAll();
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
messageManager.waitForMessages(processingTimeInMS);
while (bus.hasPendingMessages()) {
pause(10);
}
messageManager.waitForMessages(600);
}
@ -119,14 +121,18 @@ public class MBassadorTest extends MessageBusTest {
// single threaded
ConcurrentExecutor.runConcurrent(publishAndCheck, 1);
pause(processingTimeInMS);
while (bus.hasPendingMessages()) {
pause(10);
}
assertEquals(InstancesPerListener, exceptionCount.get());
// multi threaded
exceptionCount.set(0);
ConcurrentExecutor.runConcurrent(publishAndCheck, ConcurrentUnits);
pause(processingTimeInMS);
while (bus.hasPendingMessages()) {
pause(10);
}
assertEquals(InstancesPerListener * ConcurrentUnits, exceptionCount.get());
bus.shutdown();

View File

@ -1,6 +1,5 @@
package net.engio.mbassy.multi;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.MessageBusTest;

View File

@ -20,7 +20,6 @@ import net.engio.mbassy.multi.messages.IMultipartMessage;
import net.engio.mbassy.multi.messages.MessageTypes;
import net.engio.mbassy.multi.messages.MultipartMessage;
import net.engio.mbassy.multi.messages.StandardMessage;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
import org.junit.Test;
@ -162,8 +161,8 @@ public class SubscriptionManagerTest extends AssertSupport {
Overloading.ListenerBase.class,
Overloading.ListenerSub.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(ConcurrentUnits);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
SubscriptionManager subscriptionManager = new SubscriptionManager(1);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(Overloading.ListenerBase.class).handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class)
@ -181,13 +180,13 @@ public class SubscriptionManagerTest extends AssertSupport {
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
final SubscriptionManager subscriptionManager = new SubscriptionManager(ConcurrentUnits);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
validator.validate(subscriptionManager);
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), ConcurrentUnits);
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1);
listeners.clear();
validator.clear();

View File

@ -20,7 +20,6 @@ public abstract class MessageBusTest extends AssertSupport {
// this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when
// evaluation is run
protected static final int processingTimeInMS = 6000;
protected static final int InstancesPerListener = 5000;
protected static final int ConcurrentUnits = 10;
protected static final int IterationsPerThread = 100;

View File

@ -1,13 +1,14 @@
package net.engio.mbassy.multi.common;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import net.engio.mbassy.multi.SubscriptionManager;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
/**
*
@ -39,13 +40,24 @@ public class SubscriptionValidator extends AssertSupport{
// for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(SubscriptionManager manager) {
for (Class<?> messageType : this.messageTypes) {
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
Collection<ValidationEntry> validationEntries = getEntries(messageType);
assertEquals(subscriptions.size(), validationEntries.size());
// we split subs + superSubs into TWO calls.
Collection<Subscription> collection = new ArrayDeque<Subscription>(8);
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType);
if (subscriptions != null) {
collection.addAll(subscriptions);
}
Collection<Subscription> superSubs = manager.getSuperSubscriptions(messageType);
if (superSubs != null) {
collection.addAll(superSubs);
}
assertEquals(validationEntries.size(), collection.size());
for(ValidationEntry validationValidationEntry : validationEntries){
Subscription matchingSub = null;
// one of the subscriptions must belong to the subscriber type
for(Subscription sub : subscriptions){
for(Subscription sub : collection){
if(sub.belongsTo(validationValidationEntry.subscriber)){
matchingSub = sub;
break;

View File

@ -1,12 +1,12 @@
package net.engio.mbassy.multi.common;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.PubSubSupport;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
import java.util.Iterator;
import java.util.List;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.PubSubSupport;
import net.engio.mbassy.multi.SubscriptionManager;
/**
* Todo: Add javadoc
*
@ -81,15 +81,16 @@ public class TestUtil {
for(int i = 0; i < numberOfThreads; i++){
final int partitionStart = i * partitionSize;
final int partitionEnd = (i+1 < numberOfThreads)
final int partitionEnd = i+1 < numberOfThreads
? partitionStart + partitionSize + 1
: listeners.size();
setupUnits[i] = new Runnable() {
private List<Object> listenerSubset = listeners.subList(partitionStart, partitionEnd);
@Override
public void run() {
for(Object listener : listenerSubset){
for(Object listener : this.listenerSubset){
bus.subscribe(listener);
}
}