From 21541639caa4c0d08f2363885d5503e2a830ca75 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 13 May 2015 23:55:54 +0200 Subject: [PATCH] added booleanHolder --- .../util/messagebus/MultiMBassador.java | 67 ++++++++++--------- .../util/messagebus/SubscriptionManager.java | 4 +- .../common/thread/BooleanHolder.java | 11 +++ .../common/thread/BooleanThreadHolder.java | 15 +++++ .../common/{ => thread}/ClassHolder.java | 4 +- .../{ => thread}/SubscriptionHolder.java | 3 +- .../messagebus/subscription/Subscription.java | 17 +++-- .../util/messagebus/PerformanceTest.java | 2 +- 8 files changed, 79 insertions(+), 44 deletions(-) create mode 100644 src/main/java/dorkbox/util/messagebus/common/thread/BooleanHolder.java create mode 100644 src/main/java/dorkbox/util/messagebus/common/thread/BooleanThreadHolder.java rename src/main/java/dorkbox/util/messagebus/common/{ => thread}/ClassHolder.java (81%) rename src/main/java/dorkbox/util/messagebus/common/{ => thread}/SubscriptionHolder.java (84%) diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index db61b2b..4e950c9 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -12,6 +12,8 @@ import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MultiNode; +import dorkbox.util.messagebus.common.thread.BooleanHolder; +import dorkbox.util.messagebus.common.thread.BooleanThreadHolder; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -61,7 +63,7 @@ public class MultiMBassador implements IMessageBus { } /** - * @param forceExactMatches if true, only exact matching will be performed on classes. Setting this to true + * @param forceExactMatches if TRUE, only exact matching will be performed on classes. Setting this to true * removes the ability to have subTypes and VarArg matching, and doing so doubles the speed of the * system. By default, this is FALSE, to support subTypes and VarArg matching. * @@ -92,12 +94,11 @@ public class MultiMBassador implements IMessageBus { try { while (true) { IN_QUEUE.take(node); - publish(node.item1); -// switch (node.messageType) { -// case 1: publish(node.item1); continue; -// case 2: publish(node.item1, node.item2); continue; -// case 3: publish(node.item1, node.item2, node.item3); continue; -// } + switch (node.messageType) { + case 1: publish(node.item1); continue; + case 2: publish(node.item1, node.item2); continue; + case 3: publish(node.item1, node.item2, node.item3); continue; + } } } catch (InterruptedException e) { if (!MultiMBassador.this.shuttingDown) { @@ -176,13 +177,17 @@ public class MultiMBassador implements IMessageBus { this.subscriptionManager.shutdown(); } + private final BooleanThreadHolder booleanThreadLocal = new BooleanThreadHolder(); + @Override public void publish(final Object message) { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass); - boolean subsPublished = false; + + BooleanHolder subsPublished = this.booleanThreadLocal.get(); + subsPublished.bool = false; ISetEntry current; Subscription sub; @@ -195,7 +200,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, message); + sub.publishToSubscription(this, subsPublished, message); } } @@ -209,7 +214,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, message); + sub.publishToSubscription(this, subsPublished, message); } } @@ -229,7 +234,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } @@ -247,13 +252,13 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } } } - if (!subsPublished) { + if (!subsPublished.bool) { // Dead Event must EXACTLY MATCH (no subclasses) StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { @@ -265,7 +270,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - sub.publishToSubscription(this, deadMessage); + sub.publishToSubscription(this, subsPublished, deadMessage); } } } @@ -279,7 +284,8 @@ public class MultiMBassador implements IMessageBus { Class messageClass2 = message2.getClass(); StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); - boolean subsPublished = false; + BooleanHolder subsPublished = this.booleanThreadLocal.get(); + subsPublished.bool = false; ISetEntry current; Subscription sub; @@ -292,7 +298,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, message1, message2); + sub.publishToSubscription(this, subsPublished, message1, message2); } } @@ -306,7 +312,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, message1, message2); + sub.publishToSubscription(this, subsPublished, message1, message2); } } @@ -327,7 +333,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } @@ -346,7 +352,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } } else { @@ -367,7 +373,7 @@ public class MultiMBassador implements IMessageBus { asArray[1] = message2; // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } } @@ -375,7 +381,7 @@ public class MultiMBassador implements IMessageBus { } - if (!subsPublished) { + if (!subsPublished.bool) { // Dead Event must EXACTLY MATCH (no subclasses) StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { @@ -387,7 +393,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - sub.publishToSubscription(this, deadMessage); + sub.publishToSubscription(this, subsPublished, deadMessage); } } } @@ -402,7 +408,8 @@ public class MultiMBassador implements IMessageBus { Class messageClass3 = message3.getClass(); StrongConcurrentSetV8 subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); - boolean subsPublished = false; + BooleanHolder subsPublished = this.booleanThreadLocal.get(); + subsPublished.bool = false; ISetEntry current; Subscription sub; @@ -415,7 +422,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, message1, message2, message3); + sub.publishToSubscription(this, subsPublished, message1, message2, message3); } } @@ -430,7 +437,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - sub.publishToSubscription(this, message1, message2, message3); + sub.publishToSubscription(this, subsPublished, message1, message2, message3); } } @@ -451,7 +458,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } @@ -471,7 +478,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } } else { @@ -493,7 +500,7 @@ public class MultiMBassador implements IMessageBus { asArray[2] = message3; // this catches all exception types - subsPublished |= sub.publishToSubscription(this, asArray); + sub.publishToSubscription(this, subsPublished, asArray); } } } @@ -501,7 +508,7 @@ public class MultiMBassador implements IMessageBus { } - if (!subsPublished) { + if (!subsPublished.bool) { // Dead Event must EXACTLY MATCH (no subclasses) StrongConcurrentSetV8 deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { @@ -513,7 +520,7 @@ public class MultiMBassador implements IMessageBus { current = current.next(); // this catches all exception types - sub.publishToSubscription(this, deadMessage); + sub.publishToSubscription(this, subsPublished, deadMessage); } } } diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 677fea7..1900f3e 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -5,14 +5,14 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import dorkbox.util.messagebus.common.ClassHolder; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.ReflectionUtils; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; -import dorkbox.util.messagebus.common.SubscriptionHolder; import dorkbox.util.messagebus.common.VarArgPossibility; +import dorkbox.util.messagebus.common.thread.ClassHolder; +import dorkbox.util.messagebus.common.thread.SubscriptionHolder; import dorkbox.util.messagebus.listener.MessageHandler; import dorkbox.util.messagebus.listener.MetadataReader; import dorkbox.util.messagebus.subscription.Subscription; diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/BooleanHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/BooleanHolder.java new file mode 100644 index 0000000..2d9ef08 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/thread/BooleanHolder.java @@ -0,0 +1,11 @@ +package dorkbox.util.messagebus.common.thread; + + +public class BooleanHolder { + + public boolean bool = false; + + public BooleanHolder() { + } +} + diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/BooleanThreadHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/BooleanThreadHolder.java new file mode 100644 index 0000000..a5b9640 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/thread/BooleanThreadHolder.java @@ -0,0 +1,15 @@ +package dorkbox.util.messagebus.common.thread; + + +public class BooleanThreadHolder extends ThreadLocal { + + public BooleanThreadHolder() { + super(); + } + + @Override + public BooleanHolder initialValue() { + return new BooleanHolder(); + } +} + diff --git a/src/main/java/dorkbox/util/messagebus/common/ClassHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java similarity index 81% rename from src/main/java/dorkbox/util/messagebus/common/ClassHolder.java rename to src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java index 0968e33..e116d3b 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ClassHolder.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java @@ -1,4 +1,6 @@ -package dorkbox.util.messagebus.common; +package dorkbox.util.messagebus.common.thread; + +import dorkbox.util.messagebus.common.StrongConcurrentSetV8; public class ClassHolder extends ThreadLocal>> { diff --git a/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java similarity index 84% rename from src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java rename to src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java index a32eae0..ae9a47a 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/SubscriptionHolder.java @@ -1,5 +1,6 @@ -package dorkbox.util.messagebus.common; +package dorkbox.util.messagebus.common.thread; +import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.subscription.Subscription; public class SubscriptionHolder extends ThreadLocal> { diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index e3e1f64..0bb3298 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -7,6 +7,7 @@ import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.StrongConcurrentSet; +import dorkbox.util.messagebus.common.thread.BooleanHolder; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; @@ -99,7 +100,7 @@ public class Subscription { /** * @return true if there were listeners for this publication, false if there was nothing */ - public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { + public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message) { StrongConcurrentSet listeners = this.listeners; if (!listeners.isEmpty()) { @@ -114,6 +115,7 @@ public class Subscription { listener = current.getValue(); current = current.next(); //this.count++; + try { invocation.invoke(listener, handler, handleIndex, message); } catch (IllegalAccessException e) { @@ -151,15 +153,14 @@ public class Subscription { .setPublishedObject(message)); } } - return true; + booleanHolder.bool = true; } - return false; } /** * @return true if there were listeners for this publication, false if there was nothing */ - public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) { + public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2) { StrongConcurrentSet listeners = this.listeners; if (!listeners.isEmpty()) { @@ -215,15 +216,14 @@ public class Subscription { .setPublishedObject(message1, message2)); } } - return true; + booleanHolder.bool = true; } - return false; } /** * @return true if there were listeners for this publication, false if there was nothing */ - public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) { + public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2, Object message3) { StrongConcurrentSet listeners = this.listeners; if (!listeners.isEmpty()) { @@ -281,9 +281,8 @@ public class Subscription { .setPublishedObject(message1, message2, message3)); } } - return true; + booleanHolder.bool = true; } - return false; } diff --git a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java index 81fd205..40397d5 100644 --- a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java @@ -42,7 +42,7 @@ public class PerformanceTest { public void run() { Long num = Long.valueOf(7L); while (true) { - bus.publishAsync(num); + bus.publish(num); } }}, CONCURRENCY_LEVEL);