Fixed issues with varity argumuent subscriptions, cleaned up model, removed "FirstArg" publisher (didn't make any sense. Publication should always use all arguments)

This commit is contained in:
nathan 2016-01-14 15:08:49 +01:00
parent be960b6913
commit 0ed9b1b243
13 changed files with 375 additions and 539 deletions

View File

@ -112,23 +112,11 @@ public interface IMessageBus extends PubSubSupport {
/**
* Will publish to listeners with this exact message signature, as well as listeners that match the super class types signatures.
* and to listeners that have matching varity arguments. (ie: a listener that matches Obeject[], will accept messages of type Object)
* and to listeners that have matching varity arguments. (ie: a listener that matches Object[], will accept messages of type Object)
*/
ExactWithSuperTypesAndVarArgs,
ExactWithSuperTypesAndVarity,
}
enum SubscribeMode {
/**
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
*/
MultiArg,
/**
* Will subscribe and publish using only the FIRST provided parameter in the method signature (for subscribe), and arguments (for publish)
*/
FirstArg,
}
/**
* Check whether any asynchronous message publications are pending to be processed
*

View File

@ -30,13 +30,9 @@ import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.publication.PublisherAll_MultiArg;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_FirstArg;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes_MultiArg;
import dorkbox.util.messagebus.publication.PublisherExact_FirstArg;
import dorkbox.util.messagebus.publication.PublisherExact_MultiArg;
import dorkbox.util.messagebus.subscription.FirstArgSubscriber;
import dorkbox.util.messagebus.subscription.MultiArgSubscriber;
import dorkbox.util.messagebus.publication.PublisherExact;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.ClassUtils;
@ -62,7 +58,7 @@ class MessageBus implements IMessageBus {
private final ClassUtils classUtils;
private final SubscriptionManager subscriptionManager;
private final Publisher subscriptionPublisher;
private final Publisher publisher;
/**
* Notifies the consumers during shutdown, that it's on purpose.
@ -74,30 +70,40 @@ class MessageBus implements IMessageBus {
private Sequence workSequence;
/**
* By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages
* By default, will permit subTypes and Varity Argument matching, and will use half of CPUs available for dispatching async messages
*/
public
MessageBus() {
this(Runtime.getRuntime().availableProcessors());
this(Runtime.getRuntime().availableProcessors()/2);
}
/**
* @param numberOfThreads how many threads to have for dispatching async messages
* By default, will permit subTypes and Varity Argument matching
*
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBus(int numberOfThreads) {
this(PublishMode.ExactWithSuperTypes, SubscribeMode.FirstArg, numberOfThreads);
// this(PublishMode.ExactWithSuperTypes, SubscribeMode.MultiArg, numberOfThreads);
this(PublishMode.ExactWithSuperTypesAndVarity, numberOfThreads);
}
/**
* By default, will use half of CPUs available for dispatching async messages
*
* @param publishMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public
MessageBus(final PublishMode publishMode, final SubscribeMode subscribeMode, int numberOfThreads) {
MessageBus(final PublishMode publishMode) {
this(publishMode, Runtime.getRuntime().availableProcessors()/2);
}
/**
* @param publishMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBus(final PublishMode publishMode, int numberOfThreads) {
// round to the nearest power of 2
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1));
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads)));
this.errorHandler = new DefaultErrorHandler();
// this.dispatchQueue = new ArrayBlockingQueue<Object>(6);
@ -106,43 +112,25 @@ class MessageBus implements IMessageBus {
final StampedLock lock = new StampedLock();
boolean isMultiArg = subscribeMode == SubscribeMode.MultiArg;
final Subscriber subscriber;
if (isMultiArg) {
subscriber = new MultiArgSubscriber(errorHandler, classUtils);
}
else {
subscriber = new FirstArgSubscriber(errorHandler, classUtils);
}
/**
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
*/
subscriber = new Subscriber(errorHandler, classUtils);
switch (publishMode) {
case Exact:
if (isMultiArg) {
subscriptionPublisher = new PublisherExact_MultiArg(errorHandler, subscriber, lock);
}
else {
subscriptionPublisher = new PublisherExact_FirstArg(errorHandler, subscriber, lock);
}
publisher = new PublisherExact(errorHandler, subscriber, lock);
break;
case ExactWithSuperTypes:
if (isMultiArg) {
subscriptionPublisher = new PublisherExactWithSuperTypes_MultiArg(errorHandler, subscriber, lock);
}
else {
subscriptionPublisher = new PublisherExactWithSuperTypes_FirstArg(errorHandler, subscriber, lock);
}
publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber, lock);
break;
case ExactWithSuperTypesAndVarArgs:
case ExactWithSuperTypesAndVarity:
default:
if (isMultiArg) {
subscriptionPublisher = new PublisherAll_MultiArg(errorHandler, subscriber, lock);
}
else {
throw new RuntimeException("Unable to run in expected configuration");
}
publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber, lock);
}
this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock);
@ -161,7 +149,7 @@ class MessageBus implements IMessageBus {
// setup the work handlers
handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(subscriptionPublisher); // exactly one per thread is used
handlers[i] = new MessageHandler(publisher); // exactly one per thread is used
}
@ -336,25 +324,25 @@ class MessageBus implements IMessageBus {
@Override
public
void publish(final Object message) {
subscriptionPublisher.publish(message);
publisher.publish(message);
}
@Override
public
void publish(final Object message1, final Object message2) {
subscriptionPublisher.publish(message1, message2);
publisher.publish(message1, message2);
}
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
subscriptionPublisher.publish(message1, message2, message3);
publisher.publish(message1, message2, message3);
}
@Override
public
void publish(final Object[] messages) {
subscriptionPublisher.publish(messages);
publisher.publish(messages);
}
@Override

