Added more direct iteration

This commit is contained in:
nathan 2015-05-09 15:06:37 +02:00
parent aaf8e0ddf4
commit dd35477346
2 changed files with 124 additions and 58 deletions

View File

@ -39,19 +39,19 @@ public class MultiMBassador implements IMessageBus {
* removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the
* system. By default, this is FALSE, to support subTypes and VarArg matching.
*/
private final boolean forceExactMatches = false;
private final boolean forceExactMatches;
/**
* Notifies the consumers that the messagebus is shutting down.
* Notifies the consumers during shutdown, that it's on purpose.
*/
private volatile boolean shuttingDown;
/**
* By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages
* By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages
*/
public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors());
this(Runtime.getRuntime().availableProcessors()/2);
}
/**
@ -73,6 +73,7 @@ public class MultiMBassador implements IMessageBus {
numberOfThreads = 2; // at LEAST 2 threads
}
numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads);
this.forceExactMatches = forceExactMatches;
this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads);
@ -163,7 +164,7 @@ public class MultiMBassador implements IMessageBus {
}
@Override
public boolean hasPendingMessages() {
public final boolean hasPendingMessages() {
return this.dispatchQueue.hasPendingMessages();
}
@ -177,7 +178,7 @@ public class MultiMBassador implements IMessageBus {
}
@Override
public void publish(Object message) {
public void publish(final Object message) {
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass();
@ -272,29 +273,39 @@ public class MultiMBassador implements IMessageBus {
}
@Override
public void publish(Object message1, Object message2) {
public void publish(final Object message1, final Object message2) {
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
boolean subsPublished = false;
ISetEntry<Subscription> current;
Subscription sub;
// Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription sub : subscriptions) {
current = subscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, message1, message2);
}
}
if (!this.forceExactMatches) {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
StrongConcurrentSetV8<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
for (Subscription sub : superSubscriptions) {
current = superSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, message1, message2);
}
@ -304,19 +315,23 @@ public class MultiMBassador implements IMessageBus {
if (messageClass1 == messageClass2 && !messageClass1.isArray()) {
Object[] asArray = null;
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
for (Subscription sub : varargSubscriptions) {
current = varargSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
}
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
if (asArray == null) {
@ -325,7 +340,11 @@ public class MultiMBassador implements IMessageBus {
asArray[1] = message2;
}
for (Subscription sub : varargSuperSubscriptions) {
current = varargSuperSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
@ -336,10 +355,15 @@ public class MultiMBassador implements IMessageBus {
if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message1, message2);
for (Subscription sub : deadSubscriptions) {
current = deadSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
sub.publishToSubscription(this, deadMessage);
}
@ -348,19 +372,26 @@ public class MultiMBassador implements IMessageBus {
}
@Override
public void publish(Object message1, Object message2, Object message3) {
public void publish(final Object message1, final Object message2, final Object message3) {
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
Class<?> messageClass3 = message3.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
boolean subsPublished = false;
ISetEntry<Subscription> current;
Subscription sub;
// Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription sub : subscriptions) {
current = subscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, message1, message2, message3);
}
@ -368,10 +399,14 @@ public class MultiMBassador implements IMessageBus {
if (!this.forceExactMatches) {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
StrongConcurrentSetV8<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
for (Subscription sub : superSubscriptions) {
current = superSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
sub.publishToSubscription(this, message1, message2, message3);
}
@ -380,20 +415,24 @@ public class MultiMBassador implements IMessageBus {
// publish to var arg, only if not already an array
if (messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) {
Object[] asArray = null;
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
for (Subscription sub : varargSubscriptions) {
current = varargSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
}
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
if (asArray == null) {
@ -403,7 +442,11 @@ public class MultiMBassador implements IMessageBus {
asArray[2] = message3;
}
for (Subscription sub : varargSuperSubscriptions) {
current = varargSuperSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
@ -414,10 +457,15 @@ public class MultiMBassador implements IMessageBus {
if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (Subscription sub : deadSubscriptions) {
current = deadSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
sub.publishToSubscription(this, deadMessage);
}
@ -430,12 +478,14 @@ public class MultiMBassador implements IMessageBus {
if (message != null) {
try {
this.dispatchQueue.transfer(message);
} catch (InterruptedException e) {
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
} else {
throw new NullPointerException("Message cannot be null.");
}
}
@ -444,12 +494,14 @@ public class MultiMBassador implements IMessageBus {
if (message1 != null && message2 != null) {
try {
this.dispatchQueue.transfer(message1, message2);
} catch (InterruptedException e) {
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
}
} else {
throw new NullPointerException("Messages cannot be null.");
}
}
@ -458,12 +510,14 @@ public class MultiMBassador implements IMessageBus {
if (message1 != null || message2 != null | message3 != null) {
try {
this.dispatchQueue.transfer(message1, message2, message3);
} catch (InterruptedException e) {
} catch (Exception e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
} else {
throw new NullPointerException("Messages cannot be null.");
}
}
}

View File

@ -52,7 +52,7 @@ public class SubscriptionManager {
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
private final HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
@ -67,7 +67,7 @@ public class SubscriptionManager {
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti;
private final HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> superClassSubscriptionsMulti;
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSubscriptions;
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSuperClassSubscriptions;
@ -87,7 +87,7 @@ public class SubscriptionManager {
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, SubscriptionManager.LOAD_FACTOR);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
@ -101,7 +101,7 @@ public class SubscriptionManager {
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
@ -340,13 +340,13 @@ public class SubscriptionManager {
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.subscriptionsPerMessageMulti.get(messageType1, messageType2);
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
}
@ -506,12 +506,12 @@ public class SubscriptionManager {
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, Collection<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
Collection<Subscription> subsPerType = null;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
StrongConcurrentSetV8<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf != null) {
@ -521,12 +521,24 @@ public class SubscriptionManager {
subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE);
// whenever our subscriptions change, this map is cleared.
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
StrongConcurrentSetV8<Class<?>> types1 = this.superClassesCache.get(superType1);
StrongConcurrentSetV8<Class<?>> types2 = this.superClassesCache.get(superType2);
StrongConcurrentSetV8<Subscription> subs;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2;
// ISetEntry<Class<?>> current1;
// Class<?> eventSuperType1;
//
// current1 = types1.head;
// while (current1 != null) {
// eventSuperType1 = current1.getValue();
// current1 = current1.next();
//
// }
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
@ -567,7 +579,7 @@ public class SubscriptionManager {
}
}
Collection<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;
@ -579,12 +591,12 @@ public class SubscriptionManager {
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, Collection<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
Collection<Subscription> subsPerType;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
StrongConcurrentSetV8<Subscription> subsPerType;
// we DO NOT care about duplicate, because the answers will be the same
@ -599,9 +611,9 @@ public class SubscriptionManager {
subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE);
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
HashMapTree<Class<?>, Collection<Subscription>> leaf3;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf2;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf3;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
@ -658,7 +670,7 @@ public class SubscriptionManager {
}
}
Collection<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;