added booleanHolder
This commit is contained in:
parent
47f548b5b2
commit
21541639ca
|
@ -12,6 +12,8 @@ import dorkbox.util.messagebus.common.NamedThreadFactory;
|
|||
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
|
||||
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
|
||||
import dorkbox.util.messagebus.common.simpleq.MultiNode;
|
||||
import dorkbox.util.messagebus.common.thread.BooleanHolder;
|
||||
import dorkbox.util.messagebus.common.thread.BooleanThreadHolder;
|
||||
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
|
||||
import dorkbox.util.messagebus.error.PublicationError;
|
||||
import dorkbox.util.messagebus.subscription.Subscription;
|
||||
|
@ -61,7 +63,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param forceExactMatches if true, only exact matching will be performed on classes. Setting this to true
|
||||
* @param forceExactMatches if TRUE, only exact matching will be performed on classes. Setting this to true
|
||||
* removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the
|
||||
* system. By default, this is FALSE, to support subTypes and VarArg matching.
|
||||
*
|
||||
|
@ -92,12 +94,11 @@ public class MultiMBassador implements IMessageBus {
|
|||
try {
|
||||
while (true) {
|
||||
IN_QUEUE.take(node);
|
||||
publish(node.item1);
|
||||
// switch (node.messageType) {
|
||||
// case 1: publish(node.item1); continue;
|
||||
// case 2: publish(node.item1, node.item2); continue;
|
||||
// case 3: publish(node.item1, node.item2, node.item3); continue;
|
||||
// }
|
||||
switch (node.messageType) {
|
||||
case 1: publish(node.item1); continue;
|
||||
case 2: publish(node.item1, node.item2); continue;
|
||||
case 3: publish(node.item1, node.item2, node.item3); continue;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
if (!MultiMBassador.this.shuttingDown) {
|
||||
|
@ -176,13 +177,17 @@ public class MultiMBassador implements IMessageBus {
|
|||
this.subscriptionManager.shutdown();
|
||||
}
|
||||
|
||||
private final BooleanThreadHolder booleanThreadLocal = new BooleanThreadHolder();
|
||||
|
||||
@Override
|
||||
public void publish(final Object message) {
|
||||
SubscriptionManager manager = this.subscriptionManager;
|
||||
|
||||
Class<?> messageClass = message.getClass();
|
||||
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
||||
boolean subsPublished = false;
|
||||
|
||||
BooleanHolder subsPublished = this.booleanThreadLocal.get();
|
||||
subsPublished.bool = false;
|
||||
|
||||
ISetEntry<Subscription> current;
|
||||
Subscription sub;
|
||||
|
@ -195,7 +200,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, message);
|
||||
sub.publishToSubscription(this, subsPublished, message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -209,7 +214,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, message);
|
||||
sub.publishToSubscription(this, subsPublished, message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,7 +234,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -247,13 +252,13 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!subsPublished) {
|
||||
if (!subsPublished.bool) {
|
||||
// Dead Event must EXACTLY MATCH (no subclasses)
|
||||
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
|
||||
|
@ -265,7 +270,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
sub.publishToSubscription(this, deadMessage);
|
||||
sub.publishToSubscription(this, subsPublished, deadMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -279,7 +284,8 @@ public class MultiMBassador implements IMessageBus {
|
|||
Class<?> messageClass2 = message2.getClass();
|
||||
|
||||
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
|
||||
boolean subsPublished = false;
|
||||
BooleanHolder subsPublished = this.booleanThreadLocal.get();
|
||||
subsPublished.bool = false;
|
||||
|
||||
ISetEntry<Subscription> current;
|
||||
Subscription sub;
|
||||
|
@ -292,7 +298,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, message1, message2);
|
||||
sub.publishToSubscription(this, subsPublished, message1, message2);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -306,7 +312,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, message1, message2);
|
||||
sub.publishToSubscription(this, subsPublished, message1, message2);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -327,7 +333,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -346,7 +352,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -367,7 +373,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
asArray[1] = message2;
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -375,7 +381,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
}
|
||||
|
||||
|
||||
if (!subsPublished) {
|
||||
if (!subsPublished.bool) {
|
||||
// Dead Event must EXACTLY MATCH (no subclasses)
|
||||
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
|
||||
|
@ -387,7 +393,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
sub.publishToSubscription(this, deadMessage);
|
||||
sub.publishToSubscription(this, subsPublished, deadMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -402,7 +408,8 @@ public class MultiMBassador implements IMessageBus {
|
|||
Class<?> messageClass3 = message3.getClass();
|
||||
|
||||
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
|
||||
boolean subsPublished = false;
|
||||
BooleanHolder subsPublished = this.booleanThreadLocal.get();
|
||||
subsPublished.bool = false;
|
||||
|
||||
ISetEntry<Subscription> current;
|
||||
Subscription sub;
|
||||
|
@ -415,7 +422,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, message1, message2, message3);
|
||||
sub.publishToSubscription(this, subsPublished, message1, message2, message3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -430,7 +437,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
sub.publishToSubscription(this, message1, message2, message3);
|
||||
sub.publishToSubscription(this, subsPublished, message1, message2, message3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -451,7 +458,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -471,7 +478,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -493,7 +500,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
asArray[2] = message3;
|
||||
|
||||
// this catches all exception types
|
||||
subsPublished |= sub.publishToSubscription(this, asArray);
|
||||
sub.publishToSubscription(this, subsPublished, asArray);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -501,7 +508,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
}
|
||||
|
||||
|
||||
if (!subsPublished) {
|
||||
if (!subsPublished.bool) {
|
||||
// Dead Event must EXACTLY MATCH (no subclasses)
|
||||
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
|
||||
|
@ -513,7 +520,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
current = current.next();
|
||||
|
||||
// this catches all exception types
|
||||
sub.publishToSubscription(this, deadMessage);
|
||||
sub.publishToSubscription(this, subsPublished, deadMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,14 +5,14 @@ import java.util.Collection;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import dorkbox.util.messagebus.common.ClassHolder;
|
||||
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
|
||||
import dorkbox.util.messagebus.common.HashMapTree;
|
||||
import dorkbox.util.messagebus.common.ISetEntry;
|
||||
import dorkbox.util.messagebus.common.ReflectionUtils;
|
||||
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
|
||||
import dorkbox.util.messagebus.common.SubscriptionHolder;
|
||||
import dorkbox.util.messagebus.common.VarArgPossibility;
|
||||
import dorkbox.util.messagebus.common.thread.ClassHolder;
|
||||
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
|
||||
import dorkbox.util.messagebus.listener.MessageHandler;
|
||||
import dorkbox.util.messagebus.listener.MetadataReader;
|
||||
import dorkbox.util.messagebus.subscription.Subscription;
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
package dorkbox.util.messagebus.common.thread;
|
||||
|
||||
|
||||
public class BooleanHolder {
|
||||
|
||||
public boolean bool = false;
|
||||
|
||||
public BooleanHolder() {
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package dorkbox.util.messagebus.common.thread;
|
||||
|
||||
|
||||
public class BooleanThreadHolder extends ThreadLocal<BooleanHolder> {
|
||||
|
||||
public BooleanThreadHolder() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BooleanHolder initialValue() {
|
||||
return new BooleanHolder();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,6 @@
|
|||
package dorkbox.util.messagebus.common;
|
||||
package dorkbox.util.messagebus.common.thread;
|
||||
|
||||
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
|
||||
|
||||
|
||||
public class ClassHolder extends ThreadLocal<StrongConcurrentSetV8<Class<?>>> {
|
|
@ -1,5 +1,6 @@
|
|||
package dorkbox.util.messagebus.common;
|
||||
package dorkbox.util.messagebus.common.thread;
|
||||
|
||||
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
|
||||
import dorkbox.util.messagebus.subscription.Subscription;
|
||||
|
||||
public class SubscriptionHolder extends ThreadLocal<StrongConcurrentSetV8<Subscription>> {
|
|
@ -7,6 +7,7 @@ import com.esotericsoftware.reflectasm.MethodAccess;
|
|||
|
||||
import dorkbox.util.messagebus.common.ISetEntry;
|
||||
import dorkbox.util.messagebus.common.StrongConcurrentSet;
|
||||
import dorkbox.util.messagebus.common.thread.BooleanHolder;
|
||||
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
|
||||
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
|
||||
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
|
||||
|
@ -99,7 +100,7 @@ public class Subscription {
|
|||
/**
|
||||
* @return true if there were listeners for this publication, false if there was nothing
|
||||
*/
|
||||
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
|
||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message) {
|
||||
StrongConcurrentSet<Object> listeners = this.listeners;
|
||||
|
||||
if (!listeners.isEmpty()) {
|
||||
|
@ -114,6 +115,7 @@ public class Subscription {
|
|||
listener = current.getValue();
|
||||
current = current.next();
|
||||
//this.count++;
|
||||
|
||||
try {
|
||||
invocation.invoke(listener, handler, handleIndex, message);
|
||||
} catch (IllegalAccessException e) {
|
||||
|
@ -151,15 +153,14 @@ public class Subscription {
|
|||
.setPublishedObject(message));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
booleanHolder.bool = true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if there were listeners for this publication, false if there was nothing
|
||||
*/
|
||||
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
|
||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2) {
|
||||
StrongConcurrentSet<Object> listeners = this.listeners;
|
||||
|
||||
if (!listeners.isEmpty()) {
|
||||
|
@ -215,15 +216,14 @@ public class Subscription {
|
|||
.setPublishedObject(message1, message2));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
booleanHolder.bool = true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if there were listeners for this publication, false if there was nothing
|
||||
*/
|
||||
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
|
||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2, Object message3) {
|
||||
StrongConcurrentSet<Object> listeners = this.listeners;
|
||||
|
||||
if (!listeners.isEmpty()) {
|
||||
|
@ -281,9 +281,8 @@ public class Subscription {
|
|||
.setPublishedObject(message1, message2, message3));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
booleanHolder.bool = true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ public class PerformanceTest {
|
|||
public void run() {
|
||||
Long num = Long.valueOf(7L);
|
||||
while (true) {
|
||||
bus.publishAsync(num);
|
||||
bus.publish(num);
|
||||
}
|
||||
}}, CONCURRENCY_LEVEL);
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user