diff --git a/src/dorkbox/util/messagebus/IMessageBus.java b/src/dorkbox/util/messagebus/IMessageBus.java index 19dbf59..1cc1818 100644 --- a/src/dorkbox/util/messagebus/IMessageBus.java +++ b/src/dorkbox/util/messagebus/IMessageBus.java @@ -80,6 +80,17 @@ public interface IMessageBus extends PubSubSupport { ExactWithSuperTypesAndVarArgs, } + enum SubscribeMode { + /** + * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) + */ + MultiArg, + /** + * Will subscribe and publish using only the FIRST provided parameter in the method signature (for subscribe), and arguments (for publish) + */ + FirstArg, + } + /** * Check whether any asynchronous message publications are pending to be processed diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index 35e9705..439ca8d 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -1,5 +1,6 @@ package dorkbox.util.messagebus; +import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.common.simpleq.MessageType; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MultiNode; @@ -7,11 +8,9 @@ import dorkbox.util.messagebus.common.thread.NamedThreadFactory; import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.publication.PublisherAll; -import dorkbox.util.messagebus.publication.PublisherExact; -import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes; -import dorkbox.util.messagebus.subscription.Publisher; -import dorkbox.util.messagebus.subscription.SubscriptionManager; +import dorkbox.util.messagebus.publication.*; +import dorkbox.util.messagebus.subscription.*; +import dorkbox.util.messagebus.utils.ClassUtils; import org.jctools.util.Pow2; import java.util.ArrayDeque; @@ -26,7 +25,10 @@ import java.util.Collection; public class MessageBus implements IMessageBus { private final ErrorHandlingSupport errorHandler; private final MpmcMultiTransferArrayQueue dispatchQueue; + + private final ClassUtils classUtils; private final SubscriptionManager subscriptionManager; + private final Collection threads; private final Publisher subscriptionPublisher; @@ -46,32 +48,63 @@ public class MessageBus implements IMessageBus { * @param numberOfThreads how many threads to have for dispatching async messages */ public MessageBus(int numberOfThreads) { - this(PublishMode.ExactWithSuperTypes, numberOfThreads); + this(PublishMode.ExactWithSuperTypes, SubscribeMode.MultiArg, numberOfThreads); } /** * @param publishMode Specifies which publishMode to operate the publication of messages. * @param numberOfThreads how many threads to have for dispatching async messages */ - public MessageBus(final PublishMode publishMode, int numberOfThreads) { + public MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) { numberOfThreads = Pow2.roundToPowerOfTwo(getMinNumberOfThreads(numberOfThreads)); this.errorHandler = new DefaultErrorHandler(); this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads); - this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler, true); + classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); + + final StampedLock lock = new StampedLock(); + + boolean isMultiArg = subscribeMode == SubscribeMode.MultiArg; + + final Subscriber subscriber; + if (isMultiArg) { + subscriber = new MultiArgSubscriber(errorHandler, classUtils); + } + else { + subscriber = new FirstArgSubscriber(errorHandler, classUtils); + } switch (publishMode) { case Exact: - subscriptionPublisher = new PublisherExact(errorHandler); + if (isMultiArg) { + subscriptionPublisher = new PublisherExact_MultiArg(errorHandler, subscriber, lock); + } + else { + subscriptionPublisher = new PublisherExact_FirstArg(errorHandler, subscriber, lock); + } break; + case ExactWithSuperTypes: - subscriptionPublisher = new PublisherExactWithSuperTypes(errorHandler); + if (isMultiArg) { + + subscriptionPublisher = new PublisherExactWithSuperTypes_MultiArg(errorHandler, subscriber, lock); + } + else { + subscriptionPublisher = new PublisherExactWithSuperTypes_FirstArg(errorHandler, subscriber, lock); + } break; + case ExactWithSuperTypesAndVarArgs: default: - subscriptionPublisher = new PublisherAll(errorHandler); + if (isMultiArg) { + subscriptionPublisher = new PublisherAll_MultiArg(errorHandler, subscriber, lock); + } + else { + throw new RuntimeException("Unable to run in expected configuration"); + } } + this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock); this.threads = new ArrayDeque(numberOfThreads); NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus"); @@ -157,7 +190,6 @@ public class MessageBus implements IMessageBus { @Override public void subscribe(final Object listener) { MessageBus.this.subscriptionManager.subscribe(listener); - MessageBus.this.subscriptionManager.subscribe(listener); } @Override @@ -167,22 +199,22 @@ public class MessageBus implements IMessageBus { @Override public void publish(final Object message) { - subscriptionPublisher.publish(subscriptionManager, message); + subscriptionPublisher.publish(message); } @Override public void publish(final Object message1, final Object message2) { - subscriptionPublisher.publish(subscriptionManager, message1, message2); + subscriptionPublisher.publish(message1, message2); } @Override public void publish(final Object message1, final Object message2, final Object message3) { - subscriptionPublisher.publish(subscriptionManager, message1, message2, message3); + subscriptionPublisher.publish(message1, message2, message3); } @Override public void publish(final Object[] messages) { - subscriptionPublisher.publish(subscriptionManager, messages); + subscriptionPublisher.publish(messages); } @Override @@ -271,5 +303,6 @@ public class MessageBus implements IMessageBus { t.interrupt(); } this.subscriptionManager.shutdown(); + this.classUtils.clear(); } } diff --git a/src/dorkbox/util/messagebus/common/DeadMessage.java b/src/dorkbox/util/messagebus/common/DeadMessage.java index 9e9f8df..65238aa 100644 --- a/src/dorkbox/util/messagebus/common/DeadMessage.java +++ b/src/dorkbox/util/messagebus/common/DeadMessage.java @@ -1,5 +1,7 @@ package dorkbox.util.messagebus.common; +import java.util.Arrays; + /** * The dead message event is published whenever no message * handlers could be found for a given message publication. @@ -25,7 +27,7 @@ public final class DeadMessage { this.relatedMessages[1] = message2; } - public DeadMessage(Object message1, Object message2, Object message3 ) { + public DeadMessage(Object message1, Object message2, Object message3) { this.relatedMessages = new Object[3]; this.relatedMessages[0] = message1; this.relatedMessages[1] = message2; @@ -33,7 +35,7 @@ public final class DeadMessage { } public DeadMessage(Object[] messages) { - this.relatedMessages = messages; + this.relatedMessages = Arrays.copyOf(messages, messages.length); } public Object[] getMessages() { diff --git a/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java b/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java index c45a85a..716b7c1 100644 --- a/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java +++ b/src/dorkbox/util/messagebus/common/adapter/JavaVersionAdapter.java @@ -9,6 +9,7 @@ import java.util.concurrent.ConcurrentMap; public abstract class JavaVersionAdapter { + public static final JavaVersionAdapter get; static { // get = new Java7Adapter(); @@ -17,7 +18,6 @@ public abstract class JavaVersionAdapter { } - public static JavaVersionAdapter get; public abstract ConcurrentMap concurrentMap(final int size, final float loadFactor, final int stripeSize); diff --git a/src/dorkbox/util/messagebus/publication/PublisherAll.java b/src/dorkbox/util/messagebus/publication/PublisherAll_MultiArg.java similarity index 79% rename from src/dorkbox/util/messagebus/publication/PublisherAll.java rename to src/dorkbox/util/messagebus/publication/PublisherAll_MultiArg.java index 12728d9..24b2b1c 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherAll.java +++ b/src/dorkbox/util/messagebus/publication/PublisherAll_MultiArg.java @@ -5,27 +5,41 @@ import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Publisher; +import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; -import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.utils.VarArgUtils; import java.lang.reflect.Array; +import java.util.concurrent.atomic.AtomicBoolean; -public class PublisherAll implements Publisher { +public class PublisherAll_MultiArg implements Publisher { private final ErrorHandlingSupport errorHandler; - public PublisherAll(final ErrorHandlingSupport errorHandler) { + private final Subscriber subscriber; + private final StampedLock lock; + + private final AtomicBoolean varArgPossibility; + final VarArgUtils varArgUtils; + + public PublisherAll_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { this.errorHandler = errorHandler; + this.subscriber = subscriber; + this.lock = lock; + + varArgPossibility = subscriber.getVarArgPossibility(); + varArgUtils = subscriber.getVarArgUtils(); } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1) { + public void publish(final Object message1) { try { final Class messageClass = message1.getClass(); final boolean isArray = messageClass.isArray(); - final StampedLock lock = subscriptionManager.getLock(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null + lock.unlockRead(stamp); boolean hasSubs = false; // Run subscriptions @@ -41,11 +55,9 @@ public class PublisherAll implements Publisher { // publish to var arg, only if not already an array (because that would be unnecessary) - if (subscriptionManager.canPublishVarArg() && !isArray) { - final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils(); - - long stamp = lock.readLock(); - final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass); // CAN NOT RETURN NULL + if (varArgPossibility.get() && !isArray) { + stamp = lock.readLock(); + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subscriber); // CAN NOT RETURN NULL lock.unlockRead(stamp); Subscription sub; @@ -67,7 +79,8 @@ public class PublisherAll implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) stamp = lock.readLock(); - final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass); // CAN NOT RETURN NULL + final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass, + subscriber); // CAN NOT RETURN NULL lock.unlockRead(stamp); length = varArgSuperSubs.length; @@ -90,7 +103,10 @@ public class PublisherAll implements Publisher { // only get here if there were no other subscriptions // Dead Event must EXACTLY MATCH (no subclasses) if (!hasSubs) { - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -108,15 +124,14 @@ public class PublisherAll implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) { + public void publish(final Object message1, final Object message2) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); - final StampedLock lock = subscriptionManager.getLock(); + final StampedLock lock = this.lock; long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass1, - messageClass2); // can return null + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null lock.unlockRead(stamp); boolean hasSubs = false; @@ -132,14 +147,12 @@ public class PublisherAll implements Publisher { } // publish to var arg, only if not already an array AND we are all of the same type - if (subscriptionManager.canPublishVarArg() && !messageClass1.isArray() && !messageClass2.isArray()) { - - final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils(); + if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray()) { // vararg can ONLY work if all types are the same if (messageClass1 == messageClass2) { stamp = lock.readLock(); - final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null lock.unlockRead(stamp); final int length = varArgSubs.length; @@ -160,8 +173,8 @@ public class PublisherAll implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) stamp = lock.readLock(); - final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, - messageClass2); // CAN NOT RETURN NULL + final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, + subscriber); // CAN NOT RETURN NULL lock.unlockRead(stamp); @@ -188,7 +201,10 @@ public class PublisherAll implements Publisher { if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2); @@ -206,17 +222,15 @@ public class PublisherAll implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, - final Object message3) { + public void publish(final Object message1, final Object message2, final Object message3) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); final Class messageClass3 = message3.getClass(); - final StampedLock lock = subscriptionManager.getLock(); + final StampedLock lock = this.lock; long stamp = lock.readLock(); - final Subscription[] subs = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, - messageClass3); // can return null + final Subscription[] subs = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null lock.unlockRead(stamp); @@ -233,15 +247,13 @@ public class PublisherAll implements Publisher { } // publish to var arg, only if not already an array AND we are all of the same type - if (subscriptionManager.canPublishVarArg() && !messageClass1.isArray() && !messageClass2.isArray() && + if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray() && !messageClass3.isArray()) { - final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils(); - // vararg can ONLY work if all types are the same if (messageClass1 == messageClass2 && messageClass1 == messageClass3) { stamp = lock.readLock(); - final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null + final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null lock.unlockRead(stamp); final int length = varArgSubs.length; @@ -264,8 +276,8 @@ public class PublisherAll implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) stamp = lock.readLock(); - final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, - messageClass3); // CAN NOT RETURN NULL + final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3, + subscriber); // CAN NOT RETURN NULL lock.unlockRead(stamp); @@ -293,7 +305,10 @@ public class PublisherAll implements Publisher { if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); @@ -311,7 +326,7 @@ public class PublisherAll implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) { - publish(subscriptionManager, (Object) messages); + public void publish(final Object[] messages) { + publish((Object) messages); } } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_FirstArg.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_FirstArg.java new file mode 100644 index 0000000..e0dd54d --- /dev/null +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_FirstArg.java @@ -0,0 +1,194 @@ +package dorkbox.util.messagebus.publication; + +import dorkbox.util.messagebus.common.DeadMessage; +import dorkbox.util.messagebus.common.adapter.StampedLock; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.error.PublicationError; +import dorkbox.util.messagebus.subscription.Publisher; +import dorkbox.util.messagebus.subscription.Subscriber; +import dorkbox.util.messagebus.subscription.Subscription; + +import java.util.Arrays; + +public class PublisherExactWithSuperTypes_FirstArg implements Publisher { + private final ErrorHandlingSupport errorHandler; + private final Subscriber subscriber; + private final StampedLock lock; + + public PublisherExactWithSuperTypes_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, + final StampedLock lock) { + this.errorHandler = errorHandler; + this.subscriber = subscriber; + this.lock = lock; + } + + @Override + public void publish(final Object message1) { + try { + final Class messageClass = message1.getClass(); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Class[] handledMessages; + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + + handledMessages = sub.getHandler().getHandledMessages(); + if (handledMessages.length == 1) { + sub.publish(message1); + } + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(message1)); + } + } + + @Override + public void publish(final Object message1, final Object message2) { + try { + final Class messageClass = message1.getClass(); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass, null); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + + final Class[] handledMessages = sub.getHandler().getHandledMessages(); + + sub.publish(message1, message2); + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(message1, message2)); + } + } + + @Override + public void publish(final Object message1, final Object message2, final Object message3) { + try { + final Class messageClass = message1.getClass(); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1, message2, message3); + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(message1, message2, message3)); + } + } + + @Override + public void publish(final Object[] messages) { + try { + final Object message1 = messages[0]; + final Class messageClass = message1.getClass(); + final Object[] newMessages = Arrays.copyOfRange(messages, 1, messages.length); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1, newMessages); + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, newMessages); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(messages)); + } + } +} diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_MultiArg.java similarity index 68% rename from src/dorkbox/util/messagebus/publication/PublisherExact.java rename to src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_MultiArg.java index 40e9b8e..84267ac 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExact.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_MultiArg.java @@ -1,25 +1,34 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; +import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Publisher; +import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; -import dorkbox.util.messagebus.subscription.SubscriptionManager; -public class PublisherExact implements Publisher { +public class PublisherExactWithSuperTypes_MultiArg implements Publisher { private final ErrorHandlingSupport errorHandler; + private final Subscriber subscriber; + private final StampedLock lock; - public PublisherExact(final ErrorHandlingSupport errorHandler) { + public PublisherExactWithSuperTypes_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, + final StampedLock lock) { this.errorHandler = errorHandler; + this.subscriber = subscriber; + this.lock = lock; } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1) { + public void publish(final Object message1) { try { final Class messageClass = message1.getClass(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null + lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -31,7 +40,10 @@ public class PublisherExact implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -49,12 +61,15 @@ public class PublisherExact implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) { + public void publish(final Object message1, final Object message2) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass1, messageClass2); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null + lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -66,7 +81,10 @@ public class PublisherExact implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2); @@ -84,15 +102,17 @@ public class PublisherExact implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, - final Object message3) { + public void publish(final Object message1, final Object message2, final Object message3) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); final Class messageClass3 = message3.getClass(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass1, messageClass2, - messageClass3); // can return null + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null + lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -104,7 +124,10 @@ public class PublisherExact implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); @@ -122,7 +145,7 @@ public class PublisherExact implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) { - publish(subscriptionManager, (Object) messages); + public void publish(final Object[] messages) { + publish((Object) messages); } } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact_FirstArg.java b/src/dorkbox/util/messagebus/publication/PublisherExact_FirstArg.java new file mode 100644 index 0000000..d2900fe --- /dev/null +++ b/src/dorkbox/util/messagebus/publication/PublisherExact_FirstArg.java @@ -0,0 +1,185 @@ +package dorkbox.util.messagebus.publication; + +import dorkbox.util.messagebus.common.DeadMessage; +import dorkbox.util.messagebus.common.adapter.StampedLock; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.error.PublicationError; +import dorkbox.util.messagebus.subscription.Publisher; +import dorkbox.util.messagebus.subscription.Subscriber; +import dorkbox.util.messagebus.subscription.Subscription; + +import java.util.Arrays; + +public class PublisherExact_FirstArg implements Publisher { + private final ErrorHandlingSupport errorHandler; + private final Subscriber subscriber; + private final StampedLock lock; + + public PublisherExact_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { + this.errorHandler = errorHandler; + this.subscriber = subscriber; + this.lock = lock; + } + + @Override + public void publish(final Object message1) { + try { + final Class messageClass = message1.getClass(); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1); + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(message1)); + } + } + + @Override + public void publish(final Object message1, final Object message2) { + try { + final Class messageClass = message1.getClass(); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1, message2); + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + lock.unlockRead(stamp); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(message1, message2)); + } + } + + @Override + public void publish(final Object message1, final Object message2, final Object message3) { + try { + final Class messageClass = message1.getClass(); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1, message2, message3); + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(message1, message2, message3)); + } + } + + @Override + public void publish(final Object[] messages) { + try { + final Object message1 = messages[0]; + final Class messageClass = message1.getClass(); + final Object[] newMessages = Arrays.copyOfRange(messages, 1, messages.length); + + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null + lock.unlockRead(stamp); + + // Run subscriptions + if (subscriptions != null) { + Subscription sub; + for (int i = 0; i < subscriptions.length; i++) { + sub = subscriptions[i]; + sub.publish(message1, newMessages); + } + } + else { + // Dead Event must EXACTLY MATCH (no subclasses) + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + + if (deadSubscriptions != null) { + final DeadMessage deadMessage = new DeadMessage(message1, newMessages); + + Subscription sub; + for (int i = 0; i < deadSubscriptions.length; i++) { + sub = deadSubscriptions[i]; + sub.publish(deadMessage); + } + } + } + } catch (Throwable e) { + errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(messages)); + } + } +} diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/util/messagebus/publication/PublisherExact_MultiArg.java similarity index 70% rename from src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java rename to src/dorkbox/util/messagebus/publication/PublisherExact_MultiArg.java index fedb964..5199ebd 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExact_MultiArg.java @@ -1,25 +1,33 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; +import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Publisher; +import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; -import dorkbox.util.messagebus.subscription.SubscriptionManager; -public class PublisherExactWithSuperTypes implements Publisher { +public class PublisherExact_MultiArg implements Publisher { private final ErrorHandlingSupport errorHandler; + private final Subscriber subscriber; + private final StampedLock lock; - public PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler) { + public PublisherExact_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { this.errorHandler = errorHandler; + this.subscriber = subscriber; + this.lock = lock; } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1) { + public void publish(final Object message1) { try { final Class messageClass = message1.getClass(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null + lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -31,7 +39,10 @@ public class PublisherExactWithSuperTypes implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -49,13 +60,15 @@ public class PublisherExactWithSuperTypes implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) { + public void publish(final Object message1, final Object message2) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass1, - messageClass2); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2); // can return null + lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -67,7 +80,10 @@ public class PublisherExactWithSuperTypes implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2); @@ -85,15 +101,16 @@ public class PublisherExactWithSuperTypes implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, - final Object message3) { + public void publish(final Object message1, final Object message2, final Object message3) { try { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); final Class messageClass3 = message3.getClass(); - final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass1, messageClass2, - messageClass3); // can return null + final StampedLock lock = this.lock; + long stamp = lock.readLock(); + final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2, messageClass3); // can return null + lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -105,7 +122,10 @@ public class PublisherExactWithSuperTypes implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null + stamp = lock.readLock(); + final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null + lock.unlockRead(stamp); + if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); @@ -123,7 +143,7 @@ public class PublisherExactWithSuperTypes implements Publisher { } @Override - public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) { - publish(subscriptionManager, (Object) messages); + public void publish(final Object[] messages) { + publish((Object) messages); } } diff --git a/src/dorkbox/util/messagebus/subscription/FirstArgSubscriber.java b/src/dorkbox/util/messagebus/subscription/FirstArgSubscriber.java new file mode 100644 index 0000000..db1304d --- /dev/null +++ b/src/dorkbox/util/messagebus/subscription/FirstArgSubscriber.java @@ -0,0 +1,185 @@ +package dorkbox.util.messagebus.subscription; + +import dorkbox.util.messagebus.common.MessageHandler; +import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.utils.ClassUtils; +import dorkbox.util.messagebus.utils.VarArgUtils; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * Permits subscriptions that only use the first parameters as the signature. The publisher MUST provide the correct additional parameters, + * and they must be of the correct type, otherwise it will throw an error. + */ +public class FirstArgSubscriber implements Subscriber { + + private final ErrorHandlingSupport errorHandler; + + // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required + // this is the primary list for dispatching a specific message + // write access is synchronized and happens only when a listener of a specific class is registered the first time + + // the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case) + private final Map, ArrayList> subscriptionsPerMessageSingle_1; + private final Map, ArrayList> subscriptionsPerMessageSingle_2; + private final Map, ArrayList> subscriptionsPerMessageSingle_3; + + + public FirstArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) { + this.errorHandler = errorHandler; + + // the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case) + this.subscriptionsPerMessageSingle_1 = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1); + this.subscriptionsPerMessageSingle_2 = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1); + this.subscriptionsPerMessageSingle_3 = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1); + } + + // inside a write lock + // add this subscription to each of the handled types + // to activate this sub for publication + private void registerFirst(final Subscription subscription, final Class listenerClass, + final Map, ArrayList> subs_1, final Map, ArrayList> subs_2, + final Map, ArrayList> subs_3) { + + final MessageHandler handler = subscription.getHandler(); + final Class[] messageHandlerTypes = handler.getHandledMessages(); + final int size = messageHandlerTypes.length; + + Class type0 = messageHandlerTypes[0]; + + switch (size) { + case 1: { + ArrayList subs = subs_1.get(type0); + if (subs == null) { + subs = new ArrayList(); + + subs_1.put(type0, subs); + } + + subs.add(subscription); + return; + } + case 2: { + ArrayList subs = subs_2.get(type0); + if (subs == null) { + subs = new ArrayList(); + + subs_2.put(type0, subs); + } + + subs.add(subscription); + return; + } + case 3: { + ArrayList subs = subs_3.get(type0); + if (subs == null) { + subs = new ArrayList(); + + subs_3.put(type0, subs); + } + + subs.add(subscription); + return; + } + case 0: + default: { + errorHandler.handleError("Error while trying to subscribe class", listenerClass); + return; + } + } + } + + @Override + public void register(final Class listenerClass, final int handlersSize, final Subscription[] subsPerListener) { + + final Map, ArrayList> sub_1 = this.subscriptionsPerMessageSingle_1; + final Map, ArrayList> sub_2 = this.subscriptionsPerMessageSingle_2; + final Map, ArrayList> sub_3 = this.subscriptionsPerMessageSingle_3; + + + Subscription subscription; + + for (int i = 0; i < handlersSize; i++) { + subscription = subsPerListener[i]; + + // activate this subscription for publication + // now add this subscription to each of the handled types + + // only register based on the FIRST parameter + registerFirst(subscription, listenerClass, sub_1, sub_2, sub_3); + } + } + + @Override + public AtomicBoolean getVarArgPossibility() { + return null; + } + + @Override + public VarArgUtils getVarArgUtils() { + return null; + } + + @Override + public void shutdown() { + this.subscriptionsPerMessageSingle_1.clear(); + this.subscriptionsPerMessageSingle_2.clear(); + this.subscriptionsPerMessageSingle_3.clear(); + } + + @Override + public void clearConcurrentCollections() { + + } + + @Override + public ArrayList getExactAsArray(final Class messageClass) { + return subscriptionsPerMessageSingle_1.get(messageClass); + } + + @Override + public ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2) { + return subscriptionsPerMessageSingle_2.get(messageClass1); + } + + @Override + public ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2, + final Class messageClass3) { + return subscriptionsPerMessageSingle_3.get(messageClass1); + } + + @Override + public Subscription[] getExact(final Class deadMessageClass) { + return new Subscription[0]; + } + + @Override + public Subscription[] getExact(final Class messageClass1, final Class messageClass2) { + return new Subscription[0]; + } + + @Override + public Subscription[] getExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + return new Subscription[0]; + } + + + @Override + public Subscription[] getExactAndSuper(final Class messageClass) { + return new Subscription[0]; + } + + @Override + public Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { + return new Subscription[0]; + } + + @Override + public Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + return new Subscription[0]; + } +} diff --git a/src/dorkbox/util/messagebus/subscription/MultiArgSubscriber.java b/src/dorkbox/util/messagebus/subscription/MultiArgSubscriber.java new file mode 100644 index 0000000..b772310 --- /dev/null +++ b/src/dorkbox/util/messagebus/subscription/MultiArgSubscriber.java @@ -0,0 +1,312 @@ +package dorkbox.util.messagebus.subscription; + +import dorkbox.util.messagebus.common.HashMapTree; +import dorkbox.util.messagebus.common.MessageHandler; +import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; +import dorkbox.util.messagebus.utils.ClassUtils; +import dorkbox.util.messagebus.utils.SubscriptionUtils; +import dorkbox.util.messagebus.utils.VarArgUtils; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted + */ +public class MultiArgSubscriber implements Subscriber { + + private final ErrorHandlingSupport errorHandler; + + private final SubscriptionUtils subUtils; + private final VarArgUtils varArgUtils; + + // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required + // this is the primary list for dispatching a specific message + // write access is synchronized and happens only when a listener of a specific class is registered the first time + private final Map, ArrayList> subscriptionsPerMessageSingle; + private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; + + // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) + private final AtomicBoolean varArgPossibility = new AtomicBoolean(false); + + public MultiArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) { + this.errorHandler = errorHandler; + + this.subscriptionsPerMessageSingle = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1); + this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, LOAD_FACTOR); + + this.subUtils = new SubscriptionUtils(classUtils, Subscriber.LOAD_FACTOR); + + // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance of handlers + this.varArgUtils = new VarArgUtils(classUtils, Subscriber.LOAD_FACTOR); + } + + @Override + public AtomicBoolean getVarArgPossibility() { + return varArgPossibility; + } + + @Override + public VarArgUtils getVarArgUtils() { + return varArgUtils; + } + + @Override + public void clearConcurrentCollections() { + this.subUtils.clear(); + this.varArgUtils.clear(); + } + + // inside a write lock + // add this subscription to each of the handled types + // to activate this sub for publication + private void registerMulti(final Subscription subscription, final Class listenerClass, + final Map, ArrayList> subsPerMessageSingle, + final HashMapTree, ArrayList> subsPerMessageMulti, + final AtomicBoolean varArgPossibility) { + + final MessageHandler handler = subscription.getHandler(); + final Class[] messageHandlerTypes = handler.getHandledMessages(); + final int size = messageHandlerTypes.length; + + Class type0 = messageHandlerTypes[0]; + + switch (size) { + case 0: { + errorHandler.handleError("Error while trying to subscribe class", listenerClass); + return; + } + case 1: { + ArrayList subs = subsPerMessageSingle.get(type0); + if (subs == null) { + subs = new ArrayList(); + + // is this handler able to accept var args? + if (handler.getVarArgClass() != null) { + varArgPossibility.lazySet(true); + } + + subsPerMessageSingle.put(type0, subs); + } + + subs.add(subscription); + return; + } + case 2: { + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]); + } + + subs.add(subscription); + return; + } + case 3: { + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]); + } + + subs.add(subscription); + return; + } + default: { + ArrayList subs = subsPerMessageMulti.get(messageHandlerTypes); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, messageHandlerTypes); + } + + subs.add(subscription); + } + } + } + + @Override + public void register(final Class listenerClass, final int handlersSize, final Subscription[] subsPerListener) { + + final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; + final AtomicBoolean varArgPossibility = this.varArgPossibility; + + Subscription subscription; + + for (int i = 0; i < handlersSize; i++) { + subscription = subsPerListener[i]; + + // activate this subscription for publication + // now add this subscription to each of the handled types + registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); + } + } + + @Override + public void shutdown() { + this.subscriptionsPerMessageSingle.clear(); + this.subscriptionsPerMessageMulti.clear(); + + clearConcurrentCollections(); + } + + @Override + public ArrayList getExactAsArray(final Class messageClass) { + return subscriptionsPerMessageSingle.get(messageClass); + } + + @Override + public ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2) { + return subscriptionsPerMessageMulti.get(messageClass1, messageClass2); + } + + @Override + public ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2, + final Class messageClass3) { + return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3); + } + + // can return null + @Override + public Subscription[] getExactAndSuper(final Class messageClass) { + ArrayList collection = getExactAsArray(messageClass); // can return null + + // now publish superClasses + final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubscriptions.isEmpty()) { + collection.addAll(superSubscriptions); + } + } + else if (!superSubscriptions.isEmpty()) { + collection = superSubscriptions; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } + + // can return null + @Override + public Subscription[] getExact(final Class messageClass) { + final ArrayList collection = getExactAsArray(messageClass); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + @Override + public Subscription[] getExact(final Class messageClass1, final Class messageClass2) { + final ArrayList collection = getExactAsArray(messageClass1, messageClass2); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + // can return null + @Override + public Subscription[] getExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + + final ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + + return subscriptions; + } + + return null; + } + + + + // can return null + @Override + public Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { + ArrayList collection = getExactAsArray(messageClass1, messageClass2); // can return null + + // now publish superClasses + final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, + this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubs.isEmpty()) { + collection.addAll(superSubs); + } + } + else if (!superSubs.isEmpty()) { + collection = superSubs; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } + + // can return null + @Override + public Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + + ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null + + // now publish superClasses + final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3, + this); // NOT return null + + if (collection != null) { + collection = new ArrayList(collection); + + if (!superSubs.isEmpty()) { + collection.addAll(superSubs); + } + } + else if (!superSubs.isEmpty()) { + collection = superSubs; + } + + if (collection != null) { + final Subscription[] subscriptions = new Subscription[collection.size()]; + collection.toArray(subscriptions); + return subscriptions; + } + else { + return null; + } + } +} diff --git a/src/dorkbox/util/messagebus/subscription/Publisher.java b/src/dorkbox/util/messagebus/subscription/Publisher.java index 548722e..7953086 100644 --- a/src/dorkbox/util/messagebus/subscription/Publisher.java +++ b/src/dorkbox/util/messagebus/subscription/Publisher.java @@ -1,11 +1,11 @@ package dorkbox.util.messagebus.subscription; public interface Publisher { - void publish(SubscriptionManager subscriptionManager, Object message1); + void publish(Object message1); - void publish(SubscriptionManager subscriptionManager, Object message1, Object message2); + void publish(Object message1, Object message2); - void publish(SubscriptionManager subscriptionManager, Object message1, Object message2, Object message3); + void publish(Object message1, Object message2, Object message3); - void publish(SubscriptionManager subscriptionManager, Object[] messages); + void publish(Object[] messages); } diff --git a/src/dorkbox/util/messagebus/subscription/Subscriber.java b/src/dorkbox/util/messagebus/subscription/Subscriber.java new file mode 100644 index 0000000..c7d1e1d --- /dev/null +++ b/src/dorkbox/util/messagebus/subscription/Subscriber.java @@ -0,0 +1,40 @@ +package dorkbox.util.messagebus.subscription; + +import dorkbox.util.messagebus.utils.VarArgUtils; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +public interface Subscriber { + float LOAD_FACTOR = 0.8F; + + AtomicBoolean getVarArgPossibility(); + + VarArgUtils getVarArgUtils(); + + void register(Class listenerClass, int handlersSize, Subscription[] subsPerListener); + + void shutdown(); + + void clearConcurrentCollections(); + + ArrayList getExactAsArray(Class superClass); + + ArrayList getExactAsArray(Class superClass1, Class superClass2); + + ArrayList getExactAsArray(Class superClass1, Class superClass2, Class superClass3); + + + Subscription[] getExact(Class deadMessageClass); + + Subscription[] getExact(Class messageClass1, Class messageClass2); + + Subscription[] getExact(Class messageClass1, Class messageClass2, Class messageClass3); + + + Subscription[] getExactAndSuper(Class messageClass); + + Subscription[] getExactAndSuper(Class messageClass1, Class messageClass2); + + Subscription[] getExactAndSuper(Class messageClass1, Class messageClass2, Class messageClass3); +} diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java index 72318b9..689a1e7 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/dorkbox/util/messagebus/subscription/Subscription.java @@ -1,19 +1,14 @@ package dorkbox.util.messagebus.subscription; import com.esotericsoftware.reflectasm.MethodAccess; -import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -161,107 +156,4 @@ public final class Subscription { Subscription other = (Subscription) obj; return this.ID == other.ID; } - - // inside a write lock - // add this subscription to each of the handled types - // to activate this sub for publication - public void registerMulti(final ErrorHandlingSupport errorHandler, final Class listenerClass, final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti, - final AtomicBoolean varArgPossibility) { - - final Class[] messageHandlerTypes = handlerMetadata.getHandledMessages(); - final int size = messageHandlerTypes.length; - - Class type0 = messageHandlerTypes[0]; - - switch (size) { - case 0: { - errorHandler.handleError("Error while trying to subscribe class", listenerClass); - return; - } - case 1: { - ArrayList subs = subsPerMessageSingle.get(type0); - if (subs == null) { - subs = new ArrayList(); - - // is this handler able to accept var args? - if (handlerMetadata.getVarArgClass() != null) { - varArgPossibility.lazySet(true); - } - - subsPerMessageSingle.put(type0, subs); - } - - subs.add(this); - return; - } - case 2: { - ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]); - } - - subs.add(this); - return; - } - case 3: { - ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]); - } - - subs.add(this); - return; - } - default: { - ArrayList subs = subsPerMessageMulti.get(messageHandlerTypes); - if (subs == null) { - subs = new ArrayList(); - - subsPerMessageMulti.put(subs, messageHandlerTypes); - } - - subs.add(this); - } - } - } - - // inside a write lock - // add this subscription to each of the handled types - // to activate this sub for publication - public void registerFirst(final ErrorHandlingSupport errorHandler, final Class listenerClass, final Map, ArrayList> subsPerMessageSingle, - final AtomicBoolean varArgPossibility) { - - final Class[] messageHandlerTypes = handlerMetadata.getHandledMessages(); - final int size = messageHandlerTypes.length; - - Class type0 = messageHandlerTypes[0]; - - switch (size) { - case 0: { - errorHandler.handleError("Error while trying to subscribe class", listenerClass); - return; - } - default: { - ArrayList subs = subsPerMessageSingle.get(type0); - if (subs == null) { - subs = new ArrayList(); - - // is this handler able to accept var args? - if (handlerMetadata.getVarArgClass() != null) { - varArgPossibility.lazySet(true); - } - - subsPerMessageSingle.put(type0, subs); - } - - subs.add(this); - return; - } - } - } } diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index da3d997..e088a66 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -1,127 +1,54 @@ package dorkbox.util.messagebus.subscription; -import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; import dorkbox.util.messagebus.common.adapter.StampedLock; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.utils.ClassUtils; -import dorkbox.util.messagebus.utils.SubscriptionUtils; -import dorkbox.util.messagebus.utils.VarArgUtils; -import java.util.ArrayList; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; /** * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * It provides fast lookup of existing subscriptions when another instance of an already known * listener is subscribed and takes care of creating new set of subscriptions for any unknown class that defines * message handlers. - *

