diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 013372c..df3fc75 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -1,19 +1,17 @@ package dorkbox.util.messagebus; -import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.Collection; +import java.util.Iterator; import org.jctools.util.Pow2; import dorkbox.util.messagebus.common.DeadMessage; import dorkbox.util.messagebus.common.NamedThreadFactory; -import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MultiNode; import dorkbox.util.messagebus.common.thread.BooleanHolder; import dorkbox.util.messagebus.common.thread.BooleanThreadHolder; -import dorkbox.util.messagebus.common.thread.ConcurrentSet; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -157,21 +155,6 @@ public class MultiMBassador implements IMessageBus { } } - @Override - public void subscribe(final Object listener) { - MultiMBassador.this.subscriptionManager.subscribe(listener); - } - - @Override - public void unsubscribe(final Object listener) { - MultiMBassador.this.subscriptionManager.unsubscribe(listener); - } - - @Override - public final boolean hasPendingMessages() { - return this.dispatchQueue.hasPendingMessages(); - } - @Override public void start() { for (Thread t : this.threads) { @@ -194,6 +177,21 @@ public class MultiMBassador implements IMessageBus { this.subscriptionManager.shutdown(); } + @Override + public void subscribe(final Object listener) { + MultiMBassador.this.subscriptionManager.subscribe(listener); + } + + @Override + public void unsubscribe(final Object listener) { + MultiMBassador.this.subscriptionManager.unsubscribe(listener); + } + + @Override + public final boolean hasPendingMessages() { + return this.dispatchQueue.hasPendingMessages(); + } + private final BooleanThreadHolder booleanThreadLocal = new BooleanThreadHolder(); @Override @@ -201,70 +199,80 @@ public class MultiMBassador implements IMessageBus { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); - ConcurrentSet subscriptions = manager.getSubscriptionsByMessageType(messageClass); + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); // can return null BooleanHolder subsPublished = this.booleanThreadLocal.get(); subsPublished.bool = false; + Iterator iterator; + Subscription sub; + // Run subscriptions if (subscriptions != null) { - for (Subscription sub : subscriptions) { + for (iterator = subscriptions.iterator(); iterator.hasNext();) { + sub = iterator.next(); + // this catches all exception types sub.publishToSubscription(this, subsPublished, message); } } if (!this.forceExactMatches) { - ConcurrentSet superSubscriptions = manager.getSuperSubscriptions(messageClass); + Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); // now get superClasses if (superSubscriptions != null) { - for (Subscription sub : superSubscriptions) { + for (iterator = superSubscriptions.iterator(); iterator.hasNext();) { + sub = iterator.next(); + // this catches all exception types sub.publishToSubscription(this, subsPublished, message); } } - // publish to var arg, only if not already an array - if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) { - Object[] asArray = null; - - ConcurrentSet varargSubscriptions = manager.getVarArgSubscriptions(messageClass); - if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { - asArray = (Object[]) Array.newInstance(messageClass, 1); - asArray[0] = message; - - for (Subscription sub : varargSubscriptions) { - // this catches all exception types - sub.publishToSubscription(this, subsPublished, asArray); - } - } - - ConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); - // now get array based superClasses (but only if those ALSO accept vararg) - if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { - if (asArray == null) { - asArray = (Object[]) Array.newInstance(messageClass, 1); - asArray[0] = message; - } - for (Subscription sub : varargSuperSubscriptions) { - // this catches all exception types - sub.publishToSubscription(this, subsPublished, asArray); - } - } - } +// // publish to var arg, only if not already an array +// if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) { +// Object[] asArray = null; +// +// ConcurrentSet varargSubscriptions = manager.getVarArgSubscriptions(messageClass); +// if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) { +// asArray = (Object[]) Array.newInstance(messageClass, 1); +// asArray[0] = message; +// +// for (iterator = varargSubscriptions.iterator(); iterator.hasNext();) { +// sub = iterator.next(); +// // this catches all exception types +// sub.publishToSubscription(this, subsPublished, asArray); +// } +// } +// +// ConcurrentSet varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass); +// // now get array based superClasses (but only if those ALSO accept vararg) +// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { +// if (asArray == null) { +// asArray = (Object[]) Array.newInstance(messageClass, 1); +// asArray[0] = message; +// } +// for (iterator = varargSuperSubscriptions.iterator(); iterator.hasNext();) { +// sub = iterator.next(); +// // this catches all exception types +// sub.publishToSubscription(this, subsPublished, asArray); +// } +// } +// } } if (!subsPublished.bool) { // Dead Event must EXACTLY MATCH (no subclasses) - ConcurrentSet deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { DeadMessage deadMessage = new DeadMessage(message); - for (Subscription sub2 : deadSubscriptions) { + for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) { + sub = iterator.next(); // this catches all exception types - sub2.publishToSubscription(this, subsPublished, deadMessage); + sub.publishToSubscription(this, subsPublished, deadMessage); } } } diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 150e4ac..341fa20 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -1,13 +1,12 @@ package dorkbox.util.messagebus; import java.util.Collection; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.HashMapTree; -import dorkbox.util.messagebus.common.ISetEntry; -import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.SubscriptionUtils; import dorkbox.util.messagebus.common.VarArgPossibility; import dorkbox.util.messagebus.common.VarArgUtils; @@ -51,8 +50,8 @@ public class SubscriptionManager { // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final ConcurrentMap, ConcurrentSet> subscriptionsPerMessageSingle; - private final HashMapTree, ConcurrentSet> subscriptionsPerMessageMulti; + private final ConcurrentMap, Collection> subscriptionsPerMessageSingle; + private final HashMapTree, Collection> subscriptionsPerMessageMulti; // all subscriptions per messageHandler type // this map provides fast access for subscribing and unsubscribing @@ -61,12 +60,6 @@ public class SubscriptionManager { private final ConcurrentMap, ConcurrentSet> subscriptionsPerListener; - // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers - // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one - private final ConcurrentMap, ConcurrentSet> superClassSubscriptions; - private final HashMapTree, ConcurrentSet> superClassSubscriptionsMulti; - private final VarArgUtils varArgUtils; // stripe size of maps for concurrency @@ -75,36 +68,31 @@ public class SubscriptionManager { private final SubscriptionHolder subHolderSingle; private final SubscriptionHolder subHolderConcurrent; + SubscriptionManager(int numberOfThreads) { this.STRIPE_SIZE = numberOfThreads; - this.utils = new SubscriptionUtils(LOAD_FACTOR, numberOfThreads); + float loadFactor = SubscriptionManager.LOAD_FACTOR; // modified ONLY during SUB/UNSUB { - this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, SubscriptionManager.LOAD_FACTOR); + this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, loadFactor, this.STRIPE_SIZE); - this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, ConcurrentSet>(32, SubscriptionManager.LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, ConcurrentSet>(4, SubscriptionManager.LOAD_FACTOR); + this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, Collection>(4, loadFactor, this.STRIPE_SIZE); + this.subscriptionsPerMessageMulti = new HashMapTree, Collection>(4, loadFactor); // only used during SUB/UNSUB - this.subscriptionsPerListener = new ConcurrentHashMapV8, ConcurrentSet>(32, SubscriptionManager.LOAD_FACTOR, 1); + this.subscriptionsPerListener = new ConcurrentHashMapV8, ConcurrentSet>(4, loadFactor, this.STRIPE_SIZE); } - // modified by N threads - { - // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but improves performance of handlers - this.superClassSubscriptions = new ConcurrentHashMapV8, ConcurrentSet>(32, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - this.superClassSubscriptionsMulti = new HashMapTree, ConcurrentSet>(4, SubscriptionManager.LOAD_FACTOR); + this.utils = new SubscriptionUtils(this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, numberOfThreads); - // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. - // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - } + // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance of handlers + this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, this.STRIPE_SIZE); - this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + this.subHolderSingle = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE); + this.subHolderConcurrent = new SubscriptionHolder(loadFactor, this.STRIPE_SIZE); } public void shutdown() { @@ -146,7 +134,7 @@ public class SubscriptionManager { ConcurrentSet subsPerListener = subsPerListener2.get(listenerClass); if (subsPerListener == null) { // a listener is subscribed for the first time - StrongConcurrentSet messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers(); + Collection messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass, LOAD_FACTOR, this.STRIPE_SIZE).getHandlers(); int handlersSize = messageHandlers.size(); if (handlersSize == 0) { @@ -157,15 +145,15 @@ public class SubscriptionManager { VarArgPossibility varArgPossibility = this.varArgPossibility; subsPerListener = new ConcurrentSet(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); - ConcurrentMap, ConcurrentSet> subsPerMessageSingle = this.subscriptionsPerMessageSingle; + ConcurrentMap, Collection> subsPerMessageSingle = this.subscriptionsPerMessageSingle; - ISetEntry current = messageHandlers.head; + Iterator iterator; MessageHandler messageHandler; - while (current != null) { - messageHandler = current.getValue(); - current = current.next(); + Collection subsPerType = null; + + for (iterator = messageHandlers.iterator(); iterator.hasNext();) { + messageHandler = iterator.next(); - ConcurrentSet subsPerType = null; // now add this subscription to each of the handled types Class[] types = messageHandler.getHandledMessages(); @@ -176,12 +164,13 @@ public class SubscriptionManager { SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; subsPerType = subHolderConcurrent.get(); - ConcurrentSet putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType); + Collection putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { subHolderConcurrent.set(subHolderConcurrent.initialValue()); boolean isArray = this.utils.isArray(types[0]); + // cache the super classes this.utils.getSuperClasses(types[0], isArray); if (isArray) { varArgPossibility.set(true); @@ -194,11 +183,12 @@ public class SubscriptionManager { SubscriptionHolder subHolderSingle = this.subHolderSingle; subsPerType = subHolderSingle.get(); - ConcurrentSet putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]); + Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { subHolderSingle.set(subHolderSingle.initialValue()); + // cache the super classes this.utils.getSuperClasses(types[0]); this.utils.getSuperClasses(types[1]); } @@ -209,11 +199,12 @@ public class SubscriptionManager { SubscriptionHolder subHolderSingle = this.subHolderSingle; subsPerType = subHolderSingle.get(); - ConcurrentSet putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]); + Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { subHolderSingle.set(subHolderSingle.initialValue()); + // cache the super classes this.utils.getSuperClasses(types[0]); this.utils.getSuperClasses(types[1]); this.utils.getSuperClasses(types[2]); @@ -225,7 +216,7 @@ public class SubscriptionManager { SubscriptionHolder subHolderSingle = this.subHolderSingle; subsPerType = subHolderSingle.get(); - ConcurrentSet putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types); + Collection putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { @@ -253,8 +244,11 @@ public class SubscriptionManager { subsPerListener2.put(listenerClass, subsPerListener); } } else { - // subscriptions already exist and must only be updated - for (Subscription sub : subsPerListener) { + Iterator iterator; + Subscription sub; + + for (iterator = subsPerListener.iterator(); iterator.hasNext();) { + sub = iterator.next(); sub.subscribe(listener); } } @@ -276,9 +270,13 @@ public class SubscriptionManager { clearConcurrentCollections(); synchronized(listenerClass) { - ConcurrentSet subscriptions = this.subscriptionsPerListener.get(listenerClass); + Collection subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions != null) { - for (Subscription sub : subscriptions) { + Iterator iterator; + Subscription sub; + + for (iterator = subscriptions.iterator(); iterator.hasNext();) { + sub = iterator.next(); sub.unsubscribe(listener); } } @@ -286,23 +284,23 @@ public class SubscriptionManager { } private void clearConcurrentCollections() { - this.superClassSubscriptions.clear(); + this.utils.clear(); this.varArgUtils.clear(); } // CAN RETURN NULL - public final ConcurrentSet getSubscriptionsByMessageType(Class messageType) { + public final Collection getSubscriptionsByMessageType(Class messageType) { return this.subscriptionsPerMessageSingle.get(messageType); } // CAN RETURN NULL - public final ConcurrentSet getSubscriptionsByMessageType(Class messageType1, Class messageType2) { + public final Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2) { return this.subscriptionsPerMessageMulti.get(messageType1, messageType2); } // CAN RETURN NULL - public final ConcurrentSet getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { + public final Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); } @@ -321,263 +319,40 @@ public class SubscriptionManager { // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSuperSubscriptions(Class messageClass) { + public Collection getVarArgSuperSubscriptions(Class messageClass) { return this.varArgUtils.getVarArgSuperSubscriptions(messageClass); } // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSuperSubscriptions(Class messageClass1, Class messageClass2) { + public Collection getVarArgSuperSubscriptions(Class messageClass1, Class messageClass2) { return this.varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2); } // CAN NOT RETURN NULL // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // and then, returns the array'd version subscriptions - public ConcurrentSet getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3) { + public Collection getVarArgSuperSubscriptions(final Class messageClass1, final Class messageClass2, final Class messageClass3) { return this.varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); } // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. - public final ConcurrentSet getSuperSubscriptions(Class superType) { - // whenever our subscriptions change, this map is cleared. - ConcurrentMap, ConcurrentSet> local = this.superClassSubscriptions; - - SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; - ConcurrentSet subsPerType = subHolderConcurrent.get(); - - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(superType, subsPerType); - if (putIfAbsent == null) { - // we are the first one in the map - subHolderConcurrent.set(subHolderConcurrent.initialValue()); - - StrongConcurrentSet> types = this.utils.getSuperClasses(superType); - if (types.isEmpty()) { - return subsPerType; - } - - Map, ConcurrentSet> local2 = this.subscriptionsPerMessageSingle; - - ISetEntry> current1 = null; - Class superClass; - - current1 = types.head; - while (current1 != null) { - superClass = current1.getValue(); - current1 = current1.next(); - - ConcurrentSet subs = local2.get(superClass); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } - } - } - } - return subsPerType; - } else { - // someone beat us - return putIfAbsent; - } - + public final Collection getSuperSubscriptions(Class superType) { + return this.utils.getSuperSubscriptions(superType); } // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. - public ConcurrentSet getSuperSubscriptions(Class superType1, Class superType2) { - HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; - - // whenever our subscriptions change, this map is cleared. - HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2); - ConcurrentSet subsPerType = null; - - // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } else { - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); - if (putIfAbsent == null) { - // we are the first one in the map - subHolderSingle.set(subHolderSingle.initialValue()); - - // whenever our subscriptions change, this map is cleared. - StrongConcurrentSet> types1 = this.utils.getSuperClasses(superType1); - StrongConcurrentSet> types2 = this.utils.getSuperClasses(superType2); - - ConcurrentSet subs; - HashMapTree, ConcurrentSet> leaf1; - HashMapTree, ConcurrentSet> leaf2; - - ISetEntry> current1 = null; - Class eventSuperType1; - - ISetEntry> current2 = null; - Class eventSuperType2; - - if (types1 != null) { - current1 = types1.head; - } - while (current1 != null) { - eventSuperType1 = current1.getValue(); - current1 = current1.next(); - - boolean type1Matches = eventSuperType1 == superType1; - if (type1Matches) { - continue; - } - - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - if (leaf1 != null) { - if (types2 != null) { - current2 = types2.head; - } - while (current2 != null) { - eventSuperType2 = current2.getValue(); - current2 = current2.next(); - - if (type1Matches && eventSuperType2 == superType2) { - continue; - } - - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null) { - subs = leaf2.getValue(); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } - } - } - } - } - } - } - } else { - // someone beat us - subsPerType = putIfAbsent; - } - } - - return subsPerType; + public Collection getSuperSubscriptions(Class superType1, Class superType2) { + return this.utils.getSuperSubscriptions(superType1, superType2); } // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. - public ConcurrentSet getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { - HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; - - // whenever our subscriptions change, this map is cleared. - HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); - ConcurrentSet subsPerType; - - - // we DO NOT care about duplicate, because the answers will be the same - if (subsPerTypeLeaf != null) { - // if the leaf exists, then the value exists. - subsPerType = subsPerTypeLeaf.getValue(); - } else { - SubscriptionHolder subHolderSingle = this.subHolderSingle; - subsPerType = subHolderSingle.get(); - - // cache our subscriptions for super classes, so that their access can be fast! - ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); - if (putIfAbsent == null) { - // we are the first one in the map - subHolderSingle.set(subHolderSingle.initialValue()); - - StrongConcurrentSet> types1 = this.utils.getSuperClasses(superType1); - StrongConcurrentSet> types2 = this.utils.getSuperClasses(superType2); - StrongConcurrentSet> types3 = this.utils.getSuperClasses(superType3); - - ConcurrentSet subs; - HashMapTree, ConcurrentSet> leaf1; - HashMapTree, ConcurrentSet> leaf2; - HashMapTree, ConcurrentSet> leaf3; - - ISetEntry> current1 = null; - Class eventSuperType1; - - ISetEntry> current2 = null; - Class eventSuperType2; - - ISetEntry> current3 = null; - Class eventSuperType3; - - if (types1 != null) { - current1 = types1.head; - } - while (current1 != null) { - eventSuperType1 = current1.getValue(); - current1 = current1.next(); - - boolean type1Matches = eventSuperType1 == superType1; - if (type1Matches) { - continue; - } - - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - if (leaf1 != null) { - if (types2 != null) { - current2 = types2.head; - } - while (current2 != null) { - eventSuperType2 = current2.getValue(); - current2 = current2.next(); - - boolean type12Matches = type1Matches && eventSuperType2 == superType2; - if (type12Matches) { - continue; - } - - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null) { - if (types3 != null) { - current3 = types3.head; - } - while (current3 != null) { - eventSuperType3 = current3.getValue(); - current3 = current3.next(); - - if (type12Matches && eventSuperType3 == superType3) { - continue; - } - - leaf3 = leaf2.getLeaf(eventSuperType3); - - if (leaf3 != null) { - subs = leaf3.getValue(); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); - } - } - } - } - } - } - } - } - } - } else { - // someone beat us - subsPerType = putIfAbsent; - } - } - - return subsPerType; + public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { + return this.utils.getSuperSubscriptions(superType1, superType2, superType3); } } diff --git a/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java b/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java index 0125083..9e0dd14 100644 --- a/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java +++ b/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java @@ -4,9 +4,8 @@ import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; +import dorkbox.util.messagebus.common.thread.StampedLock; abstract class pad extends item { @@ -27,7 +26,7 @@ public abstract class AbstractConcurrentSet extends pad implements Set private final transient long ID = id.getAndIncrement(); // Internal state - protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); + protected final StampedLock lock = new StampedLock(); private final transient Map> entries; // maintain a map of entries for O(log n) lookup volatile long y0, y1, y2, y4, y5, y6 = 7L; @@ -47,25 +46,38 @@ public abstract class AbstractConcurrentSet extends pad implements Set } boolean changed = false; - Lock writeLock = this.lock.writeLock(); - writeLock.lock(); + long stamp = this.lock.readLock(); + if (this.entries.containsKey(element)) { + this.lock.unlockRead(stamp); + return false; + } + + long newStamp = 0L; + while ((newStamp = this.lock.tryConvertToWriteLock(stamp)) == 0) { + this.lock.unlockRead(stamp); + stamp = this.lock.writeLock(); + } + stamp = newStamp; + + changed = insert(element); - writeLock.unlock(); + this.lock.unlock(stamp); return changed; } @Override public boolean contains(Object element) { - Lock readLock = this.lock.readLock(); - ISetEntry entry; - try { - readLock.lock(); - entry = this.entries.get(element); + long stamp = this.lock.tryOptimisticRead(); - } finally { - readLock.unlock(); + ISetEntry entry = this.entries.get(element); + + if (!this.lock.validate(stamp)) { + stamp = this.lock.readLock(); + entry = this.entries.get(element); + this.lock.unlockRead(stamp); } + return entry != null && entry.getValue() != null; } @@ -90,18 +102,21 @@ public abstract class AbstractConcurrentSet extends pad implements Set @Override public boolean addAll(Collection elements) { + StampedLock lock = this.lock; + boolean changed = false; - Lock writeLock = this.lock.writeLock(); + long stamp = lock.writeLock(); + try { - writeLock.lock(); for (T element : elements) { if (element != null) { changed |= insert(element); } } } finally { - writeLock.unlock(); + lock.unlockWrite(stamp); } + return changed; } @@ -110,33 +125,35 @@ public abstract class AbstractConcurrentSet extends pad implements Set */ @Override public boolean remove(Object element) { + StampedLock lock = this.lock; + long stamp = lock.tryOptimisticRead(); - Lock updateLock = this.lock.updateLock(); - try { - updateLock.lock(); - ISetEntry entry = this.entries.get(element); + ISetEntry entry = this.entries.get(element); - if (entry != null && entry.getValue() != null) { - Lock writeLock = this.lock.writeLock(); - try { - writeLock.lock(); - if (entry != this.head) { - entry.remove(); - } else { - // if it was second, now it's first - this.head = this.head.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - this.entries.remove(element); - return true; - } finally { - writeLock.unlock(); + if (!lock.validate(stamp)) { + stamp = lock.readLock(); + entry = this.entries.get(element); + lock.unlockRead(stamp); + } + + if (entry == null || entry.getValue() == null) { + return false; // fast exit + } else { + stamp = lock.writeLock(); + + try { + if (entry != this.head) { + entry.remove(); + } else { + // if it was second, now it's first + this.head = this.head.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators } - } else { - return false; // fast exit + this.entries.remove(element); + return true; + } finally { + lock.unlockWrite(stamp); } - } finally { - updateLock.unlock(); } } @@ -146,7 +163,7 @@ public abstract class AbstractConcurrentSet extends pad implements Set } @Override - public T[] toArray(T[] a) { + public T2[] toArray(T2[] a) { return this.entries.entrySet().toArray(a); } @@ -167,14 +184,12 @@ public abstract class AbstractConcurrentSet extends pad implements Set @Override public void clear() { - Lock writeLock = this.lock.writeLock(); - try { - writeLock.lock(); - this.head = null; - this.entries.clear(); - } finally { - writeLock.unlock(); - } + StampedLock lock = this.lock; + + long stamp = lock.writeLock(); + this.head = null; + this.entries.clear(); + lock.unlockWrite(stamp); } @Override @@ -196,6 +211,7 @@ public abstract class AbstractConcurrentSet extends pad implements Set if (getClass() != obj.getClass()) { return false; } + @SuppressWarnings("rawtypes") AbstractConcurrentSet other = (AbstractConcurrentSet) obj; if (this.ID != other.ID) { return false; diff --git a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java index fb1e5c6..89c7886 100644 --- a/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/ReflectionUtils.java @@ -4,8 +4,10 @@ import java.lang.annotation.Annotation; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; import java.util.Collection; +import java.util.Iterator; import dorkbox.util.messagebus.annotations.Handler; +import dorkbox.util.messagebus.common.thread.ConcurrentSet; /** * @author bennidi @@ -16,13 +18,13 @@ import dorkbox.util.messagebus.annotations.Handler; */ public class ReflectionUtils { - public static StrongConcurrentSetV8 getMethods(Class target) { - StrongConcurrentSetV8 hashSet = new StrongConcurrentSetV8(16, .8F); + public static Collection getMethods(Class target) { + Collection hashSet = new ConcurrentSet(16, .8F, 1); getMethods(target, hashSet); return hashSet; } - private static void getMethods(Class target, StrongConcurrentSetV8 methods) { + private static void getMethods(Class target, Collection methods) { try { for (Method method : target.getDeclaredMethods()) { if (getAnnotation(method, Handler.class) != null) { @@ -66,8 +68,8 @@ public class ReflectionUtils { * @param from The root class to start with * @return A set of classes, each representing a super type of the root class */ - public static StrongConcurrentSetV8> getSuperTypes(Class from) { - StrongConcurrentSetV8> superclasses = new StrongConcurrentSetV8>(8, 0.8F); + public static Collection> getSuperTypes(Class from) { + Collection> superclasses = new ConcurrentSet>(8, 0.8F, 1); collectInterfaces( from, superclasses ); @@ -79,21 +81,19 @@ public class ReflectionUtils { return superclasses; } - public static void collectInterfaces( Class from, StrongConcurrentSetV8> accumulator ) { + public static void collectInterfaces( Class from, Collection> accumulator ) { for ( Class intface : from.getInterfaces() ) { accumulator.add( intface ); collectInterfaces( intface, accumulator ); } } - // - public static boolean containsOverridingMethod(final StrongConcurrentSetV8 allMethods, final Method methodToCheck) { - - ISetEntry current = allMethods.head; + public static boolean containsOverridingMethod(final Collection allMethods, final Method methodToCheck) { + Iterator iterator; Method method; - while (current != null) { - method = current.getValue(); - current = current.next(); + + for (iterator = allMethods.iterator(); iterator.hasNext();) { + method = iterator.next(); if (isOverriddenBy(methodToCheck, method)) { return true; @@ -131,7 +131,7 @@ public class ReflectionUtils { } public static A getAnnotation( AnnotatedElement from, Class annotationType) { - A annotation = getAnnotation(from, annotationType, new StrongConcurrentSetV8(16, .8F)); + A annotation = getAnnotation(from, annotationType, new ConcurrentSet(16, .8F, 1)); return annotation; } diff --git a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java index 30df869..b8a22f8 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/SubscriptionUtils.java @@ -1,26 +1,61 @@ package dorkbox.util.messagebus.common; import java.lang.reflect.Array; +import java.util.Collection; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; import dorkbox.util.messagebus.common.thread.ClassHolder; +import dorkbox.util.messagebus.common.thread.ConcurrentSet; +import dorkbox.util.messagebus.common.thread.SubscriptionHolder; +import dorkbox.util.messagebus.subscription.Subscription; public class SubscriptionUtils { private final Map, Class> arrayVersionCache; private final Map, Boolean> isArrayCache; - private final ConcurrentMap, StrongConcurrentSet>> superClassesCache; + private final ConcurrentMap, ConcurrentSet>> superClassesCache; private final ClassHolder classHolderSingle; + // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers + // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one + private final ConcurrentMap, ConcurrentSet> superClassSubscriptions; + private final HashMapTree, ConcurrentSet> superClassSubscriptionsMulti; - public SubscriptionUtils(float loadFactor, int stripeSize) { - this.arrayVersionCache = new ConcurrentHashMapV8, Class>(64, loadFactor, stripeSize); - this.isArrayCache = new ConcurrentHashMapV8, Boolean>(64, loadFactor, stripeSize); + private final Map, Collection> subscriptionsPerMessageSingle; - this.superClassesCache = new ConcurrentHashMapV8, StrongConcurrentSet>>(64, loadFactor, stripeSize); - this.classHolderSingle = new ClassHolder(loadFactor); + + private final SubscriptionHolder subHolderSingle; + private final SubscriptionHolder subHolderConcurrent; + private final HashMapTree, Collection> subscriptionsPerMessageMulti; + + + public SubscriptionUtils(Map, Collection> subscriptionsPerMessageSingle, + HashMapTree, Collection> subscriptionsPerMessageMulti, + float loadFactor, int stripeSize) { + + this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; + this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti; + this.arrayVersionCache = new ConcurrentHashMapV8, Class>(32, loadFactor, stripeSize); + this.isArrayCache = new ConcurrentHashMapV8, Boolean>(32, loadFactor, stripeSize); + + this.superClassesCache = new ConcurrentHashMapV8, ConcurrentSet>>(32, loadFactor, stripeSize); + this.classHolderSingle = new ClassHolder(loadFactor, stripeSize); + + // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance of handlers + this.superClassSubscriptions = new ConcurrentHashMapV8, ConcurrentSet>(32, loadFactor, stripeSize); + this.superClassSubscriptionsMulti = new HashMapTree, ConcurrentSet>(4, loadFactor); + + this.subHolderSingle = new SubscriptionHolder(loadFactor, stripeSize); + this.subHolderConcurrent = new SubscriptionHolder(loadFactor, stripeSize); + } + + public void clear() { + this.superClassSubscriptions.clear(); } /** @@ -28,31 +63,29 @@ public class SubscriptionUtils { * never returns null * never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime) */ - public StrongConcurrentSet> getSuperClasses(Class clazz) { + public Collection> getSuperClasses(Class clazz) { return getSuperClasses(clazz, isArray(clazz)); } - public final StrongConcurrentSet> getSuperClasses(Class clazz, boolean isArray) { + public final Collection> getSuperClasses(Class clazz, boolean isArray) { // this is never reset, since it never needs to be. - ConcurrentMap, StrongConcurrentSet>> local = this.superClassesCache; + ConcurrentMap, ConcurrentSet>> local = this.superClassesCache; ClassHolder classHolderSingle = this.classHolderSingle; - StrongConcurrentSet> classes = classHolderSingle.get(); + ConcurrentSet> classes = classHolderSingle.get(); - StrongConcurrentSet> putIfAbsent = local.putIfAbsent(clazz, classes); + ConcurrentSet> putIfAbsent = local.putIfAbsent(clazz, classes); if (putIfAbsent == null) { // we are the first one in the map classHolderSingle.set(classHolderSingle.initialValue()); // it doesn't matter if concurrent access stomps on values, since they are always the same. - StrongConcurrentSet> superTypes = ReflectionUtils.getSuperTypes(clazz); - - ISetEntry> current = superTypes.head; + Collection> superTypes = ReflectionUtils.getSuperTypes(clazz); + Iterator> iterator; Class c; - while (current != null) { - c = current.getValue(); - current = current.next(); + for (iterator = superTypes.iterator(); iterator.hasNext();) { + c = iterator.next(); if (isArray) { c = getArrayClass(c); } @@ -111,4 +144,239 @@ public class SubscriptionUtils { this.superClassesCache.clear(); } + + // CAN NOT RETURN NULL + // ALSO checks to see if the superClass accepts subtypes. + public final Collection getSuperSubscriptions(Class superType) { + // whenever our subscriptions change, this map is cleared. + ConcurrentMap, ConcurrentSet> local = this.superClassSubscriptions; + + SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; + ConcurrentSet subsPerType = subHolderConcurrent.get(); + + // cache our subscriptions for super classes, so that their access can be fast! + ConcurrentSet putIfAbsent = local.putIfAbsent(superType, subsPerType); + if (putIfAbsent == null) { + // we are the first one in the map + subHolderConcurrent.set(subHolderConcurrent.initialValue()); + + Collection> types = getSuperClasses(superType); + if (types.isEmpty()) { + return subsPerType; + } + + Map, Collection> local2 = this.subscriptionsPerMessageSingle; + + Class superClass; + Iterator> iterator; + + Iterator subIterator; + Subscription sub; + + for (iterator = types.iterator(); iterator.hasNext();) { + superClass = iterator.next(); + + Collection subs = local2.get(superClass); + if (subs != null) { + for (subIterator = subs.iterator(); subIterator.hasNext();) { + sub = subIterator.next(); + if (sub.acceptsSubtypes()) { + subsPerType.add(sub); + } + } + } + } + return subsPerType; + } else { + // someone beat us + return putIfAbsent; + } + } + + // CAN NOT RETURN NULL + // ALSO checks to see if the superClass accepts subtypes. + public Collection getSuperSubscriptions(Class superType1, Class superType2) { + HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; + + // whenever our subscriptions change, this map is cleared. + HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2); + ConcurrentSet subsPerType = null; + + // we DO NOT care about duplicate, because the answers will be the same + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } else { + SubscriptionHolder subHolderSingle = this.subHolderSingle; + subsPerType = subHolderSingle.get(); + + // cache our subscriptions for super classes, so that their access can be fast! + ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); + if (putIfAbsent == null) { + // we are the first one in the map + subHolderSingle.set(subHolderSingle.initialValue()); + + // whenever our subscriptions change, this map is cleared. + Collection> types1 = getSuperClasses(superType1); + + if (types1 != null) { + Collection> types2 = getSuperClasses(superType2); + + Collection subs; + HashMapTree, Collection> leaf1; + HashMapTree, Collection> leaf2; + + Class eventSuperType1; + Class eventSuperType2; + + Iterator> iterator1; + Iterator> iterator2; + + Iterator subIterator; + Subscription sub; + + for (iterator1 = types1.iterator(); iterator1.hasNext();) { + eventSuperType1 = iterator1.next(); + + boolean type1Matches = eventSuperType1 == superType1; + if (type1Matches) { + continue; + } + + leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); + if (leaf1 != null && types2 != null) { + for (iterator2 = types2.iterator(); iterator2.hasNext();) { + eventSuperType2 = iterator2.next(); + + if (type1Matches && eventSuperType2 == superType2) { + continue; + } + + leaf2 = leaf1.getLeaf(eventSuperType2); + + if (leaf2 != null) { + subs = leaf2.getValue(); + if (subs != null) { + for (subIterator = subs.iterator(); subIterator.hasNext();) { + sub = subIterator.next(); + if (sub.acceptsSubtypes()) { + subsPerType.add(sub); + } + } + } + } + } + } + } + } + } else { + // someone beat us + subsPerType = putIfAbsent; + } + } + + return subsPerType; + } + + // CAN NOT RETURN NULL + // ALSO checks to see if the superClass accepts subtypes. + public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { + HashMapTree, ConcurrentSet> local = this.superClassSubscriptionsMulti; + + // whenever our subscriptions change, this map is cleared. + HashMapTree, ConcurrentSet> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3); + ConcurrentSet subsPerType; + + + // we DO NOT care about duplicate, because the answers will be the same + if (subsPerTypeLeaf != null) { + // if the leaf exists, then the value exists. + subsPerType = subsPerTypeLeaf.getValue(); + } else { + SubscriptionHolder subHolderSingle = this.subHolderSingle; + subsPerType = subHolderSingle.get(); + + // cache our subscriptions for super classes, so that their access can be fast! + ConcurrentSet putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); + if (putIfAbsent == null) { + // we are the first one in the map + subHolderSingle.set(subHolderSingle.initialValue()); + + Collection> types1 = getSuperClasses(superType1); + + if (types1 != null) { + Collection> types2 = getSuperClasses(superType2); + Collection> types3 = getSuperClasses(superType3); + + Collection subs; + HashMapTree, Collection> leaf1; + HashMapTree, Collection> leaf2; + HashMapTree, Collection> leaf3; + + Class eventSuperType1; + Class eventSuperType2; + Class eventSuperType3; + + Iterator> iterator1; + Iterator> iterator2; + Iterator> iterator3; + + Iterator subIterator; + Subscription sub; + + for (iterator1 = types1.iterator(); iterator1.hasNext();) { + eventSuperType1 = iterator1.next(); + + boolean type1Matches = eventSuperType1 == superType1; + if (type1Matches) { + continue; + } + + leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); + if (leaf1 != null && types2 != null) { + for (iterator2 = types2.iterator(); iterator2.hasNext();) { + eventSuperType2 = iterator2.next(); + + boolean type12Matches = type1Matches && eventSuperType2 == superType2; + if (type12Matches) { + continue; + } + + leaf2 = leaf1.getLeaf(eventSuperType2); + + if (leaf2 != null && types3 != null) { + for (iterator3 = types3.iterator(); iterator3.hasNext();) { + eventSuperType3 = iterator3.next(); + + if (type12Matches && eventSuperType3 == superType3) { + continue; + } + + leaf3 = leaf2.getLeaf(eventSuperType3); + + if (leaf3 != null) { + subs = leaf3.getValue(); + if (subs != null) { + for (subIterator = subs.iterator(); subIterator.hasNext();) { + sub = subIterator.next(); + if (sub.acceptsSubtypes()) { + subsPerType.add(sub); + } + } + } + } + } + } + } + } + } + } + } else { + // someone beat us + subsPerType = putIfAbsent; + } + } + + return subsPerType; + } } diff --git a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java index ef0c663..b1d9da3 100644 --- a/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java +++ b/src/main/java/dorkbox/util/messagebus/common/VarArgUtils.java @@ -1,5 +1,7 @@ package dorkbox.util.messagebus.common; +import java.util.Collection; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -18,10 +20,10 @@ public class VarArgUtils { private final int stripeSize; private final SubscriptionUtils utils; - private final ConcurrentMap, ConcurrentSet> subscriptionsPerMessageSingle; + private final Map, Collection> subscriptionsPerMessageSingle; - public VarArgUtils(SubscriptionUtils utils, ConcurrentMap, ConcurrentSet> subscriptionsPerMessageSingle, + public VarArgUtils(SubscriptionUtils utils, Map, Collection> subscriptionsPerMessageSingle, float loadFactor, int stripeSize) { this.utils = utils; @@ -63,9 +65,13 @@ public class VarArgUtils { // this caches our array type. This is never cleared. Class arrayVersion = this.utils.getArrayClass(messageClass); - ConcurrentSet subs = this.subscriptionsPerMessageSingle.get(arrayVersion); + Iterator iterator; + Subscription sub; + + Collection subs = this.subscriptionsPerMessageSingle.get(arrayVersion); if (subs != null) { - for (Subscription sub : subs) { + for (iterator = subs.iterator(); iterator.hasNext();) { + sub = iterator.next(); if (sub.acceptsVarArgs()) { subsPerType.add(sub); } @@ -98,24 +104,27 @@ public class VarArgUtils { subHolderConcurrent.set(subHolderConcurrent.initialValue()); Class arrayVersion = this.utils.getArrayClass(messageClass); - StrongConcurrentSet> types = this.utils.getSuperClasses(arrayVersion, true); + Collection> types = this.utils.getSuperClasses(arrayVersion, true); if (types.isEmpty()) { - return null; + return subsPerType; } - Map, ConcurrentSet> local2 = this.subscriptionsPerMessageSingle; + Map, Collection> local2 = this.subscriptionsPerMessageSingle; - ISetEntry> current1; + Iterator> iterator; Class superClass; - current1 = types.head; - while (current1 != null) { - superClass = current1.getValue(); - current1 = current1.next(); + Iterator subIterator; + Subscription sub; - ConcurrentSet subs = local2.get(superClass); + + for (iterator = types.iterator(); iterator.hasNext();) { + superClass = iterator.next(); + + Collection subs = local2.get(superClass); if (subs != null) { - for (Subscription sub : subs) { + for (subIterator = subs.iterator(); subIterator.hasNext();) { + sub = subIterator.next(); if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) { subsPerType.add(sub); } @@ -159,7 +168,11 @@ public class VarArgUtils { ConcurrentSet varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); - for (Subscription sub : varargSuperSubscriptions1) { + Iterator iterator; + Subscription sub; + + for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { + sub = iterator.next(); if (varargSuperSubscriptions2.contains(sub)) { subsPerType.add(sub); } @@ -202,7 +215,11 @@ public class VarArgUtils { ConcurrentSet varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); ConcurrentSet varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); - for (Subscription sub : varargSuperSubscriptions1) { + Iterator iterator; + Subscription sub; + + for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) { + sub = iterator.next(); if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) { subsPerType.add(sub); } diff --git a/src/main/java/dorkbox/util/messagebus/common/WeakConcurrentSet.java b/src/main/java/dorkbox/util/messagebus/common/WeakConcurrentSet.java index 418b77a..6d0585a 100644 --- a/src/main/java/dorkbox/util/messagebus/common/WeakConcurrentSet.java +++ b/src/main/java/dorkbox/util/messagebus/common/WeakConcurrentSet.java @@ -3,7 +3,8 @@ package dorkbox.util.messagebus.common; import java.lang.ref.WeakReference; import java.util.Iterator; import java.util.WeakHashMap; -import java.util.concurrent.locks.Lock; + +import dorkbox.util.messagebus.common.thread.StampedLock; /** * This implementation uses weak references to the elements. Iterators automatically perform cleanups of @@ -34,9 +35,10 @@ public class WeakConcurrentSet extends AbstractConcurrentSet{ // until it finds the first entry whose value has not yet been garbage collected // the method assumes that the current element is already orphaned and will remove it private void removeOrphans(){ - Lock writelock = WeakConcurrentSet.this.lock.writeLock(); + + StampedLock lock = WeakConcurrentSet.this.lock; + long stamp = lock.writeLock(); try{ - writelock.lock(); do { ISetEntry orphaned = this.current; this.current = this.current.next(); @@ -44,7 +46,7 @@ public class WeakConcurrentSet extends AbstractConcurrentSet{ } while(this.current != null && this.current.getValue() == null); } finally { - writelock.unlock(); + lock.unlockWrite(stamp); } } diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java b/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java index ae192e5..7a724e5 100644 --- a/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/ClassHolder.java @@ -1,21 +1,22 @@ package dorkbox.util.messagebus.common.thread; -import dorkbox.util.messagebus.common.StrongConcurrentSetV8; -public class ClassHolder extends ThreadLocal>> { +public class ClassHolder extends ThreadLocal>> { private final float loadFactor; + private final int stripeSize; - public ClassHolder(float loadFactor) { + public ClassHolder(float loadFactor, int stripeSize) { super(); this.loadFactor = loadFactor; + this.stripeSize = stripeSize; } @Override - public StrongConcurrentSetV8> initialValue() { - return new StrongConcurrentSetV8>(16, this.loadFactor); + public ConcurrentSet> initialValue() { + return new ConcurrentSet>(16, this.loadFactor, this.stripeSize); } } diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java b/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java index af25dd1..1ec5329 100644 --- a/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java +++ b/src/main/java/dorkbox/util/messagebus/common/thread/ConcurrentSet.java @@ -94,7 +94,7 @@ public class ConcurrentSet extends ConcurrentLinkedQueue2 { } Node pred = null; - for (Node p = node; p != null; p = succ(p)) { + for (Node p = this.head; p != null; p = succ(p)) { T item = p.item; if (item != null && element.equals(item) && diff --git a/src/main/java/dorkbox/util/messagebus/common/thread/StampedLock.java b/src/main/java/dorkbox/util/messagebus/common/thread/StampedLock.java new file mode 100644 index 0000000..383da0a --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/thread/StampedLock.java @@ -0,0 +1,1526 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package dorkbox.util.messagebus.common.thread; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; + +/** + * A capability-based lock with three modes for controlling read/write + * access. The state of a StampedLock consists of a version and mode. + * Lock acquisition methods return a stamp that represents and + * controls access with respect to a lock state; "try" versions of + * these methods may instead return the special value zero to + * represent failure to acquire access. Lock release and conversion + * methods require stamps as arguments, and fail if they do not match + * the state of the lock. The three modes are: + * + *
    + * + *
  • Writing. Method {@link #writeLock} possibly blocks + * waiting for exclusive access, returning a stamp that can be used + * in method {@link #unlockWrite} to release the lock. Untimed and + * timed versions of {@code tryWriteLock} are also provided. When + * the lock is held in write mode, no read locks may be obtained, + * and all optimistic read validations will fail.
  • + * + *
  • Reading. Method {@link #readLock} possibly blocks + * waiting for non-exclusive access, returning a stamp that can be + * used in method {@link #unlockRead} to release the lock. Untimed + * and timed versions of {@code tryReadLock} are also provided.
  • + * + *
  • Optimistic Reading. Method {@link #tryOptimisticRead} + * returns a non-zero stamp only if the lock is not currently held + * in write mode. Method {@link #validate} returns true if the lock + * has not been acquired in write mode since obtaining a given + * stamp. This mode can be thought of as an extremely weak version + * of a read-lock, that can be broken by a writer at any time. The + * use of optimistic mode for short read-only code segments often + * reduces contention and improves throughput. However, its use is + * inherently fragile. Optimistic read sections should only read + * fields and hold them in local variables for later use after + * validation. Fields read while in optimistic mode may be wildly + * inconsistent, so usage applies only when you are familiar enough + * with data representations to check consistency and/or repeatedly + * invoke method {@code validate()}. For example, such steps are + * typically required when first reading an object or array + * reference, and then accessing one of its fields, elements or + * methods.
  • + * + *
+ * + *

This class also supports methods that conditionally provide + * conversions across the three modes. For example, method {@link + * #tryConvertToWriteLock} attempts to "upgrade" a mode, returning + * a valid write stamp if (1) already in writing mode (2) in reading + * mode and there are no other readers or (3) in optimistic mode and + * the lock is available. The forms of these methods are designed to + * help reduce some of the code bloat that otherwise occurs in + * retry-based designs. + * + *

StampedLocks are designed for use as internal utilities in the + * development of thread-safe components. Their use relies on + * knowledge of the internal properties of the data, objects, and + * methods they are protecting. They are not reentrant, so locked + * bodies should not call other unknown methods that may try to + * re-acquire locks (although you may pass a stamp to other methods + * that can use or convert it). The use of read lock modes relies on + * the associated code sections being side-effect-free. Unvalidated + * optimistic read sections cannot call methods that are not known to + * tolerate potential inconsistencies. Stamps use finite + * representations, and are not cryptographically secure (i.e., a + * valid stamp may be guessable). Stamp values may recycle after (no + * sooner than) one year of continuous operation. A stamp held without + * use or validation for longer than this period may fail to validate + * correctly. StampedLocks are serializable, but always deserialize + * into initial unlocked state, so they are not useful for remote + * locking. + * + *

The scheduling policy of StampedLock does not consistently + * prefer readers over writers or vice versa. All "try" methods are + * best-effort and do not necessarily conform to any scheduling or + * fairness policy. A zero return from any "try" method for acquiring + * or converting locks does not carry any information about the state + * of the lock; a subsequent invocation may succeed. + * + *

Because it supports coordinated usage across multiple lock + * modes, this class does not directly implement the {@link Lock} or + * {@link ReadWriteLock} interfaces. However, a StampedLock may be + * viewed {@link #asReadLock()}, {@link #asWriteLock()}, or {@link + * #asReadWriteLock()} in applications requiring only the associated + * set of functionality. + * + *

Sample Usage. The following illustrates some usage idioms + * in a class that maintains simple two-dimensional points. The sample + * code illustrates some try/catch conventions even though they are + * not strictly needed here because no exceptions can occur in their + * bodies.
+ * + *

{@code
+ * class Point {
+ *   private double x, y;
+ *   private final StampedLock sl = new StampedLock();
+ *
+ *   void move(double deltaX, double deltaY) { // an exclusively locked method
+ *     long stamp = sl.writeLock();
+ *     try {
+ *       x += deltaX;
+ *       y += deltaY;
+ *     } finally {
+ *       sl.unlockWrite(stamp);
+ *     }
+ *   }
+ *
+ *   double distanceFromOrigin() { // A read-only method
+ *     long stamp = sl.tryOptimisticRead();
+ *     double currentX = x, currentY = y;
+ *     if (!sl.validate(stamp)) {
+ *        stamp = sl.readLock();
+ *        try {
+ *          currentX = x;
+ *          currentY = y;
+ *        } finally {
+ *           sl.unlockRead(stamp);
+ *        }
+ *     }
+ *     return Math.sqrt(currentX * currentX + currentY * currentY);
+ *   }
+ *
+ *   void moveIfAtOrigin(double newX, double newY) { // upgrade
+ *     // Could instead start with optimistic, not read mode
+ *     long stamp = sl.readLock();
+ *     try {
+ *       while (x == 0.0 && y == 0.0) {
+ *         long ws = sl.tryConvertToWriteLock(stamp);
+ *         if (ws != 0L) {
+ *           stamp = ws;
+ *           x = newX;
+ *           y = newY;
+ *           break;
+ *         }
+ *         else {
+ *           sl.unlockRead(stamp);
+ *           stamp = sl.writeLock();
+ *         }
+ *       }
+ *     } finally {
+ *       sl.unlock(stamp);
+ *     }
+ *   }
+ * }}
+ * + * @since 1.8 + * @author Doug Lea + */ +public class StampedLock implements java.io.Serializable { + /* + * Algorithmic notes: + * + * The design employs elements of Sequence locks + * (as used in linux kernels; see Lameter's + * http://www.lameter.com/gelato2005.pdf + * and elsewhere; see + * Boehm's http://www.hpl.hp.com/techreports/2012/HPL-2012-68.html) + * and Ordered RW locks (see Shirako et al + * http://dl.acm.org/citation.cfm?id=2312015) + * + * Conceptually, the primary state of the lock includes a sequence + * number that is odd when write-locked and even otherwise. + * However, this is offset by a reader count that is non-zero when + * read-locked. The read count is ignored when validating + * "optimistic" seqlock-reader-style stamps. Because we must use + * a small finite number of bits (currently 7) for readers, a + * supplementary reader overflow word is used when the number of + * readers exceeds the count field. We do this by treating the max + * reader count value (RBITS) as a spinlock protecting overflow + * updates. + * + * Waiters use a modified form of CLH lock used in + * AbstractQueuedSynchronizer (see its internal documentation for + * a fuller account), where each node is tagged (field mode) as + * either a reader or writer. Sets of waiting readers are grouped + * (linked) under a common node (field cowait) so act as a single + * node with respect to most CLH mechanics. By virtue of the + * queue structure, wait nodes need not actually carry sequence + * numbers; we know each is greater than its predecessor. This + * simplifies the scheduling policy to a mainly-FIFO scheme that + * incorporates elements of Phase-Fair locks (see Brandenburg & + * Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In + * particular, we use the phase-fair anti-barging rule: If an + * incoming reader arrives while read lock is held but there is a + * queued writer, this incoming reader is queued. (This rule is + * responsible for some of the complexity of method acquireRead, + * but without it, the lock becomes highly unfair.) Method release + * does not (and sometimes cannot) itself wake up cowaiters. This + * is done by the primary thread, but helped by any other threads + * with nothing better to do in methods acquireRead and + * acquireWrite. + * + * These rules apply to threads actually queued. All tryLock forms + * opportunistically try to acquire locks regardless of preference + * rules, and so may "barge" their way in. Randomized spinning is + * used in the acquire methods to reduce (increasingly expensive) + * context switching while also avoiding sustained memory + * thrashing among many threads. We limit spins to the head of + * queue. A thread spin-waits up to SPINS times (where each + * iteration decreases spin count with 50% probability) before + * blocking. If, upon wakening it fails to obtain lock, and is + * still (or becomes) the first waiting thread (which indicates + * that some other thread barged and obtained lock), it escalates + * spins (up to MAX_HEAD_SPINS) to reduce the likelihood of + * continually losing to barging threads. + * + * Nearly all of these mechanics are carried out in methods + * acquireWrite and acquireRead, that, as typical of such code, + * sprawl out because actions and retries rely on consistent sets + * of locally cached reads. + * + * As noted in Boehm's paper (above), sequence validation (mainly + * method validate()) requires stricter ordering rules than apply + * to normal volatile reads (of "state"). In the absence of (but + * continual hope for) explicit JVM support of intrinsics with + * double-sided reordering prohibition, or corresponding fence + * intrinsics, we for now uncomfortably rely on the fact that the + * Unsafe.getXVolatile intrinsic must have this property + * (syntactic volatile reads do not) for internal purposes anyway, + * even though it is not documented. + * + * The memory layout keeps lock state and queue pointers together + * (normally on the same cache line). This usually works well for + * read-mostly loads. In most other cases, the natural tendency of + * adaptive-spin CLH locks to reduce memory contention lessens + * motivation to further spread out contended locations, but might + * be subject to future improvements. + */ + + private static final long serialVersionUID = -6001602636862214147L; + + /** Number of processors, for spin control */ + private static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** Maximum number of retries before enqueuing on acquisition */ + private static final int SPINS = NCPU > 1 ? 1 << 6 : 0; + + /** Maximum number of retries before blocking at head on acquisition */ + private static final int HEAD_SPINS = NCPU > 1 ? 1 << 10 : 0; + + /** Maximum number of retries before re-blocking */ + private static final int MAX_HEAD_SPINS = NCPU > 1 ? 1 << 16 : 0; + + /** The period for yielding when waiting for overflow spinlock */ + private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 + + /** The number of bits to use for reader count before overflowing */ + private static final int LG_READERS = 7; + + // Values for lock state and stamp operations + private static final long RUNIT = 1L; + private static final long WBIT = 1L << LG_READERS; + private static final long RBITS = WBIT - 1L; + private static final long RFULL = RBITS - 1L; + private static final long ABITS = RBITS | WBIT; + private static final long SBITS = ~RBITS; // note overlap with ABITS + + // Initial value for lock state; avoid failure value zero + private static final long ORIGIN = WBIT << 1; + + // Special value from cancelled acquire methods so caller can throw IE + private static final long INTERRUPTED = 1L; + + // Values for node status; order matters + private static final int WAITING = -1; + private static final int CANCELLED = 1; + + // Modes for nodes (int not boolean to allow arithmetic) + private static final int RMODE = 0; + private static final int WMODE = 1; + + /** Wait nodes */ + static final class WNode { + volatile WNode prev; + volatile WNode next; + volatile WNode cowait; // list of linked readers + volatile Thread thread; // non-null while possibly parked + volatile int status; // 0, WAITING, or CANCELLED + final int mode; // RMODE or WMODE + WNode(int m, WNode p) { this.mode = m; this.prev = p; } + } + + /** Head of CLH queue */ + private transient volatile WNode whead; + /** Tail (last) of CLH queue */ + private transient volatile WNode wtail; + + // views + transient ReadLockView readLockView; + transient WriteLockView writeLockView; + transient ReadWriteLockView readWriteLockView; + + /** Lock sequence/state */ + private transient volatile long state; + /** extra reader count when state read count saturated */ + private transient int readerOverflow; + + /** + * Creates a new lock, initially in unlocked state. + */ + public StampedLock() { + this.state = ORIGIN; + } + + /** + * Exclusively acquires the lock, blocking if necessary + * until available. + * + * @return a stamp that can be used to unlock or convert mode + */ + public long writeLock() { + long s, next; // bypass acquireWrite in fully unlocked case only + return ((s = this.state) & ABITS) == 0L && + U.compareAndSwapLong(this, STATE, s, next = s + WBIT) ? + next : acquireWrite(false, 0L); + } + + /** + * Exclusively acquires the lock if it is immediately available. + * + * @return a stamp that can be used to unlock or convert mode, + * or zero if the lock is not available + */ + public long tryWriteLock() { + long s, next; + return ((s = this.state) & ABITS) == 0L && + U.compareAndSwapLong(this, STATE, s, next = s + WBIT) ? + next : 0L; + } + + /** + * Exclusively acquires the lock if it is available within the + * given time and the current thread has not been interrupted. + * Behavior under timeout and interruption matches that specified + * for method {@link Lock#tryLock(long,TimeUnit)}. + * + * @param time the maximum time to wait for the lock + * @param unit the time unit of the {@code time} argument + * @return a stamp that can be used to unlock or convert mode, + * or zero if the lock is not available + * @throws InterruptedException if the current thread is interrupted + * before acquiring the lock + */ + public long tryWriteLock(long time, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(time); + if (!Thread.interrupted()) { + long next, deadline; + if ((next = tryWriteLock()) != 0L) { + return next; + } + if (nanos <= 0L) { + return 0L; + } + if ((deadline = System.nanoTime() + nanos) == 0L) { + deadline = 1L; + } + if ((next = acquireWrite(true, deadline)) != INTERRUPTED) { + return next; + } + } + throw new InterruptedException(); + } + + /** + * Exclusively acquires the lock, blocking if necessary + * until available or the current thread is interrupted. + * Behavior under interruption matches that specified + * for method {@link Lock#lockInterruptibly()}. + * + * @return a stamp that can be used to unlock or convert mode + * @throws InterruptedException if the current thread is interrupted + * before acquiring the lock + */ + public long writeLockInterruptibly() throws InterruptedException { + long next; + if (!Thread.interrupted() && + (next = acquireWrite(true, 0L)) != INTERRUPTED) { + return next; + } + throw new InterruptedException(); + } + + /** + * Non-exclusively acquires the lock, blocking if necessary + * until available. + * + * @return a stamp that can be used to unlock or convert mode + */ + public long readLock() { + long s = this.state, next; // bypass acquireRead on common uncontended case + return this.whead == this.wtail && (s & ABITS) < RFULL && + U.compareAndSwapLong(this, STATE, s, next = s + RUNIT) ? + next : acquireRead(false, 0L); + } + + /** + * Non-exclusively acquires the lock if it is immediately available. + * + * @return a stamp that can be used to unlock or convert mode, + * or zero if the lock is not available + */ + public long tryReadLock() { + for (;;) { + long s, m, next; + if ((m = (s = this.state) & ABITS) == WBIT) { + return 0L; + } else if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) { + return next; + } + } + else if ((next = tryIncReaderOverflow(s)) != 0L) { + return next; + } + } + } + + /** + * Non-exclusively acquires the lock if it is available within the + * given time and the current thread has not been interrupted. + * Behavior under timeout and interruption matches that specified + * for method {@link Lock#tryLock(long,TimeUnit)}. + * + * @param time the maximum time to wait for the lock + * @param unit the time unit of the {@code time} argument + * @return a stamp that can be used to unlock or convert mode, + * or zero if the lock is not available + * @throws InterruptedException if the current thread is interrupted + * before acquiring the lock + */ + public long tryReadLock(long time, TimeUnit unit) + throws InterruptedException { + long s, m, next, deadline; + long nanos = unit.toNanos(time); + if (!Thread.interrupted()) { + if ((m = (s = this.state) & ABITS) != WBIT) { + if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) { + return next; + } + } + else if ((next = tryIncReaderOverflow(s)) != 0L) { + return next; + } + } + if (nanos <= 0L) { + return 0L; + } + if ((deadline = System.nanoTime() + nanos) == 0L) { + deadline = 1L; + } + if ((next = acquireRead(true, deadline)) != INTERRUPTED) { + return next; + } + } + throw new InterruptedException(); + } + + /** + * Non-exclusively acquires the lock, blocking if necessary + * until available or the current thread is interrupted. + * Behavior under interruption matches that specified + * for method {@link Lock#lockInterruptibly()}. + * + * @return a stamp that can be used to unlock or convert mode + * @throws InterruptedException if the current thread is interrupted + * before acquiring the lock + */ + public long readLockInterruptibly() throws InterruptedException { + long next; + if (!Thread.interrupted() && + (next = acquireRead(true, 0L)) != INTERRUPTED) { + return next; + } + throw new InterruptedException(); + } + + /** + * Returns a stamp that can later be validated, or zero + * if exclusively locked. + * + * @return a stamp, or zero if exclusively locked + */ + public long tryOptimisticRead() { + long s; + return ((s = this.state) & WBIT) == 0L ? s & SBITS : 0L; + } + + /** + * Returns true if the lock has not been exclusively acquired + * since issuance of the given stamp. Always returns false if the + * stamp is zero. Always returns true if the stamp represents a + * currently held lock. Invoking this method with a value not + * obtained from {@link #tryOptimisticRead} or a locking method + * for this lock has no defined effect or result. + * + * @param stamp a stamp + * @return {@code true} if the lock has not been exclusively acquired + * since issuance of the given stamp; else false + */ + public boolean validate(long stamp) { + // See above about current use of getLongVolatile here + return (stamp & SBITS) == (U.getLongVolatile(this, STATE) & SBITS); + } + + /** + * If the lock state matches the given stamp, releases the + * exclusive lock. + * + * @param stamp a stamp returned by a write-lock operation + * @throws IllegalMonitorStateException if the stamp does + * not match the current state of this lock + */ + public void unlockWrite(long stamp) { + WNode h; + if (this.state != stamp || (stamp & WBIT) == 0L) { + throw new IllegalMonitorStateException(); + } + this.state = (stamp += WBIT) == 0L ? ORIGIN : stamp; + if ((h = this.whead) != null && h.status != 0) { + release(h); + } + } + + /** + * If the lock state matches the given stamp, releases the + * non-exclusive lock. + * + * @param stamp a stamp returned by a read-lock operation + * @throws IllegalMonitorStateException if the stamp does + * not match the current state of this lock + */ + public void unlockRead(long stamp) { + long s, m; WNode h; + for (;;) { + if (((s = this.state) & SBITS) != (stamp & SBITS) || + (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT) { + throw new IllegalMonitorStateException(); + } + if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { + if (m == RUNIT && (h = this.whead) != null && h.status != 0) { + release(h); + } + break; + } + } + else if (tryDecReaderOverflow(s) != 0L) { + break; + } + } + } + + /** + * If the lock state matches the given stamp, releases the + * corresponding mode of the lock. + * + * @param stamp a stamp returned by a lock operation + * @throws IllegalMonitorStateException if the stamp does + * not match the current state of this lock + */ + public void unlock(long stamp) { + long a = stamp & ABITS, m, s; WNode h; + while (((s = this.state) & SBITS) == (stamp & SBITS)) { + if ((m = s & ABITS) == 0L) { + break; + } else if (m == WBIT) { + if (a != m) { + break; + } + this.state = (s += WBIT) == 0L ? ORIGIN : s; + if ((h = this.whead) != null && h.status != 0) { + release(h); + } + return; + } + else if (a == 0L || a >= WBIT) { + break; + } else if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { + if (m == RUNIT && (h = this.whead) != null && h.status != 0) { + release(h); + } + return; + } + } + else if (tryDecReaderOverflow(s) != 0L) { + return; + } + } + throw new IllegalMonitorStateException(); + } + + /** + * If the lock state matches the given stamp, performs one of + * the following actions. If the stamp represents holding a write + * lock, returns it. Or, if a read lock, if the write lock is + * available, releases the read lock and returns a write stamp. + * Or, if an optimistic read, returns a write stamp only if + * immediately available. This method returns zero in all other + * cases. + * + * @param stamp a stamp + * @return a valid write stamp, or zero on failure + */ + public long tryConvertToWriteLock(long stamp) { + long a = stamp & ABITS, m, s, next; + while (((s = this.state) & SBITS) == (stamp & SBITS)) { + if ((m = s & ABITS) == 0L) { + if (a != 0L) { + break; + } + if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) { + return next; + } + } + else if (m == WBIT) { + if (a != m) { + break; + } + return stamp; + } + else if (m == RUNIT && a != 0L) { + if (U.compareAndSwapLong(this, STATE, s, + next = s - RUNIT + WBIT)) { + return next; + } + } else { + break; + } + } + return 0L; + } + + /** + * If the lock state matches the given stamp, performs one of + * the following actions. If the stamp represents holding a write + * lock, releases it and obtains a read lock. Or, if a read lock, + * returns it. Or, if an optimistic read, acquires a read lock and + * returns a read stamp only if immediately available. This method + * returns zero in all other cases. + * + * @param stamp a stamp + * @return a valid read stamp, or zero on failure + */ + public long tryConvertToReadLock(long stamp) { + long a = stamp & ABITS, m, s, next; WNode h; + while (((s = this.state) & SBITS) == (stamp & SBITS)) { + if ((m = s & ABITS) == 0L) { + if (a != 0L) { + break; + } else if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) { + return next; + } + } + else if ((next = tryIncReaderOverflow(s)) != 0L) { + return next; + } + } + else if (m == WBIT) { + if (a != m) { + break; + } + this.state = next = s + WBIT + RUNIT; + if ((h = this.whead) != null && h.status != 0) { + release(h); + } + return next; + } + else if (a != 0L && a < WBIT) { + return stamp; + } else { + break; + } + } + return 0L; + } + + /** + * If the lock state matches the given stamp then, if the stamp + * represents holding a lock, releases it and returns an + * observation stamp. Or, if an optimistic read, returns it if + * validated. This method returns zero in all other cases, and so + * may be useful as a form of "tryUnlock". + * + * @param stamp a stamp + * @return a valid optimistic read stamp, or zero on failure + */ + public long tryConvertToOptimisticRead(long stamp) { + long a = stamp & ABITS, m, s, next; WNode h; + for (;;) { + s = U.getLongVolatile(this, STATE); // see above + if (((s = this.state) & SBITS) != (stamp & SBITS)) { + break; + } + if ((m = s & ABITS) == 0L) { + if (a != 0L) { + break; + } + return s; + } + else if (m == WBIT) { + if (a != m) { + break; + } + this.state = next = (s += WBIT) == 0L ? ORIGIN : s; + if ((h = this.whead) != null && h.status != 0) { + release(h); + } + return next; + } + else if (a == 0L || a >= WBIT) { + break; + } else if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) { + if (m == RUNIT && (h = this.whead) != null && h.status != 0) { + release(h); + } + return next & SBITS; + } + } + else if ((next = tryDecReaderOverflow(s)) != 0L) { + return next & SBITS; + } + } + return 0L; + } + + /** + * Releases the write lock if it is held, without requiring a + * stamp value. This method may be useful for recovery after + * errors. + * + * @return {@code true} if the lock was held, else false + */ + public boolean tryUnlockWrite() { + long s; WNode h; + if (((s = this.state) & WBIT) != 0L) { + this.state = (s += WBIT) == 0L ? ORIGIN : s; + if ((h = this.whead) != null && h.status != 0) { + release(h); + } + return true; + } + return false; + } + + /** + * Releases one hold of the read lock if it is held, without + * requiring a stamp value. This method may be useful for recovery + * after errors. + * + * @return {@code true} if the read lock was held, else false + */ + public boolean tryUnlockRead() { + long s, m; WNode h; + while ((m = (s = this.state) & ABITS) != 0L && m < WBIT) { + if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { + if (m == RUNIT && (h = this.whead) != null && h.status != 0) { + release(h); + } + return true; + } + } + else if (tryDecReaderOverflow(s) != 0L) { + return true; + } + } + return false; + } + + // status monitoring methods + + /** + * Returns combined state-held and overflow read count for given + * state s. + */ + private int getReadLockCount(long s) { + long readers; + if ((readers = s & RBITS) >= RFULL) { + readers = RFULL + this.readerOverflow; + } + return (int) readers; + } + + /** + * Returns {@code true} if the lock is currently held exclusively. + * + * @return {@code true} if the lock is currently held exclusively + */ + public boolean isWriteLocked() { + return (this.state & WBIT) != 0L; + } + + /** + * Returns {@code true} if the lock is currently held non-exclusively. + * + * @return {@code true} if the lock is currently held non-exclusively + */ + public boolean isReadLocked() { + return (this.state & RBITS) != 0L; + } + + /** + * Queries the number of read locks held for this lock. This + * method is designed for use in monitoring system state, not for + * synchronization control. + * @return the number of read locks held + */ + public int getReadLockCount() { + return getReadLockCount(this.state); + } + + /** + * Returns a string identifying this lock, as well as its lock + * state. The state, in brackets, includes the String {@code + * "Unlocked"} or the String {@code "Write-locked"} or the String + * {@code "Read-locks:"} followed by the current number of + * read-locks held. + * + * @return a string identifying this lock, as well as its lock state + */ + @Override + public String toString() { + long s = this.state; + return super.toString() + + ((s & ABITS) == 0L ? "[Unlocked]" : + (s & WBIT) != 0L ? "[Write-locked]" : + "[Read-locks:" + getReadLockCount(s) + "]"); + } + + // views + + /** + * Returns a plain {@link Lock} view of this StampedLock in which + * the {@link Lock#lock} method is mapped to {@link #readLock}, + * and similarly for other methods. The returned Lock does not + * support a {@link Condition}; method {@link + * Lock#newCondition()} throws {@code + * UnsupportedOperationException}. + * + * @return the lock + */ + public Lock asReadLock() { + ReadLockView v; + return (v = this.readLockView) != null ? v : + (this.readLockView = new ReadLockView()); + } + + /** + * Returns a plain {@link Lock} view of this StampedLock in which + * the {@link Lock#lock} method is mapped to {@link #writeLock}, + * and similarly for other methods. The returned Lock does not + * support a {@link Condition}; method {@link + * Lock#newCondition()} throws {@code + * UnsupportedOperationException}. + * + * @return the lock + */ + public Lock asWriteLock() { + WriteLockView v; + return (v = this.writeLockView) != null ? v : + (this.writeLockView = new WriteLockView()); + } + + /** + * Returns a {@link ReadWriteLock} view of this StampedLock in + * which the {@link ReadWriteLock#readLock()} method is mapped to + * {@link #asReadLock()}, and {@link ReadWriteLock#writeLock()} to + * {@link #asWriteLock()}. + * + * @return the lock + */ + public ReadWriteLock asReadWriteLock() { + ReadWriteLockView v; + return (v = this.readWriteLockView) != null ? v : + (this.readWriteLockView = new ReadWriteLockView()); + } + + // view classes + + final class ReadLockView implements Lock { + @Override + public void lock() { readLock(); } + @Override + public void lockInterruptibly() throws InterruptedException { + readLockInterruptibly(); + } + @Override + public boolean tryLock() { return tryReadLock() != 0L; } + @Override + public boolean tryLock(long time, TimeUnit unit) + throws InterruptedException { + return tryReadLock(time, unit) != 0L; + } + @Override + public void unlock() { unstampedUnlockRead(); } + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + } + + final class WriteLockView implements Lock { + @Override + public void lock() { writeLock(); } + @Override + public void lockInterruptibly() throws InterruptedException { + writeLockInterruptibly(); + } + @Override + public boolean tryLock() { return tryWriteLock() != 0L; } + @Override + public boolean tryLock(long time, TimeUnit unit) + throws InterruptedException { + return tryWriteLock(time, unit) != 0L; + } + @Override + public void unlock() { unstampedUnlockWrite(); } + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + } + + final class ReadWriteLockView implements ReadWriteLock { + @Override + public Lock readLock() { return asReadLock(); } + @Override + public Lock writeLock() { return asWriteLock(); } + } + + // Unlock methods without stamp argument checks for view classes. + // Needed because view-class lock methods throw away stamps. + + final void unstampedUnlockWrite() { + WNode h; long s; + if (((s = this.state) & WBIT) == 0L) { + throw new IllegalMonitorStateException(); + } + this.state = (s += WBIT) == 0L ? ORIGIN : s; + if ((h = this.whead) != null && h.status != 0) { + release(h); + } + } + + final void unstampedUnlockRead() { + for (;;) { + long s, m; WNode h; + if ((m = (s = this.state) & ABITS) == 0L || m >= WBIT) { + throw new IllegalMonitorStateException(); + } else if (m < RFULL) { + if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { + if (m == RUNIT && (h = this.whead) != null && h.status != 0) { + release(h); + } + break; + } + } + else if (tryDecReaderOverflow(s) != 0L) { + break; + } + } + } + + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + this.state = ORIGIN; // reset to unlocked state + } + + // internals + + /** + * Tries to increment readerOverflow by first setting state + * access bits value to RBITS, indicating hold of spinlock, + * then updating, then releasing. + * + * @param s a reader overflow stamp: (s & ABITS) >= RFULL + * @return new stamp on success, else zero + */ + private long tryIncReaderOverflow(long s) { + // assert (s & ABITS) >= RFULL; + if ((s & ABITS) == RFULL) { + if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { + ++this.readerOverflow; + this.state = s; + return s; + } + } + else if ((ThreadLocalRandom.current().nextInt() & + OVERFLOW_YIELD_RATE) == 0) { + Thread.yield(); + } + return 0L; + } + + /** + * Tries to decrement readerOverflow. + * + * @param s a reader overflow stamp: (s & ABITS) >= RFULL + * @return new stamp on success, else zero + */ + private long tryDecReaderOverflow(long s) { + // assert (s & ABITS) >= RFULL; + if ((s & ABITS) == RFULL) { + if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { + int r; long next; + if ((r = this.readerOverflow) > 0) { + this.readerOverflow = r - 1; + next = s; + } else { + next = s - RUNIT; + } + this.state = next; + return next; + } + } + else if ((ThreadLocalRandom.current().nextInt() & + OVERFLOW_YIELD_RATE) == 0) { + Thread.yield(); + } + return 0L; + } + + /** + * Wakes up the successor of h (normally whead). This is normally + * just h.next, but may require traversal from wtail if next + * pointers are lagging. This may fail to wake up an acquiring + * thread when one or more have been cancelled, but the cancel + * methods themselves provide extra safeguards to ensure liveness. + */ + private void release(WNode h) { + if (h != null) { + WNode q; Thread w; + U.compareAndSwapInt(h, WSTATUS, WAITING, 0); + if ((q = h.next) == null || q.status == CANCELLED) { + for (WNode t = this.wtail; t != null && t != h; t = t.prev) { + if (t.status <= 0) { + q = t; + } + } + } + if (q != null && (w = q.thread) != null) { + U.unpark(w); + } + } + } + + /** + * See above for explanation. + * + * @param interruptible true if should check interrupts and if so + * return INTERRUPTED + * @param deadline if nonzero, the System.nanoTime value to timeout + * at (and return zero) + * @return next state, or INTERRUPTED + */ + private long acquireWrite(boolean interruptible, long deadline) { + WNode node = null, p; + for (int spins = -1;;) { // spin while enqueuing + long m, s, ns; + if ((m = (s = this.state) & ABITS) == 0L) { + if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { + return ns; + } + } + else if (spins < 0) { + spins = m == WBIT && this.wtail == this.whead ? SPINS : 0; + } else if (spins > 0) { + if (ThreadLocalRandom.current().nextInt() >= 0) { + --spins; + } + } + else if ((p = this.wtail) == null) { // initialize queue + WNode hd = new WNode(WMODE, null); + if (U.compareAndSwapObject(this, WHEAD, null, hd)) { + this.wtail = hd; + } + } + else if (node == null) { + node = new WNode(WMODE, p); + } else if (node.prev != p) { + node.prev = p; + } else if (U.compareAndSwapObject(this, WTAIL, p, node)) { + p.next = node; + break; + } + } + + for (int spins = -1;;) { + WNode h, np, pp; int ps; + if ((h = this.whead) == p) { + if (spins < 0) { + spins = HEAD_SPINS; + } else if (spins < MAX_HEAD_SPINS) { + spins <<= 1; + } + for (int k = spins;;) { // spin at head + long s, ns; + if (((s = this.state) & ABITS) == 0L) { + if (U.compareAndSwapLong(this, STATE, s, + ns = s + WBIT)) { + this.whead = node; + node.prev = null; + return ns; + } + } + else if (ThreadLocalRandom.current().nextInt() >= 0 && + --k <= 0) { + break; + } + } + } + else if (h != null) { // help release stale waiters + WNode c; Thread w; + while ((c = h.cowait) != null) { + if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && + (w = c.thread) != null) { + U.unpark(w); + } + } + } + if (this.whead == h) { + if ((np = node.prev) != p) { + if (np != null) + { + (p = np).next = node; // stale + } + } + else if ((ps = p.status) == 0) { + U.compareAndSwapInt(p, WSTATUS, 0, WAITING); + } else if (ps == CANCELLED) { + if ((pp = p.prev) != null) { + node.prev = pp; + pp.next = node; + } + } + else { + long time; // 0 argument to park means no timeout + if (deadline == 0L) { + time = 0L; + } else if ((time = deadline - System.nanoTime()) <= 0L) { + return cancelWaiter(node, node, false); + } + Thread wt = Thread.currentThread(); + U.putObject(wt, PARKBLOCKER, this); + node.thread = wt; + if (p.status < 0 && (p != h || (this.state & ABITS) != 0L) && + this.whead == h && node.prev == p) + { + U.park(false, time); // emulate LockSupport.park + } + node.thread = null; + U.putObject(wt, PARKBLOCKER, null); + if (interruptible && Thread.interrupted()) { + return cancelWaiter(node, node, true); + } + } + } + } + } + + /** + * See above for explanation. + * + * @param interruptible true if should check interrupts and if so + * return INTERRUPTED + * @param deadline if nonzero, the System.nanoTime value to timeout + * at (and return zero) + * @return next state, or INTERRUPTED + */ + private long acquireRead(boolean interruptible, long deadline) { + WNode node = null, p; + for (int spins = -1;;) { + WNode h; + if ((h = this.whead) == (p = this.wtail)) { + for (long m, s, ns;;) { + if ((m = (s = this.state) & ABITS) < RFULL ? + U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : + m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L) { + return ns; + } else if (m >= WBIT) { + if (spins > 0) { + if (ThreadLocalRandom.current().nextInt() >= 0) { + --spins; + } + } + else { + if (spins == 0) { + WNode nh = this.whead, np = this.wtail; + if (nh == h && np == p || (h = nh) != (p = np)) { + break; + } + } + spins = SPINS; + } + } + } + } + if (p == null) { // initialize queue + WNode hd = new WNode(WMODE, null); + if (U.compareAndSwapObject(this, WHEAD, null, hd)) { + this.wtail = hd; + } + } + else if (node == null) { + node = new WNode(RMODE, p); + } else if (h == p || p.mode != RMODE) { + if (node.prev != p) { + node.prev = p; + } else if (U.compareAndSwapObject(this, WTAIL, p, node)) { + p.next = node; + break; + } + } + else if (!U.compareAndSwapObject(p, WCOWAIT, + node.cowait = p.cowait, node)) { + node.cowait = null; + } else { + for (;;) { + WNode pp, c; Thread w; + if ((h = this.whead) != null && (c = h.cowait) != null && + U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && + (w = c.thread) != null) { + U.unpark(w); + } + if (h == (pp = p.prev) || h == p || pp == null) { + long m, s, ns; + do { + if ((m = (s = this.state) & ABITS) < RFULL ? + U.compareAndSwapLong(this, STATE, s, + ns = s + RUNIT) : + m < WBIT && + (ns = tryIncReaderOverflow(s)) != 0L) { + return ns; + } + } while (m < WBIT); + } + if (this.whead == h && p.prev == pp) { + long time; + if (pp == null || h == p || p.status > 0) { + node = null; // throw away + break; + } + if (deadline == 0L) { + time = 0L; + } else if ((time = deadline - System.nanoTime()) <= 0L) { + return cancelWaiter(node, p, false); + } + Thread wt = Thread.currentThread(); + U.putObject(wt, PARKBLOCKER, this); + node.thread = wt; + if ((h != pp || (this.state & ABITS) == WBIT) && + this.whead == h && p.prev == pp) { + U.park(false, time); + } + node.thread = null; + U.putObject(wt, PARKBLOCKER, null); + if (interruptible && Thread.interrupted()) { + return cancelWaiter(node, p, true); + } + } + } + } + } + + for (int spins = -1;;) { + WNode h, np, pp; int ps; + if ((h = this.whead) == p) { + if (spins < 0) { + spins = HEAD_SPINS; + } else if (spins < MAX_HEAD_SPINS) { + spins <<= 1; + } + for (int k = spins;;) { // spin at head + long m, s, ns; + if ((m = (s = this.state) & ABITS) < RFULL ? + U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : + m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L) { + WNode c; Thread w; + this.whead = node; + node.prev = null; + while ((c = node.cowait) != null) { + if (U.compareAndSwapObject(node, WCOWAIT, + c, c.cowait) && + (w = c.thread) != null) { + U.unpark(w); + } + } + return ns; + } + else if (m >= WBIT && + ThreadLocalRandom.current().nextInt() >= 0 && --k <= 0) { + break; + } + } + } + else if (h != null) { + WNode c; Thread w; + while ((c = h.cowait) != null) { + if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && + (w = c.thread) != null) { + U.unpark(w); + } + } + } + if (this.whead == h) { + if ((np = node.prev) != p) { + if (np != null) + { + (p = np).next = node; // stale + } + } + else if ((ps = p.status) == 0) { + U.compareAndSwapInt(p, WSTATUS, 0, WAITING); + } else if (ps == CANCELLED) { + if ((pp = p.prev) != null) { + node.prev = pp; + pp.next = node; + } + } + else { + long time; + if (deadline == 0L) { + time = 0L; + } else if ((time = deadline - System.nanoTime()) <= 0L) { + return cancelWaiter(node, node, false); + } + Thread wt = Thread.currentThread(); + U.putObject(wt, PARKBLOCKER, this); + node.thread = wt; + if (p.status < 0 && + (p != h || (this.state & ABITS) == WBIT) && + this.whead == h && node.prev == p) { + U.park(false, time); + } + node.thread = null; + U.putObject(wt, PARKBLOCKER, null); + if (interruptible && Thread.interrupted()) { + return cancelWaiter(node, node, true); + } + } + } + } + } + + /** + * If node non-null, forces cancel status and unsplices it from + * queue if possible and wakes up any cowaiters (of the node, or + * group, as applicable), and in any case helps release current + * first waiter if lock is free. (Calling with null arguments + * serves as a conditional form of release, which is not currently + * needed but may be needed under possible future cancellation + * policies). This is a variant of cancellation methods in + * AbstractQueuedSynchronizer (see its detailed explanation in AQS + * internal documentation). + * + * @param node if nonnull, the waiter + * @param group either node or the group node is cowaiting with + * @param interrupted if already interrupted + * @return INTERRUPTED if interrupted or Thread.interrupted, else zero + */ + private long cancelWaiter(WNode node, WNode group, boolean interrupted) { + if (node != null && group != null) { + Thread w; + node.status = CANCELLED; + // unsplice cancelled nodes from group + for (WNode p = group, q; (q = p.cowait) != null;) { + if (q.status == CANCELLED) { + U.compareAndSwapObject(p, WCOWAIT, q, q.cowait); + p = group; // restart + } else { + p = q; + } + } + if (group == node) { + for (WNode r = group.cowait; r != null; r = r.cowait) { + if ((w = r.thread) != null) + { + U.unpark(w); // wake up uncancelled co-waiters + } + } + for (WNode pred = node.prev; pred != null; ) { // unsplice + WNode succ, pp; // find valid successor + while ((succ = node.next) == null || + succ.status == CANCELLED) { + WNode q = null; // find successor the slow way + for (WNode t = this.wtail; t != null && t != node; t = t.prev) + { + if (t.status != CANCELLED) + { + q = t; // don't link if succ cancelled + } + } + if (succ == q || // ensure accurate successor + U.compareAndSwapObject(node, WNEXT, + succ, succ = q)) { + if (succ == null && node == this.wtail) { + U.compareAndSwapObject(this, WTAIL, node, pred); + } + break; + } + } + if (pred.next == node) { + U.compareAndSwapObject(pred, WNEXT, node, succ); + } + if (succ != null && (w = succ.thread) != null) { + succ.thread = null; + U.unpark(w); // wake up succ to observe new pred + } + if (pred.status != CANCELLED || (pp = pred.prev) == null) { + break; + } + node.prev = pp; // repeat if new pred wrong/cancelled + U.compareAndSwapObject(pp, WNEXT, pred, succ); + pred = pp; + } + } + } + WNode h; // Possibly release first waiter + while ((h = this.whead) != null) { + long s; WNode q; // similar to release() but check eligibility + if ((q = h.next) == null || q.status == CANCELLED) { + for (WNode t = this.wtail; t != null && t != h; t = t.prev) { + if (t.status <= 0) { + q = t; + } + } + } + if (h == this.whead) { + if (q != null && h.status == 0 && + ((s = this.state) & ABITS) != WBIT && // waiter is eligible + (s == 0L || q.mode == RMODE)) { + release(h); + } + break; + } + } + return interrupted || Thread.interrupted() ? INTERRUPTED : 0L; + } + + // Unsafe mechanics + private static final sun.misc.Unsafe U; + private static final long STATE; + private static final long WHEAD; + private static final long WTAIL; + private static final long WNEXT; + private static final long WSTATUS; + private static final long WCOWAIT; + private static final long PARKBLOCKER; + + static { + try { + U = getUnsafe(); + Class k = StampedLock.class; + Class wk = WNode.class; + STATE = U.objectFieldOffset + (k.getDeclaredField("state")); + WHEAD = U.objectFieldOffset + (k.getDeclaredField("whead")); + WTAIL = U.objectFieldOffset + (k.getDeclaredField("wtail")); + WSTATUS = U.objectFieldOffset + (wk.getDeclaredField("status")); + WNEXT = U.objectFieldOffset + (wk.getDeclaredField("next")); + WCOWAIT = U.objectFieldOffset + (wk.getDeclaredField("cowait")); + Class tk = Thread.class; + PARKBLOCKER = U.objectFieldOffset + (tk.getDeclaredField("parkBlocker")); + + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + @Override + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) { + return k.cast(x); + } + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } +} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java b/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java index 2b99949..4b63bd3 100644 --- a/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java +++ b/src/main/java/dorkbox/util/messagebus/listener/MessageListener.java @@ -1,6 +1,9 @@ package dorkbox.util.messagebus.listener; -import dorkbox.util.messagebus.common.StrongConcurrentSetV8; +import java.util.Collection; + +import dorkbox.util.messagebus.common.thread.ConcurrentSet; + /** * All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, a message @@ -18,11 +21,11 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8; */ public class MessageListener { - private final StrongConcurrentSetV8 handlers; + private final Collection handlers; private Class listenerDefinition; - public MessageListener(Class listenerDefinition, int size) { - this.handlers = new StrongConcurrentSetV8(size, 0.8F); + public MessageListener(Class listenerDefinition, int size, float loadFactor, int stripeSize) { + this.handlers = new ConcurrentSet(size, loadFactor, stripeSize); this.listenerDefinition = listenerDefinition; } @@ -35,7 +38,7 @@ public class MessageListener { return this.handlers.add(messageHandler); } - public StrongConcurrentSetV8 getHandlers() { + public Collection getHandlers() { return this.handlers; } } diff --git a/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java b/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java index 6165138..b2941f7 100644 --- a/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java +++ b/src/main/java/dorkbox/util/messagebus/listener/MetadataReader.java @@ -1,11 +1,12 @@ package dorkbox.util.messagebus.listener; import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Iterator; import dorkbox.util.messagebus.annotations.Handler; -import dorkbox.util.messagebus.common.ISetEntry; import dorkbox.util.messagebus.common.ReflectionUtils; -import dorkbox.util.messagebus.common.StrongConcurrentSetV8; +import dorkbox.util.messagebus.common.thread.ConcurrentSet; /** * The meta data reader is responsible for parsing and validating message handler configurations. @@ -19,35 +20,31 @@ public class MetadataReader { // get all listeners defined by the given class (includes // listeners defined in super classes) - public MessageListener getMessageListener(Class target) { + public MessageListener getMessageListener(Class target, float loadFactor, int stripeSize) { // get all handlers (this will include all (inherited) methods directly annotated using @Handler) - StrongConcurrentSetV8 allHandlers = ReflectionUtils.getMethods(target); + Collection allHandlers = ReflectionUtils.getMethods(target); // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) - StrongConcurrentSetV8 bottomMostHandlers = new StrongConcurrentSetV8(allHandlers.size(), 0.8F); - - - ISetEntry current = allHandlers.head; + Collection bottomMostHandlers = new ConcurrentSet(allHandlers.size(), loadFactor, stripeSize); + Iterator iterator; Method handler; - while (current != null) { - handler = current.getValue(); - current = current.next(); + + for (iterator = allHandlers.iterator(); iterator.hasNext();) { + handler = iterator.next(); if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) { bottomMostHandlers.add(handler); } } - MessageListener listenerMetadata = new MessageListener(target, bottomMostHandlers.size()); + MessageListener listenerMetadata = new MessageListener(target, bottomMostHandlers.size(), loadFactor, stripeSize); // for each handler there will be no overriding method that specifies @Handler annotation // but an overriding method does inherit the listener configuration of the overwritten method - current = bottomMostHandlers.head; - while (current != null) { - handler = current.getValue(); - current = current.next(); + for (iterator = bottomMostHandlers.iterator(); iterator.hasNext();) { + handler = iterator.next(); Handler handlerConfig = ReflectionUtils.getAnnotation(handler, Handler.class); if (handlerConfig == null || !handlerConfig.enabled()) { diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 9a7b461..07afd32 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -1,13 +1,13 @@ package dorkbox.util.messagebus.subscription; import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; import com.esotericsoftware.reflectasm.MethodAccess; -import dorkbox.util.messagebus.common.ISetEntry; -import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.thread.BooleanHolder; +import dorkbox.util.messagebus.common.thread.ConcurrentSet; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; @@ -39,11 +39,11 @@ public class Subscription { private final MessageHandler handlerMetadata; private final IHandlerInvocation invocation; - private final StrongConcurrentSet listeners; + private final ConcurrentSet listeners; public Subscription(MessageHandler handler) { this.handlerMetadata = handler; - this.listeners = new StrongConcurrentSet(); + this.listeners = new ConcurrentSet(); IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); if (handler.isSynchronized()) { @@ -93,27 +93,22 @@ public class Subscription { return this.listeners.size(); } - public int count =0; - public int getCount() { - return this.count; - } /** * @return true if there were listeners for this publication, false if there was nothing */ public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message) { - StrongConcurrentSet listeners = this.listeners; + ConcurrentSet listeners = this.listeners; if (!listeners.isEmpty()) { MethodAccess handler = this.handlerMetadata.getHandler(); int handleIndex = this.handlerMetadata.getMethodIndex(); IHandlerInvocation invocation = this.invocation; - - ISetEntry current = listeners.head; + Iterator iterator; Object listener; - while (current != null) { - listener = current.getValue(); - current = current.next(); + + for (iterator = listeners.iterator(); iterator.hasNext();) { + listener = iterator.next(); try { invocation.invoke(listener, handler, handleIndex, message); @@ -160,128 +155,128 @@ public class Subscription { * @return true if there were listeners for this publication, false if there was nothing */ public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2) { - StrongConcurrentSet listeners = this.listeners; - - if (!listeners.isEmpty()) { - MethodAccess handler = this.handlerMetadata.getHandler(); - int handleIndex = this.handlerMetadata.getMethodIndex(); - IHandlerInvocation invocation = this.invocation; - - - ISetEntry current = listeners.head; - Object listener; - while (current != null) { - listener = current.getValue(); - current = current.next(); - - try { - invocation.invoke(listener, handler, handleIndex, message1, message2); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + - message1.getClass() + ", " + - message2.getClass() - + ". Expected: " + handler.getParameterTypes()[0] + ", " + - handler.getParameterTypes()[1] - ) - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2)); - } - } - booleanHolder.bool = true; - } +// StrongConcurrentSet listeners = this.listeners; +// +// if (!listeners.isEmpty()) { +// MethodAccess handler = this.handlerMetadata.getHandler(); +// int handleIndex = this.handlerMetadata.getMethodIndex(); +// IHandlerInvocation invocation = this.invocation; +// +// +// ISetEntry current = listeners.head; +// Object listener; +// while (current != null) { +// listener = current.getValue(); +// current = current.next(); +// +// try { +// invocation.invoke(listener, handler, handleIndex, message1, message2); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + +// message1.getClass() + ", " + +// message2.getClass() +// + ". Expected: " + handler.getParameterTypes()[0] + ", " + +// handler.getParameterTypes()[1] +// ) +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2)); +// } +// } +// booleanHolder.bool = true; +// } } /** * @return true if there were listeners for this publication, false if there was nothing */ public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2, Object message3) { - StrongConcurrentSet listeners = this.listeners; - - if (!listeners.isEmpty()) { - MethodAccess handler = this.handlerMetadata.getHandler(); - int handleIndex = this.handlerMetadata.getMethodIndex(); - IHandlerInvocation invocation = this.invocation; - - - ISetEntry current = listeners.head; - Object listener; - while (current != null) { - listener = current.getValue(); - current = current.next(); - - try { - invocation.invoke(listener, handler, handleIndex, message1, message2, message3); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + - message1.getClass() + ", " + - message2.getClass() + ", " + - message3.getClass() - + ". Expected: " + handler.getParameterTypes()[0] + ", " + - handler.getParameterTypes()[1] + ", " + - handler.getParameterTypes()[2] - ) - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message1, message2, message3)); - } - } - booleanHolder.bool = true; - } +// StrongConcurrentSet listeners = this.listeners; +// +// if (!listeners.isEmpty()) { +// MethodAccess handler = this.handlerMetadata.getHandler(); +// int handleIndex = this.handlerMetadata.getMethodIndex(); +// IHandlerInvocation invocation = this.invocation; +// +// +// ISetEntry current = listeners.head; +// Object listener; +// while (current != null) { +// listener = current.getValue(); +// current = current.next(); +// +// try { +// invocation.invoke(listener, handler, handleIndex, message1, message2, message3); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + +// message1.getClass() + ", " + +// message2.getClass() + ", " + +// message3.getClass() +// + ". Expected: " + handler.getParameterTypes()[0] + ", " + +// handler.getParameterTypes()[1] + ", " + +// handler.getParameterTypes()[2] +// ) +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message1, message2, message3)); +// } +// } +// booleanHolder.bool = true; +// } } diff --git a/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java b/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java index 46d9255..38ab18f 100644 --- a/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java +++ b/src/test/java/dorkbox/util/messagebus/MetadataReaderTest.java @@ -26,7 +26,7 @@ public class MetadataReaderTest extends AssertSupport { @Test public void testListenerWithoutInheritance() { - MessageListener listener = this.reader.getMessageListener(MessageListener1.class); + MessageListener listener = this.reader.getMessageListener(MessageListener1.class, 0.85F, 4); ListenerValidator validator = new ListenerValidator() .expectHandlers(2, String.class) .expectHandlers(2, Object.class) @@ -45,7 +45,7 @@ public class MetadataReaderTest extends AssertSupport { @Test public void testListenerWithInheritance() { - MessageListener listener = this.reader.getMessageListener(MessageListener2.class); + MessageListener listener = this.reader.getMessageListener(MessageListener2.class, 0.85F, 4); ListenerValidator validator = new ListenerValidator() .expectHandlers(2, String.class) .expectHandlers(2, Object.class) @@ -55,7 +55,7 @@ public class MetadataReaderTest extends AssertSupport { @Test public void testListenerWithInheritanceOverriding() { - MessageListener listener = this.reader.getMessageListener(MessageListener3.class); + MessageListener listener = this.reader.getMessageListener(MessageListener3.class, 0.85F, 4); ListenerValidator validator = new ListenerValidator() .expectHandlers(0, String.class) @@ -198,7 +198,7 @@ public class MetadataReaderTest extends AssertSupport { @Test public void testMultipleSignatureListenerWithoutInheritance() { - MessageListener listener = this.reader.getMessageListener(MultiMessageListener1.class); + MessageListener listener = this.reader.getMessageListener(MultiMessageListener1.class, 0.85F, 4); ListenerValidator validator = new ListenerValidator() .expectHandlers(7, String.class) .expectHandlers(9, String.class, String.class) diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java index 40b0c96..a9610b5 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java @@ -27,11 +27,12 @@ public class PerfTest_Collections { public static final Integer TEST_VALUE = Integer.valueOf(777); private static final float LOAD_FACTOR = 0.8F; - private static MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class); + private static final MessageListener messageListener = new MetadataReader().getMessageListener(Listener.class, LOAD_FACTOR, 8); public static void main(final String[] args) throws Exception { final int size = 16; + System.out.println("reps:" + REPETITIONS + " size: " + size); // have to warm-up the JVM. @@ -72,8 +73,10 @@ public class PerfTest_Collections { final int warmupRuns = 2; final int runs = 3; + Collection handlers = messageListener.getHandlers(); + for (int i=0;i