Code polish and refactor to support multiple types of publishing and subscribing

This commit is contained in:
nathan 2015-06-20 01:05:01 +02:00
parent 2c2801e54f
commit a790368ec4
20 changed files with 1357 additions and 633 deletions

View File

@ -80,6 +80,17 @@ public interface IMessageBus extends PubSubSupport {
ExactWithSuperTypesAndVarArgs,
}
enum SubscribeMode {
/**
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
*/
MultiArg,
/**
* Will subscribe and publish using only the FIRST provided parameter in the method signature (for subscribe), and arguments (for publish)
*/
FirstArg,
}
/**
* Check whether any asynchronous message publications are pending to be processed

View File

@ -1,5 +1,6 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.common.simpleq.MessageType;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
@ -7,11 +8,9 @@ import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.publication.PublisherAll;
import dorkbox.util.messagebus.publication.PublisherExact;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.publication.*;
import dorkbox.util.messagebus.subscription.*;
import dorkbox.util.messagebus.utils.ClassUtils;
import org.jctools.util.Pow2;
import java.util.ArrayDeque;
@ -26,7 +25,10 @@ import java.util.Collection;
public class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler;
private final MpmcMultiTransferArrayQueue dispatchQueue;
private final ClassUtils classUtils;
private final SubscriptionManager subscriptionManager;
private final Collection<Thread> threads;
private final Publisher subscriptionPublisher;
@ -46,32 +48,63 @@ public class MessageBus implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MessageBus(int numberOfThreads) {
this(PublishMode.ExactWithSuperTypes, numberOfThreads);
this(PublishMode.ExactWithSuperTypes, SubscribeMode.MultiArg, numberOfThreads);
}
/**
* @param publishMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MessageBus(final PublishMode publishMode, int numberOfThreads) {
public MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) {
numberOfThreads = Pow2.roundToPowerOfTwo(getMinNumberOfThreads(numberOfThreads));
this.errorHandler = new DefaultErrorHandler();
this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler, true);
classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final StampedLock lock = new StampedLock();
boolean isMultiArg = subscribeMode == SubscribeMode.MultiArg;
final Subscriber subscriber;
if (isMultiArg) {
subscriber = new MultiArgSubscriber(errorHandler, classUtils);
}
else {
subscriber = new FirstArgSubscriber(errorHandler, classUtils);
}
switch (publishMode) {
case Exact:
subscriptionPublisher = new PublisherExact(errorHandler);
if (isMultiArg) {
subscriptionPublisher = new PublisherExact_MultiArg(errorHandler, subscriber, lock);
}
else {
subscriptionPublisher = new PublisherExact_FirstArg(errorHandler, subscriber, lock);
}
break;
case ExactWithSuperTypes:
subscriptionPublisher = new PublisherExactWithSuperTypes(errorHandler);
if (isMultiArg) {
subscriptionPublisher = new PublisherExactWithSuperTypes_MultiArg(errorHandler, subscriber, lock);
}
else {
subscriptionPublisher = new PublisherExactWithSuperTypes_FirstArg(errorHandler, subscriber, lock);
}
break;
case ExactWithSuperTypesAndVarArgs:
default:
subscriptionPublisher = new PublisherAll(errorHandler);
if (isMultiArg) {
subscriptionPublisher = new PublisherAll_MultiArg(errorHandler, subscriber, lock);
}
else {
throw new RuntimeException("Unable to run in expected configuration");
}
}
this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock);
this.threads = new ArrayDeque<Thread>(numberOfThreads);
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
@ -157,7 +190,6 @@ public class MessageBus implements IMessageBus {
@Override
public void subscribe(final Object listener) {
MessageBus.this.subscriptionManager.subscribe(listener);
MessageBus.this.subscriptionManager.subscribe(listener);
}
@Override
@ -167,22 +199,22 @@ public class MessageBus implements IMessageBus {
@Override
public void publish(final Object message) {
subscriptionPublisher.publish(subscriptionManager, message);
subscriptionPublisher.publish(message);
}
@Override
public void publish(final Object message1, final Object message2) {
subscriptionPublisher.publish(subscriptionManager, message1, message2);
subscriptionPublisher.publish(message1, message2);
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) {
subscriptionPublisher.publish(subscriptionManager, message1, message2, message3);
subscriptionPublisher.publish(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) {
subscriptionPublisher.publish(subscriptionManager, messages);
subscriptionPublisher.publish(messages);
}
@Override
@ -271,5 +303,6 @@ public class MessageBus implements IMessageBus {
t.interrupt();
}
this.subscriptionManager.shutdown();
this.classUtils.clear();
}
}

View File

@ -1,5 +1,7 @@
package dorkbox.util.messagebus.common;
import java.util.Arrays;
/**
* The dead message event is published whenever no message
* handlers could be found for a given message publication.
@ -25,7 +27,7 @@ public final class DeadMessage {
this.relatedMessages[1] = message2;
}
public DeadMessage(Object message1, Object message2, Object message3 ) {
public DeadMessage(Object message1, Object message2, Object message3) {
this.relatedMessages = new Object[3];
this.relatedMessages[0] = message1;
this.relatedMessages[1] = message2;
@ -33,7 +35,7 @@ public final class DeadMessage {
}
public DeadMessage(Object[] messages) {
this.relatedMessages = messages;
this.relatedMessages = Arrays.copyOf(messages, messages.length);
}
public Object[] getMessages() {

View File

@ -9,6 +9,7 @@ import java.util.concurrent.ConcurrentMap;
public abstract class JavaVersionAdapter {
public static final JavaVersionAdapter get;
static {
// get = new Java7Adapter();
@ -17,7 +18,6 @@ public abstract class JavaVersionAdapter {
}
public static JavaVersionAdapter get;
public abstract <K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize);

View File

@ -5,27 +5,41 @@ import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicBoolean;
public class PublisherAll implements Publisher {
public class PublisherAll_MultiArg implements Publisher {
private final ErrorHandlingSupport errorHandler;
public PublisherAll(final ErrorHandlingSupport errorHandler) {
private final Subscriber subscriber;
private final StampedLock lock;
private final AtomicBoolean varArgPossibility;
final VarArgUtils varArgUtils;
public PublisherAll_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
varArgPossibility = subscriber.getVarArgPossibility();
varArgUtils = subscriber.getVarArgUtils();
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) {
public void publish(final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final boolean isArray = messageClass.isArray();
final StampedLock lock = subscriptionManager.getLock();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
@ -41,11 +55,9 @@ public class PublisherAll implements Publisher {
// publish to var arg, only if not already an array (because that would be unnecessary)
if (subscriptionManager.canPublishVarArg() && !isArray) {
final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils();
long stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass); // CAN NOT RETURN NULL
if (varArgPossibility.get() && !isArray) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
Subscription sub;
@ -67,7 +79,8 @@ public class PublisherAll implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass); // CAN NOT RETURN NULL
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass,
subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
length = varArgSuperSubs.length;
@ -90,7 +103,10 @@ public class PublisherAll implements Publisher {
// only get here if there were no other subscriptions
// Dead Event must EXACTLY MATCH (no subclasses)
if (!hasSubs) {
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class);
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class);
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -108,15 +124,14 @@ public class PublisherAll implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) {
public void publish(final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final StampedLock lock = subscriptionManager.getLock();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass1,
messageClass2); // can return null
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
@ -132,14 +147,12 @@ public class PublisherAll implements Publisher {
}
// publish to var arg, only if not already an array AND we are all of the same type
if (subscriptionManager.canPublishVarArg() && !messageClass1.isArray() && !messageClass2.isArray()) {
final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils();
if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray()) {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null
lock.unlockRead(stamp);
final int length = varArgSubs.length;
@ -160,8 +173,8 @@ public class PublisherAll implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1,
messageClass2); // CAN NOT RETURN NULL
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2,
subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
@ -188,7 +201,10 @@ public class PublisherAll implements Publisher {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
@ -206,17 +222,15 @@ public class PublisherAll implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2,
final Object message3) {
public void publish(final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final StampedLock lock = subscriptionManager.getLock();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subs = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2,
messageClass3); // can return null
final Subscription[] subs = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
lock.unlockRead(stamp);
@ -233,15 +247,13 @@ public class PublisherAll implements Publisher {
}
// publish to var arg, only if not already an array AND we are all of the same type
if (subscriptionManager.canPublishVarArg() && !messageClass1.isArray() && !messageClass2.isArray() &&
if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray() &&
!messageClass3.isArray()) {
final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils();
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null
lock.unlockRead(stamp);
final int length = varArgSubs.length;
@ -264,8 +276,8 @@ public class PublisherAll implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2,
messageClass3); // CAN NOT RETURN NULL
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3,
subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
@ -293,7 +305,10 @@ public class PublisherAll implements Publisher {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
@ -311,7 +326,7 @@ public class PublisherAll implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) {
publish(subscriptionManager, (Object) messages);
public void publish(final Object[] messages) {
publish((Object) messages);
}
}

View File

@ -0,0 +1,194 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.Arrays;
public class PublisherExactWithSuperTypes_FirstArg implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public PublisherExactWithSuperTypes_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber,
final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
public void publish(final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == 1) {
sub.publish(message1);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1));
}
}
@Override
public void publish(final Object message1, final Object message2) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass, null); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
final Class<?>[] handledMessages = sub.getHandler().getHandledMessages();
sub.publish(message1, message2);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2));
}
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2, message3);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public void publish(final Object[] messages) {
try {
final Object message1 = messages[0];
final Class<?> messageClass = message1.getClass();
final Object[] newMessages = Arrays.copyOfRange(messages, 1, messages.length);
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, newMessages);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, newMessages);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(messages));
}
}
}

View File

@ -1,25 +1,34 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class PublisherExact implements Publisher {
public class PublisherExactWithSuperTypes_MultiArg implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public PublisherExact(final ErrorHandlingSupport errorHandler) {
public PublisherExactWithSuperTypes_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber,
final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) {
public void publish(final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -31,7 +40,10 @@ public class PublisherExact implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -49,12 +61,15 @@ public class PublisherExact implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) {
public void publish(final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass1, messageClass2); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -66,7 +81,10 @@ public class PublisherExact implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
@ -84,15 +102,17 @@ public class PublisherExact implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2,
final Object message3) {
public void publish(final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass1, messageClass2,
messageClass3); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -104,7 +124,10 @@ public class PublisherExact implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
@ -122,7 +145,7 @@ public class PublisherExact implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) {
publish(subscriptionManager, (Object) messages);
public void publish(final Object[] messages) {
publish((Object) messages);
}
}

View File

@ -0,0 +1,185 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.Arrays;
public class PublisherExact_FirstArg implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public PublisherExact_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
public void publish(final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1));
}
}
@Override
public void publish(final Object message1, final Object message2) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
lock.unlockRead(stamp);
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2));
}
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2, message3);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public void publish(final Object[] messages) {
try {
final Object message1 = messages[0];
final Class<?> messageClass = message1.getClass();
final Object[] newMessages = Arrays.copyOfRange(messages, 1, messages.length);
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, newMessages);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, newMessages);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(messages));
}
}
}

View File

@ -1,25 +1,33 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class PublisherExactWithSuperTypes implements Publisher {
public class PublisherExact_MultiArg implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler) {
public PublisherExact_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) {
public void publish(final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -31,7 +39,10 @@ public class PublisherExactWithSuperTypes implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -49,13 +60,15 @@ public class PublisherExactWithSuperTypes implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) {
public void publish(final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass1,
messageClass2); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -67,7 +80,10 @@ public class PublisherExactWithSuperTypes implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
@ -85,15 +101,16 @@ public class PublisherExactWithSuperTypes implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2,
final Object message3) {
public void publish(final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass1, messageClass2,
messageClass3); // can return null
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2, messageClass3); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -105,7 +122,10 @@ public class PublisherExactWithSuperTypes implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
@ -123,7 +143,7 @@ public class PublisherExactWithSuperTypes implements Publisher {
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) {
publish(subscriptionManager, (Object) messages);
public void publish(final Object[] messages) {
publish((Object) messages);
}
}

View File

@ -0,0 +1,185 @@
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions that only use the first parameters as the signature. The publisher MUST provide the correct additional parameters,
* and they must be of the correct type, otherwise it will throw an error.
*/
public class FirstArgSubscriber implements Subscriber {
private final ErrorHandlingSupport errorHandler;
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
// the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case)
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle_1;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle_2;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle_3;
public FirstArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) {
this.errorHandler = errorHandler;
// the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case)
this.subscriptionsPerMessageSingle_1 = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageSingle_2 = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageSingle_3 = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
private void registerFirst(final Subscription subscription, final Class<?> listenerClass,
final Map<Class<?>, ArrayList<Subscription>> subs_1, final Map<Class<?>, ArrayList<Subscription>> subs_2,
final Map<Class<?>, ArrayList<Subscription>> subs_3) {
final MessageHandler handler = subscription.getHandler();
final Class<?>[] messageHandlerTypes = handler.getHandledMessages();
final int size = messageHandlerTypes.length;
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 1: {
ArrayList<Subscription> subs = subs_1.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
subs_1.put(type0, subs);
}
subs.add(subscription);
return;
}
case 2: {
ArrayList<Subscription> subs = subs_2.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
subs_2.put(type0, subs);
}
subs.add(subscription);
return;
}
case 3: {
ArrayList<Subscription> subs = subs_3.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
subs_3.put(type0, subs);
}
subs.add(subscription);
return;
}
case 0:
default: {
errorHandler.handleError("Error while trying to subscribe class", listenerClass);
return;
}
}
}
@Override
public void register(final Class<?> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
final Map<Class<?>, ArrayList<Subscription>> sub_1 = this.subscriptionsPerMessageSingle_1;
final Map<Class<?>, ArrayList<Subscription>> sub_2 = this.subscriptionsPerMessageSingle_2;
final Map<Class<?>, ArrayList<Subscription>> sub_3 = this.subscriptionsPerMessageSingle_3;
Subscription subscription;
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
// now add this subscription to each of the handled types
// only register based on the FIRST parameter
registerFirst(subscription, listenerClass, sub_1, sub_2, sub_3);
}
}
@Override
public AtomicBoolean getVarArgPossibility() {
return null;
}
@Override
public VarArgUtils getVarArgUtils() {
return null;
}
@Override
public void shutdown() {
this.subscriptionsPerMessageSingle_1.clear();
this.subscriptionsPerMessageSingle_2.clear();
this.subscriptionsPerMessageSingle_3.clear();
}
@Override
public void clearConcurrentCollections() {
}
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass) {
return subscriptionsPerMessageSingle_1.get(messageClass);
}
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
return subscriptionsPerMessageSingle_2.get(messageClass1);
}
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
return subscriptionsPerMessageSingle_3.get(messageClass1);
}
@Override
public Subscription[] getExact(final Class<?> deadMessageClass) {
return new Subscription[0];
}
@Override
public Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2) {
return new Subscription[0];
}
@Override
public Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return new Subscription[0];
}
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass) {
return new Subscription[0];
}
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
return new Subscription[0];
}
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return new Subscription[0];
}
}

