diff --git a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java index 31226a1..1fe2edd 100644 --- a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java +++ b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java @@ -19,16 +19,14 @@ public class DispatchRunnable implements Runnable { private ErrorHandlingSupport errorHandler; private TransferQueue dispatchQueue; - private TransferQueue invokeQueue; private SubscriptionManager manager; public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager, - TransferQueue dispatchQueue, TransferQueue invokeQueue) { + TransferQueue dispatchQueue) { this.errorHandler = errorHandler; this.manager = subscriptionManager; this.dispatchQueue = dispatchQueue; - this.invokeQueue = invokeQueue; } @Override @@ -36,22 +34,19 @@ public class DispatchRunnable implements Runnable { final SubscriptionManager manager = this.manager; final ErrorHandlingSupport errorHandler = this.errorHandler; final TransferQueue IN_queue = this.dispatchQueue; - final TransferQueue OUT_queue = this.invokeQueue; - - final Runnable dummyRunnable = new Runnable() { - @Override - public void run() { - } - }; Object message = null; int counter; while (true) { try { - counter = MultiMBassador.WORKER_BLITZ; + counter = MultiMBassador.WORK_RUN_BLITZ; while ((message = IN_queue.poll()) == null) { - if (counter > 0) { +// if (counter > 100) { +// --counter; +// Thread.yield(); +// } else + if (counter > 0) { --counter; LockSupport.parkNanos(1L); } else { @@ -60,76 +55,43 @@ public class DispatchRunnable implements Runnable { } } - @SuppressWarnings("null") Class messageClass = message.getClass(); - - manager.readLock(); - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + boolean empty = subscriptions.isEmpty(); - - Collection deadSubscriptions = null; if (empty) { - // Dead Event. must EXACTLY MATCH (no subclasses or varargs) - deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + // Dead Event + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + + DeadMessage deadMessage = new DeadMessage(message); + message = deadMessage; + empty = subscriptions.isEmpty(); } - Collection> superClasses = manager.getSuperClasses(messageClass); - Collection varArgs = manager.getVarArgs(messageClass); - - manager.readUnLock(); - if (!empty) { + Object[] vararg = null; for (Subscription sub : subscriptions) { - sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); - } + boolean handled = false; + if (sub.isVarArg()) { + // messageClass will NEVER be an array to begin with, since that will call the multi-arg method + if (vararg == null) { + // messy, but the ONLY way to do it. + vararg = (Object[]) Array.newInstance(message.getClass(), 1); + vararg[0] = message; -// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); - } else if (deadSubscriptions != null) { - if (!deadSubscriptions.isEmpty()) { - DeadMessage deadMessage = new DeadMessage(message); - - for (Subscription sub : deadSubscriptions) { - sub.publishToSubscriptionSingle(OUT_queue, errorHandler, deadMessage); + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; + } + handled = true; + sub.publishToSubscription(errorHandler, vararg); } -// OUT_queue.put(new InvokeRunnable(errorHandler, deadSubscriptions, deadMessage)); - } - } - - // now get superClasses - for (Class superClass : superClasses) { - subscriptions = manager.getSubscriptionsByMessageType(superClass); - - if (!subscriptions.isEmpty()) { - for (Subscription sub : subscriptions) { - sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); + if (!handled) { + sub.publishToSubscription(errorHandler, message); } - -// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); } } - - // now get varargs - if (!varArgs.isEmpty()) { - // messy, but the ONLY way to do it. - Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1); - vararg[0] = message; - - Object[] newInstance = new Object[1]; - newInstance[0] = vararg; - vararg = newInstance; - - for (Subscription sub : varArgs) { - sub.publishToSubscriptionSingle(OUT_queue, errorHandler, vararg); - } - -// OUT_queue.put(new InvokeRunnable(errorHandler, varArgs, vararg)); - } - - // make sure it's synced at this point -// OUT_queue.transfer(dummyRunnable); - } catch (InterruptedException e) { return; } catch (Throwable e) { diff --git a/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java b/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java index 261fa81..ff1b85c 100644 --- a/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java +++ b/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java @@ -1,5 +1,6 @@ package net.engio.mbassy.multi; +import java.lang.reflect.Array; import java.util.Collection; import net.engio.mbassy.multi.error.ErrorHandlingSupport; @@ -26,9 +27,28 @@ public class InvokeRunnable implements Runnable { ErrorHandlingSupport errorHandler = this.errorHandler; Collection subs = this.subscriptions; Object message = this.message; + Object[] vararg = null; -// for (Subscription sub : subs) { -// sub.publishToSubscriptionSingle(errorHandler, message); -// } + for (Subscription sub : subs) { + boolean handled = false; + if (sub.isVarArg()) { + // messageClass will NEVER be an array to begin with, since that will call the multi-arg method + if (vararg == null) { + // messy, but the ONLY way to do it. + vararg = (Object[]) Array.newInstance(message.getClass(), 1); + vararg[0] = message; + + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; + } + handled = true; + sub.publishToSubscription(errorHandler, vararg); + } + + if (!handled) { + sub.publishToSubscription(errorHandler, message); + } + } } } diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 66dd607..4dbd58c 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -1,6 +1,5 @@ package net.engio.mbassy.multi; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -27,10 +26,7 @@ public class MultiMBassador implements IMessageBus { private final SubscriptionManager subscriptionManager; -// private final Queue dispatchQueue; -// private final BlockingQueue dispatchQueue; private final TransferQueue dispatchQueue; - private final TransferQueue invokeQueue; // all threads that are available for asynchronous message dispatching @@ -41,148 +37,32 @@ public class MultiMBassador implements IMessageBus { } - public static final int WORKER_BLITZ = 10; + public static final int WORK_RUN_BLITZ = 50; + public static final int WORK_RUN_BLITZ_DIV2 = WORK_RUN_BLITZ/2; public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { - numberOfThreads = 1; // at LEAST 1 thread + numberOfThreads = 1; // at LEAST 1 threads } -// this.objectQueue = new LinkedTransferQueue(); - this.dispatchQueue = new LinkedTransferQueue(); - this.invokeQueue = new LinkedTransferQueue(); -// this.invokeQueue = new BoundedTransferQueue(numberOfThreads); -// this.dispatchQueue = new BoundedTransferQueue(numberOfThreads); -// this.dispatchQueue = new MpmcArrayQueue(Pow2.roundToPowerOfTwo(numberOfThreads/2)); -// this.dispatchQueue = new PTLQueue(Pow2.roundToPowerOfTwo(numberOfThreads/2)); -// this.dispatchQueue = new ArrayBlockingQueue(4); -// this.dispatchQueue = new SynchronousQueue(); -// this.dispatchQueue = new LinkedBlockingQueue(Pow2.roundToPowerOfTwo(numberOfThreads)); this.subscriptionManager = new SubscriptionManager(); + this.dispatchQueue = new LinkedTransferQueue<>(); - int dispatchSize = 2; -// int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads); - int invokeSize = 0; - this.threads = new ArrayList(dispatchSize + invokeSize); + int dispatchSize = 8; + this.threads = new ArrayList(); DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); for (int i = 0; i < dispatchSize; i++) { // each thread will run forever and process incoming message publication requests - Runnable runnable = new DispatchRunnable(this, this.subscriptionManager, this.dispatchQueue, this.invokeQueue); + Runnable runnable = new DispatchRunnable(this, this.subscriptionManager, this.dispatchQueue); Thread runner = dispatchThreadFactory.newThread(runnable); this.threads.add(runner); runner.start(); } -////////////////////////////////////////////////////// - - DisruptorThreadFactory invokeThreadFactory = new DisruptorThreadFactory("MB_Invoke"); - for (int i = 0; i < invokeSize; i++) { - // each thread will run forever and process incoming message publication requests - Runnable runnable = new Runnable() { - @SuppressWarnings("null") - @Override - public void run() { - final MultiMBassador mbassador = MultiMBassador.this; - final TransferQueue IN_queue = mbassador.invokeQueue; - - try { - SubRunnable runnable = null; - int counter; - - while (true) { - runnable = null; - counter = WORKER_BLITZ; - -// while ((runnable = IN_queue.poll()) == null) { -// if (counter > 0) { -// --counter; -// LockSupport.parkNanos(1L); -// } else { - runnable = IN_queue.take(); -// break; -// } -// } - - - try { - - runnable.handler.invoke(runnable.listener, runnable.message); - -// this.invocation.invoke(listener, handler, message); - } catch (IllegalAccessException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The class or method is not accessible") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message)); - } catch (IllegalArgumentException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Wrong arguments passed to method. Was: " + message.getClass() -// + "Expected: " + handler.getParameterTypes()[0]) -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message)); - } catch (InvocationTargetException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Message handler threw exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message)); - } catch (Throwable e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The handler code threw an exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message)); - } - - - - - - - - - - - - - - - - - - - - - - - - -// runnable.run(); - } - } catch (InterruptedException e) { - return; - } - } - }; - - Thread runner = invokeThreadFactory.newThread(runnable); - this.threads.add(runner); - runner.start(); - } } @Override @@ -212,7 +92,8 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { - return !this.dispatchQueue.isEmpty() || !this.invokeQueue.isEmpty(); +// return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; + return !this.dispatchQueue.isEmpty(); } @Override @@ -220,6 +101,15 @@ public class MultiMBassador implements IMessageBus { for (Thread t : this.threads) { t.interrupt(); } + +// System.err.println(this.counter); + +// for (InterruptRunnable runnable : this.invokeRunners) { +// runnable.stop(); +// } + +// this.dispatch_Disruptor.shutdown(); +// this.dispatch_Executor.shutdown(); } @@ -499,18 +389,61 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(Object message) { if (message != null) { - try { - this.dispatchQueue.transfer(message); - return; - } catch (InterruptedException e) { - e.printStackTrace(); - // log.error(e); +// // put this on the disruptor ring buffer +// final RingBuffer ringBuffer = this.dispatch_RingBuffer; +// +// // setup the job +// final long seq = ringBuffer.next(); +// try { +// DispatchHolder eventJob = ringBuffer.get(seq); +// eventJob.messageType = MessageType.ONE; +// eventJob.message1 = message; +// } catch (Throwable e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message)); +// } finally { +// // always publish the job +// ringBuffer.publish(seq); +// } - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } +// MessageHolder messageHolder = new MessageHolder(); +// messageHolder.messageType = MessageType.ONE; +// messageHolder.message1 = message; + + +// new Runnable() { +// @Override +// public void run() { +// +// } +// }; + + // faster if we can skip locking +// int counter = 200; +// while (!this.dispatchQueue.offer(message)) { +// if (counter > 100) { +// --counter; +// Thread.yield(); +// } else if (counter > 0) { +// --counter; +// LockSupport.parkNanos(1L); +// } else { + try { + this.dispatchQueue.transfer(message); + return; + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } +// } +// } } } diff --git a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java index 21b9c1c..a21c471 100644 --- a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java @@ -1,11 +1,8 @@ package net.engio.mbassy.multi.common; -import java.util.Collection; import java.util.Map; import java.util.concurrent.locks.Lock; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; /** @@ -17,7 +14,7 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; * @author bennidi * Date: 2/12/12 */ -public abstract class AbstractConcurrentSet implements Collection { +public abstract class AbstractConcurrentSet implements IConcurrentSet { // Internal state protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); @@ -31,25 +28,21 @@ public abstract class AbstractConcurrentSet implements Collection { protected abstract Entry createEntry(T value, Entry next); @Override - public boolean add(T element) { + public void add(T element) { if (element == null) { - return false; + return; } Lock writeLock = this.lock.writeLock(); - boolean changed = false; writeLock.lock(); if (this.entries.containsKey(element)) { } else { insert(element); - changed = true; } writeLock.unlock(); - - return changed; } @Override - public boolean contains(Object element) { + public boolean contains(T element) { Lock readLock = this.lock.readLock(); ISetEntry entry; try { @@ -80,28 +73,25 @@ public abstract class AbstractConcurrentSet implements Collection { } @Override - public boolean addAll(Collection elements) { - boolean changed = false; + public void addAll(Iterable elements) { Lock writeLock = this.lock.writeLock(); try { writeLock.lock(); for (T element : elements) { if (element != null) { insert(element); - changed = true; } } } finally { writeLock.unlock(); } - return changed; } /** * @return TRUE if the element was successfully removed */ @Override - public boolean remove(Object element) { + public boolean remove(T element) { Lock updateLock = this.lock.updateLock(); boolean isNull; @@ -110,12 +100,15 @@ public abstract class AbstractConcurrentSet implements Collection { ISetEntry entry = this.entries.get(element); isNull = entry == null || entry.getValue() == null; - if (!isNull) { + if (isNull) { Lock writeLock = this.lock.writeLock(); try { writeLock.lock(); - if (entry != this.head) { - entry.remove(); + ISetEntry listelement = this.entries.get(element); + if (listelement == null) { + return false; //removed by other thread in the meantime + } else if (listelement != this.head) { + listelement.remove(); } else { // if it was second, now it's first this.head = this.head.next(); @@ -134,36 +127,6 @@ public abstract class AbstractConcurrentSet implements Collection { } } - @Override - public Object[] toArray() { - throw new NotImplementedException(); - } - - @Override - public T[] toArray(T[] a) { - throw new NotImplementedException(); - } - - @Override - public boolean containsAll(Collection c) { - throw new NotImplementedException(); - } - - @Override - public boolean removeAll(Collection c) { - throw new NotImplementedException(); - } - - @Override - public boolean retainAll(Collection c) { - throw new NotImplementedException(); - } - - @Override - public void clear() { - throw new NotImplementedException(); - } - public abstract static class Entry implements ISetEntry { diff --git a/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java new file mode 100644 index 0000000..cbfeff5 --- /dev/null +++ b/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java @@ -0,0 +1,25 @@ +package net.engio.mbassy.multi.common; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public interface IConcurrentSet extends Iterable { + + void add(T element); + + boolean contains(T element); + + int size(); + + boolean isEmpty(); + + void addAll(Iterable elements); + + /** + * @return TRUE if the element was removed + */ + boolean remove(T element); +} diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index db821fb..455a97f 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -13,11 +13,7 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public StrongConcurrentSet() { - this(16); - } - - public StrongConcurrentSet(int size) { - super(new IdentityHashMap>(size)); + super(new IdentityHashMap>()); } @Override @@ -79,5 +75,9 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public T getValue() { return this.value; } + + + + } } \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java index 5f0f565..41deb3a 100644 --- a/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java @@ -116,5 +116,9 @@ public class WeakConcurrentSet extends AbstractConcurrentSet{ public T getValue() { return this.value.get(); } + + + + } } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index b334b10..604db7b 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -2,12 +2,10 @@ package net.engio.mbassy.multi.subscription; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Arrays; -import net.engio.mbassy.multi.SubRunnable; +import net.engio.mbassy.multi.common.IConcurrentSet; import net.engio.mbassy.multi.common.StrongConcurrentSet; -import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.dispatch.IHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation; @@ -36,16 +34,17 @@ public class Subscription { private final MessageHandler handlerMetadata; private final IHandlerInvocation invocation; +// protected final Collection listeners; // protected final Map, Boolean> listeners; // protected final Map listeners; - protected final Collection listeners; + protected final IConcurrentSet listeners; Subscription(MessageHandler handler) { // this.listeners = new WeakConcurrentSet(); this.listeners = new StrongConcurrentSet(); -// this.listeners = new CopyOnWriteArrayList(); -// this.listeners = new ConcurrentSkipListSet(); // requires listener object to be comparable // this.listeners = new ConcurrentHashMap(); +// this.listeners = new CopyOnWriteArrayList(); +// this.listeners = new ConcurrentSkipListSet(); // this.listeners = new ConcurrentWeakHashMap, Boolean>(); this.handlerMetadata = handler; @@ -132,30 +131,17 @@ public class Subscription { return this.listeners.size(); } - public void pin() { - System.err.println(this.counter.get()); - } - - private AtomicLong counter = new AtomicLong(); - public void publishToSubscriptionSingle(TransferQueue OUT_queue, ErrorHandlingSupport errorHandler, Object message) { +// private AtomicLong counter = new AtomicLong(); + public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { // Collection listeners = this.listeners.keySet(); - Collection listeners = this.listeners; +// Collection listeners = this.listeners; + IConcurrentSet listeners = this.listeners; if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); // int count = 0; - -// Iterator iterator = listeners.iterator(); for (Object listener : listeners) { // count++; -// this.counter.getAndIncrement(); -// try { -// OUT_queue.transfer(new SubRunnable(handler, listener, message)); -// } catch (InterruptedException e1) { -// return; -// } - - try { this.invocation.invoke(listener, handler, message); } catch (IllegalAccessException e) { @@ -200,154 +186,158 @@ public class Subscription { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { // Collection listeners = this.listeners.keySet(); // Collection listeners = this.listeners; -// -// if (listeners.size() > 0) { -// Method handler = this.handlerMetadata.getHandler(); -// -// for (Object listener : listeners) { -// try { -// this.invocation.invoke(listener, handler, message1, message2); -// } catch (IllegalAccessException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The class or method is not accessible") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2)); -// } catch (IllegalArgumentException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Wrong arguments passed to method. Was: " + -// message1.getClass() + ", " + -// message2.getClass() -// + ". Expected: " + handler.getParameterTypes()[0] + ", " + -// handler.getParameterTypes()[1] -// ) -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2)); -// } catch (InvocationTargetException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Message handler threw exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2)); -// } catch (Throwable e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The handler code threw an exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2)); -// } -// } -// } + IConcurrentSet listeners = this.listeners; + + if (listeners.size() > 0) { + Method handler = this.handlerMetadata.getHandler(); + + for (Object listener : listeners) { + try { + this.invocation.invoke(listener, handler, message1, message2); + } catch (IllegalAccessException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The class or method is not accessible") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2)); + } catch (IllegalArgumentException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Wrong arguments passed to method. Was: " + + message1.getClass() + ", " + + message2.getClass() + + ". Expected: " + handler.getParameterTypes()[0] + ", " + + handler.getParameterTypes()[1] + ) + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2)); + } catch (InvocationTargetException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Message handler threw exception") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2)); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The handler code threw an exception") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2)); + } + } + } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { // Collection listeners = this.listeners.keySet(); -// -// if (this.listeners.size() > 0) { -// Method handler = this.handlerMetadata.getHandler(); -// -// for (Object listener : this.listeners) { -// try { -// this.invocation.invoke(listener, handler, message1, message2, message3); -// } catch (IllegalAccessException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The class or method is not accessible") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); -// } catch (IllegalArgumentException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Wrong arguments passed to method. Was: " + -// message1.getClass() + ", " + -// message2.getClass() + ", " + -// message3.getClass() -// + ". Expected: " + handler.getParameterTypes()[0] + ", " + -// handler.getParameterTypes()[1] + ", " + -// handler.getParameterTypes()[2] -// ) -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); -// } catch (InvocationTargetException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Message handler threw exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); -// } catch (Throwable e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The handler code threw an exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); -// } -// } -// } +// Collection listeners = this.listeners; + IConcurrentSet listeners = this.listeners; + + if (listeners.size() > 0) { + Method handler = this.handlerMetadata.getHandler(); + + for (Object listener : listeners) { + try { + this.invocation.invoke(listener, handler, message1, message2, message3); + } catch (IllegalAccessException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The class or method is not accessible") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2, message3)); + } catch (IllegalArgumentException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Wrong arguments passed to method. Was: " + + message1.getClass() + ", " + + message2.getClass() + ", " + + message3.getClass() + + ". Expected: " + handler.getParameterTypes()[0] + ", " + + handler.getParameterTypes()[1] + ", " + + handler.getParameterTypes()[2] + ) + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2, message3)); + } catch (InvocationTargetException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Message handler threw exception") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2, message3)); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The handler code threw an exception") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(message1, message2, message3)); + } + } + } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { // Collection listeners = this.listeners.keySet(); // Collection listeners = this.listeners; -// -// if (listeners.size() > 0) { -// Method handler = this.handlerMetadata.getHandler(); -// -// for (Object listener : listeners) { -// try { -// this.invocation.invoke(listener, handler, messages); -// } catch (IllegalAccessException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The class or method is not accessible") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(messages)); -// } catch (IllegalArgumentException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) -// + "Expected: " + Arrays.deepToString(handler.getParameterTypes())) -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(messages)); -// } catch (InvocationTargetException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Message handler threw exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(messages)); -// } catch (Throwable e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The handler code threw an exception") -// .setCause(e) -// .setMethodName(handler.getName()) -// .setListener(listener) -// .setPublishedObject(messages)); -// } -// } -// } + IConcurrentSet listeners = this.listeners; + + if (listeners.size() > 0) { + Method handler = this.handlerMetadata.getHandler(); + + for (Object listener : listeners) { + try { + this.invocation.invoke(listener, handler, messages); + } catch (IllegalAccessException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The class or method is not accessible") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(messages)); + } catch (IllegalArgumentException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) + + "Expected: " + Arrays.deepToString(handler.getParameterTypes())) + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(messages)); + } catch (InvocationTargetException e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "Message handler threw exception") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(messages)); + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError() + .setMessage("Error during invocation of message handler. " + + "The handler code threw an exception") + .setCause(e) + .setMethodName(handler.getName()) + .setListener(listener) + .setPublishedObject(messages)); + } + } + } } @Override diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index f3b9b86..45b1e77 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -4,7 +4,6 @@ import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; @@ -14,7 +13,6 @@ import java.util.concurrent.ConcurrentHashMap; import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.ReflectionUtils; -import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MetadataReader; @@ -33,13 +31,18 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; */ public class SubscriptionManager { + public static class SubHolder { + public int count = 0; + public Collection subs = new ArrayDeque(0); + } + // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader = new MetadataReader(); // all subscriptions per message type // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, Collection> subscriptionsPerMessageSingle = new IdentityHashMap, Collection>(50); + private final Map, SubHolder> subscriptionsPerMessageSingle = new IdentityHashMap, SubHolder>(50); private final IdentityObjectTree, Collection> subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); // all subscriptions per messageHandler type @@ -91,14 +94,17 @@ public class SubscriptionManager { // single Class clazz = handledMessageTypes[0]; - // NOTE: Order is important for safe publication - Collection subs = this.subscriptionsPerMessageSingle.get(clazz); - if (subs != null) { - subs.remove(subscription); + // NOTE: Not thread-safe! must be synchronized in outer scope + SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); + if (subHolder != null) { + Collection subs = subHolder.subs; + if (subs != null) { + subs.remove(subscription); - if (subs.isEmpty()) { - // remove element - this.subscriptionsPerMessageSingle.remove(clazz); + if (subs.isEmpty()) { + // remove element + this.subscriptionsPerMessageSingle.remove(clazz); + } } } } else { @@ -182,7 +188,7 @@ public class SubscriptionManager { } // it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock - subscriptions = new StrongConcurrentSet(messageHandlers.size()); + subscriptions = new ArrayDeque(messageHandlers.size()); // create subscriptions for all detected message handlers for (MessageHandler messageHandler : messageHandlers) { @@ -197,16 +203,15 @@ public class SubscriptionManager { // single Class clazz = handledMessageTypes[0]; - // NOTE: Order is important for safe publication - Collection subs = this.subscriptionsPerMessageSingle.get(clazz); - if (subs == null) { - subs = new StrongConcurrentSet(2); -// subs = new CopyOnWriteArrayList(); - subs.add(subscription); - this.subscriptionsPerMessageSingle.put(clazz, subs); - } else { - subs.add(subscription); + // NOTE: Not thread-safe! must be synchronized in outer scope + SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); + if (subHolder == null) { + subHolder = new SubHolder(); + this.subscriptionsPerMessageSingle.put(clazz, subHolder); } + Collection subs = subHolder.subs; + subs.add(subscription); + subHolder.count++; // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive if (subscription.isVarArg()) { @@ -216,7 +221,7 @@ public class SubscriptionManager { // since it's vararg, this means that it's an ARRAY, so we ALSO // have to add the component classes of the array if (subscription.acceptsSubtypes()) { - ArrayList> setupSuperClassCache2 = setupSuperClassCache(componentType); + ArrayList> setupSuperClassCache2 = superClassCache(componentType); // have to setup each vararg chain for (int i = 0; i < setupSuperClassCache2.size(); i++) { Class superClass = setupSuperClassCache2.get(i); @@ -229,7 +234,7 @@ public class SubscriptionManager { } } } else if (subscription.acceptsSubtypes()) { - setupSuperClassCache(clazz); + superClassCache(clazz); } } else { @@ -240,17 +245,17 @@ public class SubscriptionManager { case 2: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); if (subscription.acceptsSubtypes()) { - setupSuperClassCache(handledMessageTypes[0]); - setupSuperClassCache(handledMessageTypes[1]); + superClassCache(handledMessageTypes[0]); + superClassCache(handledMessageTypes[1]); } break; } case 3: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); if (subscription.acceptsSubtypes()) { - setupSuperClassCache(handledMessageTypes[0]); - setupSuperClassCache(handledMessageTypes[1]); - setupSuperClassCache(handledMessageTypes[2]); + superClassCache(handledMessageTypes[0]); + superClassCache(handledMessageTypes[1]); + superClassCache(handledMessageTypes[2]); } break; } @@ -258,7 +263,7 @@ public class SubscriptionManager { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); if (subscription.acceptsSubtypes()) { for (Class c : handledMessageTypes) { - setupSuperClassCache(c); + superClassCache(c); } } break; @@ -286,24 +291,10 @@ public class SubscriptionManager { } } - private final Collection EMPTY_LIST = Collections.emptyList(); - - - // cannot return null, must be protected by read lock - public Collection getSubscriptionsByMessageType(Class messageType) { - Collection subs = this.subscriptionsPerMessageSingle.get(messageType); - if (subs != null) { - - return subs; - } else { - return this.EMPTY_LIST; - } - } - // obtain the set of subscriptions for the given message type // Note: never returns null! - public Collection DEPRECATED_getSubscriptionsByMessageType(Class messageType) { + public Collection getSubscriptionsByMessageType(Class messageType) { // thread safe publication Collection subscriptions; @@ -311,35 +302,50 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); int count = 0; - Collection subs = this.subscriptionsPerMessageSingle.get(messageType); - if (subs != null) { + Collection subs; + SubHolder primaryHolder = this.subscriptionsPerMessageSingle.get(messageType); + if (primaryHolder != null) { subscriptions = new ArrayDeque(count); - subscriptions.addAll(subs); + subs = primaryHolder.subs; + count = primaryHolder.count; + if (subs != null) { + subscriptions.addAll(subs); + } } else { subscriptions = new ArrayDeque(16); } // also add all subscriptions that match super types - ArrayList> types1 = setupSuperClassCache(messageType); + SubHolder subHolder; + ArrayList> types1 = superClassCache(messageType); if (types1 != null) { Class eventSuperType; int i; for (i = 0; i < types1.size(); i++) { eventSuperType = types1.get(i); - subs = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType)) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + + if (subs != null) { + for (Subscription sub : subs) { + if (sub.handlesMessageType(messageType)) { + subscriptions.add(sub); + } } } } - addVarArgClass(subscriptions, eventSuperType); + count += addVarArgClass(subscriptions, eventSuperType); } } - addVarArgClass(subscriptions, messageType); + count += addVarArgClass(subscriptions, messageType); + if (primaryHolder != null) { + // save off our count, so our collection creation size is optimal. + primaryHolder.count = count; + } } finally { this.LOCK.readLock().unlock(); } @@ -358,8 +364,8 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); // also add all subscriptions that match super types - ArrayList> types1 = setupSuperClassCache(messageType1); - ArrayList> types2 = setupSuperClassCache(messageType2); + ArrayList> types1 = superClassCache(messageType1); + ArrayList> types2 = superClassCache(messageType2); Collection subs; Class eventSuperType1 = messageType1; @@ -421,9 +427,9 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); // also add all subscriptions that match super types - ArrayList> types1 = setupSuperClassCache(messageType1); - ArrayList> types2 = setupSuperClassCache(messageType2); - ArrayList> types3 = setupSuperClassCache(messageType3); + ArrayList> types1 = superClassCache(messageType1); + ArrayList> types2 = superClassCache(messageType2); + ArrayList> types3 = superClassCache(messageType3); Class eventSuperType1 = messageType1; IdentityObjectTree, Collection> leaf1; @@ -507,6 +513,7 @@ public class SubscriptionManager { } } + SubHolder subHolder; int size = messageTypes.length; if (size > 0) { boolean allSameType = true; @@ -530,7 +537,7 @@ public class SubscriptionManager { if (allSameType) { // do we have a var-arg (it shows as an array) subscribed? - ArrayList> superClasses = setupSuperClassCache(firstType); + ArrayList> superClasses = superClassCache(firstType); Class eventSuperType = firstType; int j; @@ -545,10 +552,14 @@ public class SubscriptionManager { eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } @@ -565,30 +576,12 @@ public class SubscriptionManager { return subscriptions; } - - private final Collection> EMPTY_LIST_CLASSES = Collections.emptyList(); - // must be protected by read lock - public Collection> getSuperClasses(Class clazz) { - // not thread safe. DO NOT MODIFY + private ArrayList> superClassCache(Class clazz) { ArrayList> types = this.superClassesCache.get(clazz); - - if (types != null) { - return types; - } - - return this.EMPTY_LIST_CLASSES; - } - - - // not a thread safe collection. must be locked by caller - private ArrayList> setupSuperClassCache(Class clazz) { - ArrayList> types = this.superClassesCache.get(clazz); - if (types == null) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); types = new ArrayList>(superTypes); - // NOTE: no need to write lock, since race conditions will result in duplicate answers this.superClassesCache.put(clazz, types); } @@ -600,49 +593,54 @@ public class SubscriptionManager { /////////////// // a var-arg handler might match /////////////// - private void addVarArgClass(Collection subscriptions, Class messageType) { + private int addVarArgClass(Collection subscriptions, Class messageType) { // tricky part. We have to check the ARRAY version + SubHolder subHolder; Collection subs; + int count = 0; Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } + return count; } - // must be protected by read lock - public Collection getVarArgs(Class clazz) { - Class varArgClass = this.varArgClasses.get(clazz); - if (varArgClass != null) { - Collection subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - return subs; - } - } - - return this.EMPTY_LIST; + public Class getVarArg(Class clazz) { + return this.varArgClasses.get(clazz); } /////////////// // a var-arg handler might match // tricky part. We have to check the ARRAY version /////////////// - private void addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { + private int addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { Collection subs; + SubHolder subHolder; + int count = 0; Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } @@ -651,14 +649,21 @@ public class SubscriptionManager { varArgClass = this.varArgClasses.get(eventSuperType); if (varArgClass != null) { // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subHolder != null) { + subs = subHolder.subs; + count += subHolder.count; + + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); + } } } } } + + return count; } private void getSubsVarArg(Collection subscriptions, int length, int index, @@ -666,7 +671,7 @@ public class SubscriptionManager { Class classType = messageTypes[index]; // get all the super types, if there are any. - ArrayList> superClasses = setupSuperClassCache(classType); + ArrayList> superClasses = superClassCache(classType); IdentityObjectTree, Collection> leaf; Collection subs; @@ -697,12 +702,4 @@ public class SubscriptionManager { } } } - - public void readLock() { - this.LOCK.readLock().lock(); - } - - public void readUnLock() { - this.LOCK.readLock().unlock(); - } } diff --git a/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java index 3bb7ca3..10963ea 100644 --- a/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java @@ -1,23 +1,17 @@ package net.engio.mbassy.multi; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicInteger; - import junit.framework.Assert; import net.engio.mbassy.multi.common.AssertSupport; import net.engio.mbassy.multi.common.ConcurrentExecutor; +import net.engio.mbassy.multi.common.IConcurrentSet; import org.junit.Before; import org.junit.Test; +import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; + /** * This test ensures the correct behaviour of the set implementation that is the building * block of the subscription implementations used by the Mbassador message bus. @@ -37,14 +31,13 @@ public abstract class ConcurrentSetTest extends AssertSupport { protected Set gcProtector = new HashSet(); - @Override @Before public void beforeTest(){ super.beforeTest(); - this.gcProtector = new HashSet(); + gcProtector = new HashSet(); } - - protected abstract Collection createSet(); + + protected abstract IConcurrentSet createSet(); @Test @@ -52,12 +45,12 @@ public abstract class ConcurrentSetTest extends AssertSupport { final LinkedList duplicates = new LinkedList(); final HashSet distinct = new HashSet(); - final Collection testSet = createSet(); + final IConcurrentSet testSet = createSet(); Random rand = new Random(); // getAll set of distinct objects and list of duplicates Object candidate = new Object(); - for (int i = 0; i < this.numberOfElements; i++) { + for (int i = 0; i < numberOfElements; i++) { if (rand.nextInt() % 3 == 0) { candidate = new Object(); } @@ -73,7 +66,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.add(src); } } - }, this.numberOfThreads); + }, numberOfThreads); // check that the control set and the test set contain the exact same elements assertEquals(distinct.size(), testSet.size()); @@ -84,22 +77,21 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Test() public void testIterationWithConcurrentRemoval() { - final Collection testSet = createSet(); + final IConcurrentSet testSet = createSet(); final Random rand = new Random(); - for (int i = 0; i < this.numberOfElements; i++) { + for (int i = 0; i < numberOfElements; i++) { AtomicInteger element = new AtomicInteger(); testSet.add(element); - this.gcProtector.add(element); + gcProtector.add(element); } Runnable incrementer = new Runnable() { @Override public void run() { while(testSet.size() > 100){ - for(AtomicInteger element : testSet) { + for(AtomicInteger element : testSet) element.incrementAndGet(); - } } } @@ -109,11 +101,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Override public void run() { while(testSet.size() > 100){ - for(AtomicInteger element : testSet) { - if(rand.nextInt() % 3 == 0 && testSet.size() > 100) { + for(AtomicInteger element : testSet) + if(rand.nextInt() % 3 == 0 && testSet.size() > 100) testSet.remove(element); - } - } } } }; @@ -138,9 +128,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final Collection testSet = createSet(); + final IConcurrentSet testSet = createSet(); // getAll set of distinct objects and mark a subset of those for removal - for (int i = 0; i < this.numberOfElements; i++) { + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); if (i % 3 == 0) { @@ -156,7 +146,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.add(src); } } - }, this.numberOfThreads); + }, numberOfThreads); // remove all candidates that have previously been marked for removal from the test set ConcurrentExecutor.runConcurrent(new Runnable() { @@ -166,7 +156,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.remove(src); } } - }, this.numberOfThreads); + }, numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it for (Object tar : testSet) { @@ -176,9 +166,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { // for removal assertEquals(source.size() - toRemove.size(), testSet.size()); for (Object src : source) { - if (!toRemove.contains(src)) { - assertTrue(testSet.contains(src)); - } + if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); } } @@ -187,9 +175,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final Collection testSet = createSet(); + final IConcurrentSet testSet = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < this.numberOfElements; i++) { + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); if (i % 3 == 0) { @@ -204,12 +192,11 @@ public abstract class ConcurrentSetTest extends AssertSupport { public void run() { for (Object src : source) { testSet.add(src); - if (toRemove.contains(src)) { + if (toRemove.contains(src)) testSet.remove(src); - } } } - }, this.numberOfThreads); + }, numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it for (Object tar : testSet) { @@ -219,19 +206,17 @@ public abstract class ConcurrentSetTest extends AssertSupport { // for removal assertEquals(source.size() - toRemove.size(), testSet.size()); for (Object src : source) { - if (!toRemove.contains(src)) { - assertTrue(testSet.contains(src)); - } + if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); } } @Test public void testCompleteRemoval() { final HashSet source = new HashSet(); - final Collection testSet = createSet(); + final IConcurrentSet testSet = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < this.numberOfElements; i++) { + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); testSet.add(candidate); @@ -246,7 +231,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.remove(src); } } - }, this.numberOfThreads); + }, numberOfThreads); // ensure that the test set still contains all objects from the source set that have not been marked @@ -260,10 +245,10 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Test public void testRemovalViaIterator() { final HashSet source = new HashSet(); - final Collection setUnderTest = createSet(); + final IConcurrentSet setUnderTest = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < this.numberOfElements; i++) { + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); setUnderTest.add(candidate); @@ -279,7 +264,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { iterator.remove(); } } - }, this.numberOfThreads); + }, numberOfThreads); // ensure that the test set still contains all objects from the source set that have not been marked @@ -303,7 +288,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { */ @Test public void testConcurrentAddRemove() { - final Collection testSet = createSet(); + final IConcurrentSet testSet = createSet(); // a set of unique integers that will stay permanently in the test set final List permanentObjects = new ArrayList(); // a set of objects that will be added and removed at random to the test set to force rehashing @@ -321,7 +306,6 @@ public abstract class ConcurrentSetTest extends AssertSupport { // Adds and removes items // thus forcing constant rehashing of the backing hashtable Runnable rehasher = new Runnable() { - @Override public void run() { Random rand = new Random(); for(int times = 0; times < 1000 ; times++){ @@ -346,9 +330,8 @@ public abstract class ConcurrentSetTest extends AssertSupport { for (Object permanent : permanentObjects) { // permanent items are never touched, // --> set.contains(j) should always return true - if(!testSet.contains(permanent)) { + if(!testSet.contains(permanent)) missing.add(permanent); - } } } } @@ -361,23 +344,19 @@ public abstract class ConcurrentSetTest extends AssertSupport { public Set createWithRandomIntegers(int size, List excluding){ - if(excluding == null) { - excluding = new ArrayList(); - } + if(excluding == null) excluding = new ArrayList(); Set result = new HashSet(size); Random rand = new Random(); while(result.size() < size){ result.add(rand.nextInt()); } - for(Integer excluded : excluding) { + for(Integer excluded : excluding) result.remove(excluded); - } return result; } protected void protectFromGarbageCollector(Set elements){ - for(Object element : elements) { - this.gcProtector.add(element); - } + for(Object element : elements) + gcProtector.add(element); } } diff --git a/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java b/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java index cbe2ca1..5a1f615 100644 --- a/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java @@ -1,14 +1,16 @@ package net.engio.mbassy.multi; -import java.util.Collection; -import java.util.HashSet; -import java.util.Random; - import net.engio.mbassy.multi.common.ConcurrentExecutor; +import net.engio.mbassy.multi.common.IConcurrentSet; import net.engio.mbassy.multi.common.WeakConcurrentSet; +import org.junit.Before; import org.junit.Test; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + /** * * @@ -22,7 +24,7 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ @Override - protected Collection createSet() { + protected IConcurrentSet createSet() { return new WeakConcurrentSet(); } @@ -31,10 +33,10 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ // Assemble final HashSet permanentElements = new HashSet(); - final Collection testSetWeak = createSet(); + final IConcurrentSet testSetWeak = createSet(); final Random rand = new Random(); - for (int i = 0; i < this.numberOfElements; i++) { + for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); if (rand.nextInt() % 3 == 0) { @@ -56,13 +58,13 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ System.currentTimeMillis(); } } - }, this.numberOfThreads); + }, numberOfThreads); // the set should have cleaned up the garbage collected elements // it must still contain all of the permanent objects // since different GC mechanisms can be used (not necessarily full, stop-the-world) not all dead objects // must have been collected - assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < this.numberOfElements); + assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < numberOfElements); for (Object test : testSetWeak) { assertTrue(permanentElements.contains(test)); } diff --git a/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java b/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java index a55f4c0..6410a79 100644 --- a/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java +++ b/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java @@ -39,7 +39,7 @@ public class SubscriptionValidator extends AssertSupport{ // for each tuple of subscriber and message type the specified number of listeners must exist public void validate(SubscriptionManager manager) { for (Class messageType : this.messageTypes) { - Collection subscriptions = manager.DEPRECATED_getSubscriptionsByMessageType(messageType); + Collection subscriptions = manager.getSubscriptionsByMessageType(messageType); Collection validationEntries = getEntries(messageType); assertEquals(subscriptions.size(), validationEntries.size()); for(ValidationEntry validationValidationEntry : validationEntries){