From 758a1c837a5455ad926ad7b617cb7c25f747d3d5 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 20 Feb 2015 12:17:07 +0100 Subject: [PATCH] Changed superClassSubs to use fast iterator --- .../engio/mbassy/multi/MultiMBassador.java | 18 ++-- .../multi/subscription/Subscription.java | 2 +- .../subscription/SubscriptionManager.java | 35 ++++--- .../engio/mbassy/multi/PerformanceTest.java | 97 +++++++++++++++++++ 4 files changed, 133 insertions(+), 19 deletions(-) create mode 100644 src/test/java/net/engio/mbassy/multi/PerformanceTest.java diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 9e28e20..b87160a 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -1,5 +1,9 @@ package net.engio.mbassy.multi; +import it.unimi.dsi.fastutil.objects.ObjectIterator; +import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry; +import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -36,7 +40,7 @@ public class MultiMBassador implements IMessageBus { private List threads; public MultiMBassador() { - this(Runtime.getRuntime().availableProcessors()); + this(Runtime.getRuntime().availableProcessors()*2); // this(2); } @@ -130,9 +134,8 @@ public class MultiMBassador implements IMessageBus { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); - - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + // Run subscriptions if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription sub : subscriptions) { @@ -152,10 +155,13 @@ public class MultiMBassador implements IMessageBus { } } - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); + FastEntrySet superSubscriptions = manager.getSuperSubscriptions(messageClass); // now get superClasses - if (superSubscriptions != null) { - for (Subscription sub : superSubscriptions) { + if (superSubscriptions != null && !superSubscriptions.isEmpty()) { + ObjectIterator> fastIterator = superSubscriptions.fastIterator(); + + while (fastIterator.hasNext()) { + Subscription sub = fastIterator.next().getKey(); // this catches all exception types sub.publishToSubscription(this, message); } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index d8e91c8..e8997a1 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -87,7 +87,7 @@ public class Subscription { public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { Collection listeners = this.listeners; - if (listeners.size() > 0) { + if (!listeners.isEmpty()) { Method handler = this.handlerMetadata.getHandler(); IHandlerInvocation invocation = this.invocation; diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 559a7bd..05f65b1 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -1,5 +1,8 @@ package net.engio.mbassy.multi.subscription; +import it.unimi.dsi.fastutil.objects.Reference2BooleanArrayMap; +import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet; + import java.util.ArrayDeque; import java.util.Collection; import java.util.Iterator; @@ -7,6 +10,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.ReflectionUtils; @@ -15,8 +19,6 @@ import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MetadataReader; import sun.reflect.generics.reflectiveObjects.NotImplementedException; -import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; - /** * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. * It provides fast lookup of existing subscriptions when another instance of an already known @@ -56,7 +58,7 @@ public class SubscriptionManager { // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one - private volatile Map, Collection> superClassSubscriptions; + private volatile Map, FastEntrySet> superClassSubscriptions; // private final IdentityObjectTree, Collection> superClassSubscriptionsMulti = new IdentityObjectTree, Collection>(); @@ -64,7 +66,7 @@ public class SubscriptionManager { private final Map, Object> nonListeners; // synchronize read/write acces to the subscription maps - private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock(); + private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); public SubscriptionManager(int numberOfThreads) { this.MAP_STRIPING = numberOfThreads; @@ -84,7 +86,15 @@ public class SubscriptionManager { private final void resetSuperClassSubs() { // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance on handlers - this.superClassSubscriptions = new ConcurrentHashMap, Collection>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + this.superClassSubscriptions = new ConcurrentHashMap, FastEntrySet>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + } + + public void readLock() { + this.LOCK.readLock().lock(); + } + + public void readUnLock() { + this.LOCK.readLock().unlock(); } public void unsubscribe(Object listener) { @@ -332,15 +342,16 @@ public class SubscriptionManager { } + // ALSO checks to see if the superClass accepts subtypes. - public Collection getSuperSubscriptions(Class superType) { - Map, Collection> superClassSubs = this.superClassSubscriptions; + public FastEntrySet getSuperSubscriptions(Class superType) { + Map, FastEntrySet> superClassSubs = this.superClassSubscriptions; if (superClassSubs == null) { // we haven't created it yet (via subscribe) return null; } - Collection subsPerType = superClassSubs.get(superType); + FastEntrySet subsPerType = superClassSubs.get(superType); if (subsPerType == null) { Collection> types = this.superClassesCache.get(superType); @@ -348,20 +359,20 @@ public class SubscriptionManager { return null; } -// subsPerType = new StrongConcurrentSet(types.size(), this.LOAD_FACTOR); - subsPerType = new ArrayDeque(types.size() + 1); + Reference2BooleanArrayMap map = new Reference2BooleanArrayMap(types.size() + 1); for (Class superClass : types) { Collection subs = this.subscriptionsPerMessageSingle.get(superClass); - if (subs != null) { + if (subs != null && !subs.isEmpty()) { for (Subscription sub : subs) { if (sub.acceptsSubtypes()) { - subsPerType.add(sub); + map.put(sub, Boolean.TRUE); } } } } + subsPerType = map.reference2BooleanEntrySet(); superClassSubs.put(superType, subsPerType); } diff --git a/src/test/java/net/engio/mbassy/multi/PerformanceTest.java b/src/test/java/net/engio/mbassy/multi/PerformanceTest.java new file mode 100644 index 0000000..55371c2 --- /dev/null +++ b/src/test/java/net/engio/mbassy/multi/PerformanceTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2015 dorkbox, llc + */ +package net.engio.mbassy.multi; + +import junit.framework.Assert; +import net.engio.mbassy.multi.annotations.Handler; +import net.engio.mbassy.multi.error.IPublicationErrorHandler; +import net.engio.mbassy.multi.error.PublicationError; + +/** + * @author dorkbox, llc Date: 2/2/15 + */ +public class PerformanceTest { + + private static long count = 0; + + protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { + @Override + public void handleError(PublicationError error) { + error.getCause().printStackTrace(); + Assert.fail(); + } + }; + + public static void main(String[] args) { + PerformanceTest multiMessageTest = new PerformanceTest(); + multiMessageTest.testMultiMessageSending(); + } + + + public PerformanceTest() { + } + + public void testMultiMessageSending() { + MultiMBassador bus = new MultiMBassador(); + bus.addErrorHandler(TestFailingHandler); + + + Listener listener1 = new Listener(); + bus.subscribe(listener1); + + long num = Long.MAX_VALUE; + while (num-- > 0) { + bus.publish("s"); + } + +// bus.publish("s", "s"); +// bus.publish("s", "s", "s"); +// bus.publish("s", "s", "s", "s"); +// bus.publish(1, 2, "s"); +// bus.publish(1, 2, 3, 4, 5, 6); +// bus.publish(new Integer[] {1, 2, 3, 4, 5, 6}); + + bus.shutdown(); + System.err.println("Count: " + count); + } + + @SuppressWarnings("unused") + public static class Listener { + @Handler + public void handleSync(String o1) { + count++; +// System.err.println("match String"); + } + +// @Handler +// public void handleSync(String o1, String o2) { +// count.getAndIncrement(); +// System.err.println("match String, String"); +// } +// +// @Handler +// public void handleSync(String o1, String o2, String o3) { +// count.getAndIncrement(); +// System.err.println("match String, String, String"); +// } +// +// @Handler +// public void handleSync(Integer o1, Integer o2, String o3) { +// count.getAndIncrement(); +// System.err.println("match Integer, Integer, String"); +// } +// +// @Handler +// public void handleSync(String... o) { +// count.getAndIncrement(); +// System.err.println("match String[]"); +// } +// +// @Handler +// public void handleSync(Integer... o) { +// count.getAndIncrement(); +// System.err.println("match Integer[]"); +// } + } +}