removed some more code branches. pre-wip on subscribe (it's really slow)

This commit is contained in:
nathan 2015-06-05 10:47:34 +02:00
parent 30617a2b57
commit 3db34cc7dd
4 changed files with 94 additions and 66 deletions

View File

@ -59,6 +59,24 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
*/
public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport {
enum Mode {
/**
* Will only publish to listeners with this exact message signature. This is the fastest
*/
Exact,
/**
* Will publish to listeners with this exact message signature, as well as listeners that match the super class types signatures.
*/
ExactWithSuperTypes,
/**
* Will publish to listeners with this exact message signature, as well as listeners that match the super class types signatures.
* and to listeners that have matching varity arguments. (ie: a listener that matches Obeject[], will accept messages of type Object)
*/
ExactWithSuperTypesAndVarArgs,
}
/**
* Check whether any asynchronous message publications are pending to be processed
*

View File

@ -1,13 +1,11 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import org.jctools.util.Pow2;
@ -52,16 +50,14 @@ public class MultiMBassador implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MultiMBassador(int numberOfThreads) {
this(false, numberOfThreads);
this(Mode.Exact, numberOfThreads);
}
/**
* @param forceExactMatches if TRUE, only exact matching will be performed on classes. Setting this to true
* removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the
* system. By default, this is FALSE, to support subTypes and VarArg matching.
* @param mode Specifies which mode to operate the publication of messages.
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MultiMBassador(boolean forceExactMatches, int numberOfThreads) {
public MultiMBassador(Mode mode, int numberOfThreads) {
if (numberOfThreads < 2) {
numberOfThreads = 2; // at LEAST 2 threads
}
@ -69,22 +65,31 @@ public class MultiMBassador implements IMessageBus {
this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
if (forceExactMatches) {
subscriptionMatcher = new Matcher() {
@Override
public boolean publish(Object message) throws Throwable {
return subscriptionManager.publishExact(message);
}
};
}
else {
subscriptionMatcher = new Matcher() {
@Override
public boolean publish(Object message) throws Throwable {
return subscriptionManager.publish(message);
}
};
switch (mode) {
case Exact:
subscriptionMatcher = new Matcher() {
@Override
public void publish(Object message) throws Throwable {
subscriptionManager.publishExact(message);
}
};
break;
case ExactWithSuperTypes:
subscriptionMatcher = new Matcher() {
@Override
public void publish(Object message) throws Throwable {
subscriptionManager.publishExactAndSuper(message);
}
};
break;
case ExactWithSuperTypesAndVarArgs:
default:
subscriptionMatcher = new Matcher() {
@Override
public void publish(Object message) throws Throwable {
subscriptionManager.publishAll(message);
}
};
}
this.threads = new ArrayDeque<Thread>(numberOfThreads);
@ -203,23 +208,7 @@ public class MultiMBassador implements IMessageBus {
@Override
public void publish(final Object message) {
try {
boolean subsPublished = subscriptionMatcher.publish(message);
if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses)
Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsForcedExact(DeadMessage.class);
if (deadSubscriptions != null) {
DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
subscriptionMatcher.publish(message);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message));

View File

@ -1,5 +1,5 @@
package dorkbox.util.messagebus.subscription;
public interface Matcher {
boolean publish(Object messageClass) throws Throwable;
void publish(Object messageClass) throws Throwable;
}

View File

@ -123,8 +123,8 @@ public class SubscriptionManager {
// Lock writeLock = this.lock.writeLock();
// writeLock.lock();
ConcurrentMap<Class<?>, Subscription[]> subsPerListener2 = this.subscriptionsPerListener;
subscriptions = subsPerListener2.get(listenerClass);
ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
subscriptions = subsPerListenerMap.get(listenerClass);
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
@ -158,12 +158,13 @@ public class SubscriptionManager {
subsPerListener.add(subscription); // activates this sub for sub/unsub
// now add this subscription to each of the handled types
subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle,
subsForPublication = getSubsForPublication(messageHandler, subsPerMessageSingle,
subsPerMessageMulti);
subsForPublication.add(subscription); // activates this sub for publication
}
subsPerListener2.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()]));
subsPerListenerMap.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()]));
lock.unlockWrite(stamp);
// writeLock.unlock();
@ -187,15 +188,16 @@ public class SubscriptionManager {
// inside a write lock
// also puts it into the correct map if it's not already there
private Collection<Subscription> getSubsForPublication(final Class<?>[] messageHandlerTypes,
private Collection<Subscription> getSubsForPublication(final MessageHandler messageHandler,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti) {
final Class<?>[] messageHandlerTypes = messageHandler.getHandledMessages();
final int size = messageHandlerTypes.length;
// ConcurrentSet<Subscription> subsPerType;
SubscriptionUtils utils = this.utils;
// SubscriptionUtils utils = this.utils;
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
@ -208,10 +210,7 @@ public class SubscriptionManager {
if (isArray) {
varArgPossibility.lazySet(true);
}
// cache the super classes
//utils.cacheSuperClasses(type0, isArray);
utils.cacheSuperClasses(type0);
subsPerMessageSingle.put(type0, subs);
}
@ -329,7 +328,7 @@ public class SubscriptionManager {
// retrieves all of the appropriate subscriptions for the message type
public final Subscription[] getSubscriptionsForcedExact(final Class<?> messageClass) {
public final Subscription[] getSubscriptionsExact(final Class<?> messageClass) {
ArrayList<Subscription> collection;
Subscription[] subscriptions;
@ -354,6 +353,7 @@ public class SubscriptionManager {
}
// never return null
// used by unit tests only
public final Subscription[] getSubscriptions(final Class<?> messageClass, boolean isArray) {
StampedLock lock = this.lock;
long stamp = lock.readLock();
@ -530,17 +530,43 @@ public class SubscriptionManager {
return this.utils.getSuperSubscriptions(superType1, superType2, superType3);
}
public final void publishExact(Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final Subscription[] subscriptions = getSubscriptionsExact(messageClass);
Subscription sub;
// Run subscriptions
if (subscriptions != null) {
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
/**
* @return true if subscriptions were published
*/
public boolean publishExact(Object message) throws Throwable {
public final boolean publishExactAndSuper(Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final boolean isArray = messageClass.isArray();
StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray);
final Subscription[] subscriptions = getSubscriptions(messageClass, isArray);
Subscription sub;
// Run subscriptions
@ -550,25 +576,19 @@ public class SubscriptionManager {
sub.publish(message);
}
lock.unlockRead(stamp);
return true;
}
lock.unlockRead(stamp);
return false;
}
/**
* @return true if subscriptions were published
*/
public boolean publish(Object message) throws Throwable {
public boolean publishAll(Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final boolean isArray = messageClass.isArray();
StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray);
Subscription sub;
@ -581,6 +601,9 @@ public class SubscriptionManager {
// publish to var arg, only if not already an array
if (varArgPossibility.get() && !isArray) {
// StampedLock lock = this.lock;
// long stamp = lock.readLock();
final ArrayList<Subscription> varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass);
if (varArgSubscriptions != null && !varArgSubscriptions.isEmpty()) {
@ -621,11 +644,9 @@ public class SubscriptionManager {
}
}
lock.unlockRead(stamp);
return true;
}
lock.unlockRead(stamp);
return false;
}
}