it's slower, not sure if correct direction

This commit is contained in:
nathan 2015-02-16 23:37:41 +01:00
parent d1338ad46c
commit a91b3af27f
13 changed files with 598 additions and 494 deletions

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import java.lang.reflect.Array;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
@ -18,11 +19,11 @@ public class DispatchRunnable implements Runnable {
private ErrorHandlingSupport errorHandler; private ErrorHandlingSupport errorHandler;
private TransferQueue<Object> dispatchQueue; private TransferQueue<Object> dispatchQueue;
private TransferQueue<Runnable> invokeQueue; private TransferQueue<SubRunnable> invokeQueue;
private SubscriptionManager manager; private SubscriptionManager manager;
public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager, public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager,
TransferQueue<Object> dispatchQueue, TransferQueue<Runnable> invokeQueue) { TransferQueue<Object> dispatchQueue, TransferQueue<SubRunnable> invokeQueue) {
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
this.manager = subscriptionManager; this.manager = subscriptionManager;
@ -35,19 +36,22 @@ public class DispatchRunnable implements Runnable {
final SubscriptionManager manager = this.manager; final SubscriptionManager manager = this.manager;
final ErrorHandlingSupport errorHandler = this.errorHandler; final ErrorHandlingSupport errorHandler = this.errorHandler;
final TransferQueue<Object> IN_queue = this.dispatchQueue; final TransferQueue<Object> IN_queue = this.dispatchQueue;
final TransferQueue<Runnable> OUT_queue = this.invokeQueue; final TransferQueue<SubRunnable> OUT_queue = this.invokeQueue;
final Runnable dummyRunnable = new Runnable() {
@Override
public void run() {
}
};
Object message = null; Object message = null;
int counter; int counter;
while (true) { while (true) {
try { try {
counter = MultiMBassador.WORK_RUN_BLITZ; counter = MultiMBassador.WORKER_BLITZ;
while ((message = IN_queue.poll()) == null) { while ((message = IN_queue.poll()) == null) {
if (counter > MultiMBassador.WORK_RUN_BLITZ_DIV2) { if (counter > 0) {
--counter;
Thread.yield();
} else if (counter > 0) {
--counter; --counter;
LockSupport.parkNanos(1L); LockSupport.parkNanos(1L);
} else { } else {
@ -56,23 +60,76 @@ public class DispatchRunnable implements Runnable {
} }
} }
@SuppressWarnings("null")
Class<?> messageClass = message.getClass(); Class<?> messageClass = message.getClass();
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass); Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty(); boolean empty = subscriptions.isEmpty();
if (empty) {
// Dead Event
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message); Collection<Subscription> deadSubscriptions = null;
message = deadMessage; if (empty) {
empty = subscriptions.isEmpty(); // Dead Event. must EXACTLY MATCH (no subclasses or varargs)
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
} }
Collection<Class<?>> superClasses = manager.getSuperClasses(messageClass);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
manager.readUnLock();
if (!empty) { if (!empty) {
Runnable e = new InvokeRunnable(errorHandler, subscriptions, message); for (Subscription sub : subscriptions) {
OUT_queue.transfer(e); sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message));
} else if (deadSubscriptions != null) {
if (!deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : deadSubscriptions) {
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, deadMessage);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, deadSubscriptions, deadMessage));
}
} }
// now get superClasses
for (Class<?> superClass : superClasses) {
subscriptions = manager.getSubscriptionsByMessageType(superClass);
if (!subscriptions.isEmpty()) {
for (Subscription sub : subscriptions) {
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message));
}
}
// now get varargs
if (!varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1);
vararg[0] = message;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
for (Subscription sub : varArgs) {
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, vararg);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, varArgs, vararg));
}
// make sure it's synced at this point
// OUT_queue.transfer(dummyRunnable);
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} catch (Throwable e) { } catch (Throwable e) {

View File

@ -1,6 +1,5 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import java.lang.reflect.Array;
import java.util.Collection; import java.util.Collection;
import net.engio.mbassy.multi.error.ErrorHandlingSupport; import net.engio.mbassy.multi.error.ErrorHandlingSupport;
@ -27,28 +26,9 @@ public class InvokeRunnable implements Runnable {
ErrorHandlingSupport errorHandler = this.errorHandler; ErrorHandlingSupport errorHandler = this.errorHandler;
Collection<Subscription> subs = this.subscriptions; Collection<Subscription> subs = this.subscriptions;
Object message = this.message; Object message = this.message;
Object[] vararg = null;
for (Subscription sub : subs) { // for (Subscription sub : subs) {
boolean handled = false; // sub.publishToSubscriptionSingle(errorHandler, message);
if (sub.isVarArg()) { // }
// messageClass will NEVER be an array to begin with, since that will call the multi-arg method
if (vararg == null) {
// messy, but the ONLY way to do it.
vararg = (Object[]) Array.newInstance(message.getClass(), 1);
vararg[0] = message;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
handled = true;
sub.publishToSubscription(errorHandler, vararg);
}
if (!handled) {
sub.publishToSubscription(errorHandler, message);
}
}
} }
} }

View File