View File

@ -0,0 +1,312 @@
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
*/
public class MultiArgSubscriber implements Subscriber {
private final ErrorHandlingSupport errorHandler;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
public MultiArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) {
this.errorHandler = errorHandler;
this.subscriptionsPerMessageSingle = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
this.subUtils = new SubscriptionUtils(classUtils, Subscriber.LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, Subscriber.LOAD_FACTOR);
}
@Override
public AtomicBoolean getVarArgPossibility() {
return varArgPossibility;
}
@Override
public VarArgUtils getVarArgUtils() {
return varArgUtils;
}
@Override
public void clearConcurrentCollections() {
this.subUtils.clear();
this.varArgUtils.clear();
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
private void registerMulti(final Subscription subscription, final Class<?> listenerClass,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
final MessageHandler handler = subscription.getHandler();
final Class<?>[] messageHandlerTypes = handler.getHandledMessages();
final int size = messageHandlerTypes.length;
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
errorHandler.handleError("Error while trying to subscribe class", listenerClass);
return;
}
case 1: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
// is this handler able to accept var args?
if (handler.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
subsPerMessageSingle.put(type0, subs);
}
subs.add(subscription);
return;
}
case 2: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
}
subs.add(subscription);
return;
}
case 3: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
}
subs.add(subscription);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, messageHandlerTypes);
}
subs.add(subscription);
}
}
}
@Override
public void register(final Class<?> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
Subscription subscription;
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
// now add this subscription to each of the handled types
registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
}
@Override
public void shutdown() {
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
clearConcurrentCollections();
}
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass) {
return subscriptionsPerMessageSingle.get(messageClass);
}
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
}
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
}
// can return null
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass) {
ArrayList<Subscription> collection = getExactAsArray(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
@Override
public Subscription[] getExact(final Class<?> messageClass) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
@Override
public Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
@Override
public Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
}

View File

@ -1,11 +1,11 @@
package dorkbox.util.messagebus.subscription;
public interface Publisher {
void publish(SubscriptionManager subscriptionManager, Object message1);
void publish(Object message1);
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2);
void publish(Object message1, Object message2);
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2, Object message3);
void publish(Object message1, Object message2, Object message3);
void publish(SubscriptionManager subscriptionManager, Object[] messages);
void publish(Object[] messages);
}