- *

- * Subscribe/Unsubscribe, while it is possible for them to be 100% concurrent (in relation to listeners per subscription), - * getting an accurate reflection of the number of subscriptions, or guaranteeing a "HAPPENS-BEFORE" relationship really - * complicates this, so it has been modified for subscribe/unsubscibe to be mutually exclusive. - *

- * Given these restrictions and complexity, it is much easier to create a MPSC blocking queue, and have a single thread - * manage sub/unsub. * * @author dorkbox, llc * Date: 2/2/15 */ public final class SubscriptionManager { - private static final float LOAD_FACTOR = 0.8F; - - // remember already processed classes that do not contain any message handlers private final Map, Boolean> nonListeners; - // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) - private final AtomicBoolean varArgPossibility = new AtomicBoolean(false); - - // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required - // this is the primary list for dispatching a specific message - // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, ArrayList> subscriptionsPerMessageSingle; - private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - // all subscriptions per messageHandler type // this map provides fast access for subscribing and unsubscribing // write access is synchronized and happens very infrequently // once a collection of subscriptions is stored it does not change private final Map, Subscription[]> subscriptionsPerListener; - private final ClassUtils classUtils; - private final SubscriptionUtils subUtils; - private final VarArgUtils varArgUtils; - private final StampedLock lock = new StampedLock(); + private final StampedLock lock; private final int numberOfThreads; private final Subscriber subscriber; - public SubscriptionManager(int numberOfThreads, final ErrorHandlingSupport errorHandler, boolean isMultiMode) { + public SubscriptionManager(final int numberOfThreads, final Subscriber subscriber, final StampedLock lock) { this.numberOfThreads = numberOfThreads; + this.subscriber = subscriber; + this.lock = lock; + // modified ONLY during SUB/UNSUB - { - this.nonListeners = JavaVersionAdapter.get.concurrentMap(4, LOAD_FACTOR, numberOfThreads); + this.nonListeners = JavaVersionAdapter.get.concurrentMap(4, Subscriber.LOAD_FACTOR, numberOfThreads); - this.subscriptionsPerMessageSingle = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, LOAD_FACTOR); - - // only used during SUB/UNSUB - this.subscriptionsPerListener = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1); - } - - classUtils = new ClassUtils(LOAD_FACTOR); - - this.subUtils = new SubscriptionUtils(classUtils, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, - LOAD_FACTOR, numberOfThreads); - - // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(classUtils, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads); - - if (isMultiMode) { - subscriber = new Subscriber() { - @Override - public void register(final Class listenerClass, final Subscription subscription, - final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti, - final AtomicBoolean varArgPossibility) { - - // now add this subscription to each of the handled types - subscription.registerMulti(errorHandler, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); - } - }; - } - else { - subscriber = new Subscriber() { - @Override - public void register(final Class listenerClass, final Subscription subscription, - final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti, - final AtomicBoolean varArgPossibility) { - - // only register based on the FIRST parameter - subscription.registerFirst(errorHandler, listenerClass, subsPerMessageSingle, varArgPossibility); - } - }; - } + // only used during SUB/UNSUB, in a rw lock + this.subscriptionsPerListener = JavaVersionAdapter.get.concurrentMap(32, Subscriber.LOAD_FACTOR, 1); } public void shutdown() { this.nonListeners.clear(); - this.subscriptionsPerMessageSingle.clear(); - this.subscriptionsPerMessageMulti.clear(); + subscriber.shutdown(); this.subscriptionsPerListener.clear(); - - clearConcurrentCollections(); - - this.classUtils.clear(); } public void subscribe(final Object listener) { @@ -137,7 +64,7 @@ public final class SubscriptionManager { } // these are concurrent collections - clearConcurrentCollections(); + subscriber.clearConcurrentCollections(); Subscription[] subscriptions = getListenerSubs(listenerClass); @@ -155,9 +82,6 @@ public final class SubscriptionManager { return; } - final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; - final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; - final Subscription[] subsPerListener = new Subscription[handlersSize]; @@ -169,14 +93,13 @@ public final class SubscriptionManager { messageHandler = messageHandlers[i]; // create the subscription - subscription = new Subscription(messageHandler, LOAD_FACTOR, numberOfThreads); + subscription = new Subscription(messageHandler, Subscriber.LOAD_FACTOR, numberOfThreads); subscription.subscribe(listener); subsPerListener[i] = subscription; // activates this sub for sub/unsub } final Map, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; - final AtomicBoolean varArgPossibility = this.varArgPossibility; // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because // of the huge number of reads compared to writes. @@ -188,12 +111,7 @@ public final class SubscriptionManager { // it was still null, so we actually have to create the rest of the subs if (subscriptions == null) { - for (int i = 0; i < handlersSize; i++) { - subscription = subsPerListener[i]; - - // activate this subscription for publication - subscriber.register(listenerClass, subscription, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); - } + subscriber.register(listenerClass, handlersSize, subsPerListener); subsPerListenerMap.put(listenerClass, subsPerListener); lock.unlockWrite(stamp); @@ -227,7 +145,7 @@ public final class SubscriptionManager { } // these are concurrent collections - clearConcurrentCollections(); + subscriber.clearConcurrentCollections(); final Subscription[] subscriptions = getListenerSubs(listenerClass); if (subscriptions != null) { @@ -240,11 +158,6 @@ public final class SubscriptionManager { } } - private void clearConcurrentCollections() { - this.subUtils.clear(); - this.varArgUtils.clear(); - } - private Subscription[] getListenerSubs(final Class listenerClass) { final StampedLock lock = this.lock; @@ -255,221 +168,4 @@ public final class SubscriptionManager { lock.unlockRead(stamp); return subscriptions; } - - // can return null - public Subscription[] getSubscriptionsExact(final Class messageClass) { - final StampedLock lock = this.lock; - final long stamp = lock.readLock(); - - final ArrayList collection = this.subscriptionsPerMessageSingle.get(messageClass); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - lock.unlockRead(stamp); - return subscriptions; - } - - lock.unlockRead(stamp); - return null; - } - - // can return null - public Subscription[] getSubscriptionsExact(final Class messageClass1, final Class messageClass2) { - final StampedLock lock = this.lock; - final long stamp = lock.readLock(); - - final ArrayList collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - lock.unlockRead(stamp); - return subscriptions; - } - - lock.unlockRead(stamp); - return null; - } - - // can return null - public Subscription[] getSubscriptionsExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - final StampedLock lock = this.lock; - final long stamp = lock.readLock(); - - final ArrayList collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - lock.unlockRead(stamp); - return subscriptions; - } - - lock.unlockRead(stamp); - return null; - } - - // can return null - // public because it is also used by unit tests - public Subscription[] getSubscriptionsExactAndSuper(final Class messageClass) { - final StampedLock lock = this.lock; - final long stamp = lock.readLock(); - - final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass); - - lock.unlockRead(stamp); - return subscriptions; - } - - // can return null - public Subscription[] getSubscriptionsExactAndSuper(final Class messageClass1, final Class messageClass2) { - final StampedLock lock = this.lock; - final long stamp = lock.readLock(); - - final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2); - - lock.unlockRead(stamp); - return subscriptions; - } - - // can return null - public Subscription[] getSubscriptionsExactAndSuper(final Class messageClass1, final Class messageClass2, - final Class messageClass3) { - final StampedLock lock = this.lock; - final long stamp = lock.readLock(); - - final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, messageClass3); - - lock.unlockRead(stamp); - return subscriptions; - } - - // can return null - public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class messageClass) { - ArrayList collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null - - // now publish superClasses - final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubscriptions.isEmpty()) { - collection.addAll(superSubscriptions); - } - } - else if (!superSubscriptions.isEmpty()) { - collection = superSubscriptions; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - // can return null - public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class messageClass1, final Class messageClass2) { - - ArrayList collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2); // can return null - - // now publish superClasses - final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubs.isEmpty()) { - collection.addAll(superSubs); - } - } - else if (!superSubs.isEmpty()) { - collection = superSubs; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - // can return null - public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class messageClass1, final Class messageClass2, - final Class messageClass3) { - - ArrayList collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2, - messageClass3); // can return null - - // now publish superClasses - final ArrayList superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, - messageClass3); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubs.isEmpty()) { - collection.addAll(superSubs); - } - } - else if (!superSubs.isEmpty()) { - collection = superSubs; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - public boolean canPublishVarArg() { - return varArgPossibility.get(); - } - - public VarArgUtils getVarArgUtils() { - return varArgUtils; - } - - public StampedLock getLock() { - return lock; - } - - interface Subscriber { - void register(final Class listenerClass, Subscription subscription, - Map, ArrayList> subsPerMessageSingle, - HashMapTree, ArrayList> subsPerMessageMulti, AtomicBoolean varArgPossibility); - } - - -// public static final Comparator SubscriptionByPriorityDesc = new Comparator() { -// @Override -// public int compare(Subscription o1, Subscription o2) { -//// int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority()); -//// return byPriority == 0 ? o2.id.compareTo(o1.id) : byPriority; -// if (o2.ID > o1.ID) { -// return 1; -// } else if (o2.ID < o1.ID) { -// return -1; -// } else { -// return 0; -// } -// } -// }; - - } diff --git a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java index 5b36bff..399d398 100644 --- a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java +++ b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java @@ -1,7 +1,8 @@ package dorkbox.util.messagebus.utils; import dorkbox.util.messagebus.common.HashMapTree; -import dorkbox.util.messagebus.common.adapter.ConcurrentHashMapV8; +import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; +import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; import java.util.ArrayList; @@ -16,24 +17,14 @@ public final class SubscriptionUtils { private final Map, ArrayList> superClassSubscriptions; private final HashMapTree, ArrayList> superClassSubscriptionsMulti; - private final Map, ArrayList> subscriptionsPerMessageSingle; - - private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; - - - public SubscriptionUtils(final ClassUtils superClass, Map, ArrayList> subscriptionsPerMessageSingle, - final HashMapTree, ArrayList> subscriptionsPerMessageMulti, final float loadFactor, - final int stripeSize) { + public SubscriptionUtils(final ClassUtils superClass, final float loadFactor) { this.superClass = superClass; - this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; - this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti; - // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.superClassSubscriptions = new ConcurrentHashMapV8, ArrayList>(); + this.superClassSubscriptions = JavaVersionAdapter.get.concurrentMap(8, loadFactor, 1); this.superClassSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); } @@ -45,14 +36,14 @@ public final class SubscriptionUtils { /** * Returns an array COPY of the super subscriptions for the specified type. - *

+ *

* This ALSO checks to see if the superClass accepts subtypes. - *

+ *

* protected by read lock by caller * * @return CAN NOT RETURN NULL */ - public ArrayList getSuperSubscriptions(final Class clazz) { + public ArrayList getSuperSubscriptions(final Class clazz, final Subscriber subscriber) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.superClassSubscriptions; @@ -60,7 +51,6 @@ public final class SubscriptionUtils { if (subs == null) { // types was not empty, so collect subscriptions for each type and collate them - final Map, ArrayList> local2 = this.subscriptionsPerMessageSingle; // save the subscriptions final Class[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response @@ -75,7 +65,7 @@ public final class SubscriptionUtils { for (int i = 0; i < length; i++) { superClass = superClasses[i]; - superSubs = local2.get(superClass); + superSubs = subscriber.getExactAsArray(superClass); if (superSubs != null) { superSubLength = superSubs.size(); @@ -98,22 +88,21 @@ public final class SubscriptionUtils { /** * Returns an array COPY of the super subscriptions for the specified type. - *

+ *

* This ALSO checks to see if the superClass accepts subtypes. - *

+ *

* protected by read lock by caller * * @return CAN NOT RETURN NULL */ - public ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2) { + public ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final Subscriber subscriber) { // whenever our subscriptions change, this map is cleared. - final HashMapTree, ArrayList> local = this.superClassSubscriptionsMulti; + final HashMapTree, ArrayList> cached = this.superClassSubscriptionsMulti; - ArrayList subs = local.get(clazz1, clazz2); + ArrayList subs = cached.get(clazz1, clazz2); if (subs == null) { // types was not empty, so collect subscriptions for each type and collate them - final HashMapTree, ArrayList> local2 = this.subscriptionsPerMessageMulti; // save the subscriptions final Class[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response @@ -145,7 +134,7 @@ public final class SubscriptionUtils { continue; } - superSubs = local2.get(superClass1, superClass2); + superSubs = subscriber.getExactAsArray(superClass1, superClass2); if (superSubs != null) { for (int k = 0; k < superSubs.size(); k++) { sub = superSubs.get(k); @@ -158,7 +147,7 @@ public final class SubscriptionUtils { } } subs.trimToSize(); - local.put(subs, clazz1, clazz2); + cached.put(subs, clazz1, clazz2); } return subs; @@ -166,14 +155,15 @@ public final class SubscriptionUtils { /** * Returns an array COPY of the super subscriptions for the specified type. - *

+ *

* This ALSO checks to see if the superClass accepts subtypes. - *

+ *

* protected by read lock by caller * * @return CAN NOT RETURN NULL */ - public ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final Class clazz3) { + public ArrayList getSuperSubscriptions(final Class clazz1, final Class clazz2, final Class clazz3, + final Subscriber subscriber) { // whenever our subscriptions change, this map is cleared. final HashMapTree, ArrayList> local = this.superClassSubscriptionsMulti; @@ -181,7 +171,6 @@ public final class SubscriptionUtils { if (subs == null) { // types was not empty, so collect subscriptions for each type and collate them - final HashMapTree, ArrayList> local2 = this.subscriptionsPerMessageMulti; // save the subscriptions final Class[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response @@ -224,7 +213,7 @@ public final class SubscriptionUtils { continue; } - superSubs = local2.get(superClass1, superClass2); + superSubs = subscriber.getExactAsArray(superClass1, superClass2, superClass3); if (superSubs != null) { for (int m = 0; m < superSubs.size(); m++) { sub = superSubs.get(m); diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java index cb6feb4..2851e07 100644 --- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java +++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java @@ -1,8 +1,9 @@ package dorkbox.util.messagebus.utils; -import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.MessageHandler; +import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; +import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; import java.util.ArrayList; @@ -16,19 +17,16 @@ public final class VarArgUtils { private final HashMapTree, ArrayList> varArgSuperSubscriptionsMulti; private final ClassUtils superClassUtils; - private final Map, ArrayList> subscriptionsPerMessageSingle; - public VarArgUtils(final ClassUtils superClassUtils, final Map, ArrayList> subscriptionsPerMessageSingle, - final float loadFactor, final int stripeSize) { + public VarArgUtils(final ClassUtils superClassUtils, final float loadFactor) { this.superClassUtils = superClassUtils; - this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; - this.varArgSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, stripeSize); + this.varArgSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, 1); this.varArgSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); - this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, stripeSize); + this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, 1); this.varArgSuperSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); } @@ -45,7 +43,7 @@ public final class VarArgUtils { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // and then, returns the array'd version subscriptions - public Subscription[] getVarArgSubscriptions(final Class messageClass) { + public Subscription[] getVarArgSubscriptions(final Class messageClass, final Subscriber subscriber) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.varArgSubscriptionsSingle; @@ -55,7 +53,7 @@ public final class VarArgUtils { // this gets (and caches) our array type. This is never cleared. final Class arrayVersion = this.superClassUtils.getArrayClass(messageClass); - ArrayList subs = this.subscriptionsPerMessageSingle.get(arrayVersion); + final ArrayList subs = subscriber.getExactAsArray(arrayVersion); if (subs != null) { final int length = subs.size(); varArgSubs = new ArrayList(length); @@ -70,7 +68,8 @@ public final class VarArgUtils { } local.put(messageClass, varArgSubs); - } else { + } + else { varArgSubs = new ArrayList(0); } } @@ -85,16 +84,16 @@ public final class VarArgUtils { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version superclass subscriptions - public Subscription[] getVarArgSuperSubscriptions(final Class messageClass) { - final ArrayList subs = getVarArgSuperSubscriptions_List(messageClass); + public Subscription[] getVarArgSuperSubscriptions(final Class messageClass, final Subscriber subscriber) { + final ArrayList subs = getVarArgSuperSubscriptions_List(messageClass, subscriber); - final Subscription[] subscriptions = new Subscription[subs.size()]; - subs.toArray(subscriptions); - return subscriptions; + final Subscription[] returnedSubscriptions = new Subscription[subs.size()]; + subs.toArray(returnedSubscriptions); + return returnedSubscriptions; } // CAN NOT RETURN NULL - private ArrayList getVarArgSuperSubscriptions_List(final Class messageClass) { + private ArrayList getVarArgSuperSubscriptions_List(final Class messageClass, final Subscriber subscriber) { // whenever our subscriptions change, this map is cleared. final Map, ArrayList> local = this.varArgSuperSubscriptionsSingle; @@ -122,7 +121,7 @@ public final class VarArgUtils { for (int i = 0; i < typesLength; i++) { type = types[i]; - subs = this.subscriptionsPerMessageSingle.get(type); + subs = subscriber.getExactAsArray(type); if (subs != null) { length = subs.size(); @@ -136,7 +135,9 @@ public final class VarArgUtils { varArgSuperSubs.add(sub); } } - + } + else { + varArgSuperSubs = new ArrayList(0); } } @@ -150,7 +151,7 @@ public final class VarArgUtils { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version superclass subscriptions - public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2) { + public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Subscriber subscriber) { // whenever our subscriptions change, this map is cleared. final HashMapTree, ArrayList> local = this.varArgSuperSubscriptionsMulti; @@ -159,8 +160,8 @@ public final class VarArgUtils { if (subs == null) { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1); - final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2); + final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber); + final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber); subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2); @@ -168,9 +169,9 @@ public final class VarArgUtils { local.put(subs, messageClass1, messageClass2); } - final Subscription[] subscriptions = new Subscription[subs.size()]; - subs.toArray(subscriptions); - return subscriptions; + final Subscription[] returnedSubscriptions = new Subscription[subs.size()]; + subs.toArray(returnedSubscriptions); + return returnedSubscriptions; } @@ -178,7 +179,7 @@ public final class VarArgUtils { // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version superclass subscriptions public Subscription[] getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, - final Class messageClass3) { + final Class messageClass3, final Subscriber subscriber) { // whenever our subscriptions change, this map is cleared. final HashMapTree, ArrayList> local = this.varArgSuperSubscriptionsMulti; @@ -187,9 +188,9 @@ public final class VarArgUtils { if (subs == null) { // the message class types are not the same, so look for a common superClass varArg subscription. // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages - final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1); - final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2); - final ArrayList varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3); + final ArrayList varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber); + final ArrayList varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber); + final ArrayList varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3, subscriber); subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2); subs = ClassUtils.findCommon(subs, varargSuperSubscriptions3); @@ -198,10 +199,8 @@ public final class VarArgUtils { local.put(subs, messageClass1, messageClass2, messageClass3); } - final Subscription[] subscriptions = new Subscription[subs.size()]; - subs.toArray(subscriptions); - return subscriptions; + final Subscription[] returnedSubscriptions = new Subscription[subs.size()]; + subs.toArray(returnedSubscriptions); + return returnedSubscriptions; } - - } diff --git a/test/dorkbox/util/messagebus/MultiMessageTest.java b/test/dorkbox/util/messagebus/MultiMessageTest.java index efc0037..76c8663 100644 --- a/test/dorkbox/util/messagebus/MultiMessageTest.java +++ b/test/dorkbox/util/messagebus/MultiMessageTest.java @@ -18,11 +18,11 @@ public class MultiMessageTest extends MessageBusTest { private static AtomicInteger count = new AtomicInteger(0); @Test - public void testMultiMessageSending(){ + public void testMultiMessageSending() { IMessageBus bus = new MessageBus(); bus.start(); - Listener listener1 = new Listener(); + MultiListener listener1 = new MultiListener(); bus.subscribe(listener1); bus.unsubscribe(listener1); @@ -68,8 +68,60 @@ public class MultiMessageTest extends MessageBusTest { bus.shutdown(); } + @Test + public void testFirstArgMultiMessageSending() { + IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypes, IMessageBus.SubscribeMode.FirstArg, + Runtime.getRuntime().availableProcessors() / 2); + bus.start(); + + FirstListener listener = new FirstListener(); + bus.subscribe(listener); + bus.unsubscribe(listener); + + bus.publish("s"); + bus.publish("s", "s"); + bus.publish("s", "s", "s"); + bus.publish(1, "s"); + bus.publish(1, 2, "s"); + bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); + + assertEquals(0, count.get()); + + bus.subscribe(listener); + + bus.publish("s"); // 4 + bus.publish("s", "s"); // 3 + bus.publish("s", "s", "s"); // 3 + bus.publish(1, "s"); // 1 + bus.publish(1, 2, "s"); // 2 + bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); // 2 + + assertEquals(15, count.get()); + count.set(0); + + + bus.publishAsync("s"); + bus.publishAsync("s", "s"); + bus.publishAsync("s", "s", "s"); + bus.publish(1, "s"); + bus.publishAsync(1, 2, "s"); + bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6}); + + while (bus.hasPendingMessages()) { + try { + Thread.sleep(ConcurrentUnits); + } catch (InterruptedException e) { + } + } + + assertEquals(13, count.get()); + + + bus.shutdown(); + } + @SuppressWarnings("unused") - public static class Listener { + public static class MultiListener { @Handler public void handleSync(Object o) { count.getAndIncrement(); @@ -100,7 +152,7 @@ public class MultiMessageTest extends MessageBusTest { System.err.println("match Integer, Integer, String"); } - @Handler(acceptVarargs=true) + @Handler(acceptVarargs = true) public void handleSync(String... o) { count.getAndIncrement(); System.err.println("match String[]"); @@ -112,10 +164,59 @@ public class MultiMessageTest extends MessageBusTest { System.err.println("match Integer[]"); } - @Handler(acceptVarargs=true) + @Handler(acceptVarargs = true) public void handleSync(Object... o) { count.getAndIncrement(); System.err.println("match Object[]"); } } + public static class FirstListener { + @Handler + public void handleSync(Object o) { + count.getAndIncrement(); + System.err.println("match Object"); + } + + @Handler + public void handleSync(String o1) { + count.getAndIncrement(); + System.err.println("match String"); + } + + @Handler + public void handleSync(String o1, String o2) { + count.getAndIncrement(); + System.err.println("match String, String"); + } + +// @Handler +// public void handleSync(String o1, String o2, String o3) { +// count.getAndIncrement(); +// System.err.println("match String, String, String"); +// } +// +// @Handler +// public void handleSync(Integer o1, Integer o2, String o3) { +// count.getAndIncrement(); +// System.err.println("match Integer, Integer, String"); +// } +// +// @Handler(acceptVarargs = true) +// public void handleSync(String... o) { +// count.getAndIncrement(); +// System.err.println("match String[]"); +// } +// +// @Handler +// public void handleSync(Integer... o) { +// count.getAndIncrement(); +// System.err.println("match Integer[]"); +// } +// +// @Handler(acceptVarargs = true) +// public void handleSync(Object... o) { +// count.getAndIncrement(); +// System.err.println("match Object[]"); +// } + } } diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java index 5dee2e8..ebb2f12 100644 --- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -1,10 +1,15 @@ package dorkbox.util.messagebus; import dorkbox.util.messagebus.common.*; +import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.error.DefaultErrorHandler; +import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.listeners.*; import dorkbox.util.messagebus.messages.*; +import dorkbox.util.messagebus.subscription.MultiArgSubscriber; +import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.SubscriptionManager; +import dorkbox.util.messagebus.utils.ClassUtils; import org.junit.Test; /** @@ -19,111 +24,128 @@ public class SubscriptionManagerTest extends AssertSupport { private static final int InstancesPerListener = 5000; - @Test public void testIMessageListener() { + @Test + public void testIMessageListener() { ListenerFactory listeners = listeners(IMessageListener.DefaultListener.class, IMessageListener.DisabledListener.class, IMessageListener.NoSubtypesListener.class); SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(IMessageListener.DefaultListener.class) - .handles(IMessage.class, AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class) - .listener(IMessageListener.NoSubtypesListener.class).handles(IMessage.class); + .handles(IMessage.class, AbstractMessage.class, + IMultipartMessage.class, + StandardMessage.class, + MessageTypes.class).listener( + IMessageListener.NoSubtypesListener.class).handles(IMessage.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testAbstractMessageListener() { + @Test + public void testAbstractMessageListener() { ListenerFactory listeners = listeners(AbstractMessageListener.DefaultListener.class, AbstractMessageListener.DisabledListener.class, AbstractMessageListener.NoSubtypesListener.class); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) - .listener(AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class) - .listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener( + AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class).listener( + AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testMessagesListener() { + @Test + public void testMessagesListener() { ListenerFactory listeners = listeners(MessagesListener.DefaultListener.class, MessagesListener.DisabledListener.class, MessagesListener.NoSubtypesListener.class); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) - .listener(MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class) - .listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener( + MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class).listener( + MessagesListener.DefaultListener.class).handles(MessageTypes.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testMultipartMessageListener() { + @Test + public void testMultipartMessageListener() { ListenerFactory listeners = listeners(MultipartMessageListener.DefaultListener.class, MultipartMessageListener.DisabledListener.class, MultipartMessageListener.NoSubtypesListener.class); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) - .listener(MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class) - .listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener( + MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class).listener( + MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testIMultipartMessageListener() { + @Test + public void testIMultipartMessageListener() { ListenerFactory listeners = listeners(IMultipartMessageListener.DefaultListener.class, IMultipartMessageListener.DisabledListener.class, IMultipartMessageListener.NoSubtypesListener.class); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) - .listener(IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class) - .listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener( + IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class).listener( + IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testStandardMessageListener() { + @Test + public void testStandardMessageListener() { ListenerFactory listeners = listeners(StandardMessageListener.DefaultListener.class, StandardMessageListener.DisabledListener.class, StandardMessageListener.NoSubtypesListener.class); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) - .listener(StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class) - .listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener( + StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class).listener( + StandardMessageListener.DefaultListener.class).handles(StandardMessage.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testICountableListener() { + @Test + public void testICountableListener() { ListenerFactory listeners = listeners(ICountableListener.DefaultListener.class, ICountableListener.DisabledListener.class, ICountableListener.NoSubtypesListener.class); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) - .listener(ICountableListener.DefaultListener.class).handles(ICountable.class) - .listener(ICountableListener.DefaultListener.class) - .handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener( + ICountableListener.DefaultListener.class).handles(ICountable.class).listener( + ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, + StandardMessage.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testMultipleMessageListeners() { + @Test + public void testMultipleMessageListeners() { ListenerFactory listeners = listeners(ICountableListener.DefaultListener.class, ICountableListener.DisabledListener.class, IMultipartMessageListener.DefaultListener.class, IMultipartMessageListener.DisabledListener.class, MessagesListener.DefaultListener.class, MessagesListener.DisabledListener.class); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) - .listener(ICountableListener.DefaultListener.class) - .handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class) - .listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class) - .listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener( + ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, + StandardMessage.class).listener( + IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class).listener( + MessagesListener.DefaultListener.class).handles(MessageTypes.class); runTestWith(listeners, expectedSubscriptions); } - @Test public void testOverloadedMessageHandlers() { + @Test + public void testOverloadedMessageHandlers() { ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class); - SubscriptionManager subscriptionManager = new SubscriptionManager(1, new DefaultErrorHandler(), true); + final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); + final StampedLock lock = new StampedLock(); + final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); + final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils); + + SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class) - .handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class).listener(Overloading.ListenerSub.class) - .handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class, Overloading.TestMessageB.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class).handles( + Overloading.TestMessageA.class, Overloading.TestMessageA.class).listener(Overloading.ListenerSub.class).handles( + Overloading.TestMessageA.class, Overloading.TestMessageA.class, Overloading.TestMessageB.class); runTestWith(listeners, expectedSubscriptions); } @@ -137,17 +159,22 @@ public class SubscriptionManagerTest extends AssertSupport { } private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) { - final SubscriptionManager subscriptionManager = new SubscriptionManager(1, new DefaultErrorHandler(), true); + final ErrorHandlingSupport errorHandler = new DefaultErrorHandler(); + final StampedLock lock = new StampedLock(); + final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); + final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils); + + final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); - validator.validate(subscriptionManager); + validator.validate(subscriber); ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1); listeners.clear(); - validator.validate(subscriptionManager); + validator.validate(subscriber); } } diff --git a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java index 9339cb3..a7fa8ae 100644 --- a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -1,7 +1,7 @@ package dorkbox.util.messagebus.common; +import dorkbox.util.messagebus.subscription.Subscriber; import dorkbox.util.messagebus.subscription.Subscription; -import dorkbox.util.messagebus.subscription.SubscriptionManager; import java.util.*; @@ -32,13 +32,13 @@ public class SubscriptionValidator extends AssertSupport { // match subscriptions with existing validation entries // for each tuple of subscriber and message type the specified number of listeners must exist - public void validate(SubscriptionManager manager) { + public void validate(Subscriber subscriber) { for (Class messageType : this.messageTypes) { Collection validationEntries = getEntries(messageType); // we split subs + superSubs into TWO calls. Collection collection = new ArrayDeque(8); - Subscription[] subscriptions = manager.getSubscriptionsExactAndSuper(messageType); + Subscription[] subscriptions = subscriber.getExactAndSuper(messageType); if (subscriptions != null) { collection.addAll(Arrays.asList(subscriptions)); }