@ -1,13 +1,12 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.multi.common.DisruptorThreadFactory; import net.engio.mbassy.multi.common.DisruptorThreadFactory;
import net.engio.mbassy.multi.common.LinkedTransferQueue; import net.engio.mbassy.multi.common.LinkedTransferQueue;
import net.engio.mbassy.multi.common.Pow2;
import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError; import net.engio.mbassy.multi.error.PublicationError;
@ -31,7 +30,7 @@ public class MultiMBassador implements IMessageBus {
// private final Queue<MessageHolder> dispatchQueue; // private final Queue<MessageHolder> dispatchQueue;
// private final BlockingQueue<MessageHolder> dispatchQueue; // private final BlockingQueue<MessageHolder> dispatchQueue;
private final TransferQueue<Object> dispatchQueue; private final TransferQueue<Object> dispatchQueue;
private final TransferQueue<Runnable> invokeQueue; private final TransferQueue<SubRunnable> invokeQueue;
// all threads that are available for asynchronous message dispatching // all threads that are available for asynchronous message dispatching
@ -42,17 +41,16 @@ public class MultiMBassador implements IMessageBus {
} }
public static final int WORK_RUN_BLITZ = 50; public static final int WORKER_BLITZ = 10;
public static final int WORK_RUN_BLITZ_DIV2 = WORK_RUN_BLITZ/2;
public MultiMBassador(int numberOfThreads) { public MultiMBassador(int numberOfThreads) {
if (numberOfThreads < 1) { if (numberOfThreads < 1) {
numberOfThreads = 1; // at LEAST 1 threads numberOfThreads = 1; // at LEAST 1 thread
} }
// this.objectQueue = new LinkedTransferQueue<MessageHolder>(); // this.objectQueue = new LinkedTransferQueue<MessageHolder>();
this.dispatchQueue = new LinkedTransferQueue<Object>(); this.dispatchQueue = new LinkedTransferQueue<Object>();
this.invokeQueue = new LinkedTransferQueue<Runnable>(); this.invokeQueue = new LinkedTransferQueue<SubRunnable>();
// this.invokeQueue = new BoundedTransferQueue<Runnable>(numberOfThreads); // this.invokeQueue = new BoundedTransferQueue<Runnable>(numberOfThreads);
// this.dispatchQueue = new BoundedTransferQueue<MessageHolder>(numberOfThreads); // this.dispatchQueue = new BoundedTransferQueue<MessageHolder>(numberOfThreads);
// this.dispatchQueue = new MpmcArrayQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2)); // this.dispatchQueue = new MpmcArrayQueue<MessageHolder>(Pow2.roundToPowerOfTwo(numberOfThreads/2));
@ -65,7 +63,8 @@ public class MultiMBassador implements IMessageBus {
int dispatchSize = 2; int dispatchSize = 2;
int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); // int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads);
int invokeSize = 0;
this.threads = new ArrayList<Thread>(dispatchSize + invokeSize); this.threads = new ArrayList<Thread>(dispatchSize + invokeSize);
@ -88,30 +87,91 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public void run() { public void run() {
final MultiMBassador mbassador = MultiMBassador.this; final MultiMBassador mbassador = MultiMBassador.this;
final TransferQueue<Runnable> IN_queue = mbassador.invokeQueue; final TransferQueue<SubRunnable> IN_queue = mbassador.invokeQueue;
try { try {
Runnable runnable = null; SubRunnable runnable = null;
int counter; int counter;
while (true) { while (true) {
runnable = null; runnable = null;
counter = WORK_RUN_BLITZ; counter = WORKER_BLITZ;
while ((runnable = IN_queue.poll()) == null) { // while ((runnable = IN_queue.poll()) == null) {
if (counter > WORK_RUN_BLITZ_DIV2) { // if (counter > 0) {
--counter; // --counter;
Thread.yield(); // LockSupport.parkNanos(1L);
} else if (counter > 0) { // } else {
--counter;
LockSupport.parkNanos(1L);
} else {
runnable = IN_queue.take(); runnable = IN_queue.take();
break; // break;
} // }
} // }
runnable.run();
try {
runnable.handler.invoke(runnable.listener, runnable.message);
// this.invocation.invoke(listener, handler, message);
} catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message));
} catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " + message.getClass()
// + "Expected: " + handler.getParameterTypes()[0])
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message));
} catch (InvocationTargetException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message));
} catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message));
}
// runnable.run();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
@ -152,8 +212,7 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public boolean hasPendingMessages() { public boolean hasPendingMessages() {
// return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; return !this.dispatchQueue.isEmpty() || !this.invokeQueue.isEmpty();
return !(this.dispatchQueue.isEmpty() && this.invokeQueue.isEmpty());
} }
@Override @Override
@ -161,15 +220,6 @@ public class MultiMBassador implements IMessageBus {
for (Thread t : this.threads) { for (Thread t : this.threads) {
t.interrupt(); t.interrupt();
} }
// System.err.println(this.counter);
// for (InterruptRunnable runnable : this.invokeRunners) {
// runnable.stop();
// }
// this.dispatch_Disruptor.shutdown();
// this.dispatch_Executor.shutdown();
} }
@ -449,61 +499,18 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public void publishAsync(Object message) { public void publishAsync(Object message) {
if (message != null) { if (message != null) {
// // put this on the disruptor ring buffer try {
// final RingBuffer<DispatchHolder> ringBuffer = this.dispatch_RingBuffer; this.dispatchQueue.transfer(message);
// return;
// // setup the job } catch (InterruptedException e) {
// final long seq = ringBuffer.next(); e.printStackTrace();
// try { // log.error(e);
// DispatchHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.ONE;
// eventJob.message1 = message;
// } catch (Throwable e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
// MessageHolder messageHolder = new MessageHolder(); handlePublicationError(new PublicationError()
// messageHolder.messageType = MessageType.ONE; .setMessage("Error while adding an asynchronous message")
// messageHolder.message1 = message; .setCause(e)
.setPublishedObject(message));
}
// new Runnable() {
// @Override
// public void run() {
//
// }
// };
// faster if we can skip locking
// int counter = 200;
// while (!this.dispatchQueue.offer(message)) {
// if (counter > 100) {
// --counter;
// Thread.yield();
// } else if (counter > 0) {
// --counter;
// LockSupport.parkNanos(1L);
// } else {
try {
this.dispatchQueue.transfer(message);
return;
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
// }
// }
} }
} }

View File

@ -0,0 +1,20 @@
package net.engio.mbassy.multi;
import java.lang.reflect.Method;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public class SubRunnable {
public Method handler;
public Object listener;
public Object message;
public SubRunnable(Method handler, Object listener, Object message) {
this.handler = handler;
this.listener = listener;
this.message = message;
}
}

View File