View File

@ -0,0 +1,40 @@
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
public interface Subscriber {
float LOAD_FACTOR = 0.8F;
AtomicBoolean getVarArgPossibility();
VarArgUtils getVarArgUtils();
void register(Class<?> listenerClass, int handlersSize, Subscription[] subsPerListener);
void shutdown();
void clearConcurrentCollections();
ArrayList<Subscription> getExactAsArray(Class<?> superClass);
ArrayList<Subscription> getExactAsArray(Class<?> superClass1, Class<?> superClass2);
ArrayList<Subscription> getExactAsArray(Class<?> superClass1, Class<?> superClass2, Class<?> superClass3);
Subscription[] getExact(Class<?> deadMessageClass);
Subscription[] getExact(Class<?> messageClass1, Class<?> messageClass2);
Subscription[] getExact(Class<?> messageClass1, Class<?> messageClass2, Class<?> messageClass3);
Subscription[] getExactAndSuper(Class<?> messageClass);
Subscription[] getExactAndSuper(Class<?> messageClass1, Class<?> messageClass2);
Subscription[] getExactAndSuper(Class<?> messageClass1, Class<?> messageClass2, Class<?> messageClass3);
}

View File

@ -1,19 +1,14 @@
package dorkbox.util.messagebus.subscription;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -161,107 +156,4 @@ public final class Subscription {
Subscription other = (Subscription) obj;
return this.ID == other.ID;
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
public void registerMulti(final ErrorHandlingSupport errorHandler, final Class<?> listenerClass, final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
final Class<?>[] messageHandlerTypes = handlerMetadata.getHandledMessages();
final int size = messageHandlerTypes.length;
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
errorHandler.handleError("Error while trying to subscribe class", listenerClass);
return;
}
case 1: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
// is this handler able to accept var args?
if (handlerMetadata.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
subsPerMessageSingle.put(type0, subs);
}
subs.add(this);
return;
}
case 2: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
}
subs.add(this);
return;
}
case 3: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
}
subs.add(this);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, messageHandlerTypes);
}
subs.add(this);
}
}
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
public void registerFirst(final ErrorHandlingSupport errorHandler, final Class<?> listenerClass, final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final AtomicBoolean varArgPossibility) {
final Class<?>[] messageHandlerTypes = handlerMetadata.getHandledMessages();
final int size = messageHandlerTypes.length;
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
errorHandler.handleError("Error while trying to subscribe class", listenerClass);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
// is this handler able to accept var args?
if (handlerMetadata.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
subsPerMessageSingle.put(type0, subs);
}
subs.add(this);
return;
}
}
}
}