View File

@ -67,10 +67,10 @@ import java.util.Arrays;
*/
public
class MessageHandler {
// publish all listeners defined by the given class (includes
// listeners defined in super classes)
public static
//cache this?
MessageHandler[] get(final Class<?> target) {
// publish all handlers (this will include all (inherited) methods directly annotated using @Handler)

View File

@ -24,13 +24,13 @@ import dorkbox.util.messagebus.subscription.Subscription;
@SuppressWarnings("Duplicates")
public
class PublisherExact_MultiArg implements Publisher {
class PublisherExact implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public
PublisherExact_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;

View File

@ -24,13 +24,13 @@ import dorkbox.util.messagebus.subscription.Subscription;
@SuppressWarnings("Duplicates")
public
class PublisherExactWithSuperTypes_MultiArg implements Publisher {
class PublisherExactWithSuperTypes implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public
PublisherExactWithSuperTypes_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;

View File

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("Duplicates")
public
class PublisherAll_MultiArg implements Publisher {
class PublisherExactWithSuperTypesAndVarity implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
@ -38,7 +38,7 @@ class PublisherAll_MultiArg implements Publisher {
final VarArgUtils varArgUtils;
public
PublisherAll_MultiArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
@ -82,7 +82,7 @@ class PublisherAll_MultiArg implements Publisher {
int length = varArgSubs.length;
Object[] asArray = null;
if (length > 1) {
if (length > 0) {
hasSubs = true;
asArray = (Object[]) Array.newInstance(messageClass, 1);
@ -103,7 +103,7 @@ class PublisherAll_MultiArg implements Publisher {
length = varArgSuperSubs.length;
if (length > 1) {
if (length > 0) {
hasSubs = true;
if (asArray == null) {

View File

@ -1,340 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
*/
public
class MultiArgSubscriber implements Subscriber {
private final ErrorHandlingSupport errorHandler;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
public
MultiArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) {
this.errorHandler = errorHandler;
this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
this.subUtils = new SubscriptionUtils(classUtils, Subscriber.LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, Subscriber.LOAD_FACTOR);
}
@Override
public
AtomicBoolean getVarArgPossibility() {
return varArgPossibility;
}
@Override
public
VarArgUtils getVarArgUtils() {
return varArgUtils;
}
@Override
public
void clear() {
this.subUtils.clear();
this.varArgUtils.clear();
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
private
void registerMulti(final Subscription subscription, final Class<?> listenerClass,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, final AtomicBoolean varArgPossibility) {
final MessageHandler handler = subscription.getHandler();
final Class<?>[] messageHandlerTypes = handler.getHandledMessages();
final int size = messageHandlerTypes.length;
final Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
errorHandler.handleError("Error while trying to subscribe class", listenerClass);
return;
}
case 1: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
// is this handler able to accept var args?
if (handler.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
subsPerMessageSingle.put(type0, subs);
}
subs.add(subscription);
return;
}
case 2: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
}
subs.add(subscription);
return;
}
case 3: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
}
subs.add(subscription);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, messageHandlerTypes);
}
subs.add(subscription);
}
}
}
@Override
public
void register(final Class<?> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
Subscription subscription;
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
// now add this subscription to each of the handled types
registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
}
@Override
public
void shutdown() {
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
clear();
}
@Override
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass) {
return subscriptionsPerMessageSingle.get(messageClass);
}
@Override
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
}
@Override
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
}
// can return null
@Override
public
Subscription[] getExact(final Class<?> messageClass) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
@Override
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
@Override
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
@Override
public
Subscription[] getExactAndSuper(final Class<?> messageClass) {
ArrayList<Subscription> collection = getExactAsArray(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
@Override
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
@Override
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
}

View File

@ -15,42 +15,325 @@
*/
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
*/
public
interface Subscriber {
float LOAD_FACTOR = 0.8F;
class Subscriber {
public static final float LOAD_FACTOR = 0.8F;
AtomicBoolean getVarArgPossibility();
private final ErrorHandlingSupport errorHandler;
VarArgUtils getVarArgUtils();
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
void register(Class<?> listenerClass, int handlersSize, Subscription[] subsPerListener);
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
void shutdown();
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
void clear();
ArrayList<Subscription> getExactAsArray(Class<?> superClass);
ArrayList<Subscription> getExactAsArray(Class<?> superClass1, Class<?> superClass2);
ArrayList<Subscription> getExactAsArray(Class<?> superClass1, Class<?> superClass2, Class<?> superClass3);
private ThreadLocal<ArrayList<Subscription>> listCache = new ThreadLocal<ArrayList<Subscription>>() {
@Override
protected
ArrayList<Subscription> initialValue() {
return new ArrayList<Subscription>(8);
}
};
Subscription[] getExact(Class<?> deadMessageClass);
Subscription[] getExact(Class<?> messageClass1, Class<?> messageClass2);
public
Subscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) {
this.errorHandler = errorHandler;
Subscription[] getExact(Class<?> messageClass1, Class<?> messageClass2, Class<?> messageClass3);
this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR);
Subscription[] getExactAndSuper(Class<?> messageClass);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, LOAD_FACTOR);
}
Subscription[] getExactAndSuper(Class<?> messageClass1, Class<?> messageClass2);
public
AtomicBoolean getVarArgPossibility() {
return varArgPossibility;
}
Subscription[] getExactAndSuper(Class<?> messageClass1, Class<?> messageClass2, Class<?> messageClass3);
public
VarArgUtils getVarArgUtils() {
return varArgUtils;
}
public
void clear() {
this.subUtils.clear();
this.varArgUtils.clear();
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
private
void registerMulti(final Subscription subscription, final Class<?> listenerClass,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, final AtomicBoolean varArgPossibility) {
final MessageHandler handler = subscription.getHandler();
final Class<?>[] messageHandlerTypes = handler.getHandledMessages();
final int size = messageHandlerTypes.length;
final Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
// TODO: maybe this SHOULD be permitted? so if a publisher publishes VOID, it call's a method?
errorHandler.handleError("Error while trying to subscribe class with zero arguments", listenerClass);
return;
}
case 1: {
// using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
final ArrayList<Subscription> cachedSubs = listCache.get();
ArrayList<Subscription> subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs);
if (subs == null) {
listCache.set(new ArrayList<Subscription>(8));
subs = cachedSubs;
// is this handler able to accept var args?
if (handler.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
}
subs.add(subscription);
return;
}
case 2: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
}
subs.add(subscription);
return;
}
case 3: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
}
subs.add(subscription);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, messageHandlerTypes);
}
subs.add(subscription);
}
}
}
public
void register(final Class<?> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
Subscription subscription;
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
// now add this subscription to each of the handled types
registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
}
public
void shutdown() {
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
clear();
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass) {
return subscriptionsPerMessageSingle.get(messageClass);
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
}
public
ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass) {
ArrayList<Subscription> collection = getExactAsArray(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1, messageClass2, messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2, messageClass3,
this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
}

View File

@ -39,13 +39,14 @@ package dorkbox.util.messagebus.subscription;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -78,10 +79,20 @@ class Subscription {
public
Subscription(final MessageHandler handler, final float loadFactor, final int stripeSize) {
this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSetV8<Object>(16, loadFactor, stripeSize);
// this.listeners = new StrongConcurrentSetV8<Object>(16, loadFactor, stripeSize);
///this is by far, the fastest
this.listeners = new ConcurrentSkipListSet<>(new Comparator() {
@Override
public
int compare(final Object o1, final Object o2) {
return Integer.compare(o1.hashCode(), o2.hashCode());
}
});
// this.listeners = new StrongConcurrentSet<Object>(16, 0.85F);
// this.listeners = new ConcurrentLinkedQueue2<Object>();
// this.listeners = new CopyOnWriteArrayList<Object>();
// this.listeners = new CopyOnWriteArraySet<Object>(); // not very good
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) {

View File

@ -130,7 +130,7 @@ class SubscriptionManager {
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
subscriber.register(listenerClass, handlersSize, subsPerListener);
subscriber.register(listenerClass, handlersSize, subsPerListener); // this adds to subscriptionsPerMessage
subsPerListenerMap.put(listenerClass, subsPerListener);
lock.unlockWrite(stamp);

View File

@ -19,7 +19,8 @@ public class MultiMessageTest extends MessageBusTest {
@Test
public void testMultiMessageSending() {
IMessageBus bus = new MessageBus();
IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypesAndVarity, Runtime.getRuntime().availableProcessors() / 2);
// IMessageBus bus = new MessageBus(); // non varity mode
bus.start();
MultiListener listener1 = new MultiListener();
@ -48,6 +49,7 @@ public class MultiMessageTest extends MessageBusTest {
count.set(0);
// test async publication
bus.publishAsync("s");
bus.publishAsync("s", "s");
bus.publishAsync("s", "s", "s");
@ -58,59 +60,7 @@ public class MultiMessageTest extends MessageBusTest {
while (bus.hasPendingMessages()) {
try {
Thread.sleep(ConcurrentUnits);
} catch (InterruptedException e) {
}
}
assertEquals(13, count.get());
bus.shutdown();
}
@Test
public void testFirstArgMultiMessageSending() {
IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypes, IMessageBus.SubscribeMode.FirstArg,
Runtime.getRuntime().availableProcessors() / 2);
bus.start();
FirstListener listener = new FirstListener();
bus.subscribe(listener);
bus.unsubscribe(listener);
bus.publish("s");
bus.publish("s", "s");
bus.publish("s", "s", "s");
bus.publish(1, "s");
bus.publish(1, 2, "s");
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
assertEquals(0, count.get());
bus.subscribe(listener);
bus.publish("s"); // 4
bus.publish("s", "s"); // 3
bus.publish("s", "s", "s"); // 3
bus.publish(1, "s"); // 1
bus.publish(1, 2, "s"); // 2
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); // 2
assertEquals(15, count.get());
count.set(0);
bus.publishAsync("s");
bus.publishAsync("s", "s");
bus.publishAsync("s", "s", "s");
bus.publish(1, "s");
bus.publishAsync(1, 2, "s");
bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6});
while (bus.hasPendingMessages()) {
try {
Thread.sleep(ConcurrentUnits);
} catch (InterruptedException e) {
} catch (InterruptedException ignored) {
}
}
@ -170,53 +120,4 @@ public class MultiMessageTest extends MessageBusTest {
System.err.println("match Object[]");
}
}
public static class FirstListener {
@Handler
public void handleSync(Object o) {
count.getAndIncrement();
System.err.println("match Object");
}
@Handler
public void handleSync(String o1) {
count.getAndIncrement();
System.err.println("match String");
}
@Handler
public void handleSync(String o1, String o2) {
count.getAndIncrement();
System.err.println("match String, String");
}
// @Handler
// public void handleSync(String o1, String o2, String o3) {
// count.getAndIncrement();
// System.err.println("match String, String, String");
// }
//
// @Handler
// public void handleSync(Integer o1, Integer o2, String o3) {
// count.getAndIncrement();
// System.err.println("match Integer, Integer, String");
// }
//
// @Handler(acceptVarargs = true)
// public void handleSync(String... o) {
// count.getAndIncrement();
// System.err.println("match String[]");
// }
//
// @Handler
// public void handleSync(Integer... o) {
// count.getAndIncrement();
// System.err.println("match Integer[]");
// }
//
// @Handler(acceptVarargs = true)
// public void handleSync(Object... o) {
// count.getAndIncrement();
// System.err.println("match Object[]");
// }
}
}

View File

@ -28,7 +28,6 @@ import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.listeners.*;
import dorkbox.util.messagebus.messages.*;
import dorkbox.util.messagebus.subscription.MultiArgSubscriber;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.ClassUtils;
@ -160,7 +159,7 @@ public class SubscriptionManagerTest extends AssertSupport {
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final StampedLock lock = new StampedLock();
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
@ -184,7 +183,7 @@ public class SubscriptionManagerTest extends AssertSupport {
final ErrorHandlingSupport errorHandler = new DefaultErrorHandler();
final StampedLock lock = new StampedLock();
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new MultiArgSubscriber(errorHandler, classUtils);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);

View File

@ -25,7 +25,13 @@ package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.*;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
/**
* @author bennidi