@ -1,8 +1,11 @@
package net.engio.mbassy.multi.common; package net.engio.mbassy.multi.common;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
/** /**
@ -14,7 +17,7 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
* @author bennidi * @author bennidi
* Date: 2/12/12 * Date: 2/12/12
*/ */
public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> { public abstract class AbstractConcurrentSet<T> implements Collection<T> {
// Internal state // Internal state
protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
@ -28,21 +31,25 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
protected abstract Entry<T> createEntry(T value, Entry<T> next); protected abstract Entry<T> createEntry(T value, Entry<T> next);
@Override @Override
public void add(T element) { public boolean add(T element) {
if (element == null) { if (element == null) {
return; return false;
} }
Lock writeLock = this.lock.writeLock(); Lock writeLock = this.lock.writeLock();
boolean changed = false;
writeLock.lock(); writeLock.lock();
if (this.entries.containsKey(element)) { if (this.entries.containsKey(element)) {
} else { } else {
insert(element); insert(element);
changed = true;
} }
writeLock.unlock(); writeLock.unlock();
return changed;
} }
@Override @Override
public boolean contains(T element) { public boolean contains(Object element) {
Lock readLock = this.lock.readLock(); Lock readLock = this.lock.readLock();
ISetEntry<T> entry; ISetEntry<T> entry;
try { try {
@ -73,25 +80,28 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
} }
@Override @Override
public void addAll(Iterable<T> elements) { public boolean addAll(Collection<? extends T> elements) {
boolean changed = false;
Lock writeLock = this.lock.writeLock(); Lock writeLock = this.lock.writeLock();
try { try {
writeLock.lock(); writeLock.lock();
for (T element : elements) { for (T element : elements) {
if (element != null) { if (element != null) {
insert(element); insert(element);
changed = true;
} }
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
return changed;
} }
/** /**
* @return TRUE if the element was successfully removed * @return TRUE if the element was successfully removed
*/ */
@Override @Override
public boolean remove(T element) { public boolean remove(Object element) {
Lock updateLock = this.lock.updateLock(); Lock updateLock = this.lock.updateLock();
boolean isNull; boolean isNull;
@ -100,15 +110,12 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
ISetEntry<T> entry = this.entries.get(element); ISetEntry<T> entry = this.entries.get(element);
isNull = entry == null || entry.getValue() == null; isNull = entry == null || entry.getValue() == null;
if (isNull) { if (!isNull) {
Lock writeLock = this.lock.writeLock(); Lock writeLock = this.lock.writeLock();
try { try {
writeLock.lock(); writeLock.lock();
ISetEntry<T> listelement = this.entries.get(element); if (entry != this.head) {
if (listelement == null) { entry.remove();
return false; //removed by other thread in the meantime
} else if (listelement != this.head) {
listelement.remove();
} else { } else {
// if it was second, now it's first // if it was second, now it's first
this.head = this.head.next(); this.head = this.head.next();
@ -127,6 +134,36 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
} }
} }
@Override
public Object[] toArray() {
throw new NotImplementedException();
}
@Override
public <T> T[] toArray(T[] a) {
throw new NotImplementedException();
}
@Override
public boolean containsAll(Collection<?> c) {
throw new NotImplementedException();
}
@Override
public boolean removeAll(Collection<?> c) {
throw new NotImplementedException();
}
@Override
public boolean retainAll(Collection<?> c) {
throw new NotImplementedException();
}
@Override
public void clear() {
throw new NotImplementedException();
}
public abstract static class Entry<T> implements ISetEntry<T> { public abstract static class Entry<T> implements ISetEntry<T> {

View File

@ -1,25 +0,0 @@
package net.engio.mbassy.multi.common;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public interface IConcurrentSet<T> extends Iterable<T> {
void add(T element);
boolean contains(T element);
int size();
boolean isEmpty();
void addAll(Iterable<T> elements);
/**
* @return TRUE if the element was removed
*/
boolean remove(T element);
}

View File

@ -13,7 +13,11 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
public StrongConcurrentSet() { public StrongConcurrentSet() {
super(new IdentityHashMap<T, ISetEntry<T>>()); this(16);
}
public StrongConcurrentSet(int size) {
super(new IdentityHashMap<T, ISetEntry<T>>(size));
} }
@Override @Override
@ -75,9 +79,5 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
public T getValue() { public T getValue() {
return this.value; return this.value;
} }
} }
} }

View File

@ -116,9 +116,5 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
public T getValue() { public T getValue() {
return this.value.get(); return this.value.get();
} }
} }
} }

View File

@ -2,10 +2,12 @@ package net.engio.mbassy.multi.subscription;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays; import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import net.engio.mbassy.multi.common.IConcurrentSet; import net.engio.mbassy.multi.SubRunnable;
import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.common.StrongConcurrentSet;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.dispatch.IHandlerInvocation; import net.engio.mbassy.multi.dispatch.IHandlerInvocation;
import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation; import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation;
@ -34,17 +36,16 @@ public class Subscription {
private final MessageHandler handlerMetadata; private final MessageHandler handlerMetadata;
private final IHandlerInvocation invocation; private final IHandlerInvocation invocation;
// protected final Collection<Object> listeners;
// protected final Map<WeakReference<Object>, Boolean> listeners; // protected final Map<WeakReference<Object>, Boolean> listeners;
// protected final Map<Object, Boolean> listeners; // protected final Map<Object, Boolean> listeners;
protected final IConcurrentSet<Object> listeners; protected final Collection<Object> listeners;
Subscription(MessageHandler handler) { Subscription(MessageHandler handler) {
// this.listeners = new WeakConcurrentSet<Object>(); // this.listeners = new WeakConcurrentSet<Object>();
this.listeners = new StrongConcurrentSet<Object>(); this.listeners = new StrongConcurrentSet<Object>();
// this.listeners = new ConcurrentHashMap<Object, Boolean>();
// this.listeners = new CopyOnWriteArrayList<Object>(); // this.listeners = new CopyOnWriteArrayList<Object>();
// this.listeners = new ConcurrentSkipListSet<Object>(); // this.listeners = new ConcurrentSkipListSet<Object>(); // requires listener object to be comparable
// this.listeners = new ConcurrentHashMap<Object, Boolean>();
// this.listeners = new ConcurrentWeakHashMap<WeakReference<Object>, Boolean>(); // this.listeners = new ConcurrentWeakHashMap<WeakReference<Object>, Boolean>();
this.handlerMetadata = handler; this.handlerMetadata = handler;
@ -131,17 +132,30 @@ public class Subscription {
return this.listeners.size(); return this.listeners.size();
} }
// private AtomicLong counter = new AtomicLong(); public void pin() {
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { System.err.println(this.counter.get());
}
private AtomicLong counter = new AtomicLong();
public void publishToSubscriptionSingle(TransferQueue<SubRunnable> OUT_queue, ErrorHandlingSupport errorHandler, Object message) {
// Collection<Object> listeners = this.listeners.keySet(); // Collection<Object> listeners = this.listeners.keySet();
// Collection<Object> listeners = this.listeners; Collection<Object> listeners = this.listeners;
IConcurrentSet<Object> listeners = this.listeners;
if (listeners.size() > 0) { if (listeners.size() > 0) {
Method handler = this.handlerMetadata.getHandler(); Method handler = this.handlerMetadata.getHandler();
// int count = 0; // int count = 0;
// Iterator<Object> iterator = listeners.iterator();
for (Object listener : listeners) { for (Object listener : listeners) {
// count++; // count++;
// this.counter.getAndIncrement();
// try {
// OUT_queue.transfer(new SubRunnable(handler, listener, message));
// } catch (InterruptedException e1) {
// return;
// }
try { try {
this.invocation.invoke(listener, handler, message); this.invocation.invoke(listener, handler, message);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
@ -186,158 +200,154 @@ public class Subscription {
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
// Collection<Object> listeners = this.listeners.keySet(); // Collection<Object> listeners = this.listeners.keySet();
// Collection<Object> listeners = this.listeners; // Collection<Object> listeners = this.listeners;
IConcurrentSet<Object> listeners = this.listeners; //
// if (listeners.size() > 0) {
if (listeners.size() > 0) { // Method handler = this.handlerMetadata.getHandler();
Method handler = this.handlerMetadata.getHandler(); //
// for (Object listener : listeners) {
for (Object listener : listeners) { // try {
try { // this.invocation.invoke(listener, handler, message1, message2);
this.invocation.invoke(listener, handler, message1, message2); // } catch (IllegalAccessException e) {
} catch (IllegalAccessException e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "The class or method is not accessible")
"The class or method is not accessible") // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(message1, message2));
.setPublishedObject(message1, message2)); // } catch (IllegalArgumentException e) {
} catch (IllegalArgumentException e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "Wrong arguments passed to method. Was: " +
"Wrong arguments passed to method. Was: " + // message1.getClass() + ", " +
message1.getClass() + ", " + // message2.getClass()
message2.getClass() // + ". Expected: " + handler.getParameterTypes()[0] + ", " +
+ ". Expected: " + handler.getParameterTypes()[0] + ", " + // handler.getParameterTypes()[1]
handler.getParameterTypes()[1] // )
) // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(message1, message2));
.setPublishedObject(message1, message2)); // } catch (InvocationTargetException e) {
} catch (InvocationTargetException e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "Message handler threw exception")
"Message handler threw exception") // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(message1, message2));
.setPublishedObject(message1, message2)); // } catch (Throwable e) {
} catch (Throwable e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "The handler code threw an exception")
"The handler code threw an exception") // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(message1, message2));
.setPublishedObject(message1, message2)); // }
} // }
} // }
}
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
// Collection<Object> listeners = this.listeners.keySet(); // Collection<Object> listeners = this.listeners.keySet();
// Collection<Object> listeners = this.listeners; //
IConcurrentSet<Object> listeners = this.listeners; // if (this.listeners.size() > 0) {
// Method handler = this.handlerMetadata.getHandler();
if (listeners.size() > 0) { //
Method handler = this.handlerMetadata.getHandler(); // for (Object listener : this.listeners) {
// try {
for (Object listener : listeners) { // this.invocation.invoke(listener, handler, message1, message2, message3);
try { // } catch (IllegalAccessException e) {
this.invocation.invoke(listener, handler, message1, message2, message3); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalAccessException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The class or method is not accessible")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The class or method is not accessible") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // } catch (IllegalArgumentException e) {
.setPublishedObject(message1, message2, message3)); // errorHandler.handlePublicationError(new PublicationError()
} catch (IllegalArgumentException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Wrong arguments passed to method. Was: " +
.setMessage("Error during invocation of message handler. " + // message1.getClass() + ", " +
"Wrong arguments passed to method. Was: " + // message2.getClass() + ", " +
message1.getClass() + ", " + // message3.getClass()
message2.getClass() + ", " + // + ". Expected: " + handler.getParameterTypes()[0] + ", " +
message3.getClass() // handler.getParameterTypes()[1] + ", " +
+ ". Expected: " + handler.getParameterTypes()[0] + ", " + // handler.getParameterTypes()[2]
handler.getParameterTypes()[1] + ", " + // )
handler.getParameterTypes()[2] // .setCause(e)
) // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // } catch (InvocationTargetException e) {
.setPublishedObject(message1, message2, message3)); // errorHandler.handlePublicationError(new PublicationError()
} catch (InvocationTargetException e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "Message handler threw exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"Message handler threw exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // } catch (Throwable e) {
.setPublishedObject(message1, message2, message3)); // errorHandler.handlePublicationError(new PublicationError()
} catch (Throwable e) { // .setMessage("Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError() // "The handler code threw an exception")
.setMessage("Error during invocation of message handler. " + // .setCause(e)
"The handler code threw an exception") // .setMethodName(handler.getName())
.setCause(e) // .setListener(listener)
.setMethodName(handler.getName()) // .setPublishedObject(message1, message2, message3));
.setListener(listener) // }
.setPublishedObject(message1, message2, message3)); // }
} // }
}
}
} }
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) {
// Collection<Object> listeners = this.listeners.keySet(); // Collection<Object> listeners = this.listeners.keySet();
// Collection<Object> listeners = this.listeners; // Collection<Object> listeners = this.listeners;
IConcurrentSet<Object> listeners = this.listeners; //
// if (listeners.size() > 0) {
if (listeners.size() > 0) { // Method handler = this.handlerMetadata.getHandler();
Method handler = this.handlerMetadata.getHandler(); //
// for (Object listener : listeners) {
for (Object listener : listeners) { // try {
try { // this.invocation.invoke(listener, handler, messages);
this.invocation.invoke(listener, handler, messages); // } catch (IllegalAccessException e) {
} catch (IllegalAccessException e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "The class or method is not accessible")
"The class or method is not accessible") // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(messages));
.setPublishedObject(messages)); // } catch (IllegalArgumentException e) {
} catch (IllegalArgumentException e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages)
"Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) // + "Expected: " + Arrays.deepToString(handler.getParameterTypes()))
+ "Expected: " + Arrays.deepToString(handler.getParameterTypes())) // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(messages));
.setPublishedObject(messages)); // } catch (InvocationTargetException e) {
} catch (InvocationTargetException e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "Message handler threw exception")
"Message handler threw exception") // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(messages));
.setPublishedObject(messages)); // } catch (Throwable e) {
} catch (Throwable e) { // errorHandler.handlePublicationError(new PublicationError()
errorHandler.handlePublicationError(new PublicationError() // .setMessage("Error during invocation of message handler. " +
.setMessage("Error during invocation of message handler. " + // "The handler code threw an exception")
"The handler code threw an exception") // .setCause(e)
.setCause(e) // .setMethodName(handler.getName())
.setMethodName(handler.getName()) // .setListener(listener)
.setListener(listener) // .setPublishedObject(messages));
.setPublishedObject(messages)); // }
} // }
} // }
}
} }
@Override @Override

