diff --git a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java index 1fe2edd..59e365e 100644 --- a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java +++ b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java @@ -19,14 +19,16 @@ public class DispatchRunnable implements Runnable { private ErrorHandlingSupport errorHandler; private TransferQueue dispatchQueue; + private TransferQueue invokeQueue; private SubscriptionManager manager; public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager, - TransferQueue dispatchQueue) { + TransferQueue dispatchQueue, TransferQueue invokeQueue) { this.errorHandler = errorHandler; this.manager = subscriptionManager; this.dispatchQueue = dispatchQueue; + this.invokeQueue = invokeQueue; } @Override @@ -34,19 +36,22 @@ public class DispatchRunnable implements Runnable { final SubscriptionManager manager = this.manager; final ErrorHandlingSupport errorHandler = this.errorHandler; final TransferQueue IN_queue = this.dispatchQueue; + final TransferQueue OUT_queue = this.invokeQueue; + + final Runnable dummyRunnable = new Runnable() { + @Override + public void run() { + } + }; Object message = null; int counter; while (true) { try { - counter = MultiMBassador.WORK_RUN_BLITZ; + counter = 200; while ((message = IN_queue.poll()) == null) { -// if (counter > 100) { -// --counter; -// Thread.yield(); -// } else - if (counter > 0) { + if (counter > 0) { --counter; LockSupport.parkNanos(1L); } else { @@ -55,43 +60,76 @@ public class DispatchRunnable implements Runnable { } } + @SuppressWarnings("null") Class messageClass = message.getClass(); + + manager.readLock(); + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - boolean empty = subscriptions.isEmpty(); - if (empty) { - // Dead Event - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - DeadMessage deadMessage = new DeadMessage(message); - message = deadMessage; - empty = subscriptions.isEmpty(); + Collection deadSubscriptions = null; + if (empty) { + // Dead Event. must EXACTLY MATCH (no subclasses or varargs) + deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); } + Collection> superClasses = manager.getSuperClasses(messageClass); + Collection varArgs = manager.getVarArgs(messageClass); + + manager.readUnLock(); + if (!empty) { - Object[] vararg = null; for (Subscription sub : subscriptions) { - boolean handled = false; - if (sub.isVarArg()) { - // messageClass will NEVER be an array to begin with, since that will call the multi-arg method - if (vararg == null) { - // messy, but the ONLY way to do it. - vararg = (Object[]) Array.newInstance(message.getClass(), 1); - vararg[0] = message; + sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); + } - Object[] newInstance = new Object[1]; - newInstance[0] = vararg; - vararg = newInstance; - } - handled = true; - sub.publishToSubscription(errorHandler, vararg); +// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); + } else if (deadSubscriptions != null) { + if (!deadSubscriptions.isEmpty()) { + DeadMessage deadMessage = new DeadMessage(message); + + for (Subscription sub : deadSubscriptions) { + sub.publishToSubscriptionSingle(OUT_queue, errorHandler, deadMessage); } - if (!handled) { - sub.publishToSubscription(errorHandler, message); - } +// OUT_queue.put(new InvokeRunnable(errorHandler, deadSubscriptions, deadMessage)); } } + + // now get superClasses + for (Class superClass : superClasses) { + subscriptions = manager.getSubscriptionsByMessageType(superClass); + + if (!subscriptions.isEmpty()) { + for (Subscription sub : subscriptions) { + sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); + } + +// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); + } + } + + // now get varargs + if (!varArgs.isEmpty()) { + // messy, but the ONLY way to do it. + Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1); + vararg[0] = message; + + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; + + for (Subscription sub : varArgs) { + sub.publishToSubscription(OUT_queue, errorHandler, vararg); + } + +// OUT_queue.put(new InvokeRunnable(errorHandler, varArgs, vararg)); + } + + // make sure it's synced at this point +// OUT_queue.transfer(dummyRunnable); + } catch (InterruptedException e) { return; } catch (Throwable e) { diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 4dbd58c..e382750 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -1,14 +1,19 @@ package net.engio.mbassy.multi; +import java.lang.reflect.Array; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import net.engio.mbassy.multi.common.DeadMessage; import net.engio.mbassy.multi.common.DisruptorThreadFactory; import net.engio.mbassy.multi.common.LinkedTransferQueue; import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.PublicationError; +import net.engio.mbassy.multi.subscription.Subscription; import net.engio.mbassy.multi.subscription.SubscriptionManager; /** @@ -26,7 +31,7 @@ public class MultiMBassador implements IMessageBus { private final SubscriptionManager subscriptionManager; - private final TransferQueue dispatchQueue; + private final TransferQueue dispatchQueue; // all threads that are available for asynchronous message dispatching @@ -37,9 +42,6 @@ public class MultiMBassador implements IMessageBus { } - public static final int WORK_RUN_BLITZ = 50; - public static final int WORK_RUN_BLITZ_DIV2 = WORK_RUN_BLITZ/2; - public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { numberOfThreads = 1; // at LEAST 1 threads @@ -47,17 +49,43 @@ public class MultiMBassador implements IMessageBus { this.subscriptionManager = new SubscriptionManager(); - this.dispatchQueue = new LinkedTransferQueue<>(); + this.dispatchQueue = new LinkedTransferQueue(); - int dispatchSize = 8; + int dispatchSize = numberOfThreads; this.threads = new ArrayList(); DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); for (int i = 0; i < dispatchSize; i++) { // each thread will run forever and process incoming message publication requests - Runnable runnable = new DispatchRunnable(this, this.subscriptionManager, this.dispatchQueue); + Runnable runnable = new Runnable() { + @Override + public void run() { + TransferQueue IN_QUEUE= MultiMBassador.this.dispatchQueue; + Runnable event = null; + int counter; + + while (true) { + try { + counter = 200; + while ((event = IN_QUEUE.poll()) == null) { + if (counter > 0) { + --counter; + LockSupport.parkNanos(1L); + } else { + event = IN_QUEUE.take(); + break; + } + } + + event.run(); + } catch (InterruptedException e) { + return; + } + } + } + }; Thread runner = dispatchThreadFactory.newThread(runnable); this.threads.add(runner); @@ -92,7 +120,6 @@ public class MultiMBassador implements IMessageBus { @Override public boolean hasPendingMessages() { -// return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize; return !this.dispatchQueue.isEmpty(); } @@ -101,136 +128,144 @@ public class MultiMBassador implements IMessageBus { for (Thread t : this.threads) { t.interrupt(); } - -// System.err.println(this.counter); - -// for (InterruptRunnable runnable : this.invokeRunners) { -// runnable.stop(); -// } - -// this.dispatch_Disruptor.shutdown(); -// this.dispatch_Executor.shutdown(); } @Override public void publish(Object message) { -// Class messageClass = message.getClass(); -// -// SubscriptionManager manager = this.subscriptionManager; -// Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); -// -// try { -// if (subscriptions.isEmpty()) { -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// -// DeadMessage deadMessage = new DeadMessage(message); + Class messageClass = message.getClass(); + + SubscriptionManager manager = this.subscriptionManager; +// Collection subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass); + + manager.readLock(); + + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + boolean empty = subscriptions.isEmpty(); + + Collection deadSubscriptions = null; + if (empty) { + // Dead Event. must EXACTLY MATCH (no subclasses or varargs) + deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + } + Collection> superClasses = manager.getSuperClasses(messageClass); + Collection varArgs = manager.getVarArgs(messageClass); + + manager.readUnLock(); + + if (!empty) { + for (Subscription sub : subscriptions) { + // this catches all exception types + sub.publishToSubscription(this, message); + } + } else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { + DeadMessage deadMessage = new DeadMessage(message); + + for (Subscription sub : deadSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, deadMessage); + } + } + + + + + + + + + + +// if (subscriptions.isEmpty()) { +// // Dead Event. only matches EXACT handlers (no vararg, no subclasses) +// subscriptions = this.subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class); // +// DeadMessage deadMessage = new DeadMessage(message); +// if (!subscriptions.isEmpty()) { // for (Subscription sub : subscriptions) { +// // this catches all exception types // sub.publishToSubscription(this, deadMessage); // } -// } else { -// Object[] vararg = null; -// -// for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (sub.isVarArg()) { -// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(messageClass, 1); -// vararg[0] = message; -// -// Object[] newInstance = new Object[1]; -// newInstance[0] = vararg; -// vararg = newInstance; -// } -// -// handled = true; -// sub.publishToSubscription(this, vararg); -// } -// -// if (!handled) { -// sub.publishToSubscription(this, message); -// } -// } // } -// } catch (Throwable e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error during publication of message") -// .setCause(e) -// .setPublishedObject(message)); +// } +// else { +//// Object[] vararg = null; +// for (Subscription sub : subscriptions) { +// // this catches all exception types +// sub.publishToSubscription(this, message); +// +//// if (sub.isVarArg()) { +//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method +//// if (vararg == null) { +//// // messy, but the ONLY way to do it. +//// vararg = (Object[]) Array.newInstance(message.getClass(), 1); +//// vararg[0] = message; +//// +//// Object[] newInstance = new Object[1]; +//// newInstance[0] = vararg; +//// vararg = newInstance; +//// } +//// +//// // this catches all exception types +//// sub.publishToSubscription(this, vararg); +//// continue; +//// } +// } // } } @Override public void publish(Object message1, Object message2) { -// try { -// Class messageClass1 = message1.getClass(); -// Class messageClass2 = message2.getClass(); -// -// SubscriptionManager manager = this.subscriptionManager; -// Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); -// -// if (subscriptions == null || subscriptions.isEmpty()) { -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// -// DeadMessage deadMessage = new DeadMessage(message1, message2); -// -// for (Subscription sub : subscriptions) { -// sub.publishToSubscription(this, deadMessage); -// } -// } else { -// Object[] vararg = null; -// -// for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (sub.isVarArg()) { -// Class class1 = message1.getClass(); -// Class class2 = message2.getClass(); -// if (!class1.isArray() && class1 == class2) { -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(class1, 2); -// vararg[0] = message1; -// vararg[1] = message2; -// -// Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1); -// newInstance[0] = vararg; -// vararg = newInstance; -// } -// -// handled = true; -// sub.publishToSubscription(this, vararg); -// } -// } -// -// if (!handled) { -// sub.publishToSubscription(this, message1, message2); -// } -// } -// -// // if the message did not have any listener/handler accept it -// if (subscriptions.isEmpty()) { -// // cannot have DeadMessage published to this, so no extra check necessary -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// DeadMessage deadMessage = new DeadMessage(message1, message2); -// -// for (Subscription sub : subscriptions) { -// sub.publishToSubscription(this, deadMessage); -// } -// } -// } -// } catch (Throwable e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error during publication of message") -// .setCause(e) -// .setPublishedObject(message1, message2)); -// } + try { + Class messageClass1 = message1.getClass(); + Class messageClass2 = message2.getClass(); + + SubscriptionManager manager = this.subscriptionManager; + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); + + if (subscriptions == null || subscriptions.isEmpty()) { + subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + + DeadMessage deadMessage = new DeadMessage(message1, message2); + + for (Subscription sub : subscriptions) { + sub.publishToSubscription(this, deadMessage); + } + } + else { + Object[] vararg = null; + + for (Subscription sub : subscriptions) { + if (sub.isVarArg()) { + Class class1 = message1.getClass(); + Class class2 = message2.getClass(); + if (!class1.isArray() && class1 == class2) { + if (vararg == null) { + // messy, but the ONLY way to do it. + vararg = (Object[]) Array.newInstance(class1, 2); + vararg[0] = message1; + vararg[1] = message2; + + Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1); + newInstance[0] = vararg; + vararg = newInstance; + } + + sub.publishToSubscription(this, vararg); + continue; + } + } + + sub.publishToSubscription(this, message1, message2); + } + } + } catch (Throwable e) { + handlePublicationError(new PublicationError() + .setMessage("Error during publication of message") + .setCause(e) + .setPublishedObject(message1, message2)); + } } @Override @@ -387,51 +422,30 @@ public class MultiMBassador implements IMessageBus { } @Override - public void publishAsync(Object message) { + public void publishAsync(final Object message) { if (message != null) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.dispatch_RingBuffer; -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// DispatchHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ONE; -// eventJob.message1 = message; -// } catch (Throwable e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } - -// MessageHolder messageHolder = new MessageHolder(); -// messageHolder.messageType = MessageType.ONE; -// messageHolder.message1 = message; + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(message); + } + }; -// new Runnable() { -// @Override -// public void run() { -// -// } -// }; // faster if we can skip locking // int counter = 200; // while (!this.dispatchQueue.offer(message)) { -// if (counter > 100) { -// --counter; -// Thread.yield(); -// } else if (counter > 0) { +//// if (counter > 100) { +//// --counter; +//// Thread.yield(); +//// } else +// if (counter > 0) { // --counter; // LockSupport.parkNanos(1L); // } else { try { - this.dispatchQueue.transfer(message); + this.dispatchQueue.transfer(runnable); return; } catch (InterruptedException e) { e.printStackTrace(); @@ -442,9 +456,9 @@ public class MultiMBassador implements IMessageBus { .setCause(e) .setPublishedObject(message)); } -// } + } // } - } +// } } @Override diff --git a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java index a21c471..9e9d023 100644 --- a/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/AbstractConcurrentSet.java @@ -1,8 +1,11 @@ package net.engio.mbassy.multi.common; +import java.util.Collection; import java.util.Map; import java.util.concurrent.locks.Lock; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; /** @@ -14,7 +17,7 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; * @author bennidi * Date: 2/12/12 */ -public abstract class AbstractConcurrentSet implements IConcurrentSet { +public abstract class AbstractConcurrentSet implements Collection { // Internal state protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); @@ -28,21 +31,25 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { protected abstract Entry createEntry(T value, Entry next); @Override - public void add(T element) { + public boolean add(T element) { if (element == null) { - return; + return false; } + boolean changed = false; + Lock writeLock = this.lock.writeLock(); writeLock.lock(); - if (this.entries.containsKey(element)) { - } else { + if (!this.entries.containsKey(element)) { insert(element); + changed = true; } writeLock.unlock(); + + return changed; } @Override - public boolean contains(T element) { + public boolean contains(Object element) { Lock readLock = this.lock.readLock(); ISetEntry entry; try { @@ -73,42 +80,40 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { } @Override - public void addAll(Iterable elements) { + public boolean addAll(Collection elements) { + boolean changed = false; Lock writeLock = this.lock.writeLock(); try { writeLock.lock(); for (T element : elements) { if (element != null) { insert(element); + changed = true; } } } finally { writeLock.unlock(); } + return changed; } /** * @return TRUE if the element was successfully removed */ @Override - public boolean remove(T element) { + public boolean remove(Object element) { Lock updateLock = this.lock.updateLock(); - boolean isNull; try { updateLock.lock(); ISetEntry entry = this.entries.get(element); - isNull = entry == null || entry.getValue() == null; - if (isNull) { + if (entry != null && entry.getValue() != null) { Lock writeLock = this.lock.writeLock(); try { writeLock.lock(); - ISetEntry listelement = this.entries.get(element); - if (listelement == null) { - return false; //removed by other thread in the meantime - } else if (listelement != this.head) { - listelement.remove(); + if (entry != this.head) { + entry.remove(); } else { // if it was second, now it's first this.head = this.head.next(); @@ -127,6 +132,37 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { } } + @Override + public Object[] toArray() { + throw new NotImplementedException(); + } + + @SuppressWarnings("hiding") + @Override + public T[] toArray(T[] a) { + throw new NotImplementedException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new NotImplementedException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new NotImplementedException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new NotImplementedException(); + } + + @Override + public void clear() { + throw new NotImplementedException(); + } + public abstract static class Entry implements ISetEntry { diff --git a/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java index cbfeff5..b60269e 100644 --- a/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java @@ -1,19 +1,24 @@ package net.engio.mbassy.multi.common; +import java.util.Collection; + /** * Todo: Add javadoc * * @author bennidi * Date: 3/29/13 */ -public interface IConcurrentSet extends Iterable { +public interface IConcurrentSet extends Collection { - void add(T element); + @Override + boolean add(T element); - boolean contains(T element); + boolean contains(Object element); + @Override int size(); + @Override boolean isEmpty(); void addAll(Iterable elements); @@ -21,5 +26,5 @@ public interface IConcurrentSet extends Iterable { /** * @return TRUE if the element was removed */ - boolean remove(T element); + boolean remove(Object element); } diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index 455a97f..26c2a46 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -13,7 +13,11 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public StrongConcurrentSet() { - super(new IdentityHashMap>()); + this(16); + } + + public StrongConcurrentSet(int size) { + super(new IdentityHashMap>(size)); } @Override diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index 604db7b..77bb28a 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -3,8 +3,8 @@ package net.engio.mbassy.multi.subscription; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.Collection; -import net.engio.mbassy.multi.common.IConcurrentSet; import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.dispatch.IHandlerInvocation; import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation; @@ -34,22 +34,15 @@ public class Subscription { private final MessageHandler handlerMetadata; private final IHandlerInvocation invocation; -// protected final Collection listeners; -// protected final Map, Boolean> listeners; -// protected final Map listeners; - protected final IConcurrentSet listeners; + private final Collection listeners; Subscription(MessageHandler handler) { // this.listeners = new WeakConcurrentSet(); this.listeners = new StrongConcurrentSet(); -// this.listeners = new ConcurrentHashMap(); -// this.listeners = new CopyOnWriteArrayList(); -// this.listeners = new ConcurrentSkipListSet(); -// this.listeners = new ConcurrentWeakHashMap, Boolean>(); this.handlerMetadata = handler; IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); - if (handler.isSynchronized()){ + if (handler.isSynchronized()) { invocation = new SynchronizedHandlerInvocation(invocation); } @@ -114,11 +107,6 @@ public class Subscription { * @return TRUE if the element was removed */ public boolean unsubscribe(Object existingListener) { -// Boolean remove = this.listeners.remove(existingListener); -// if (remove != null) { -// return true; -// } -// return false; return this.listeners.remove(existingListener); } @@ -133,9 +121,7 @@ public class Subscription { // private AtomicLong counter = new AtomicLong(); public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { -// Collection listeners = this.listeners.keySet(); -// Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; + Collection listeners = this.listeners; if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); @@ -185,8 +171,8 @@ public class Subscription { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { // Collection listeners = this.listeners.keySet(); -// Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; + Collection listeners = this.listeners; +// IConcurrentSet listeners = this.listeners; if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); @@ -238,8 +224,8 @@ public class Subscription { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { // Collection listeners = this.listeners.keySet(); -// Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; + Collection listeners = this.listeners; +// IConcurrentSet listeners = this.listeners; if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); @@ -293,8 +279,8 @@ public class Subscription { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) { // Collection listeners = this.listeners.keySet(); -// Collection listeners = this.listeners; - IConcurrentSet listeners = this.listeners; + Collection listeners = this.listeners; +// IConcurrentSet listeners = this.listeners; if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); @@ -327,6 +313,9 @@ public class Subscription { .setMethodName(handler.getName()) .setListener(listener) .setPublishedObject(messages)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } catch (Throwable e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 45b1e77..a2bc552 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -4,6 +4,7 @@ import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; @@ -13,6 +14,7 @@ import java.util.concurrent.ConcurrentHashMap; import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.ReflectionUtils; +import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MetadataReader; @@ -31,18 +33,13 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; */ public class SubscriptionManager { - public static class SubHolder { - public int count = 0; - public Collection subs = new ArrayDeque(0); - } - // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader = new MetadataReader(); // all subscriptions per message type // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, SubHolder> subscriptionsPerMessageSingle = new IdentityHashMap, SubHolder>(50); + private final Map, Collection> subscriptionsPerMessageSingle = new IdentityHashMap, Collection>(50); private final IdentityObjectTree, Collection> subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); // all subscriptions per messageHandler type @@ -94,17 +91,14 @@ public class SubscriptionManager { // single Class clazz = handledMessageTypes[0]; - // NOTE: Not thread-safe! must be synchronized in outer scope - SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); - if (subHolder != null) { - Collection subs = subHolder.subs; - if (subs != null) { - subs.remove(subscription); + // NOTE: Order is important for safe publication + Collection subs = this.subscriptionsPerMessageSingle.get(clazz); + if (subs != null) { + subs.remove(subscription); - if (subs.isEmpty()) { - // remove element - this.subscriptionsPerMessageSingle.remove(clazz); - } + if (subs.isEmpty()) { + // remove element + this.subscriptionsPerMessageSingle.remove(clazz); } } } else { @@ -188,7 +182,7 @@ public class SubscriptionManager { } // it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock - subscriptions = new ArrayDeque(messageHandlers.size()); + subscriptions = new StrongConcurrentSet(messageHandlers.size()); // create subscriptions for all detected message handlers for (MessageHandler messageHandler : messageHandlers) { @@ -203,15 +197,15 @@ public class SubscriptionManager { // single Class clazz = handledMessageTypes[0]; - // NOTE: Not thread-safe! must be synchronized in outer scope - SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz); - if (subHolder == null) { - subHolder = new SubHolder(); - this.subscriptionsPerMessageSingle.put(clazz, subHolder); + // NOTE: Order is important for safe publication + Collection subs = this.subscriptionsPerMessageSingle.get(clazz); + if (subs == null) { + subs = new StrongConcurrentSet(2); + subs.add(subscription); + this.subscriptionsPerMessageSingle.put(clazz, subs); + } else { + subs.add(subscription); } - Collection subs = subHolder.subs; - subs.add(subscription); - subHolder.count++; // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive if (subscription.isVarArg()) { @@ -221,7 +215,7 @@ public class SubscriptionManager { // since it's vararg, this means that it's an ARRAY, so we ALSO // have to add the component classes of the array if (subscription.acceptsSubtypes()) { - ArrayList> setupSuperClassCache2 = superClassCache(componentType); + ArrayList> setupSuperClassCache2 = setupSuperClassCache(componentType); // have to setup each vararg chain for (int i = 0; i < setupSuperClassCache2.size(); i++) { Class superClass = setupSuperClassCache2.get(i); @@ -234,7 +228,7 @@ public class SubscriptionManager { } } } else if (subscription.acceptsSubtypes()) { - superClassCache(clazz); + setupSuperClassCache(clazz); } } else { @@ -245,17 +239,17 @@ public class SubscriptionManager { case 2: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); if (subscription.acceptsSubtypes()) { - superClassCache(handledMessageTypes[0]); - superClassCache(handledMessageTypes[1]); + setupSuperClassCache(handledMessageTypes[0]); + setupSuperClassCache(handledMessageTypes[1]); } break; } case 3: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); if (subscription.acceptsSubtypes()) { - superClassCache(handledMessageTypes[0]); - superClassCache(handledMessageTypes[1]); - superClassCache(handledMessageTypes[2]); + setupSuperClassCache(handledMessageTypes[0]); + setupSuperClassCache(handledMessageTypes[1]); + setupSuperClassCache(handledMessageTypes[2]); } break; } @@ -263,7 +257,7 @@ public class SubscriptionManager { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); if (subscription.acceptsSubtypes()) { for (Class c : handledMessageTypes) { - superClassCache(c); + setupSuperClassCache(c); } } break; @@ -291,10 +285,24 @@ public class SubscriptionManager { } } + private final Collection EMPTY_LIST = Collections.emptyList(); + + + // cannot return null, not thread safe. + public Collection getSubscriptionsByMessageType(Class messageType) { + Collection subs = this.subscriptionsPerMessageSingle.get(messageType); + if (subs != null) { + + return subs; + } else { + return this.EMPTY_LIST; + } + } + // obtain the set of subscriptions for the given message type // Note: never returns null! - public Collection getSubscriptionsByMessageType(Class messageType) { + public Collection DEPRECATED_getSubscriptionsByMessageType(Class messageType) { // thread safe publication Collection subscriptions; @@ -302,50 +310,35 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); int count = 0; - Collection subs; - SubHolder primaryHolder = this.subscriptionsPerMessageSingle.get(messageType); - if (primaryHolder != null) { + Collection subs = this.subscriptionsPerMessageSingle.get(messageType); + if (subs != null) { subscriptions = new ArrayDeque(count); - subs = primaryHolder.subs; - count = primaryHolder.count; - if (subs != null) { - subscriptions.addAll(subs); - } + subscriptions.addAll(subs); } else { subscriptions = new ArrayDeque(16); } // also add all subscriptions that match super types - SubHolder subHolder; - ArrayList> types1 = superClassCache(messageType); + ArrayList> types1 = setupSuperClassCache(messageType); if (types1 != null) { Class eventSuperType; int i; for (i = 0; i < types1.size(); i++) { eventSuperType = types1.get(i); - subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType)) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.handlesMessageType(messageType)) { + subscriptions.add(sub); } } } - count += addVarArgClass(subscriptions, eventSuperType); + addVarArgClass(subscriptions, eventSuperType); } } - count += addVarArgClass(subscriptions, messageType); + addVarArgClass(subscriptions, messageType); - if (primaryHolder != null) { - // save off our count, so our collection creation size is optimal. - primaryHolder.count = count; - } } finally { this.LOCK.readLock().unlock(); } @@ -364,8 +357,8 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); // also add all subscriptions that match super types - ArrayList> types1 = superClassCache(messageType1); - ArrayList> types2 = superClassCache(messageType2); + ArrayList> types1 = setupSuperClassCache(messageType1); + ArrayList> types2 = setupSuperClassCache(messageType2); Collection subs; Class eventSuperType1 = messageType1; @@ -427,9 +420,9 @@ public class SubscriptionManager { this.LOCK.readLock().lock(); // also add all subscriptions that match super types - ArrayList> types1 = superClassCache(messageType1); - ArrayList> types2 = superClassCache(messageType2); - ArrayList> types3 = superClassCache(messageType3); + ArrayList> types1 = setupSuperClassCache(messageType1); + ArrayList> types2 = setupSuperClassCache(messageType2); + ArrayList> types3 = setupSuperClassCache(messageType3); Class eventSuperType1 = messageType1; IdentityObjectTree, Collection> leaf1; @@ -513,7 +506,6 @@ public class SubscriptionManager { } } - SubHolder subHolder; int size = messageTypes.length; if (size > 0) { boolean allSameType = true; @@ -537,7 +529,7 @@ public class SubscriptionManager { if (allSameType) { // do we have a var-arg (it shows as an array) subscribed? - ArrayList> superClasses = superClassCache(firstType); + ArrayList> superClasses = setupSuperClassCache(firstType); Class eventSuperType = firstType; int j; @@ -552,14 +544,10 @@ public class SubscriptionManager { eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(eventSuperType); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } @@ -576,12 +564,30 @@ public class SubscriptionManager { return subscriptions; } - private ArrayList> superClassCache(Class clazz) { + + private final Collection> EMPTY_LIST_CLASSES = Collections.emptyList(); + // must be protected by read lock + public Collection> getSuperClasses(Class clazz) { + // not thread safe. DO NOT MODIFY ArrayList> types = this.superClassesCache.get(clazz); + + if (types != null) { + return types; + } + + return this.EMPTY_LIST_CLASSES; + } + + + // not a thread safe collection. must be locked by caller + private ArrayList> setupSuperClassCache(Class clazz) { + ArrayList> types = this.superClassesCache.get(clazz); + if (types == null) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); types = new ArrayList>(superTypes); + // NOTE: no need to write lock, since race conditions will result in duplicate answers this.superClassesCache.put(clazz, types); } @@ -593,54 +599,49 @@ public class SubscriptionManager { /////////////// // a var-arg handler might match /////////////// - private int addVarArgClass(Collection subscriptions, Class messageType) { + private void addVarArgClass(Collection subscriptions, Class messageType) { // tricky part. We have to check the ARRAY version - SubHolder subHolder; Collection subs; - int count = 0; Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } - return count; } - public Class getVarArg(Class clazz) { - return this.varArgClasses.get(clazz); + // must be protected by read lock + public Collection getVarArgs(Class clazz) { + Class varArgClass = this.varArgClasses.get(clazz); + if (varArgClass != null) { + Collection subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + return subs; + } + } + + return this.EMPTY_LIST; } /////////////// // a var-arg handler might match // tricky part. We have to check the ARRAY version /////////////// - private int addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { + private void addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { Collection subs; - SubHolder subHolder; - int count = 0; Class varArgClass = this.varArgClasses.get(messageType); if (varArgClass != null) { // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } @@ -649,21 +650,14 @@ public class SubscriptionManager { varArgClass = this.varArgClasses.get(eventSuperType); if (varArgClass != null) { // also add all subscriptions that match super types - subHolder = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subHolder != null) { - subs = subHolder.subs; - count += subHolder.count; - - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } + subs = this.subscriptionsPerMessageSingle.get(varArgClass); + if (subs != null) { + for (Subscription sub : subs) { + subscriptions.add(sub); } } } } - - return count; } private void getSubsVarArg(Collection subscriptions, int length, int index, @@ -671,7 +665,7 @@ public class SubscriptionManager { Class classType = messageTypes[index]; // get all the super types, if there are any. - ArrayList> superClasses = superClassCache(classType); + ArrayList> superClasses = setupSuperClassCache(classType); IdentityObjectTree, Collection> leaf; Collection subs; @@ -702,4 +696,12 @@ public class SubscriptionManager { } } } + + public void readLock() { + this.LOCK.readLock().lock(); + } + + public void readUnLock() { + this.LOCK.readLock().unlock(); + } }