From bd25415dffdf1a6fbdeb2311ce31226b99c51d06 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 5 Jun 2015 12:41:47 +0200 Subject: [PATCH] Really fast, not sure why exactly --- .../messagebus/subscription/Subscription.java | 103 +++++++++++ .../subscription/SubscriptionManager.java | 171 +++++------------- 2 files changed, 146 insertions(+), 128 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 9a158c4..dd27e75 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -1,6 +1,7 @@ package dorkbox.util.messagebus.subscription; import com.esotericsoftware.reflectasm.MethodAccess; +import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; @@ -9,8 +10,11 @@ import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import org.omg.CORBA.BooleanHolder; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -54,6 +58,10 @@ public class Subscription { this.invocation = invocation; } + public final MessageHandler getHandlerMetadata() { + return handlerMetadata; + } + public Class[] getHandledMessageTypes() { return this.handlerMetadata.getHandledMessages(); } @@ -236,4 +244,99 @@ public class Subscription { Subscription other = (Subscription) obj; return this.ID == other.ID; } + + // inside a write lock + // also puts it into the correct map if it's not already there + public Collection createPublicationSubscriptions(final Map, ArrayList> subsPerMessageSingle, + final HashMapTree, ArrayList> subsPerMessageMulti, + AtomicBoolean varArgPossibility, SubscriptionUtils utils) { + + final Class[] messageHandlerTypes = handlerMetadata.getHandledMessages(); + final int size = messageHandlerTypes.length; + +// ConcurrentSet subsPerType; + +// SubscriptionUtils utils = this.utils; + Class type0 = messageHandlerTypes[0]; + + switch (size) { + case 1: { + ArrayList subs = subsPerMessageSingle.get(type0); + if (subs == null) { + subs = new ArrayList(); + + boolean isArray = type0.isArray(); + if (isArray) { + varArgPossibility.lazySet(true); + } + utils.cacheSuperClasses(type0); + + subsPerMessageSingle.put(type0, subs); + } + + return subs; + } + case 2: { + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.publish(); +// +// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]); +// if (putIfAbsent != null) { +// return putIfAbsent; +// } else { +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// // cache the super classes +// utils.cacheSuperClasses(type0); +// utils.cacheSuperClasses(types[1]); +// +// return subsPerType; +// } + } + case 3: { + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.publish(); +// +// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]); +// if (putIfAbsent != null) { +// return putIfAbsent; +// } else { +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// // cache the super classes +// utils.cacheSuperClasses(type0); +// utils.cacheSuperClasses(types[1]); +// utils.cacheSuperClasses(types[2]); +// +// return subsPerType; +// } + } + default: { + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time +// SubscriptionHolder subHolderSingle = this.subHolderSingle; +// subsPerType = subHolderSingle.publish(); +// +// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); +// if (putIfAbsent != null) { +// return putIfAbsent; +// } else { +// subHolderSingle.set(subHolderSingle.initialValue()); +// +// Class c; +// int length = types.length; +// for (int i = 0; i < length; i++) { +// c = types[i]; +// +// // cache the super classes +// utils.cacheSuperClasses(c); +// } +// +// return subsPerType; +// } + return null; + } + } + } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java index a50c8f9..afba0e4 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -60,7 +60,7 @@ public class SubscriptionManager { private final StampedLock lock = new StampedLock(); -// private final ReadWriteLock lock = new ReentrantReadWriteLock(); + // private final ReadWriteLock lock = new ReentrantReadWriteLock(); public SubscriptionManager(int numberOfThreads) { float loadFactor = SubscriptionManager.LOAD_FACTOR; @@ -115,64 +115,72 @@ public class SubscriptionManager { // the subscriptions from the map were null, so create them if (subscriptions == null) { + // it is important to note that this section CAN be repeated, however the write lock is gained before + // anything 'permanent' is saved. This is so the time spent inside the writelock is minimized. + + MessageHandler[] messageHandlers = MessageHandler.get(listenerClass); + int handlersSize = messageHandlers.length; + + // remember the class as non listening class if no handlers are found + if (handlersSize == 0) { + this.nonListeners.put(listenerClass, Boolean.TRUE); + return; + } + + + final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; + + final ArrayList subsPerListener = new ArrayList(handlersSize); + Collection subsForPublication; + + // create the subscription + MessageHandler messageHandler; + Subscription subscription; + + for (int i = 0; i < handlersSize; i++) { + messageHandler = messageHandlers[i]; + + // create the subscription + subscription = new Subscription(messageHandler); + subscription.subscribe(listener); + + subsPerListener.add(subscription); // activates this sub for sub/unsub + } + + // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because // of the huge number of reads compared to writes. StampedLock lock = this.lock; long stamp = lock.writeLock(); -// Lock writeLock = this.lock.writeLock(); -// writeLock.lock(); - ConcurrentMap, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; + final ConcurrentMap, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; subscriptions = subsPerListenerMap.get(listenerClass); // it was still null, so we actually have to create the rest of the subs if (subscriptions == null) { - MessageHandler[] messageHandlers = MessageHandler.get(listenerClass); - int handlersSize = messageHandlers.length; - - // remember the class as non listening class if no handlers are found - if (handlersSize == 0) { - this.nonListeners.put(listenerClass, Boolean.TRUE); - lock.unlockWrite(stamp); -// writeLock.unlock(); - return; - } - - Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; - HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; - - ArrayList subsPerListener = new ArrayList(handlersSize); - Collection subsForPublication = null; - - // create the subscription - MessageHandler messageHandler; + final AtomicBoolean varArgPossibility = this.varArgPossibility; + final SubscriptionUtils utils = this.utils; for (int i = 0; i < handlersSize; i++) { - messageHandler = messageHandlers[i]; - - // create the subscription - Subscription subscription = new Subscription(messageHandler); - subscription.subscribe(listener); - - subsPerListener.add(subscription); // activates this sub for sub/unsub + subscription = subsPerListener.get(i); // now add this subscription to each of the handled types - subsForPublication = getSubsForPublication(messageHandler, subsPerMessageSingle, - subsPerMessageMulti); + subsForPublication = subscription + .createPublicationSubscriptions(subsPerMessageSingle, subsPerMessageMulti, varArgPossibility, utils); + //noinspection ConstantConditions subsForPublication.add(subscription); // activates this sub for publication } subsPerListenerMap.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()])); lock.unlockWrite(stamp); -// writeLock.unlock(); return; } else { lock.unlockWrite(stamp); -// writeLock.unlock(); } } @@ -186,100 +194,6 @@ public class SubscriptionManager { } } - // inside a write lock - // also puts it into the correct map if it's not already there - private Collection getSubsForPublication(final MessageHandler messageHandler, - final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti) { - - final Class[] messageHandlerTypes = messageHandler.getHandledMessages(); - final int size = messageHandlerTypes.length; - -// ConcurrentSet subsPerType; - -// SubscriptionUtils utils = this.utils; - Class type0 = messageHandlerTypes[0]; - - switch (size) { - case 1: { - ArrayList subs = subsPerMessageSingle.get(type0); - if (subs == null) { - subs = new ArrayList(); - - boolean isArray = type0.isArray(); - if (isArray) { - varArgPossibility.lazySet(true); - } - utils.cacheSuperClasses(type0); - subsPerMessageSingle.put(type0, subs); - } - - return subs; - } - case 2: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time -// SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.publish(); -// -// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]); -// if (putIfAbsent != null) { -// return putIfAbsent; -// } else { -// subHolderSingle.set(subHolderSingle.initialValue()); -// -// // cache the super classes -// utils.cacheSuperClasses(type0); -// utils.cacheSuperClasses(types[1]); -// -// return subsPerType; -// } - } - case 3: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time -// SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.publish(); -// -// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]); -// if (putIfAbsent != null) { -// return putIfAbsent; -// } else { -// subHolderSingle.set(subHolderSingle.initialValue()); -// -// // cache the super classes -// utils.cacheSuperClasses(type0); -// utils.cacheSuperClasses(types[1]); -// utils.cacheSuperClasses(types[2]); -// -// return subsPerType; -// } - } - default: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time -// SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.publish(); -// -// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); -// if (putIfAbsent != null) { -// return putIfAbsent; -// } else { -// subHolderSingle.set(subHolderSingle.initialValue()); -// -// Class c; -// int length = types.length; -// for (int i = 0; i < length; i++) { -// c = types[i]; -// -// // cache the super classes -// utils.cacheSuperClasses(c); -// } -// -// return subsPerType; -// } - return null; - } - } - } - public final void unsubscribe(Object listener) { if (listener == null) { return; @@ -305,6 +219,7 @@ public class SubscriptionManager { } } + private void clearConcurrentCollections() { this.utils.clear(); this.varArgUtils.clear();