View File

@ -1,127 +1,54 @@
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known
* listener is subscribed and takes care of creating new set of subscriptions for any unknown class that defines
* message handlers.
* <p>
* <p>
* Subscribe/Unsubscribe, while it is possible for them to be 100% concurrent (in relation to listeners per subscription),
* getting an accurate reflection of the number of subscriptions, or guaranteeing a "HAPPENS-BEFORE" relationship really
* complicates this, so it has been modified for subscribe/unsubscibe to be mutually exclusive.
* <p>
* Given these restrictions and complexity, it is much easier to create a MPSC blocking queue, and have a single thread
* manage sub/unsub.
*
* @author dorkbox, llc
* Date: 2/2/15
*/
public final class SubscriptionManager {
private static final float LOAD_FACTOR = 0.8F;
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final Map<Class<?>, Subscription[]> subscriptionsPerListener;
private final ClassUtils classUtils;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
private final StampedLock lock = new StampedLock();
private final StampedLock lock;
private final int numberOfThreads;
private final Subscriber subscriber;
public SubscriptionManager(int numberOfThreads, final ErrorHandlingSupport errorHandler, boolean isMultiMode) {
public SubscriptionManager(final int numberOfThreads, final Subscriber subscriber, final StampedLock lock) {
this.numberOfThreads = numberOfThreads;
this.subscriber = subscriber;
this.lock = lock;
// modified ONLY during SUB/UNSUB
{
this.nonListeners = JavaVersionAdapter.get.concurrentMap(4, LOAD_FACTOR, numberOfThreads);
this.nonListeners = JavaVersionAdapter.get.concurrentMap(4, Subscriber.LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageSingle = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
// only used during SUB/UNSUB
this.subscriptionsPerListener = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
}
classUtils = new ClassUtils(LOAD_FACTOR);
this.subUtils = new SubscriptionUtils(classUtils, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti,
LOAD_FACTOR, numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads);
if (isMultiMode) {
subscriber = new Subscriber() {
@Override
public void register(final Class<?> listenerClass, final Subscription subscription,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
// now add this subscription to each of the handled types
subscription.registerMulti(errorHandler, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
};
}
else {
subscriber = new Subscriber() {
@Override
public void register(final Class<?> listenerClass, final Subscription subscription,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
// only register based on the FIRST parameter
subscription.registerFirst(errorHandler, listenerClass, subsPerMessageSingle, varArgPossibility);
}
};
}
// only used during SUB/UNSUB, in a rw lock
this.subscriptionsPerListener = JavaVersionAdapter.get.concurrentMap(32, Subscriber.LOAD_FACTOR, 1);
}
public void shutdown() {
this.nonListeners.clear();
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
subscriber.shutdown();
this.subscriptionsPerListener.clear();
clearConcurrentCollections();
this.classUtils.clear();
}
public void subscribe(final Object listener) {
@ -137,7 +64,7 @@ public final class SubscriptionManager {
}
// these are concurrent collections
clearConcurrentCollections();
subscriber.clearConcurrentCollections();
Subscription[] subscriptions = getListenerSubs(listenerClass);
@ -155,9 +82,6 @@ public final class SubscriptionManager {
return;
}
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final Subscription[] subsPerListener = new Subscription[handlersSize];
@ -169,14 +93,13 @@ public final class SubscriptionManager {
messageHandler = messageHandlers[i];
// create the subscription
subscription = new Subscription(messageHandler, LOAD_FACTOR, numberOfThreads);
subscription = new Subscription(messageHandler, Subscriber.LOAD_FACTOR, numberOfThreads);
subscription.subscribe(listener);
subsPerListener[i] = subscription; // activates this sub for sub/unsub
}
final Map<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
@ -188,12 +111,7 @@ public final class SubscriptionManager {
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
subscriber.register(listenerClass, subscription, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
subscriber.register(listenerClass, handlersSize, subsPerListener);
subsPerListenerMap.put(listenerClass, subsPerListener);
lock.unlockWrite(stamp);
@ -227,7 +145,7 @@ public final class SubscriptionManager {
}
// these are concurrent collections
clearConcurrentCollections();
subscriber.clearConcurrentCollections();
final Subscription[] subscriptions = getListenerSubs(listenerClass);
if (subscriptions != null) {
@ -240,11 +158,6 @@ public final class SubscriptionManager {
}
}
private void clearConcurrentCollections() {
this.subUtils.clear();
this.varArgUtils.clear();
}
private Subscription[] getListenerSubs(final Class<?> listenerClass) {
final StampedLock lock = this.lock;
@ -255,221 +168,4 @@ public final class SubscriptionManager {
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
// public because it is also used by unit tests
public Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, messageClass3);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2,
messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
messageClass3); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
public boolean canPublishVarArg() {
return varArgPossibility.get();
}
public VarArgUtils getVarArgUtils() {
return varArgUtils;
}
public StampedLock getLock() {
return lock;
}
interface Subscriber {
void register(final Class<?> listenerClass, Subscription subscription,
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, AtomicBoolean varArgPossibility);
}
// public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
// @Override
// public int compare(Subscription o1, Subscription o2) {
//// int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority());
//// return byPriority == 0 ? o2.id.compareTo(o1.id) : byPriority;
// if (o2.ID > o1.ID) {
// return 1;
// } else if (o2.ID < o1.ID) {
// return -1;
// } else {
// return 0;
// }
// }
// };
}

View File

@ -1,7 +1,8 @@
package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.adapter.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayList;
@ -16,24 +17,14 @@ public final class SubscriptionUtils {
private final Map<Class<?>, ArrayList<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, ArrayList<Subscription>> superClassSubscriptionsMulti;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
public SubscriptionUtils(final ClassUtils superClass, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti, final float loadFactor,
final int stripeSize) {
public SubscriptionUtils(final ClassUtils superClass, final float loadFactor) {
this.superClass = superClass;
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>();
this.superClassSubscriptions = JavaVersionAdapter.get.concurrentMap(8, loadFactor, 1);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
}
@ -45,14 +36,14 @@ public final class SubscriptionUtils {
/**
* Returns an array COPY of the super subscriptions for the specified type.
* <p>
* <p/>
* This ALSO checks to see if the superClass accepts subtypes.
* <p>
* <p/>
* protected by read lock by caller
*
* @return CAN NOT RETURN NULL
*/
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz) {
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz, final Subscriber subscriber) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
@ -60,7 +51,6 @@ public final class SubscriptionUtils {
if (subs == null) {
// types was not empty, so collect subscriptions for each type and collate them
final Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle;
// save the subscriptions
final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz); // never returns null, cached response
@ -75,7 +65,7 @@ public final class SubscriptionUtils {
for (int i = 0; i < length; i++) {
superClass = superClasses[i];
superSubs = local2.get(superClass);
superSubs = subscriber.getExactAsArray(superClass);
if (superSubs != null) {
superSubLength = superSubs.size();
@ -98,22 +88,21 @@ public final class SubscriptionUtils {
/**
* Returns an array COPY of the super subscriptions for the specified type.
* <p>
* <p/>
* This ALSO checks to see if the superClass accepts subtypes.
* <p>
* <p/>
* protected by read lock by caller
*
* @return CAN NOT RETURN NULL
*/
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2) {
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final Subscriber subscriber) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
final HashMapTree<Class<?>, ArrayList<Subscription>> cached = this.superClassSubscriptionsMulti;
ArrayList<Subscription> subs = local.get(clazz1, clazz2);
ArrayList<Subscription> subs = cached.get(clazz1, clazz2);
if (subs == null) {
// types was not empty, so collect subscriptions for each type and collate them
final HashMapTree<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageMulti;
// save the subscriptions
final Class<?>[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response
@ -145,7 +134,7 @@ public final class SubscriptionUtils {
continue;
}
superSubs = local2.get(superClass1, superClass2);
superSubs = subscriber.getExactAsArray(superClass1, superClass2);
if (superSubs != null) {
for (int k = 0; k < superSubs.size(); k++) {
sub = superSubs.get(k);
@ -158,7 +147,7 @@ public final class SubscriptionUtils {
}
}
subs.trimToSize();
local.put(subs, clazz1, clazz2);
cached.put(subs, clazz1, clazz2);
}
return subs;
@ -166,14 +155,15 @@ public final class SubscriptionUtils {
/**
* Returns an array COPY of the super subscriptions for the specified type.
* <p>
* <p/>
* This ALSO checks to see if the superClass accepts subtypes.
* <p>
* <p/>
* protected by read lock by caller
*
* @return CAN NOT RETURN NULL
*/
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final Class<?> clazz3) {
public ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz1, final Class<?> clazz2, final Class<?> clazz3,
final Subscriber subscriber) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptionsMulti;
@ -181,7 +171,6 @@ public final class SubscriptionUtils {
if (subs == null) {
// types was not empty, so collect subscriptions for each type and collate them
final HashMapTree<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageMulti;
// save the subscriptions
final Class<?>[] superClasses1 = this.superClass.getSuperClasses(clazz1); // never returns null, cached response
@ -224,7 +213,7 @@ public final class SubscriptionUtils {
continue;
}
superSubs = local2.get(superClass1, superClass2);
superSubs = subscriber.getExactAsArray(superClass1, superClass2, superClass3);
if (superSubs != null) {
for (int m = 0; m < superSubs.size(); m++) {
sub = superSubs.get(m);

View File

@ -1,8 +1,9 @@
package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayList;
@ -16,19 +17,16 @@ public final class VarArgUtils {
private final HashMapTree<Class<?>, ArrayList<Subscription>> varArgSuperSubscriptionsMulti;
private final ClassUtils superClassUtils;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
public VarArgUtils(final ClassUtils superClassUtils, final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
final float loadFactor, final int stripeSize) {
public VarArgUtils(final ClassUtils superClassUtils, final float loadFactor) {
this.superClassUtils = superClassUtils;
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.varArgSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, stripeSize);
this.varArgSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, 1);
this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, stripeSize);
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, 1);
this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
}
@ -45,7 +43,7 @@ public final class VarArgUtils {
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Subscription[] getVarArgSubscriptions(final Class<?> messageClass) {
public Subscription[] getVarArgSubscriptions(final Class<?> messageClass, final Subscriber subscriber) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptionsSingle;
@ -55,7 +53,7 @@ public final class VarArgUtils {
// this gets (and caches) our array type. This is never cleared.
final Class<?> arrayVersion = this.superClassUtils.getArrayClass(messageClass);
ArrayList<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
final ArrayList<Subscription> subs = subscriber.getExactAsArray(arrayVersion);
if (subs != null) {
final int length = subs.size();
varArgSubs = new ArrayList<Subscription>(length);
@ -70,7 +68,8 @@ public final class VarArgUtils {
}
local.put(messageClass, varArgSubs);
} else {
}
else {
varArgSubs = new ArrayList<Subscription>(0);
}
}
@ -85,16 +84,16 @@ public final class VarArgUtils {
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass) {
final ArrayList<Subscription> subs = getVarArgSuperSubscriptions_List(messageClass);
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass, final Subscriber subscriber) {
final ArrayList<Subscription> subs = getVarArgSuperSubscriptions_List(messageClass, subscriber);
final Subscription[] subscriptions = new Subscription[subs.size()];
subs.toArray(subscriptions);
return subscriptions;
final Subscription[] returnedSubscriptions = new Subscription[subs.size()];
subs.toArray(returnedSubscriptions);
return returnedSubscriptions;
}
// CAN NOT RETURN NULL
private ArrayList<Subscription> getVarArgSuperSubscriptions_List(final Class<?> messageClass) {
private ArrayList<Subscription> getVarArgSuperSubscriptions_List(final Class<?> messageClass, final Subscriber subscriber) {
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsSingle;
@ -122,7 +121,7 @@ public final class VarArgUtils {
for (int i = 0; i < typesLength; i++) {
type = types[i];
subs = this.subscriptionsPerMessageSingle.get(type);
subs = subscriber.getExactAsArray(type);
if (subs != null) {
length = subs.size();
@ -136,7 +135,9 @@ public final class VarArgUtils {
varArgSuperSubs.add(sub);
}
}
}
else {
varArgSuperSubs = new ArrayList<Subscription>(0);
}
}
@ -150,7 +151,7 @@ public final class VarArgUtils {
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2) {
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2, final Subscriber subscriber) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsMulti;
@ -159,8 +160,8 @@ public final class VarArgUtils {
if (subs == null) {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2);
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber);
subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2);
@ -168,9 +169,9 @@ public final class VarArgUtils {
local.put(subs, messageClass1, messageClass2);
}
final Subscription[] subscriptions = new Subscription[subs.size()];
subs.toArray(subscriptions);
return subscriptions;
final Subscription[] returnedSubscriptions = new Subscription[subs.size()];
subs.toArray(returnedSubscriptions);
return returnedSubscriptions;
}
@ -178,7 +179,7 @@ public final class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
final Class<?> messageClass3, final Subscriber subscriber) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperSubscriptionsMulti;
@ -187,9 +188,9 @@ public final class VarArgUtils {
if (subs == null) {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2);
final ArrayList<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3);
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2, subscriber);
final ArrayList<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3, subscriber);
subs = ClassUtils.findCommon(varargSuperSubscriptions1, varargSuperSubscriptions2);
subs = ClassUtils.findCommon(subs, varargSuperSubscriptions3);
@ -198,10 +199,8 @@ public final class VarArgUtils {
local.put(subs, messageClass1, messageClass2, messageClass3);
}
final Subscription[] subscriptions = new Subscription[subs.size()];
subs.toArray(subscriptions);
return subscriptions;
final Subscription[] returnedSubscriptions = new Subscription[subs.size()];
subs.toArray(returnedSubscriptions);
return returnedSubscriptions;
}
}

View File

@ -18,11 +18,11 @@ public class MultiMessageTest extends MessageBusTest {
private static AtomicInteger count = new AtomicInteger(0);
@Test
public void testMultiMessageSending(){
public void testMultiMessageSending() {
IMessageBus bus = new MessageBus();
bus.start();
Listener listener1 = new Listener();
MultiListener listener1 = new MultiListener();
bus.subscribe(listener1);
bus.unsubscribe(listener1);
@ -68,8 +68,60 @@ public class MultiMessageTest extends MessageBusTest {
bus.shutdown();
}
@Test
public void testFirstArgMultiMessageSending() {
IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypes, IMessageBus.SubscribeMode.FirstArg,
Runtime.getRuntime().availableProcessors() / 2);
bus.start();
FirstListener listener = new FirstListener();
bus.subscribe(listener);
bus.unsubscribe(listener);
bus.publish("s");
bus.publish("s", "s");
bus.publish("s", "s", "s");
bus.publish(1, "s");
bus.publish(1, 2, "s");
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(0, count.get());
bus.subscribe(listener);
bus.publish("s"); // 4
bus.publish("s", "s"); // 3
bus.publish("s", "s", "s"); // 3
bus.publish(1, "s"); // 1
bus.publish(1, 2, "s"); // 2
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); // 2
assertEquals(15, count.get());
count.set(0);
bus.publishAsync("s");
bus.publishAsync("s", "s");
bus.publishAsync("s", "s", "s");
bus.publish(1, "s");
bus.publishAsync(1, 2, "s");
bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6});
while (bus.hasPendingMessages()) {
try {
Thread.sleep(ConcurrentUnits);
} catch (InterruptedException e) {
}
}
assertEquals(13, count.get());
bus.shutdown();
}
@SuppressWarnings("unused")
public static class Listener {
public static class MultiListener {
@Handler
public void handleSync(Object o) {
count.getAndIncrement();
@ -100,7 +152,7 @@ public class MultiMessageTest extends MessageBusTest {
System.err.println("match Integer, Integer, String");
}
@Handler(acceptVarargs=true)
@Handler(acceptVarargs = true)
public void handleSync(String... o) {
count.getAndIncrement();
System.err.println("match String[]");
@ -112,10 +164,59 @@ public class MultiMessageTest extends MessageBusTest {
System.err.println("match Integer[]");
}
@Handler(acceptVarargs=true)
@Handler(acceptVarargs = true)
public void handleSync(Object... o) {
count.getAndIncrement();
System.err.println("match Object[]");
}
}
public static class FirstListener {
@Handler
public void handleSync(Object o) {
count.getAndIncrement();
System.err.println("match Object");
}
@Handler
public void handleSync(String o1) {
count.getAndIncrement();
System.err.println("match String");
}
@Handler
public void handleSync(String o1, String o2) {
count.getAndIncrement();
System.err.println("match String, String");
}
// @Handler
// public void handleSync(String o1, String o2, String o3) {
// count.getAndIncrement();
// System.err.println("match String, String, String");
// }
//
// @Handler
// public void handleSync(Integer o1, Integer o2, String o3) {
// count.getAndIncrement();
// System.err.println("match Integer, Integer, String");
// }
//
// @Handler(acceptVarargs = true)
// public void handleSync(String... o) {
// count.getAndIncrement();
// System.err.println("match String[]");
// }
//
// @Handler
// public void handleSync(Integer... o) {
// count.getAndIncrement();
// System.err.println("match Integer[]");
// }
//
// @Handler(acceptVarargs = true)
// public void handleSync(Object... o) {
// count.getAndIncrement();
// System.err.println("match Object[]");
// }
}
}

View File

@ -1,10 +1,15 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.listeners.*;
import dorkbox.util.messagebus.messages.*;
import dorkbox.util.messagebus.subscription.MultiArgSubscriber;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.ClassUtils;
import org.junit.Test;
/**
@ -19,111 +24,128 @@ public class SubscriptionManagerTest extends AssertSupport {
private static final int InstancesPerListener = 5000;
@Test public void testIMessageListener() {
@Test
public void testIMessageListener() {
ListenerFactory listeners = listeners(IMessageListener.DefaultListener.class, IMessageListener.DisabledListener.class,
IMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(IMessageListener.DefaultListener.class)
.handles(IMessage.class, AbstractMessage.class, IMultipartMessage.class, StandardMessage.class, MessageTypes.class)
.listener(IMessageListener.NoSubtypesListener.class).handles(IMessage.class);
.handles(IMessage.class, AbstractMessage.class,
IMultipartMessage.class,
StandardMessage.class,
MessageTypes.class).listener(
IMessageListener.NoSubtypesListener.class).handles(IMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testAbstractMessageListener() {
@Test
public void testAbstractMessageListener() {
ListenerFactory listeners = listeners(AbstractMessageListener.DefaultListener.class, AbstractMessageListener.DisabledListener.class,
AbstractMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class)
.listener(AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(
AbstractMessageListener.NoSubtypesListener.class).handles(AbstractMessage.class).listener(
AbstractMessageListener.DefaultListener.class).handles(StandardMessage.class, AbstractMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testMessagesListener() {
@Test
public void testMessagesListener() {
ListenerFactory listeners = listeners(MessagesListener.DefaultListener.class, MessagesListener.DisabledListener.class,
MessagesListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(
MessagesListener.NoSubtypesListener.class).handles(MessageTypes.class).listener(
MessagesListener.DefaultListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testMultipartMessageListener() {
@Test
public void testMultipartMessageListener() {
ListenerFactory listeners = listeners(MultipartMessageListener.DefaultListener.class,
MultipartMessageListener.DisabledListener.class,
MultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class)
.listener(MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(
MultipartMessageListener.NoSubtypesListener.class).handles(MultipartMessage.class).listener(
MultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testIMultipartMessageListener() {
@Test
public void testIMultipartMessageListener() {
ListenerFactory listeners = listeners(IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.DisabledListener.class,
IMultipartMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(
IMultipartMessageListener.NoSubtypesListener.class).handles(IMultipartMessage.class).listener(
IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testStandardMessageListener() {
@Test
public void testStandardMessageListener() {
ListenerFactory listeners = listeners(StandardMessageListener.DefaultListener.class, StandardMessageListener.DisabledListener.class,
StandardMessageListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class)
.listener(StandardMessageListener.DefaultListener.class).handles(StandardMessage.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(
StandardMessageListener.NoSubtypesListener.class).handles(StandardMessage.class).listener(
StandardMessageListener.DefaultListener.class).handles(StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testICountableListener() {
@Test
public void testICountableListener() {
ListenerFactory listeners = listeners(ICountableListener.DefaultListener.class, ICountableListener.DisabledListener.class,
ICountableListener.NoSubtypesListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class).handles(ICountable.class)
.listener(ICountableListener.DefaultListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(
ICountableListener.DefaultListener.class).handles(ICountable.class).listener(
ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class,
StandardMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testMultipleMessageListeners() {
@Test
public void testMultipleMessageListeners() {
ListenerFactory listeners = listeners(ICountableListener.DefaultListener.class, ICountableListener.DisabledListener.class,
IMultipartMessageListener.DefaultListener.class,
IMultipartMessageListener.DisabledListener.class, MessagesListener.DefaultListener.class,
MessagesListener.DisabledListener.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
.listener(ICountableListener.DefaultListener.class)
.handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class, StandardMessage.class)
.listener(IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class)
.listener(MessagesListener.DefaultListener.class).handles(MessageTypes.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(
ICountableListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class, ICountable.class,
StandardMessage.class).listener(
IMultipartMessageListener.DefaultListener.class).handles(MultipartMessage.class, IMultipartMessage.class).listener(
MessagesListener.DefaultListener.class).handles(MessageTypes.class);
runTestWith(listeners, expectedSubscriptions);
}
@Test public void testOverloadedMessageHandlers() {
@Test
public void testOverloadedMessageHandlers() {
ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(1, new DefaultErrorHandler(), true);
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final StampedLock lock = new StampedLock();
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils);
SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class)
.handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class).listener(Overloading.ListenerSub.class)
.handles(Overloading.TestMessageA.class, Overloading.TestMessageA.class, Overloading.TestMessageB.class);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class).handles(
Overloading.TestMessageA.class, Overloading.TestMessageA.class).listener(Overloading.ListenerSub.class).handles(
Overloading.TestMessageA.class, Overloading.TestMessageA.class, Overloading.TestMessageB.class);
runTestWith(listeners, expectedSubscriptions);
}
@ -137,17 +159,22 @@ public class SubscriptionManagerTest extends AssertSupport {
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) {
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, new DefaultErrorHandler(), true);
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final StampedLock lock = new StampedLock();
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
validator.validate(subscriptionManager);
validator.validate(subscriber);
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1);
listeners.clear();
validator.validate(subscriptionManager);
validator.validate(subscriber);
}
}

View File

@ -1,7 +1,7 @@
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.*;
@ -32,13 +32,13 @@ public class SubscriptionValidator extends AssertSupport {
// match subscriptions with existing validation entries
// for each tuple of subscriber and message type the specified number of listeners must exist
public void validate(SubscriptionManager manager) {
public void validate(Subscriber subscriber) {
for (Class messageType : this.messageTypes) {
Collection<ValidationEntry> validationEntries = getEntries(messageType);
// we split subs + superSubs into TWO calls.
Collection<Subscription> collection = new ArrayDeque<Subscription>(8);
Subscription[] subscriptions = manager.getSubscriptionsExactAndSuper(messageType);
Subscription[] subscriptions = subscriber.getExactAndSuper(messageType);
if (subscriptions != null) {
collection.addAll(Arrays.asList(subscriptions));
}