diff --git a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java index 79cf4a3..31226a1 100644 --- a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java +++ b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java @@ -1,5 +1,6 @@ package net.engio.mbassy.multi; +import java.lang.reflect.Array; import java.util.Collection; import java.util.concurrent.locks.LockSupport; @@ -18,11 +19,11 @@ public class DispatchRunnable implements Runnable { private ErrorHandlingSupport errorHandler; private TransferQueue dispatchQueue; - private TransferQueue invokeQueue; + private TransferQueue invokeQueue; private SubscriptionManager manager; public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager, - TransferQueue dispatchQueue, TransferQueue invokeQueue) { + TransferQueue dispatchQueue, TransferQueue invokeQueue) { this.errorHandler = errorHandler; this.manager = subscriptionManager; @@ -35,19 +36,22 @@ public class DispatchRunnable implements Runnable { final SubscriptionManager manager = this.manager; final ErrorHandlingSupport errorHandler = this.errorHandler; final TransferQueue IN_queue = this.dispatchQueue; - final TransferQueue OUT_queue = this.invokeQueue; + final TransferQueue OUT_queue = this.invokeQueue; + + final Runnable dummyRunnable = new Runnable() { + @Override + public void run() { + } + }; Object message = null; int counter; while (true) { try { - counter = MultiMBassador.WORK_RUN_BLITZ; + counter = MultiMBassador.WORKER_BLITZ; while ((message = IN_queue.poll()) == null) { - if (counter > MultiMBassador.WORK_RUN_BLITZ_DIV2) { - --counter; - Thread.yield(); - } else if (counter > 0) { + if (counter > 0) { --counter; LockSupport.parkNanos(1L); } else { @@ -56,23 +60,76 @@ public class DispatchRunnable implements Runnable { } } + @SuppressWarnings("null") Class messageClass = message.getClass(); + + manager.readLock(); + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - boolean empty = subscriptions.isEmpty(); - if (empty) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(message); - message = deadMessage; - empty = subscriptions.isEmpty(); + Collection deadSubscriptions = null; + if (empty) { + // Dead Event. must EXACTLY MATCH (no subclasses or varargs) + deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); } + Collection> superClasses = manager.getSuperClasses(messageClass); + Collection varArgs = manager.getVarArgs(messageClass); + + manager.readUnLock(); + if (!empty) { - Runnable e = new InvokeRunnable(errorHandler, subscriptions, message); - OUT_queue.transfer(e); + for (Subscription sub : subscriptions) { + sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); + } + +// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); + } else if (deadSubscriptions != null) { + if (!deadSubscriptions.isEmpty()) { + DeadMessage deadMessage = new DeadMessage(message); + + for (Subscription sub : deadSubscriptions) { + sub.publishToSubscriptionSingle(OUT_queue, errorHandler, deadMessage); + } + +// OUT_queue.put(new InvokeRunnable(errorHandler, deadSubscriptions, deadMessage)); + } } + + // now get superClasses + for (Class superClass : superClasses) { + subscriptions = manager.getSubscriptionsByMessageType(superClass); + + if (!subscriptions.isEmpty()) { + for (Subscription sub : subscriptions) { + sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); + } + +// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); + } + } + + // now get varargs + if (!varArgs.isEmpty()) { + // messy, but the ONLY way to do it. + Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1); + vararg[0] = message; + + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; + + for (Subscription sub : varArgs) { + sub.publishToSubscriptionSingle(OUT_queue, errorHandler, vararg); + } + +// OUT_queue.put(new InvokeRunnable(errorHandler, varArgs, vararg)); + } + + // make sure it's synced at this point +// OUT_queue.transfer(dummyRunnable); + } catch (InterruptedException e) { return; } catch (Throwable e) { diff --git a/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java b/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java index ff1b85c..261fa81 100644 --- a/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java +++ b/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java @@ -1,6 +1,5 @@ package net.engio.mbassy.multi; -import java.lang.reflect.Array; import java.util.Collection; import net.engio.mbassy.multi.error.ErrorHandlingSupport; @@ -27,28 +26,9 @@ public class InvokeRunnable implements Runnable { ErrorHandlingSupport errorHandler = this.errorHandler; Collection subs = this.subscriptions; Object message = this.message; - Object[] vararg = null; - for (Subscription sub : subs) { - boolean handled = false; - if (sub.isVarArg()) { - // messageClass will NEVER be an array to begin with, since that will call the multi-arg method - if (vararg == null) { - // messy, but the ONLY way to do it. - vararg = (Object[]) Array.newInstance(message.getClass(), 1); - vararg[0] = message; - - Object[] newInstance = new Object[1]; - newInstance[0] = vararg; - vararg = newInstance; - } - handled = true; - sub.publishToSubscription(errorHandler, vararg); - } - - if (!handled) { - sub.publishToSubscription(errorHandler, message); - } - } +// for (Subscription sub : subs) { +// sub.publishToSubscriptionSingle(errorHandler, message); +// } } } diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 84d9e38..66dd607 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -1,13 +1,12 @@ package net.engio.mbassy.multi; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; import net.engio.mbassy.multi.common.DisruptorThreadFactory; import net.engio.mbassy.multi.common.LinkedTransferQueue; -import net.engio.mbassy.multi.common.Pow2; import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.PublicationError; @@ -31,7 +30,7 @@ public class MultiMBassador implements IMessageBus { // private final Queue dispatchQueue; // private final BlockingQueue dispatchQueue; private final TransferQueue dispatchQueue; - private final TransferQueue invokeQueue; + private final TransferQueue invokeQueue; // all threads that are available for asynchronous message dispatching @@ -42,17 +41,16 @@ public class MultiMBassador implements IMessageBus { } - public static final int WORK_RUN_BLITZ = 50; - public static final int WORK_RUN_BLITZ_DIV2 = WORK_RUN_BLITZ/2; + public static final int WORKER_BLITZ = 10; public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { - numberOfThreads = 1; // at LEAST 1 threads + numberOfThreads = 1; // at LEAST 1 thread } // this.objectQueue = new LinkedTransferQueue(); this.dispatchQueue = new LinkedTransferQueue(); - this.invokeQueue = new LinkedTransferQueue(); + this.invokeQueue = new LinkedTransferQueue(); // this.invokeQueue = new BoundedTransferQueue(numberOfThreads); // this.dispatchQueue = new BoundedTransferQueue(numberOfThreads); // this.dispatchQueue = new MpmcArrayQueue(Pow2.roundToPowerOfTwo(numberOfThreads/2)); @@ -65,7 +63,8 @@ public class MultiMBassador implements IMessageBus { int dispatchSize = 2; - int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads*2); +// int invokeSize = Pow2.roundToPowerOfTwo(numberOfThreads); + int invokeSize = 0; this.threads = new ArrayList(dispatchSize + invokeSize); @@ -88,30 +87,91 @@ public class MultiMBassador implements IMessageBus { @Override public void run() { final MultiMBassador mbassador = MultiMBassador.this; - final TransferQueue IN_queue = mbassador.invokeQueue; + final TransferQueue IN_queue = mbassador.invokeQueue; try { - Runnable runnable = null; + SubRunnable runnable = null; int counter; while (true) { runnable = null; - counter = WORK_RUN_BLITZ; + counter = WORKER_BLITZ; - while ((runnable = IN_queue.poll()) == null) { - if (counter > WORK_RUN_BLITZ_DIV2) { - --counter; - Thread.yield(); - } else if (counter > 0) { - --counter; - LockSupport.parkNanos(1L); - } else { +// while ((runnable = IN_queue.poll()) == null) { +// if (counter > 0) { +// --counter; +// LockSupport.parkNanos(1L); +// } else { runnable = IN_queue.take(); - break; - } - } +// break; +// } +// } - runnable.run(); + + try { + + runnable.handler.invoke(runnable.listener, runnable.message); + +// this.invocation.invoke(listener, handler, message); + } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message)); + } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + message.getClass() +// + "Expected: " + handler.getParameterTypes()[0]) +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message)); + } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message)); + } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message)); + } + + + + + + + + + + + + + + + + + + + + + + + + +// runnable.run(); } } catch (InterruptedException e) { return; @@ -152,8 +212,7 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { -// return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; - return !(this.dispatchQueue.isEmpty() && this.invokeQueue.isEmpty()); + return !this.dispatchQueue.isEmpty() || !this.invokeQueue.isEmpty(); } @Override @@ -161,15 +220,6 @@ public class MultiMBassador implements IMessageBus { for (Thread t : this.threads) { t.interrupt(); } - -// System.err.println(this.counter); - -// for (InterruptRunnable runnable : this.invokeRunners) { -// runnable.stop(); -// } - -// this.dispatch_Disruptor.shutdown(); -// this.dispatch_Executor.shutdown(); } @@ -449,61 +499,18 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(Object message) { if (message != null) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.dispatch_RingBuffer; -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// DispatchHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ONE; -// eventJob.message1 = message; -// } catch (Throwable e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } + try { + this.dispatchQueue.transfer(message); + return; + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); -// MessageHolder messageHolder = new MessageHolder(); -// messageHolder.messageType = MessageType.ONE; -// messageHolder.message1 = message; - - -// new Runnable() { -// @Override -// public void run() { -// -// } -// }; - - // faster if we can skip locking -// int counter = 200; -// while (!this.dispatchQueue.offer(message)) { -// if (counter > 100) { -// --counter; -// Thread.yield(); -// } else if (counter > 0) { -// --counter; -// LockSupport.parkNanos(1L); -// } else { - try { - this.dispatchQueue.transfer(message); - return; - } catch (InterruptedException e) { - e.printStackTrace(); - // log.error(e); - - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } -// } -// } + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } } } diff --git a/src/main/java/net/engio/mbassy/multi/SubRunnable.java b/src/main/java/net/engio/mbassy/multi/SubRunnable.java new file mode 100644 index 0000000..4fcaf76 --- /dev/null +++ b/src/main/java/net/engio/mbassy/multi/SubRunnable.java @@ -0,0 +1,20 @@ +package net.engio.mbassy.multi; + +import java.lang.reflect.Method; + + +/** + * @author dorkbox, llc Date: 2/2/15 + */ +public class SubRunnable { + + public Method handler; + public Object listener; + public Object message; + + public SubRunnable(Method handler, Object listener, Object message) { + this.handler = handler; + this.listener = listener; + this.message = message; + } +} diff --git a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java index a21c471..21b9c1c 100644 --- a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java @@ -1,8 +1,11 @@ package net.engio.mbassy.multi.common; +import java.util.Collection; import java.util.Map; import java.util.concurrent.locks.Lock; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; /** @@ -14,7 +17,7 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; * @author bennidi * Date: 2/12/12 */ -public abstract class AbstractConcurrentSet implements IConcurrentSet { +public abstract class AbstractConcurrentSet implements Collection { // Internal state protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); @@ -28,21 +31,25 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { protected abstract Entry createEntry(T value, Entry next); @Override - public void add(T element) { + public boolean add(T element) { if (element == null) { - return; + return false; } Lock writeLock = this.lock.writeLock(); + boolean changed = false; writeLock.lock(); if (this.entries.containsKey(element)) { } else { insert(element); + changed = true; } writeLock.unlock(); + + return changed; } @Override - public boolean contains(T element) { + public boolean contains(Object element) { Lock readLock = this.lock.readLock(); ISetEntry entry; try { @@ -73,25 +80,28 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { } @Override - public void addAll(Iterable elements) { + public boolean addAll(Collection elements) { + boolean changed = false; Lock writeLock = this.lock.writeLock(); try { writeLock.lock(); for (T element : elements) { if (element != null) { insert(element); + changed = true; } } } finally { writeLock.unlock(); } + return changed; } /** * @return TRUE if the element was successfully removed */ @Override - public boolean remove(T element) { + public boolean remove(Object element) { Lock updateLock = this.lock.updateLock(); boolean isNull; @@ -100,15 +110,12 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { ISetEntry entry = this.entries.get(element); isNull = entry == null || entry.getValue() == null; - if (isNull) { + if (!isNull) { Lock writeLock = this.lock.writeLock(); try { writeLock.lock(); - ISetEntry listelement = this.entries.get(element); - if (listelement == null) { - return false; //removed by other thread in the meantime - } else if (listelement != this.head) { - listelement.remove(); + if (entry != this.head) { + entry.remove(); } else { // if it was second, now it's first this.head = this.head.next(); @@ -127,6 +134,36 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { } } + @Override + public Object[] toArray() { + throw new NotImplementedException(); + } + + @Override + public T[] toArray(T[] a) { + throw new NotImplementedException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new NotImplementedException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new NotImplementedException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new NotImplementedException(); + } + + @Override + public void clear() { + throw new NotImplementedException(); + } + public abstract static class Entry implements ISetEntry { diff --git a/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java deleted file mode 100644 index cbfeff5..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java +++ /dev/null @@ -1,25 +0,0 @@ -package net.engio.mbassy.multi.common; - -/** - * Todo: Add javadoc - * - * @author bennidi - * Date: 3/29/13 - */ -public interface IConcurrentSet extends Iterable { - - void add(T element); - - boolean contains(T element); - - int size(); - - boolean isEmpty(); - - void addAll(Iterable elements); - - /** - * @return TRUE if the element was removed - */ - boolean remove(T element); -} diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index 455a97f..db821fb 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -13,7 +13,11 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public StrongConcurrentSet() { - super(new IdentityHashMap>()); + this(16); + } + + public StrongConcurrentSet(int size) { + super(new IdentityHashMap>(size)); } @Override @@ -75,9 +79,5 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public T getValue() { return this.value; } - - - - } } \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java index 41deb3a..5f0f565 100644 --- a/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/WeakConcurrentSet.java @@ -116,9 +116,5 @@ public class WeakConcurrentSet extends AbstractConcurrentSet{ public T getValue() { return this.value.get(); } - - - - } } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index 604db7b..b334b10 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -2,10 +2,12 @@ package net.engio.mbassy.multi.subscription; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; -import net.engio.mbassy.multi.common.IConcurrentSet; +import net.engio.mbassy.multi.SubRunnable; import net.engio.mbassy.multi.common.StrongConcurrentSet; +import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.dispatch.IHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation; @@ -34,17 +36,16 @@ public class Subscription { private final MessageHandler handlerMetadata; private final IHandlerInvocation invocation; -// protected final Collection listeners; // protected final Map, Boolean> listeners; // protected final Map listeners; - protected final IConcurrentSet listeners; + protected final Collection listeners; Subscription(MessageHandler handler) { // this.listeners = new WeakConcurrentSet(); this.listeners = new StrongConcurrentSet(); -// this.listeners = new ConcurrentHashMap(); // this.listeners = new CopyOnWriteArrayList(); -// this.listeners = new ConcurrentSkipListSet(); +// this.listeners = new ConcurrentSkipListSet(); // requires listener object to be comparable +// this.listeners = new ConcurrentHashMap(); // this.listeners = new ConcurrentWeakHashMap, Boolean>(); this.handlerMetadata = handler; @@ -131,17 +132,30 @@ public class Subscription { return this.listeners.size(); } -// private AtomicLong counter = new AtomicLong(); - public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { + public void pin() { + System.err.println(this.counter.get()); + } + + private AtomicLong counter = new AtomicLong(); + public void publishToSubscriptionSingle(TransferQueue OUT_queue, ErrorHandlingSupport errorHandler, Object message) { // Collection listeners = this.listeners.keySet(); -// Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; + Collection listeners = this.listeners; if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); // int count = 0; + +// Iterator iterator = listeners.iterator(); for (Object listener : listeners) { // count++; +// this.counter.getAndIncrement(); +// try { +// OUT_queue.transfer(new SubRunnable(handler, listener, message)); +// } catch (InterruptedException e1) { +// return; +// } + + try { this.invocation.invoke(listener, handler, message); } catch (IllegalAccessException e) { @@ -186,158 +200,154 @@ public class Subscription { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { // Collection listeners = this.listeners.keySet(); // Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; - - if (listeners.size() > 0) { - Method handler = this.handlerMetadata.getHandler(); - - for (Object listener : listeners) { - try { - this.invocation.invoke(listener, handler, message1, message2); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + - message1.getClass() + ", " + - message2.getClass() - + ". Expected: " + handler.getParameterTypes()[0] + ", " + - handler.getParameterTypes()[1] - ) - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2)); - } - } - } +// +// if (listeners.size() > 0) { +// Method handler = this.handlerMetadata.getHandler(); +// +// for (Object listener : listeners) { +// try { +// this.invocation.invoke(listener, handler, message1, message2); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + +// message1.getClass() + ", " + +// message2.getClass() +// + ". Expected: " + handler.getParameterTypes()[0] + ", " + +// handler.getParameterTypes()[1] +// ) +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } +// } +// } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { // Collection listeners = this.listeners.keySet(); -// Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; - - if (listeners.size() > 0) { - Method handler = this.handlerMetadata.getHandler(); - - for (Object listener : listeners) { - try { - this.invocation.invoke(listener, handler, message1, message2, message3); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + - message1.getClass() + ", " + - message2.getClass() + ", " + - message3.getClass() - + ". Expected: " + handler.getParameterTypes()[0] + ", " + - handler.getParameterTypes()[1] + ", " + - handler.getParameterTypes()[2] - ) - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } - } - } +// +// if (this.listeners.size() > 0) { +// Method handler = this.handlerMetadata.getHandler(); +// +// for (Object listener : this.listeners) { +// try { +// this.invocation.invoke(listener, handler, message1, message2, message3); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + +// message1.getClass() + ", " + +// message2.getClass() + ", " + +// message3.getClass() +// + ". Expected: " + handler.getParameterTypes()[0] + ", " + +// handler.getParameterTypes()[1] + ", " + +// handler.getParameterTypes()[2] +// ) +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } +// } +// } } public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { // Collection listeners = this.listeners.keySet(); // Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; - - if (listeners.size() > 0) { - Method handler = this.handlerMetadata.getHandler(); - - for (Object listener : listeners) { - try { - this.invocation.invoke(listener, handler, messages); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) - + "Expected: " + Arrays.deepToString(handler.getParameterTypes())) - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getName()) - .setListener(listener) - .setPublishedObject(messages)); - } - } - } +// +// if (listeners.size() > 0) { +// Method handler = this.handlerMetadata.getHandler(); +// +// for (Object listener : listeners) { +// try { +// this.invocation.invoke(listener, handler, messages); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + Arrays.deepToString(messages) +// + "Expected: " + Arrays.deepToString(handler.getParameterTypes())) +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getName()) +// .setListener(listener) +// .setPublishedObject(messages)); +// } +// } +// } } @Override diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 45b1e77..f3b9b86 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -4,6 +4,7 @@ import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; @@ -13,6 +14,7 @@ import java.util.concurrent.ConcurrentHashMap; import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.ReflectionUtils; +import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MetadataReader; @@ -31,18 +33,13 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; */ public class SubscriptionManager { - public static class SubHolder { - public int count = 0; - public Collection subs = new ArrayDeque(0); - } - // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader = new MetadataReader(); // all subscriptions per message type // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, SubHolder> subscriptionsPerMessageSingle = new IdentityHashMap, SubHolder>(50); + private final Map, Collection> subscriptionsPerMessageSingle = new IdentityHashMap, Collection>(50); private final IdentityObjectTree, Collection> subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); // all subscriptions per messageHandler type @@ -94,17 +91,14 @@ public class SubscriptionManager { // single Class clazz = handledMessageTypes[0]; - // NOTE: Not thread-safe! must be synchronized in outer scope - SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); - if (subHolder != null) { - Collection subs = subHolder.subs; - if (subs != null) { - subs.remove(subscription); + // NOTE: Order is important for safe publication + Collection subs = this.subscriptionsPerMessageSingle.get(clazz); + if (subs != null) { + subs.remove(subscription); - if (subs.isEmpty()) { - // remove element - this.subscriptionsPerMessageSingle.remove(clazz); - } + if (subs.isEmpty()) { + // remove element + this.subscriptionsPerMessageSingle.remove(clazz); } } } else { @@ -188,7 +182,7 @@ public class SubscriptionManager { } // it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock - subscriptions = new ArrayDeque(messageHandlers.size()); + subscriptions = new StrongConcurrentSet(messageHandlers.size()); // create subscriptions for all detected message handlers for (MessageHandler messageHandler : messageHandlers) { @@ -203,15 +197,16 @@ public class SubscriptionManager { // single Class clazz = handledMessageTypes[0]; - // NOTE: Not thread-safe! must be synchronized in outer scope - SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); - if (subHolder == null) { - subHolder = new SubHolder(); - this.subscriptionsPerMessageSingle.put(clazz, subHolder); + // NOTE: Order is important for safe publication + Collection subs = this.subscriptionsPerMessageSingle.get(clazz); + if (subs == null) { + subs = new StrongConcurrentSet(2); +// subs = new CopyOnWriteArrayList(); + subs.add(subscription); + this.subscriptionsPerMessageSingle.put(clazz, subs); + } else { + subs.add(subscription); } - Collection subs = subHolder.subs; - subs.add(subscription); - subHolder.count++; // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive if (subscription.isVarArg()) { @@ -221,7 +216,7 @@ public class SubscriptionManager { // since it's vararg, this means that it's an ARRAY, so we ALSO // have to add the component classes of the array if (subscription.acceptsSubtypes()) { - ArrayList> setupSuperClassCache2 = superClassCache(componentType); + ArrayList> setupSuperClassCache2 = setupSuperClassCache(componentType); // have to setup each vararg chain for (int i = 0; i < setupSuperClassCache2.size(); i++) { Class superClass = setupSuperClassCache2.get(i); @@ -234,7 +229,7 @@ public class SubscriptionManager { } } } else if (subscription.acceptsSubtypes()) { - superClassCache(clazz); + setupSuperClassCache(clazz); } } else { @@ -245,17 +240,17 @@ public class SubscriptionManager { case 2: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); if (subscription.acceptsSubtypes()) { - superClassCache(handledMessageTypes[0]); - superClassCache(handledMessageTypes[1]); + setupSuperClassCache(handledMessageTypes[0]); + setupSuperClassCache(handledMessageTypes[1]); } break; } case 3: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); if (subscription.acceptsSubtypes()) { - superClassCache(handledMessageTypes[0]); - superClassCache(handledMessageTypes[1]); - superClassCache(handledMessageTypes[2]); + setupSuperClassCache(handledMessageTypes[0]); + setupSuperClassCache(handledMessageTypes[1]); + setupSuperClassCache(handledMessageTypes[2]); } break; } @@ -263,7 +258,7 @@ public class SubscriptionManager { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); if (subscription.acceptsSubtypes()) { for (Class c : handledMessageTypes) { - superClassCache(c); + setupSuperClassCache(c); } } break; @@ -291,10 +286,24 @@ public class SubscriptionManager { } } + private final Collection EMPTY_LIST = Collections.emptyList(); + + + // cannot return null, must be protected by read lock + public Collection getSubscriptionsByMessageType(Class messageType) { + Collection subs = this.subscriptionsPerMessageSingle.get(messageType); + if (subs != null) { + + return subs; + } else { + return this.EMPTY_LIST; + } + } + // obtain the set of subscriptions for the given message type // Note: never returns null! - public Collection getSubscriptionsByMessageType(Class messageType) { + public Collection DEPRECATED_getSubscriptionsByMessageType(Class messageType) { // thread safe publication Collection subscriptions; @@ -302,50 +311,35 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); int count = 0; - Collection subs; - SubHolder primaryHolder = this.subscriptionsPerMessageSingle.get(messageType); - if (primaryHolder != null) { + Collection subs = this.subscriptionsPerMessageSingle.get(messageType); + if (subs != null) { subscriptions = new ArrayDeque(count); - subs = primaryHolder.subs; - count = primaryHolder.count; - if (subs != null) { - subscriptions.addAll(subs); - } + subscriptions.addAll(subs); } else { subscriptions = new ArrayDeque(16); } // also add all subscriptions that match super types - SubHolder subHolder; - ArrayList> types1 = superClassCache(messageType); + ArrayList> types1 = setupSuperClassCache(messageType); if (types1 != null) { Class eventSuperType; int i; for (i = 0; i < types1.size(); i++) { eventSuperType = types1.get(i); - subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType)) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.handlesMessageType(messageType)) { + subscriptions.add(sub); } } } - count += addVarArgClass(subscriptions, eventSuperType); + addVarArgClass(subscriptions, eventSuperType); } } - count += addVarArgClass(subscriptions, messageType); + addVarArgClass(subscriptions, messageType); - if (primaryHolder != null) { - // save off our count, so our collection creation size is optimal. - primaryHolder.count = count; - } } finally { this.LOCK.readLock().unlock(); } @@ -364,8 +358,8 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); // also add all subscriptions that match super types - ArrayList> types1 = superClassCache(messageType1); - ArrayList> types2 = superClassCache(messageType2); + ArrayList> types1 = setupSuperClassCache(messageType1); + ArrayList> types2 = setupSuperClassCache(messageType2); Collection subs; Class eventSuperType1 = messageType1; @@ -427,9 +421,9 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); // also add all subscriptions that match super types - ArrayList> types1 = superClassCache(messageType1); - ArrayList> types2 = superClassCache(messageType2); - ArrayList> types3 = superClassCache(messageType3); + ArrayList> types1 = setupSuperClassCache(messageType1); + ArrayList> types2 = setupSuperClassCache(messageType2); + ArrayList> types3 = setupSuperClassCache(messageType3); Class eventSuperType1 = messageType1; IdentityObjectTree, Collection> leaf1; @@ -513,7 +507,6 @@ public class SubscriptionManager { } } - SubHolder subHolder; int size = messageTypes.length; if (size > 0) { boolean allSameType = true; @@ -537,7 +530,7 @@ public class SubscriptionManager { if (allSameType) { // do we have a var-arg (it shows as an array) subscribed? - ArrayList> superClasses = superClassCache(firstType); + ArrayList> superClasses = setupSuperClassCache(firstType); Class eventSuperType = firstType; int j; @@ -552,14 +545,10 @@ public class SubscriptionManager { eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } @@ -576,12 +565,30 @@ public class SubscriptionManager { return subscriptions; } - private ArrayList> superClassCache(Class clazz) { + + private final Collection> EMPTY_LIST_CLASSES = Collections.emptyList(); + // must be protected by read lock + public Collection> getSuperClasses(Class clazz) { + // not thread safe. DO NOT MODIFY ArrayList> types = this.superClassesCache.get(clazz); + + if (types != null) { + return types; + } + + return this.EMPTY_LIST_CLASSES; + } + + + // not a thread safe collection. must be locked by caller + private ArrayList> setupSuperClassCache(Class clazz) { + ArrayList> types = this.superClassesCache.get(clazz); + if (types == null) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); types = new ArrayList>(superTypes); + // NOTE: no need to write lock, since race conditions will result in duplicate answers this.superClassesCache.put(clazz, types); } @@ -593,54 +600,49 @@ public class SubscriptionManager { /////////////// // a var-arg handler might match /////////////// - private int addVarArgClass(Collection subscriptions, Class messageType) { + private void addVarArgClass(Collection subscriptions, Class messageType) { // tricky part. We have to check the ARRAY version - SubHolder subHolder; Collection subs; - int count = 0; Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } - return count; } - public Class getVarArg(Class clazz) { - return this.varArgClasses.get(clazz); + // must be protected by read lock + public Collection getVarArgs(Class clazz) { + Class varArgClass = this.varArgClasses.get(clazz); + if (varArgClass != null) { + Collection subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + return subs; + } + } + + return this.EMPTY_LIST; } /////////////// // a var-arg handler might match // tricky part. We have to check the ARRAY version /////////////// - private int addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { + private void addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { Collection subs; - SubHolder subHolder; - int count = 0; Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } @@ -649,21 +651,14 @@ public class SubscriptionManager { varArgClass = this.varArgClasses.get(eventSuperType); if (varArgClass != null) { // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } } - - return count; } private void getSubsVarArg(Collection subscriptions, int length, int index, @@ -671,7 +666,7 @@ public class SubscriptionManager { Class classType = messageTypes[index]; // get all the super types, if there are any. - ArrayList> superClasses = superClassCache(classType); + ArrayList> superClasses = setupSuperClassCache(classType); IdentityObjectTree, Collection> leaf; Collection subs; @@ -702,4 +697,12 @@ public class SubscriptionManager { } } } + + public void readLock() { + this.LOCK.readLock().lock(); + } + + public void readUnLock() { + this.LOCK.readLock().unlock(); + } } diff --git a/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java index 10963ea..3bb7ca3 100644 --- a/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java @@ -1,17 +1,23 @@ package net.engio.mbassy.multi; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; + import junit.framework.Assert; import net.engio.mbassy.multi.common.AssertSupport; import net.engio.mbassy.multi.common.ConcurrentExecutor; -import net.engio.mbassy.multi.common.IConcurrentSet; import org.junit.Before; import org.junit.Test; -import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicInteger; - /** * This test ensures the correct behaviour of the set implementation that is the building * block of the subscription implementations used by the Mbassador message bus. @@ -31,13 +37,14 @@ public abstract class ConcurrentSetTest extends AssertSupport { protected Set gcProtector = new HashSet(); + @Override @Before public void beforeTest(){ super.beforeTest(); - gcProtector = new HashSet(); + this.gcProtector = new HashSet(); } - - protected abstract IConcurrentSet createSet(); + + protected abstract Collection createSet(); @Test @@ -45,12 +52,12 @@ public abstract class ConcurrentSetTest extends AssertSupport { final LinkedList duplicates = new LinkedList(); final HashSet distinct = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); Random rand = new Random(); // getAll set of distinct objects and list of duplicates Object candidate = new Object(); - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { if (rand.nextInt() % 3 == 0) { candidate = new Object(); } @@ -66,7 +73,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.add(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // check that the control set and the test set contain the exact same elements assertEquals(distinct.size(), testSet.size()); @@ -77,21 +84,22 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Test() public void testIterationWithConcurrentRemoval() { - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); final Random rand = new Random(); - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { AtomicInteger element = new AtomicInteger(); testSet.add(element); - gcProtector.add(element); + this.gcProtector.add(element); } Runnable incrementer = new Runnable() { @Override public void run() { while(testSet.size() > 100){ - for(AtomicInteger element : testSet) + for(AtomicInteger element : testSet) { element.incrementAndGet(); + } } } @@ -101,9 +109,11 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Override public void run() { while(testSet.size() > 100){ - for(AtomicInteger element : testSet) - if(rand.nextInt() % 3 == 0 && testSet.size() > 100) + for(AtomicInteger element : testSet) { + if(rand.nextInt() % 3 == 0 && testSet.size() > 100) { testSet.remove(element); + } + } } } }; @@ -128,9 +138,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // getAll set of distinct objects and mark a subset of those for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); if (i % 3 == 0) { @@ -146,7 +156,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.add(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // remove all candidates that have previously been marked for removal from the test set ConcurrentExecutor.runConcurrent(new Runnable() { @@ -156,7 +166,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.remove(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it for (Object tar : testSet) { @@ -166,7 +176,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { // for removal assertEquals(source.size() - toRemove.size(), testSet.size()); for (Object src : source) { - if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); + if (!toRemove.contains(src)) { + assertTrue(testSet.contains(src)); + } } } @@ -175,9 +187,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); if (i % 3 == 0) { @@ -192,11 +204,12 @@ public abstract class ConcurrentSetTest extends AssertSupport { public void run() { for (Object src : source) { testSet.add(src); - if (toRemove.contains(src)) + if (toRemove.contains(src)) { testSet.remove(src); + } } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it for (Object tar : testSet) { @@ -206,17 +219,19 @@ public abstract class ConcurrentSetTest extends AssertSupport { // for removal assertEquals(source.size() - toRemove.size(), testSet.size()); for (Object src : source) { - if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); + if (!toRemove.contains(src)) { + assertTrue(testSet.contains(src)); + } } } @Test public void testCompleteRemoval() { final HashSet source = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); testSet.add(candidate); @@ -231,7 +246,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.remove(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set still contains all objects from the source set that have not been marked @@ -245,10 +260,10 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Test public void testRemovalViaIterator() { final HashSet source = new HashSet(); - final IConcurrentSet setUnderTest = createSet(); + final Collection setUnderTest = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); setUnderTest.add(candidate); @@ -264,7 +279,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { iterator.remove(); } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set still contains all objects from the source set that have not been marked @@ -288,7 +303,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { */ @Test public void testConcurrentAddRemove() { - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // a set of unique integers that will stay permanently in the test set final List permanentObjects = new ArrayList(); // a set of objects that will be added and removed at random to the test set to force rehashing @@ -306,6 +321,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { // Adds and removes items // thus forcing constant rehashing of the backing hashtable Runnable rehasher = new Runnable() { + @Override public void run() { Random rand = new Random(); for(int times = 0; times < 1000 ; times++){ @@ -330,8 +346,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { for (Object permanent : permanentObjects) { // permanent items are never touched, // --> set.contains(j) should always return true - if(!testSet.contains(permanent)) + if(!testSet.contains(permanent)) { missing.add(permanent); + } } } } @@ -344,19 +361,23 @@ public abstract class ConcurrentSetTest extends AssertSupport { public Set createWithRandomIntegers(int size, List excluding){ - if(excluding == null) excluding = new ArrayList(); + if(excluding == null) { + excluding = new ArrayList(); + } Set result = new HashSet(size); Random rand = new Random(); while(result.size() < size){ result.add(rand.nextInt()); } - for(Integer excluded : excluding) + for(Integer excluded : excluding) { result.remove(excluded); + } return result; } protected void protectFromGarbageCollector(Set elements){ - for(Object element : elements) - gcProtector.add(element); + for(Object element : elements) { + this.gcProtector.add(element); + } } } diff --git a/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java b/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java index 5a1f615..cbe2ca1 100644 --- a/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java @@ -1,15 +1,13 @@ package net.engio.mbassy.multi; -import net.engio.mbassy.multi.common.ConcurrentExecutor; -import net.engio.mbassy.multi.common.IConcurrentSet; -import net.engio.mbassy.multi.common.WeakConcurrentSet; - -import org.junit.Before; -import org.junit.Test; - +import java.util.Collection; import java.util.HashSet; import java.util.Random; -import java.util.Set; + +import net.engio.mbassy.multi.common.ConcurrentExecutor; +import net.engio.mbassy.multi.common.WeakConcurrentSet; + +import org.junit.Test; /** * @@ -24,7 +22,7 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ @Override - protected IConcurrentSet createSet() { + protected Collection createSet() { return new WeakConcurrentSet(); } @@ -33,10 +31,10 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ // Assemble final HashSet permanentElements = new HashSet(); - final IConcurrentSet testSetWeak = createSet(); + final Collection testSetWeak = createSet(); final Random rand = new Random(); - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); if (rand.nextInt() % 3 == 0) { @@ -58,13 +56,13 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ System.currentTimeMillis(); } } - }, numberOfThreads); + }, this.numberOfThreads); // the set should have cleaned up the garbage collected elements // it must still contain all of the permanent objects // since different GC mechanisms can be used (not necessarily full, stop-the-world) not all dead objects // must have been collected - assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < numberOfElements); + assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < this.numberOfElements); for (Object test : testSetWeak) { assertTrue(permanentElements.contains(test)); } diff --git a/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java b/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java index 6410a79..a55f4c0 100644 --- a/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java +++ b/src/test/java/net/engio/mbassy/multi/common/SubscriptionValidator.java @@ -39,7 +39,7 @@ public class SubscriptionValidator extends AssertSupport{ // for each tuple of subscriber and message type the specified number of listeners must exist public void validate(SubscriptionManager manager) { for (Class messageType : this.messageTypes) { - Collection subscriptions = manager.getSubscriptionsByMessageType(messageType); + Collection subscriptions = manager.DEPRECATED_getSubscriptionsByMessageType(messageType); Collection validationEntries = getEntries(messageType); assertEquals(subscriptions.size(), validationEntries.size()); for(ValidationEntry validationValidationEntry : validationEntries){