View File

@ -4,6 +4,7 @@ import java.lang.reflect.Array;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -13,6 +14,7 @@ import java.util.concurrent.ConcurrentHashMap;
import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.IdentityObjectTree;
import net.engio.mbassy.multi.common.ReflectionUtils; import net.engio.mbassy.multi.common.ReflectionUtils;
import net.engio.mbassy.multi.common.StrongConcurrentSet;
import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MessageHandler;
import net.engio.mbassy.multi.listener.MetadataReader; import net.engio.mbassy.multi.listener.MetadataReader;
@ -31,18 +33,13 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
*/ */
public class SubscriptionManager { public class SubscriptionManager {
public static class SubHolder {
public int count = 0;
public Collection<Subscription> subs = new ArrayDeque<Subscription>(0);
}
// the metadata reader that is used to inspect objects passed to the subscribe method // the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader = new MetadataReader(); private final MetadataReader metadataReader = new MetadataReader();
// all subscriptions per message type // all subscriptions per message type
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time // write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, SubHolder> subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, SubHolder>(50); private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(50);
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>(); private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
// all subscriptions per messageHandler type // all subscriptions per messageHandler type
@ -94,17 +91,14 @@ public class SubscriptionManager {
// single // single
Class<?> clazz = handledMessageTypes[0]; Class<?> clazz = handledMessageTypes[0];
// NOTE: Not thread-safe! must be synchronized in outer scope // NOTE: Order is important for safe publication
SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subHolder != null) { if (subs != null) {
Collection<Subscription> subs = subHolder.subs; subs.remove(subscription);
if (subs != null) {
subs.remove(subscription);
if (subs.isEmpty()) { if (subs.isEmpty()) {
// remove element // remove element
this.subscriptionsPerMessageSingle.remove(clazz); this.subscriptionsPerMessageSingle.remove(clazz);
}
} }
} }
} else { } else {
@ -188,7 +182,7 @@ public class SubscriptionManager {
} }
// it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock // it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock
subscriptions = new ArrayDeque<Subscription>(messageHandlers.size()); subscriptions = new StrongConcurrentSet<Subscription>(messageHandlers.size());
// create subscriptions for all detected message handlers // create subscriptions for all detected message handlers
for (MessageHandler messageHandler : messageHandlers) { for (MessageHandler messageHandler : messageHandlers) {
@ -203,15 +197,16 @@ public class SubscriptionManager {
// single // single
Class<?> clazz = handledMessageTypes[0]; Class<?> clazz = handledMessageTypes[0];
// NOTE: Not thread-safe! must be synchronized in outer scope // NOTE: Order is important for safe publication
SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
if (subHolder == null) { if (subs == null) {
subHolder = new SubHolder(); subs = new StrongConcurrentSet<Subscription>(2);
this.subscriptionsPerMessageSingle.put(clazz, subHolder); // subs = new CopyOnWriteArrayList<Subscription>();
subs.add(subscription);
this.subscriptionsPerMessageSingle.put(clazz, subs);
} else {
subs.add(subscription);
} }
Collection<Subscription> subs = subHolder.subs;
subs.add(subscription);
subHolder.count++;
// have to save our the VarArg class types, because creating var-arg arrays for objects is expensive // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive
if (subscription.isVarArg()) { if (subscription.isVarArg()) {
@ -221,7 +216,7 @@ public class SubscriptionManager {
// since it's vararg, this means that it's an ARRAY, so we ALSO // since it's vararg, this means that it's an ARRAY, so we ALSO
// have to add the component classes of the array // have to add the component classes of the array
if (subscription.acceptsSubtypes()) { if (subscription.acceptsSubtypes()) {
ArrayList<Class<?>> setupSuperClassCache2 = superClassCache(componentType); ArrayList<Class<?>> setupSuperClassCache2 = setupSuperClassCache(componentType);
// have to setup each vararg chain // have to setup each vararg chain
for (int i = 0; i < setupSuperClassCache2.size(); i++) { for (int i = 0; i < setupSuperClassCache2.size(); i++) {
Class<?> superClass = setupSuperClassCache2.get(i); Class<?> superClass = setupSuperClassCache2.get(i);
@ -234,7 +229,7 @@ public class SubscriptionManager {
} }
} }
} else if (subscription.acceptsSubtypes()) { } else if (subscription.acceptsSubtypes()) {
superClassCache(clazz); setupSuperClassCache(clazz);
} }
} }
else { else {
@ -245,17 +240,17 @@ public class SubscriptionManager {
case 2: { case 2: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
if (subscription.acceptsSubtypes()) { if (subscription.acceptsSubtypes()) {
superClassCache(handledMessageTypes[0]); setupSuperClassCache(handledMessageTypes[0]);
superClassCache(handledMessageTypes[1]); setupSuperClassCache(handledMessageTypes[1]);
} }
break; break;
} }
case 3: { case 3: {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
if (subscription.acceptsSubtypes()) { if (subscription.acceptsSubtypes()) {
superClassCache(handledMessageTypes[0]); setupSuperClassCache(handledMessageTypes[0]);
superClassCache(handledMessageTypes[1]); setupSuperClassCache(handledMessageTypes[1]);
superClassCache(handledMessageTypes[2]); setupSuperClassCache(handledMessageTypes[2]);
} }
break; break;
} }
@ -263,7 +258,7 @@ public class SubscriptionManager {
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
if (subscription.acceptsSubtypes()) { if (subscription.acceptsSubtypes()) {
for (Class<?> c : handledMessageTypes) { for (Class<?> c : handledMessageTypes) {
superClassCache(c); setupSuperClassCache(c);
} }
} }
break; break;
@ -291,10 +286,24 @@ public class SubscriptionManager {
} }
} }
private final Collection<Subscription> EMPTY_LIST = Collections.emptyList();
// cannot return null, must be protected by read lock
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType);
if (subs != null) {
return subs;
} else {
return this.EMPTY_LIST;
}
}
// obtain the set of subscriptions for the given message type // obtain the set of subscriptions for the given message type
// Note: never returns null! // Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) { public Collection<Subscription> DEPRECATED_getSubscriptionsByMessageType(Class<?> messageType) {
// thread safe publication // thread safe publication
Collection<Subscription> subscriptions; Collection<Subscription> subscriptions;
@ -302,50 +311,35 @@ public class SubscriptionManager {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
int count = 0; int count = 0;
Collection<Subscription> subs; Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType);
SubHolder primaryHolder = this.subscriptionsPerMessageSingle.get(messageType); if (subs != null) {
if (primaryHolder != null) {
subscriptions = new ArrayDeque<Subscription>(count); subscriptions = new ArrayDeque<Subscription>(count);
subs = primaryHolder.subs; subscriptions.addAll(subs);
count = primaryHolder.count;
if (subs != null) {
subscriptions.addAll(subs);
}
} else { } else {
subscriptions = new ArrayDeque<Subscription>(16); subscriptions = new ArrayDeque<Subscription>(16);
} }
// also add all subscriptions that match super types // also add all subscriptions that match super types
SubHolder subHolder; ArrayList<Class<?>> types1 = setupSuperClassCache(messageType);
ArrayList<Class<?>> types1 = superClassCache(messageType);
if (types1 != null) { if (types1 != null) {
Class<?> eventSuperType; Class<?> eventSuperType;
int i; int i;
for (i = 0; i < types1.size(); i++) { for (i = 0; i < types1.size(); i++) {
eventSuperType = types1.get(i); eventSuperType = types1.get(i);
subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subHolder != null) { if (subs != null) {
subs = subHolder.subs; for (Subscription sub : subs) {
count += subHolder.count; if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
}
} }
} }
} }
count += addVarArgClass(subscriptions, eventSuperType); addVarArgClass(subscriptions, eventSuperType);
} }
} }
count += addVarArgClass(subscriptions, messageType); addVarArgClass(subscriptions, messageType);
if (primaryHolder != null) {
// save off our count, so our collection creation size is optimal.
primaryHolder.count = count;
}
} finally { } finally {
this.LOCK.readLock().unlock(); this.LOCK.readLock().unlock();
} }
@ -364,8 +358,8 @@ public class SubscriptionManager {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
// also add all subscriptions that match super types // also add all subscriptions that match super types
ArrayList<Class<?>> types1 = superClassCache(messageType1); ArrayList<Class<?>> types1 = setupSuperClassCache(messageType1);
ArrayList<Class<?>> types2 = superClassCache(messageType2); ArrayList<Class<?>> types2 = setupSuperClassCache(messageType2);
Collection<Subscription> subs; Collection<Subscription> subs;
Class<?> eventSuperType1 = messageType1; Class<?> eventSuperType1 = messageType1;
@ -427,9 +421,9 @@ public class SubscriptionManager {
this.LOCK.readLock().lock(); this.LOCK.readLock().lock();
// also add all subscriptions that match super types // also add all subscriptions that match super types
ArrayList<Class<?>> types1 = superClassCache(messageType1); ArrayList<Class<?>> types1 = setupSuperClassCache(messageType1);
ArrayList<Class<?>> types2 = superClassCache(messageType2); ArrayList<Class<?>> types2 = setupSuperClassCache(messageType2);
ArrayList<Class<?>> types3 = superClassCache(messageType3); ArrayList<Class<?>> types3 = setupSuperClassCache(messageType3);
Class<?> eventSuperType1 = messageType1; Class<?> eventSuperType1 = messageType1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1; IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
@ -513,7 +507,6 @@ public class SubscriptionManager {
} }
} }
SubHolder subHolder;
int size = messageTypes.length; int size = messageTypes.length;
if (size > 0) { if (size > 0) {
boolean allSameType = true; boolean allSameType = true;
@ -537,7 +530,7 @@ public class SubscriptionManager {
if (allSameType) { if (allSameType) {
// do we have a var-arg (it shows as an array) subscribed? // do we have a var-arg (it shows as an array) subscribed?
ArrayList<Class<?>> superClasses = superClassCache(firstType); ArrayList<Class<?>> superClasses = setupSuperClassCache(firstType);
Class<?> eventSuperType = firstType; Class<?> eventSuperType = firstType;
int j; int j;
@ -552,14 +545,10 @@ public class SubscriptionManager {
eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types // also add all subscriptions that match super types
subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subHolder != null) { if (subs != null) {
subs = subHolder.subs; for (Subscription sub : subs) {
count += subHolder.count; subscriptions.add(sub);
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
@ -576,12 +565,30 @@ public class SubscriptionManager {
return subscriptions; return subscriptions;
} }
private ArrayList<Class<?>> superClassCache(Class<?> clazz) {
private final Collection<Class<?>> EMPTY_LIST_CLASSES = Collections.emptyList();
// must be protected by read lock
public Collection<Class<?>> getSuperClasses(Class<?> clazz) {
// not thread safe. DO NOT MODIFY
ArrayList<Class<?>> types = this.superClassesCache.get(clazz); ArrayList<Class<?>> types = this.superClassesCache.get(clazz);
if (types != null) {
return types;
}
return this.EMPTY_LIST_CLASSES;
}
// not a thread safe collection. must be locked by caller
private ArrayList<Class<?>> setupSuperClassCache(Class<?> clazz) {
ArrayList<Class<?>> types = this.superClassesCache.get(clazz);
if (types == null) { if (types == null) {
// it doesn't matter if concurrent access stomps on values, since they are always the same. // it doesn't matter if concurrent access stomps on values, since they are always the same.
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz); Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
types = new ArrayList<Class<?>>(superTypes); types = new ArrayList<Class<?>>(superTypes);
// NOTE: no need to write lock, since race conditions will result in duplicate answers
this.superClassesCache.put(clazz, types); this.superClassesCache.put(clazz, types);
} }
@ -593,54 +600,49 @@ public class SubscriptionManager {
/////////////// ///////////////
// a var-arg handler might match // a var-arg handler might match
/////////////// ///////////////
private int addVarArgClass(Collection<Subscription> subscriptions, Class<?> messageType) { private void addVarArgClass(Collection<Subscription> subscriptions, Class<?> messageType) {
// tricky part. We have to check the ARRAY version // tricky part. We have to check the ARRAY version
SubHolder subHolder;
Collection<Subscription> subs; Collection<Subscription> subs;
int count = 0;
Class<?> varArgClass = this.varArgClasses.get(messageType); Class<?> varArgClass = this.varArgClasses.get(messageType);
if (varArgClass != null) { if (varArgClass != null) {
// also add all subscriptions that match super types // also add all subscriptions that match super types
subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); subs = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subHolder != null) { if (subs != null) {
subs = subHolder.subs; for (Subscription sub : subs) {
count += subHolder.count; subscriptions.add(sub);
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
return count;
} }
public Class<?> getVarArg(Class<?> clazz) { // must be protected by read lock
return this.varArgClasses.get(clazz); public Collection<Subscription> getVarArgs(Class<?> clazz) {
Class<?> varArgClass = this.varArgClasses.get(clazz);
if (varArgClass != null) {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subs != null) {
return subs;
}
}
return this.EMPTY_LIST;
} }
/////////////// ///////////////
// a var-arg handler might match // a var-arg handler might match
// tricky part. We have to check the ARRAY version // tricky part. We have to check the ARRAY version
/////////////// ///////////////
private int addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayList<Class<?>> types1) { private void addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayList<Class<?>> types1) {
Collection<Subscription> subs; Collection<Subscription> subs;
SubHolder subHolder;
int count = 0;
Class<?> varArgClass = this.varArgClasses.get(messageType); Class<?> varArgClass = this.varArgClasses.get(messageType);
if (varArgClass != null) { if (varArgClass != null) {
// also add all subscriptions that match super types // also add all subscriptions that match super types
subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); subs = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subHolder != null) { if (subs != null) {
subs = subHolder.subs; for (Subscription sub : subs) {
count += subHolder.count; subscriptions.add(sub);
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
@ -649,21 +651,14 @@ public class SubscriptionManager {
varArgClass = this.varArgClasses.get(eventSuperType); varArgClass = this.varArgClasses.get(eventSuperType);
if (varArgClass != null) { if (varArgClass != null) {
// also add all subscriptions that match super types // also add all subscriptions that match super types
subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); subs = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subHolder != null) { if (subs != null) {
subs = subHolder.subs; for (Subscription sub : subs) {
count += subHolder.count; subscriptions.add(sub);
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
}
} }
} }
} }
} }
return count;
} }
private void getSubsVarArg(Collection<Subscription> subscriptions, int length, int index, private void getSubsVarArg(Collection<Subscription> subscriptions, int length, int index,
@ -671,7 +666,7 @@ public class SubscriptionManager {
Class<?> classType = messageTypes[index]; Class<?> classType = messageTypes[index];
// get all the super types, if there are any. // get all the super types, if there are any.
ArrayList<Class<?>> superClasses = superClassCache(classType); ArrayList<Class<?>> superClasses = setupSuperClassCache(classType);
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf; IdentityObjectTree<Class<?>, Collection<Subscription>> leaf;
Collection<Subscription> subs; Collection<Subscription> subs;
@ -702,4 +697,12 @@ public class SubscriptionManager {
} }
} }
} }
public void readLock() {
this.LOCK.readLock().lock();
}
public void readUnLock() {
this.LOCK.readLock().unlock();
}
} }

View File

@ -1,17 +1,23 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert; import junit.framework.Assert;
import net.engio.mbassy.multi.common.AssertSupport; import net.engio.mbassy.multi.common.AssertSupport;
import net.engio.mbassy.multi.common.ConcurrentExecutor; import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.IConcurrentSet;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* This test ensures the correct behaviour of the set implementation that is the building * This test ensures the correct behaviour of the set implementation that is the building
* block of the subscription implementations used by the Mbassador message bus. * block of the subscription implementations used by the Mbassador message bus.
@ -31,13 +37,14 @@ public abstract class ConcurrentSetTest extends AssertSupport {
protected Set gcProtector = new HashSet(); protected Set gcProtector = new HashSet();
@Override
@Before @Before
public void beforeTest(){ public void beforeTest(){
super.beforeTest(); super.beforeTest();
gcProtector = new HashSet(); this.gcProtector = new HashSet();
} }
protected abstract IConcurrentSet createSet(); protected abstract Collection createSet();
@Test @Test
@ -45,12 +52,12 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final LinkedList<Object> duplicates = new LinkedList<Object>(); final LinkedList<Object> duplicates = new LinkedList<Object>();
final HashSet<Object> distinct = new HashSet<Object>(); final HashSet<Object> distinct = new HashSet<Object>();
final IConcurrentSet testSet = createSet(); final Collection testSet = createSet();
Random rand = new Random(); Random rand = new Random();
// getAll set of distinct objects and list of duplicates // getAll set of distinct objects and list of duplicates
Object candidate = new Object(); Object candidate = new Object();
for (int i = 0; i < numberOfElements; i++) { for (int i = 0; i < this.numberOfElements; i++) {
if (rand.nextInt() % 3 == 0) { if (rand.nextInt() % 3 == 0) {
candidate = new Object(); candidate = new Object();
} }
@ -66,7 +73,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.add(src); testSet.add(src);
} }
} }
}, numberOfThreads); }, this.numberOfThreads);
// check that the control set and the test set contain the exact same elements // check that the control set and the test set contain the exact same elements
assertEquals(distinct.size(), testSet.size()); assertEquals(distinct.size(), testSet.size());
@ -77,21 +84,22 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Test() @Test()
public void testIterationWithConcurrentRemoval() { public void testIterationWithConcurrentRemoval() {
final IConcurrentSet<AtomicInteger> testSet = createSet(); final Collection<AtomicInteger> testSet = createSet();
final Random rand = new Random(); final Random rand = new Random();
for (int i = 0; i < numberOfElements; i++) { for (int i = 0; i < this.numberOfElements; i++) {
AtomicInteger element = new AtomicInteger(); AtomicInteger element = new AtomicInteger();
testSet.add(element); testSet.add(element);
gcProtector.add(element); this.gcProtector.add(element);
} }
Runnable incrementer = new Runnable() { Runnable incrementer = new Runnable() {
@Override @Override
public void run() { public void run() {
while(testSet.size() > 100){ while(testSet.size() > 100){
for(AtomicInteger element : testSet) for(AtomicInteger element : testSet) {
element.incrementAndGet(); element.incrementAndGet();
}
} }
} }
@ -101,9 +109,11 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Override @Override
public void run() { public void run() {
while(testSet.size() > 100){ while(testSet.size() > 100){
for(AtomicInteger element : testSet) for(AtomicInteger element : testSet) {
if(rand.nextInt() % 3 == 0 && testSet.size() > 100) if(rand.nextInt() % 3 == 0 && testSet.size() > 100) {
testSet.remove(element); testSet.remove(element);
}
}
} }
} }
}; };
@ -128,9 +138,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final HashSet<Object> source = new HashSet<Object>(); final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>(); final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSet = createSet(); final Collection testSet = createSet();
// getAll set of distinct objects and mark a subset of those for removal // getAll set of distinct objects and mark a subset of those for removal
for (int i = 0; i < numberOfElements; i++) { for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object(); Object candidate = new Object();
source.add(candidate); source.add(candidate);
if (i % 3 == 0) { if (i % 3 == 0) {
@ -146,7 +156,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.add(src); testSet.add(src);
} }
} }
}, numberOfThreads); }, this.numberOfThreads);
// remove all candidates that have previously been marked for removal from the test set // remove all candidates that have previously been marked for removal from the test set
ConcurrentExecutor.runConcurrent(new Runnable() { ConcurrentExecutor.runConcurrent(new Runnable() {
@ -156,7 +166,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.remove(src); testSet.remove(src);
} }
} }
}, numberOfThreads); }, this.numberOfThreads);
// ensure that the test set does not contain any of the elements that have been removed from it // ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) { for (Object tar : testSet) {
@ -166,7 +176,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// for removal // for removal
assertEquals(source.size() - toRemove.size(), testSet.size()); assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) { for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); if (!toRemove.contains(src)) {
assertTrue(testSet.contains(src));
}
} }
} }
@ -175,9 +187,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final HashSet<Object> source = new HashSet<Object>(); final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>(); final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSet = createSet(); final Collection testSet = createSet();
// getAll set of candidates and mark subset for removal // getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) { for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object(); Object candidate = new Object();
source.add(candidate); source.add(candidate);
if (i % 3 == 0) { if (i % 3 == 0) {
@ -192,11 +204,12 @@ public abstract class ConcurrentSetTest extends AssertSupport {
public void run() { public void run() {
for (Object src : source) { for (Object src : source) {
testSet.add(src); testSet.add(src);
if (toRemove.contains(src)) if (toRemove.contains(src)) {
testSet.remove(src); testSet.remove(src);
}
} }
} }
}, numberOfThreads); }, this.numberOfThreads);
// ensure that the test set does not contain any of the elements that have been removed from it // ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) { for (Object tar : testSet) {
@ -206,17 +219,19 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// for removal // for removal
assertEquals(source.size() - toRemove.size(), testSet.size()); assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) { for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); if (!toRemove.contains(src)) {
assertTrue(testSet.contains(src));
}
} }
} }
@Test @Test
public void testCompleteRemoval() { public void testCompleteRemoval() {
final HashSet<Object> source = new HashSet<Object>(); final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet testSet = createSet(); final Collection testSet = createSet();
// getAll set of candidates and mark subset for removal // getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) { for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object(); Object candidate = new Object();
source.add(candidate); source.add(candidate);
testSet.add(candidate); testSet.add(candidate);
@ -231,7 +246,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.remove(src); testSet.remove(src);
} }
} }
}, numberOfThreads); }, this.numberOfThreads);
// ensure that the test set still contains all objects from the source set that have not been marked // ensure that the test set still contains all objects from the source set that have not been marked
@ -245,10 +260,10 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Test @Test
public void testRemovalViaIterator() { public void testRemovalViaIterator() {
final HashSet<Object> source = new HashSet<Object>(); final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet setUnderTest = createSet(); final Collection setUnderTest = createSet();
// getAll set of candidates and mark subset for removal // getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) { for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object(); Object candidate = new Object();
source.add(candidate); source.add(candidate);
setUnderTest.add(candidate); setUnderTest.add(candidate);
@ -264,7 +279,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
iterator.remove(); iterator.remove();
} }
} }
}, numberOfThreads); }, this.numberOfThreads);
// ensure that the test set still contains all objects from the source set that have not been marked // ensure that the test set still contains all objects from the source set that have not been marked
@ -288,7 +303,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
*/ */
@Test @Test
public void testConcurrentAddRemove() { public void testConcurrentAddRemove() {
final IConcurrentSet testSet = createSet(); final Collection testSet = createSet();
// a set of unique integers that will stay permanently in the test set // a set of unique integers that will stay permanently in the test set
final List permanentObjects = new ArrayList(); final List permanentObjects = new ArrayList();
// a set of objects that will be added and removed at random to the test set to force rehashing // a set of objects that will be added and removed at random to the test set to force rehashing
@ -306,6 +321,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// Adds and removes items // Adds and removes items
// thus forcing constant rehashing of the backing hashtable // thus forcing constant rehashing of the backing hashtable
Runnable rehasher = new Runnable() { Runnable rehasher = new Runnable() {
@Override
public void run() { public void run() {
Random rand = new Random(); Random rand = new Random();
for(int times = 0; times < 1000 ; times++){ for(int times = 0; times < 1000 ; times++){
@ -330,8 +346,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
for (Object permanent : permanentObjects) { for (Object permanent : permanentObjects) {
// permanent items are never touched, // permanent items are never touched,
// --> set.contains(j) should always return true // --> set.contains(j) should always return true
if(!testSet.contains(permanent)) if(!testSet.contains(permanent)) {
missing.add(permanent); missing.add(permanent);
}
} }
} }
} }
@ -344,19 +361,23 @@ public abstract class ConcurrentSetTest extends AssertSupport {
public Set createWithRandomIntegers(int size, List<Integer> excluding){ public Set createWithRandomIntegers(int size, List<Integer> excluding){
if(excluding == null) excluding = new ArrayList<Integer>(); if(excluding == null) {
excluding = new ArrayList<Integer>();
}
Set<Integer> result = new HashSet<Integer>(size); Set<Integer> result = new HashSet<Integer>(size);
Random rand = new Random(); Random rand = new Random();
while(result.size() < size){ while(result.size() < size){
result.add(rand.nextInt()); result.add(rand.nextInt());
} }
for(Integer excluded : excluding) for(Integer excluded : excluding) {
result.remove(excluded); result.remove(excluded);
}
return result; return result;
} }
protected void protectFromGarbageCollector(Set elements){ protected void protectFromGarbageCollector(Set elements){
for(Object element : elements) for(Object element : elements) {
gcProtector.add(element); this.gcProtector.add(element);
}
} }
} }

View File

@ -1,15 +1,13 @@
package net.engio.mbassy.multi; package net.engio.mbassy.multi;
import net.engio.mbassy.multi.common.ConcurrentExecutor; import java.util.Collection;
import net.engio.mbassy.multi.common.IConcurrentSet;
import net.engio.mbassy.multi.common.WeakConcurrentSet;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet; import java.util.HashSet;
import java.util.Random; import java.util.Random;
import java.util.Set;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.WeakConcurrentSet;
import org.junit.Test;
/** /**
* *
@ -24,7 +22,7 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{
@Override @Override
protected IConcurrentSet createSet() { protected Collection createSet() {
return new WeakConcurrentSet(); return new WeakConcurrentSet();
} }
@ -33,10 +31,10 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{
// Assemble // Assemble
final HashSet<Object> permanentElements = new HashSet<Object>(); final HashSet<Object> permanentElements = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet(); final Collection testSetWeak = createSet();
final Random rand = new Random(); final Random rand = new Random();
for (int i = 0; i < numberOfElements; i++) { for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object(); Object candidate = new Object();
if (rand.nextInt() % 3 == 0) { if (rand.nextInt() % 3 == 0) {
@ -58,13 +56,13 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{
System.currentTimeMillis(); System.currentTimeMillis();
} }
} }
}, numberOfThreads); }, this.numberOfThreads);
// the set should have cleaned up the garbage collected elements // the set should have cleaned up the garbage collected elements
// it must still contain all of the permanent objects // it must still contain all of the permanent objects
// since different GC mechanisms can be used (not necessarily full, stop-the-world) not all dead objects // since different GC mechanisms can be used (not necessarily full, stop-the-world) not all dead objects
// must have been collected // must have been collected
assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < numberOfElements); assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < this.numberOfElements);
for (Object test : testSetWeak) { for (Object test : testSetWeak) {
assertTrue(permanentElements.contains(test)); assertTrue(permanentElements.contains(test));
} }

View File

@ -39,7 +39,7 @@ public class SubscriptionValidator extends AssertSupport{
// for each tuple of subscriber and message type the specified number of listeners must exist // for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(SubscriptionManager manager) { public void validate(SubscriptionManager manager) {
for (Class<?> messageType : this.messageTypes) { for (Class<?> messageType : this.messageTypes) {
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageType); Collection<Subscription> subscriptions = manager.DEPRECATED_getSubscriptionsByMessageType(messageType);
Collection<ValidationEntry> validationEntries = getEntries(messageType); Collection<ValidationEntry> validationEntries = getEntries(messageType);
assertEquals(subscriptions.size(), validationEntries.size()); assertEquals(subscriptions.size(), validationEntries.size());
for(ValidationEntry validationValidationEntry : validationEntries){ for(ValidationEntry validationValidationEntry : validationEntries){