From 120054afd4c53a80fe4b0769b9d9be119e5a1d28 Mon Sep 17 00:00:00 2001 From: nathan Date: Thu, 19 Feb 2015 17:52:48 +0100 Subject: [PATCH] WIP - playing with concurrent maps + ability to have non-blocking for updates --- .../engio/mbassy/multi/MultiMBassador.java | 43 +- .../common/ConcurrentReferenceHashMap.java | 1914 +++++++++++++++++ .../multi/common/IdentityObjectTree.java | 4 +- .../multi/common/StrongConcurrentSet.java | 4 +- .../multi/common/SubscriptionPoolable.java | 14 - .../multi/listener/MessageListener.java | 7 +- .../subscription/SubscriptionManager.java | 263 +-- .../mbassy/multi/SubscriptionManagerTest.java | 4 +- 8 files changed, 2045 insertions(+), 208 deletions(-) create mode 100644 src/main/java/net/engio/mbassy/multi/common/ConcurrentReferenceHashMap.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/SubscriptionPoolable.java diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 09aadf9..0aa3100 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -8,8 +8,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import net.engio.mbassy.multi.common.DeadMessage; -import net.engio.mbassy.multi.common.NamedThreadFactory; import net.engio.mbassy.multi.common.LinkedTransferQueue; +import net.engio.mbassy.multi.common.NamedThreadFactory; import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.error.IPublicationErrorHandler; import net.engio.mbassy.multi.error.PublicationError; @@ -29,7 +29,7 @@ public class MultiMBassador implements IMessageBus { // this handler will receive all errors that occur during message dispatch or message handling private final List errorHandlers = new ArrayList(); - private final SubscriptionManager subscriptionManager = new SubscriptionManager(); + private final SubscriptionManager subscriptionManager; private final TransferQueue dispatchQueue = new LinkedTransferQueue(); @@ -38,6 +38,7 @@ public class MultiMBassador implements IMessageBus { public MultiMBassador() { this(Runtime.getRuntime().availableProcessors()); +// this(2); } @@ -46,6 +47,7 @@ public class MultiMBassador implements IMessageBus { numberOfThreads = 1; // at LEAST 1 thread } + this.subscriptionManager = new SubscriptionManager(numberOfThreads); this.threads = new ArrayList(numberOfThreads); NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus"); @@ -124,45 +126,42 @@ public class MultiMBassador implements IMessageBus { } - @SuppressWarnings("null") @Override public void publish(Object message) { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); - manager.readLock(); +// manager.readLock(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); - - Collection deadSubscriptions = null; - if (!validSubs) { - // Dead Event. must EXACTLY MATCH (no subclasses or varargs) - deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - } - Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); Collection varArgs = manager.getVarArgs(messageClass); - manager.readUnLock(); +// manager.readUnLock(); // Run subscriptions - if (validSubs) { + if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription sub : subscriptions) { // this catches all exception types sub.publishToSubscription(this, message); } - } else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { - DeadMessage deadMessage = new DeadMessage(message); + } else { +// manager.readLock(); + // Dead Event must EXACTLY MATCH (no subclasses or varargs permitted) + Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); +// manager.readUnLock(); + + if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { + DeadMessage deadMessage = new DeadMessage(message); + for (Subscription sub : deadSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, deadMessage); + } - for (Subscription sub : deadSubscriptions) { - // this catches all exception types - sub.publishToSubscription(this, deadMessage); } - // Dead Event. only matches EXACT handlers (no vararg, no subclasses) - return; } + // now get superClasses if (superSubscriptions != null) { for (Subscription sub : superSubscriptions) { @@ -171,6 +170,8 @@ public class MultiMBassador implements IMessageBus { } } + + // now get varargs if (varArgs != null && !varArgs.isEmpty()) { // messy, but the ONLY way to do it. diff --git a/src/main/java/net/engio/mbassy/multi/common/ConcurrentReferenceHashMap.java b/src/main/java/net/engio/mbassy/multi/common/ConcurrentReferenceHashMap.java new file mode 100644 index 0000000..b49dbb6 --- /dev/null +++ b/src/main/java/net/engio/mbassy/multi/common/ConcurrentReferenceHashMap.java @@ -0,0 +1,1914 @@ +/* + * Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.engio.mbassy.multi.common; + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +import java.io.IOException; +import java.io.Serializable; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; +import java.lang.ref.WeakReference; +import java.util.AbstractCollection; +import java.util.AbstractMap; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.EnumSet; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An advanced hash table supporting configurable garbage collection semantics + * of keys and values, optional referential-equality, full concurrency of + * retrievals, and adjustable expected concurrency for updates. + *

+ * This table is designed around specific advanced use-cases. If there is any + * doubt whether this table is for you, you most likely should be using + * {@link java.util.concurrent.ConcurrentHashMap} instead. + *

+ * This table supports strong, weak, and soft keys and values. By default keys + * are weak, and values are strong. Such a configuration offers similar behavior + * to {@link java.util.WeakHashMap}, entries of this table are periodically + * removed once their corresponding keys are no longer referenced outside of + * this table. In other words, this table will not prevent a key from being + * discarded by the garbage collector. Once a key has been discarded by the + * collector, the corresponding entry is no longer visible to this table; + * however, the entry may occupy space until a future table operation decides to + * reclaim it. For this reason, summary functions such as size and + * isEmpty might return a value greater than the observed number of + * entries. In order to support a high level of concurrency, stale entries are + * only reclaimed during blocking (usually mutating) operations. + *

+ * Enabling soft keys allows entries in this table to remain until their space + * is absolutely needed by the garbage collector. This is unlike weak keys which + * can be reclaimed as soon as they are no longer referenced by a normal strong + * reference. The primary use case for soft keys is a cache, which ideally + * occupies memory that is not in use for as long as possible. + *

+ * By default, values are held using a normal strong reference. This provides + * the commonly desired guarantee that a value will always have at least the + * same life-span as it's key. For this reason, care should be taken to ensure + * that a value never refers, either directly or indirectly, to its key, thereby + * preventing reclamation. If this is unavoidable, then it is recommended to use + * the same reference type in use for the key. However, it should be noted that + * non-strong values may disappear before their corresponding key. + *

+ * While this table does allow the use of both strong keys and values, it is + * recommended you use {@link java.util.concurrent.ConcurrentHashMap} for such a + * configuration, since it is optimized for that case. + *

+ * Just like {@link java.util.concurrent.ConcurrentHashMap}, this class obeys + * the same functional specification as {@link java.util.Hashtable}, and + * includes versions of methods corresponding to each method of + * Hashtable. However, even though all operations are thread-safe, + * retrieval operations do not entail locking, and there is + * not any support for locking the entire table in a way that + * prevents all access. This class is fully interoperable with + * Hashtable in programs that rely on its thread safety but not on + * its synchronization details. + *

+ *

+ * Retrieval operations (including get) generally do not block, so they + * may overlap with update operations (including put and + * remove). Retrievals reflect the results of the most recently + * completed update operations holding upon their onset. For + * aggregate operations such as putAll and clear, + * concurrent retrievals may reflect insertion or removal of only some entries. + * Similarly, Iterators and Enumerations return elements reflecting the state of + * the hash table at some point at or since the creation of the + * iterator/enumeration. They do not throw + * {@link ConcurrentModificationException}. However, iterators are designed to + * be used by only one thread at a time. + *

+ *

+ * The allowed concurrency among update operations is guided by the optional + * concurrencyLevel constructor argument (default 16), + * which is used as a hint for internal sizing. The table is internally + * partitioned to try to permit the indicated number of concurrent updates + * without contention. Because placement in hash tables is essentially random, + * the actual concurrency will vary. Ideally, you should choose a value to + * accommodate as many threads as will ever concurrently modify the table. Using + * a significantly higher value than you need can waste space and time, and a + * significantly lower value can lead to thread contention. But overestimates + * and underestimates within an order of magnitude do not usually have much + * noticeable impact. A value of one is appropriate when it is known that only + * one thread will modify and all others will only read. Also, resizing this or + * any other kind of hash table is a relatively slow operation, so, when + * possible, it is a good idea that you provide estimates of expected table sizes in + * constructors. + *

+ *

+ * This class and its views and iterators implement all of the optional + * methods of the {@link Map} and {@link Iterator} interfaces. + *

+ *

+ * Like {@link Hashtable} but unlike {@link HashMap}, this class does + * not allow null to be used as a key or value. + *

+ *

+ * This class is a member of the + * Java Collections Framework. + * + * @param the type of keys maintained by this map + * @param the type of mapped values + * @author Doug Lea + * @author Jason T. Greene + */ +@SuppressWarnings("all") +public class ConcurrentReferenceHashMap extends AbstractMap + implements java.util.concurrent.ConcurrentMap, Serializable { + private static final long serialVersionUID = 7249069246763182397L; + + /* + * The basic strategy is to subdivide the table among Segments, + * each of which itself is a concurrently readable hash table. + */ + + /** + * An option specifying which Java reference type should be used to refer + * to a key and/or value. + */ + public static enum ReferenceType { + /** + * Indicates a normal Java strong reference should be used + */ + STRONG, + /** + * Indicates a {@link WeakReference} should be used + */ + WEAK, + /** + * Indicates a {@link SoftReference} should be used + */ + SOFT + } + + ; + + + public static enum Option { + /** + * Indicates that referential-equality (== instead of .equals()) should + * be used when locating keys. This offers similar behavior to {@link IdentityHashMap} + */ + IDENTITY_COMPARISONS + } + + ; + + /* ---------------- Constants -------------- */ + + static final ReferenceType DEFAULT_KEY_TYPE = ReferenceType.WEAK; + + static final ReferenceType DEFAULT_VALUE_TYPE = ReferenceType.STRONG; + + + /** + * The default initial capacity for this table, + * used when not otherwise specified in a constructor. + */ + static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The default load factor for this table, used when not + * otherwise specified in a constructor. + */ + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * The default concurrency level for this table, used when not + * otherwise specified in a constructor. + */ + static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * The maximum capacity, used if a higher value is implicitly + * specified by either of the constructors with arguments. MUST + * be a power of two <= 1<<30 to ensure that entries are indexable + * using ints. + */ + static final int MAXIMUM_CAPACITY = 1 << 30; + + /** + * The maximum number of segments to allow; used to bound + * constructor arguments. + */ + static final int MAX_SEGMENTS = 1 << 16; // slightly conservative + + /** + * Number of unsynchronized retries in size and containsValue + * methods before resorting to locking. This is used to avoid + * unbounded retries if tables undergo continuous modification + * which would make it impossible to obtain an accurate result. + */ + static final int RETRIES_BEFORE_LOCK = 2; + + /* ---------------- Fields -------------- */ + + /** + * Mask value for indexing into segments. The upper bits of a + * key's hash code are used to choose the segment. + */ + final int segmentMask; + + /** + * Shift value for indexing within segments. + */ + final int segmentShift; + + /** + * The segments, each of which is a specialized hash table + */ + final Segment[] segments; + + boolean identityComparisons; + + transient Set keySet; + transient Set> entrySet; + transient Collection values; + + /* ---------------- Small Utilities -------------- */ + + /** + * Applies a supplemental hash function to a given hashCode, which + * defends against poor quality hash functions. This is critical + * because ConcurrentReferenceHashMap uses power-of-two length hash tables, + * that otherwise encounter collisions for hashCodes that do not + * differ in lower or upper bits. + */ + private static int hash(int h) { + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += h << 15 ^ 0xffffcd7d; + h ^= h >>> 10; + h += h << 3; + h ^= h >>> 6; + h += (h << 2) + (h << 14); + return h ^ h >>> 16; + } + + /** + * Returns the segment that should be used for key with given hash + * + * @param hash the hash code for the key + * @return the segment + */ + final Segment segmentFor(int hash) { + return this.segments[hash >>> this.segmentShift & this.segmentMask]; + } + + private int hashOf(Object key) { + return hash(this.identityComparisons ? + System.identityHashCode(key) : key.hashCode()); + } + + /* ---------------- Inner Classes -------------- */ + + static interface KeyReference { + int keyHash(); + + Object keyRef(); + } + + /** + * A weak-key reference which stores the key hash needed for reclamation. + */ + static final class WeakKeyReference extends WeakReference implements KeyReference { + final int hash; + + WeakKeyReference(K key, int hash, ReferenceQueue refQueue) { + super(key, refQueue); + this.hash = hash; + } + + @Override + public final int keyHash() { + return this.hash; + } + + @Override + public final Object keyRef() { + return this; + } + } + + /** + * A soft-key reference which stores the key hash needed for reclamation. + */ + static final class SoftKeyReference extends SoftReference implements KeyReference { + final int hash; + + SoftKeyReference(K key, int hash, ReferenceQueue refQueue) { + super(key, refQueue); + this.hash = hash; + } + + @Override + public final int keyHash() { + return this.hash; + } + + @Override + public final Object keyRef() { + return this; + } + } + + static final class WeakValueReference extends WeakReference implements KeyReference { + final Object keyRef; + final int hash; + + WeakValueReference(V value, Object keyRef, int hash, ReferenceQueue refQueue) { + super(value, refQueue); + this.keyRef = keyRef; + this.hash = hash; + } + + @Override + public final int keyHash() { + return this.hash; + } + + @Override + public final Object keyRef() { + return this.keyRef; + } + } + + static final class SoftValueReference extends SoftReference implements KeyReference { + final Object keyRef; + final int hash; + + SoftValueReference(V value, Object keyRef, int hash, ReferenceQueue refQueue) { + super(value, refQueue); + this.keyRef = keyRef; + this.hash = hash; + } + + @Override + public final int keyHash() { + return this.hash; + } + + @Override + public final Object keyRef() { + return this.keyRef; + } + } + + /** + * ConcurrentReferenceHashMap list entry. Note that this is never exported + * out as a user-visible Map.Entry. + *

+ * Because the value field is volatile, not final, it is legal wrt + * the Java Memory Model for an unsynchronized reader to see null + * instead of initial value when read via a data race. Although a + * reordering leading to this is not likely to ever actually + * occur, the Segment.readValueUnderLock method is used as a + * backup in case a null (pre-initialized) value is ever seen in + * an unsynchronized access method. + */ + static final class HashEntry { + final Object keyRef; + final int hash; + volatile Object valueRef; + final HashEntry next; + + HashEntry(K key, int hash, HashEntry next, V value, + ReferenceType keyType, ReferenceType valueType, + ReferenceQueue refQueue) { + this.hash = hash; + this.next = next; + this.keyRef = newKeyReference(key, keyType, refQueue); + this.valueRef = newValueReference(value, valueType, refQueue); + } + + final Object newKeyReference(K key, ReferenceType keyType, + ReferenceQueue refQueue) { + if (keyType == ReferenceType.WEAK) { + return new WeakKeyReference(key, this.hash, refQueue); + } + if (keyType == ReferenceType.SOFT) { + return new SoftKeyReference(key, this.hash, refQueue); + } + + return key; + } + + final Object newValueReference(V value, ReferenceType valueType, + ReferenceQueue refQueue) { + if (valueType == ReferenceType.WEAK) { + return new WeakValueReference(value, this.keyRef, this.hash, refQueue); + } + if (valueType == ReferenceType.SOFT) { + return new SoftValueReference(value, this.keyRef, this.hash, refQueue); + } + + return value; + } + + @SuppressWarnings("unchecked") + final K key() { + if (this.keyRef instanceof KeyReference) { + return ((Reference) this.keyRef).get(); + } + + return (K) this.keyRef; + } + + final V value() { + return dereferenceValue(this.valueRef); + } + + @SuppressWarnings("unchecked") + final V dereferenceValue(Object value) { + if (value instanceof KeyReference) { + return ((Reference) value).get(); + } + + return (V) value; + } + + final void setValue(V value, ReferenceType valueType, ReferenceQueue refQueue) { + this.valueRef = newValueReference(value, valueType, refQueue); + } + + @SuppressWarnings("unchecked") + static final HashEntry[] newArray(int i) { + return new HashEntry[i]; + } + } + + /** + * Segments are specialized versions of hash tables. This + * subclasses from ReentrantLock opportunistically, just to + * simplify some locking and avoid separate construction. + */ + static final class Segment extends ReentrantLock implements Serializable { + /* + * Segments maintain a table of entry lists that are ALWAYS + * kept in a consistent state, so they can be read without locking. + * Next fields of nodes are immutable (final). All list + * additions are performed at the front of each bin. This + * makes it easy to check changes, and also fast to traverse. + * When nodes would otherwise be changed, new nodes are + * created to replace them. This works well for hash tables + * since the bin lists tend to be short. (The average length + * is less than two for the default load factor threshold.) + * + * Read operations can thus proceed without locking, but rely + * on selected uses of volatiles to ensure that completed + * write operations performed by other threads are + * noticed. For most purposes, the "count" field, tracking the + * number of elements, serves as that volatile variable + * ensuring visibility. This is convenient because this field + * needs to be read in many read operations anyway: + * + * - All (unsynchronized) read operations must first read the + * "count" field, and should not look at table entries if + * it is 0. + * + * - All (synchronized) write operations should write to + * the "count" field after structurally changing any bin. + * The operations must not take any action that could even + * momentarily cause a concurrent read operation to see + * inconsistent data. This is made easier by the nature of + * the read operations in Map. For example, no operation + * can reveal that the table has grown but the threshold + * has not yet been updated, so there are no atomicity + * requirements for this with respect to reads. + * + * As a guide, all critical volatile reads and writes to the + * count field are marked in code comments. + */ + + private static final long serialVersionUID = 2249069246763182397L; + + /** + * The number of elements in this segment's region. + */ + // I have faith in Doug Lea's techical decision +// @edu.umd.cs.findbugs.annotations.SuppressWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") + transient volatile int count; + + /** + * Number of updates that alter the size of the table. This is + * used during bulk-read methods to make sure they see a + * consistent snapshot: If modCounts change during a traversal + * of segments computing size or checking containsValue, then + * we might have an inconsistent view of state so (usually) we + * must retry. + */ + // I have faith in Doug Lea's techical decision +// @edu.umd.cs.findbugs.annotations.SuppressWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") + transient int modCount; + + /** + * The table is rehashed when its size exceeds this threshold. + * (The value of this field is always (int)(capacity * + * loadFactor).) + */ + transient int threshold; + + /** + * The per-segment table. + */ + transient volatile HashEntry[] table; + + /** + * The load factor for the hash table. Even though this value + * is same for all segments, it is replicated to avoid needing + * links to outer object. + * + * @serial + */ + final float loadFactor; + + /** + * The collected weak-key reference queue for this segment. + * This should be (re)initialized whenever table is assigned, + */ + transient volatile ReferenceQueue refQueue; + + final ReferenceType keyType; + + final ReferenceType valueType; + + final boolean identityComparisons; + + Segment(int initialCapacity, float lf, ReferenceType keyType, + ReferenceType valueType, boolean identityComparisons) { + this.loadFactor = lf; + this.keyType = keyType; + this.valueType = valueType; + this.identityComparisons = identityComparisons; + setTable(HashEntry.newArray(initialCapacity)); + } + + @SuppressWarnings("unchecked") + static final Segment[] newArray(int i) { + return new Segment[i]; + } + + private boolean keyEq(Object src, Object dest) { + return this.identityComparisons ? src == dest : src.equals(dest); + } + + /** + * Sets table to new HashEntry array. + * Call only while holding lock or in constructor. + */ + void setTable(HashEntry[] newTable) { + this.threshold = (int) (newTable.length * this.loadFactor); + this.table = newTable; + this.refQueue = new ReferenceQueue(); + } + + /** + * Returns properly casted first entry of bin for given hash. + */ + HashEntry getFirst(int hash) { + HashEntry[] tab = this.table; + return tab[hash & tab.length - 1]; + } + + HashEntry newHashEntry(K key, int hash, HashEntry next, V value) { + return new HashEntry(key, hash, next, value, this.keyType, this.valueType, this.refQueue); + } + + /** + * Reads value field of an entry under lock. Called if value + * field ever appears to be null. This is possible only if a + * compiler happens to reorder a HashEntry initialization with + * its table assignment, which is legal under memory model + * but is not known to ever occur. + */ + V readValueUnderLock(HashEntry e) { + lock(); + try { + removeStale(); + return e.value(); + } finally { + unlock(); + } + } + + /* Specialized implementations of map methods */ + + V get(Object key, int hash) { + if (this.count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && keyEq(key, e.key())) { + Object opaque = e.valueRef; + if (opaque != null) { + return e.dereferenceValue(opaque); + } + + return readValueUnderLock(e); // recheck + } + e = e.next; + } + } + return null; + } + + boolean containsKey(Object key, int hash) { + if (this.count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && keyEq(key, e.key())) { + return true; + } + e = e.next; + } + } + return false; + } + + boolean containsValue(Object value) { + if (this.count != 0) { // read-volatile + HashEntry[] tab = this.table; + int len = tab.length; + for (int i = 0; i < len; i++) { + for (HashEntry e = tab[i]; e != null; e = e.next) { + Object opaque = e.valueRef; + V v; + + if (opaque == null) { + v = readValueUnderLock(e); // recheck + } else { + v = e.dereferenceValue(opaque); + } + + if (value.equals(v)) { + return true; + } + } + } + } + return false; + } + + boolean replace(K key, int hash, V oldValue, V newValue) { + lock(); + try { + removeStale(); + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + boolean replaced = false; + if (e != null && oldValue.equals(e.value())) { + replaced = true; + e.setValue(newValue, this.valueType, this.refQueue); + } + return replaced; + } finally { + unlock(); + } + } + + V replace(K key, int hash, V newValue) { + lock(); + try { + removeStale(); + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + oldValue = e.value(); + e.setValue(newValue, this.valueType, this.refQueue); + } + return oldValue; + } finally { + unlock(); + } + } + + + V put(K key, int hash, V value, boolean onlyIfAbsent) { + lock(); + try { + removeStale(); + int c = this.count; + if (c++ > this.threshold) {// ensure capacity + int reduced = rehash(); + if (reduced > 0) + { + this.count = (c -= reduced) - 1; // write-volatile + } + } + + HashEntry[] tab = this.table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue; + if (e != null) { + oldValue = e.value(); + if (!onlyIfAbsent) { + e.setValue(value, this.valueType, this.refQueue); + } + } else { + oldValue = null; + ++this.modCount; + tab[index] = newHashEntry(key, hash, first, value); + this.count = c; // write-volatile + } + return oldValue; + } finally { + unlock(); + } + } + + int rehash() { + HashEntry[] oldTable = this.table; + int oldCapacity = oldTable.length; + if (oldCapacity >= MAXIMUM_CAPACITY) { + return 0; + } + + /* + * Reclassify nodes in each list to new Map. Because we are + * using power-of-two expansion, the elements from each bin + * must either stay at the same index, or move with a power of two + * offset. We eliminate unnecessary node creation by catching + * cases where old nodes can be reused because their next + * fields won't change. Statistically, at the default + * threshold, only about one-sixth of them need cloning when + * a table doubles. The nodes they replace will be garbage + * collectable as soon as they are no longer referenced by any + * reader thread that may be in the midst of traversing table + * right now. + */ + + HashEntry[] newTable = HashEntry.newArray(oldCapacity << 1); + this.threshold = (int) (newTable.length * this.loadFactor); + int sizeMask = newTable.length - 1; + int reduce = 0; + for (int i = 0; i < oldCapacity; i++) { + // We need to guarantee that any existing reads of old Map can + // proceed. So we cannot yet null out each bin. + HashEntry e = oldTable[i]; + + if (e != null) { + HashEntry next = e.next; + int idx = e.hash & sizeMask; + + // Single node on list + if (next == null) { + newTable[idx] = e; + } else { + // Reuse trailing consecutive sequence at same slot + HashEntry lastRun = e; + int lastIdx = idx; + for (HashEntry last = next; + last != null; + last = last.next) { + int k = last.hash & sizeMask; + if (k != lastIdx) { + lastIdx = k; + lastRun = last; + } + } + newTable[lastIdx] = lastRun; + // Clone all remaining nodes + for (HashEntry p = e; p != lastRun; p = p.next) { + // Skip GC'd weak refs + K key = p.key(); + if (key == null) { + reduce++; + continue; + } + int k = p.hash & sizeMask; + HashEntry n = newTable[k]; + newTable[k] = newHashEntry(key, p.hash, n, p.value()); + } + } + } + } + this.table = newTable; + return reduce; + } + + /** + * Remove: match on key only if value is null, else match both. + */ + V remove(Object key, int hash, Object value, boolean refRemove) { + lock(); + try { + if (!refRemove) { + removeStale(); + } + int c = this.count - 1; + HashEntry[] tab = this.table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + // a ref remove operation compares the Reference instance + while (e != null && key != e.keyRef + && (refRemove || hash != e.hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + V v = e.value(); + if (value == null || value.equals(v)) { + oldValue = v; + // All entries following removed node can stay + // in list, but all preceding ones need to be + // cloned. + ++this.modCount; + HashEntry newFirst = e.next; + for (HashEntry p = first; p != e; p = p.next) { + K pKey = p.key(); + if (pKey == null) { // Skip GC'd keys + c--; + continue; + } + + newFirst = newHashEntry(pKey, p.hash, newFirst, p.value()); + } + tab[index] = newFirst; + this.count = c; // write-volatile + } + } + return oldValue; + } finally { + unlock(); + } + } + + final void removeStale() { + KeyReference ref; + while ((ref = (KeyReference) this.refQueue.poll()) != null) { + remove(ref.keyRef(), ref.keyHash(), null, true); + } + } + + void clear() { + if (this.count != 0) { + lock(); + try { + HashEntry[] tab = this.table; + for (int i = 0; i < tab.length; i++) { + tab[i] = null; + } + ++this.modCount; + // replace the reference queue to avoid unnecessary stale cleanups + this.refQueue = new ReferenceQueue(); + this.count = 0; // write-volatile + } finally { + unlock(); + } + } + } + } + + + + /* ---------------- Public operations -------------- */ + + /** + * Creates a new, empty map with the specified initial + * capacity, reference types, load factor, and concurrency level. + *

+ * Behavioral changing options such as {@link Option#IDENTITY_COMPARISONS} + * can also be specified. + * + * @param initialCapacity the initial capacity. The implementation + * performs internal sizing to accommodate this many elements. + * @param loadFactor the load factor threshold, used to control resizing. + * Resizing may be performed when the average number of elements per + * bin exceeds this threshold. + * @param concurrencyLevel the estimated number of concurrently + * updating threads. The implementation performs internal sizing + * to try to accommodate this many threads. + * @param keyType the reference type to use for keys + * @param valueType the reference type to use for values + * @param options the behavioral options + * @throws IllegalArgumentException if the initial capacity is + * negative or the load factor or concurrencyLevel are + * nonpositive. + */ + public ConcurrentReferenceHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel, + ReferenceType keyType, ReferenceType valueType, + EnumSet

+ *

If this map contains a mapping from a key + * {@code k} to a value {@code v} such that {@code key.equals(k)}, + * then this method returns {@code v}; otherwise it returns + * {@code null}. (There can be at most one such mapping.) + * + * @throws NullPointerException if the specified key is null + */ + @Override + public V get(Object key) { + int hash = hashOf(key); + return segmentFor(hash).get(key, hash); + } + + /** + * Tests if the specified object is a key in this table. + * + * @param key possible key + * @return true if and only if the specified object + * is a key in this table, as determined by the + * equals method; false otherwise. + * @throws NullPointerException if the specified key is null + */ + @Override + public boolean containsKey(Object key) { + int hash = hashOf(key); + return segmentFor(hash).containsKey(key, hash); + } + + /** + * Returns true if this map maps one or more keys to the + * specified value. Note: This method requires a full internal + * traversal of the hash table, therefore it is much slower than the + * method containsKey. + * + * @param value value whose presence in this map is to be tested + * @return true if this map maps one or more keys to the + * specified value + * @throws NullPointerException if the specified value is null + */ + @Override + public boolean containsValue(Object value) { + if (value == null) { + throw new NullPointerException(); + } + + // See explanation of modCount use above + + final Segment[] segments = this.segments; + int[] mc = new int[segments.length]; + + // Try a few times without locking + for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { + int sum = 0; + int mcsum = 0; + for (int i = 0; i < segments.length; ++i) { + int c = segments[i].count; + mcsum += mc[i] = segments[i].modCount; + if (segments[i].containsValue(value)) { + return true; + } + } + boolean cleanSweep = true; + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++i) { + int c = segments[i].count; + if (mc[i] != segments[i].modCount) { + cleanSweep = false; + break; + } + } + } + if (cleanSweep) { + return false; + } + } + // Resort to locking all segments + for (int i = 0; i < segments.length; ++i) { + segments[i].lock(); + } + boolean found = false; + try { + for (int i = 0; i < segments.length; ++i) { + if (segments[i].containsValue(value)) { + found = true; + break; + } + } + } finally { + for (int i = 0; i < segments.length; ++i) { + segments[i].unlock(); + } + } + return found; + } + + /** + * Legacy method testing if some key maps into the specified value + * in this table. This method is identical in functionality to + * {@link #containsValue}, and exists solely to ensure + * full compatibility with class {@link java.util.Hashtable}, + * which supported this method prior to introduction of the + * Java Collections framework. + * + * @param value a value to search for + * @return true if and only if some key maps to the + * value argument in this table as + * determined by the equals method; + * false otherwise + * @throws NullPointerException if the specified value is null + */ + public boolean contains(Object value) { + return containsValue(value); + } + + /** + * Maps the specified key to the specified value in this table. + * Neither the key nor the value can be null. + *

+ *

The value can be retrieved by calling the get method + * with a key that is equal to the original key. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + * @return the previous value associated with key, or + * null if there was no mapping for key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V put(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).put(key, hash, value, false); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, + * or null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V putIfAbsent(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).put(key, hash, value, true); + } + + /** + * Copies all of the mappings from the specified map to this one. + * These mappings replace any mappings that this map had for any of the + * keys currently in the specified map. + * + * @param m mappings to be stored in this map + */ + @Override + public void putAll(Map m) { + for (Map.Entry e : m.entrySet()) { + put(e.getKey(), e.getValue()); + } + } + + /** + * Removes the key (and its corresponding value) from this map. + * This method does nothing if the key is not in the map. + * + * @param key the key that needs to be removed + * @return the previous value associated with key, or + * null if there was no mapping for key + * @throws NullPointerException if the specified key is null + */ + @Override + public V remove(Object key) { + int hash = hashOf(key); + return segmentFor(hash).remove(key, hash, null, false); + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if the specified key is null + */ + @Override + public boolean remove(Object key, Object value) { + int hash = hashOf(key); + if (value == null) { + return false; + } + return segmentFor(hash).remove(key, hash, value, false) != null; + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if any of the arguments are null + */ + @Override + public boolean replace(K key, V oldValue, V newValue) { + if (oldValue == null || newValue == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).replace(key, hash, oldValue, newValue); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, + * or null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V replace(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).replace(key, hash, value); + } + + /** + * Removes all of the mappings from this map. + */ + @Override + public void clear() { + for (int i = 0; i < this.segments.length; ++i) { + this.segments[i].clear(); + } + } + + /** + * Removes any stale entries whose keys have been finalized. Use of this + * method is normally not necessary since stale entries are automatically + * removed lazily, when blocking operations are required. However, there + * are some cases where this operation should be performed eagerly, such + * as cleaning up old references to a ClassLoader in a multi-classloader + * environment. + *

+ * Note: this method will acquire locks one at a time across all segments + * of this table, so this method should be used sparingly. + */ + public void purgeStaleEntries() { + for (int i = 0; i < this.segments.length; ++i) { + this.segments[i].removeStale(); + } + } + + + /** + * Returns a {@link Set} view of the keys contained in this map. + * The set is backed by the map, so changes to the map are + * reflected in the set, and vice-versa. The set supports element + * removal, which removes the corresponding mapping from this map, + * via the Iterator.remove, Set.remove, + * removeAll, retainAll, and clear + * operations. It does not support the add or + * addAll operations. + *

+ *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + */ + @Override + public Set keySet() { + Set ks = this.keySet; + return ks != null ? ks : (this.keySet = new KeySet()); + } + + /** + * Returns a {@link Collection} view of the values contained in this map. + * The collection is backed by the map, so changes to the map are + * reflected in the collection, and vice-versa. The collection + * supports element removal, which removes the corresponding + * mapping from this map, via the Iterator.remove, + * Collection.remove, removeAll, + * retainAll, and clear operations. It does not + * support the add or addAll operations. + *

+ *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + */ + @Override + public Collection values() { + Collection vs = this.values; + return vs != null ? vs : (this.values = new Values()); + } + + /** + * Returns a {@link Set} view of the mappings contained in this map. + * The set is backed by the map, so changes to the map are + * reflected in the set, and vice-versa. The set supports element + * removal, which removes the corresponding mapping from the map, + * via the Iterator.remove, Set.remove, + * removeAll, retainAll, and clear + * operations. It does not support the add or + * addAll operations. + *

+ *

The view's iterator is a "weakly consistent" iterator + * that will never throw {@link ConcurrentModificationException}, + * and is guaranteed to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + */ + @Override + public Set> entrySet() { + Set> es = this.entrySet; + return es != null ? es : (this.entrySet = new EntrySet()); + } + + /** + * Returns an enumeration of the keys in this table. + * + * @return an enumeration of the keys in this table + * @see #keySet() + */ + public Enumeration keys() { + return new KeyIterator(); + } + + /** + * Returns an enumeration of the values in this table. + * + * @return an enumeration of the values in this table + * @see #values() + */ + public Enumeration elements() { + return new ValueIterator(); + } + + /* ---------------- Iterator Support -------------- */ + + abstract class HashIterator { + int nextSegmentIndex; + int nextTableIndex; + HashEntry[] currentTable; + HashEntry nextEntry; + HashEntry lastReturned; + K currentKey; // Strong reference to weak key (prevents gc) + + HashIterator() { + this.nextSegmentIndex = ConcurrentReferenceHashMap.this.segments.length - 1; + this.nextTableIndex = -1; + advance(); + } + + public boolean hasMoreElements() { + return hasNext(); + } + + final void advance() { + if (this.nextEntry != null && (this.nextEntry = this.nextEntry.next) != null) { + return; + } + + while (this.nextTableIndex >= 0) { + if ((this.nextEntry = this.currentTable[this.nextTableIndex--]) != null) { + return; + } + } + + while (this.nextSegmentIndex >= 0) { + Segment seg = ConcurrentReferenceHashMap.this.segments[this.nextSegmentIndex--]; + if (seg.count != 0) { + this.currentTable = seg.table; + for (int j = this.currentTable.length - 1; j >= 0; --j) { + if ((this.nextEntry = this.currentTable[j]) != null) { + this.nextTableIndex = j - 1; + return; + } + } + } + } + } + + public boolean hasNext() { + while (this.nextEntry != null) { + if (this.nextEntry.key() != null) { + return true; + } + advance(); + } + + return false; + } + + HashEntry nextEntry() { + do { + if (this.nextEntry == null) { + throw new NoSuchElementException(); + } + + this.lastReturned = this.nextEntry; + this.currentKey = this.lastReturned.key(); + advance(); + } while (this.currentKey == null); // Skip GC'd keys + + return this.lastReturned; + } + + public void remove() { + if (this.lastReturned == null) { + throw new IllegalStateException(); + } + ConcurrentReferenceHashMap.this.remove(this.currentKey); + this.lastReturned = null; + } + } + + final class KeyIterator + extends HashIterator + implements Iterator, Enumeration { + @Override + public K next() { + return super.nextEntry().key(); + } + + @Override + public K nextElement() { + return super.nextEntry().key(); + } + } + + final class ValueIterator + extends HashIterator + implements Iterator, Enumeration { + @Override + public V next() { + return super.nextEntry().value(); + } + + @Override + public V nextElement() { + return super.nextEntry().value(); + } + } + + /* + * This class is needed for JDK5 compatibility. + */ + protected static class SimpleEntry implements Entry, + java.io.Serializable { + private static final long serialVersionUID = -8499721149061103585L; + + protected final K key; + protected V value; + + public SimpleEntry(K key, V value) { + this.key = key; + this.value = value; + } + + public SimpleEntry(Entry entry) { + this.key = entry.getKey(); + this.value = entry.getValue(); + } + + @Override + public K getKey() { + return this.key; + } + + @Override + public V getValue() { + return this.value; + } + + @Override + public V setValue(V value) { + V oldValue = this.value; + this.value = value; + return oldValue; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + @SuppressWarnings("unchecked") + Map.Entry e = (Map.Entry) o; + return eq(this.key, e.getKey()) && eq(this.value, e.getValue()); + } + + @Override + public int hashCode() { + return (this.key == null ? 0 : this.key.hashCode()) + ^ (this.value == null ? 0 : this.value.hashCode()); + } + + @Override + public String toString() { + return this.key + "=" + this.value; + } + + private static boolean eq(Object o1, Object o2) { + return o1 == null ? o2 == null : o1.equals(o2); + } + } + + + /** + * Custom Entry class used by EntryIterator.next(), that relays setValue + * changes to the underlying map. + */ + protected class WriteThroughEntry extends SimpleEntry { + private static final long serialVersionUID = -7900634345345313646L; + + protected WriteThroughEntry(K k, V v) { + super(k, v); + } + + /** + * Set our entry's value and writes it through to the map. The + * value to return is somewhat arbitrary: since a + * WriteThroughEntry does not necessarily track asynchronous + * changes, the most recent "previous" value could be + * different from what we return (or could even have been + * removed in which case the put will re-establish). We do not + * and cannot guarantee more. + */ + @Override + public V setValue(V value) { + if (value == null) { + throw new NullPointerException(); + } + V v = super.setValue(value); + ConcurrentReferenceHashMap.this.put(getKey(), value); + return v; + } + } + + final class EntryIterator + extends HashIterator + implements Iterator> { + @Override + public Map.Entry next() { + HashEntry e = super.nextEntry(); + return new WriteThroughEntry(e.key(), e.value()); + } + } + + final class KeySet extends AbstractSet { + @Override + public Iterator iterator() { + return new KeyIterator(); + } + + @Override + public int size() { + return ConcurrentReferenceHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentReferenceHashMap.this.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return ConcurrentReferenceHashMap.this.containsKey(o); + } + + @Override + public boolean remove(Object o) { + return ConcurrentReferenceHashMap.this.remove(o) != null; + } + + @Override + public void clear() { + ConcurrentReferenceHashMap.this.clear(); + } + } + + final class Values extends AbstractCollection { + @Override + public Iterator iterator() { + return new ValueIterator(); + } + + @Override + public int size() { + return ConcurrentReferenceHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentReferenceHashMap.this.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return ConcurrentReferenceHashMap.this.containsValue(o); + } + + @Override + public void clear() { + ConcurrentReferenceHashMap.this.clear(); + } + } + + final class EntrySet extends AbstractSet> { + @Override + public Iterator> iterator() { + return new EntryIterator(); + } + + @Override + public boolean contains(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + Map.Entry e = (Map.Entry) o; + V v = ConcurrentReferenceHashMap.this.get(e.getKey()); + return v != null && v.equals(e.getValue()); + } + + @Override + public boolean remove(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + Map.Entry e = (Map.Entry) o; + return ConcurrentReferenceHashMap.this.remove(e.getKey(), e.getValue()); + } + + @Override + public int size() { + return ConcurrentReferenceHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentReferenceHashMap.this.isEmpty(); + } + + @Override + public void clear() { + ConcurrentReferenceHashMap.this.clear(); + } + } + + /* ---------------- Serialization Support -------------- */ + + /** + * Save the state of the ConcurrentReferenceHashMap instance to a + * stream (i.e., serialize it). + * + * @param s the stream + * @serialData the key (Object) and value (Object) + * for each key-value mapping, followed by a null pair. + * The key-value mappings are emitted in no particular order. + */ + private void writeObject(java.io.ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + + for (int k = 0; k < this.segments.length; ++k) { + Segment seg = this.segments[k]; + seg.lock(); + try { + HashEntry[] tab = seg.table; + for (int i = 0; i < tab.length; ++i) { + for (HashEntry e = tab[i]; e != null; e = e.next) { + K key = e.key(); + if (key == null) { + continue; + } + + s.writeObject(key); + s.writeObject(e.value()); + } + } + } finally { + seg.unlock(); + } + } + s.writeObject(null); + s.writeObject(null); + } + + /** + * Reconstitute the ConcurrentReferenceHashMap instance from a + * stream (i.e., deserialize it). + * + * @param s the stream + */ + @SuppressWarnings("unchecked") + private void readObject(java.io.ObjectInputStream s) + throws IOException, ClassNotFoundException { + s.defaultReadObject(); + + // Initialize each segment to be minimally sized, and let grow. + for (int i = 0; i < this.segments.length; ++i) { + this.segments[i].setTable(new HashEntry[1]); + } + + // Read the keys and values, and put the mappings in the table + for (; ; ) { + K key = (K) s.readObject(); + V value = (V) s.readObject(); + if (key == null) { + break; + } + put(key, value); + } + } +} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java b/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java index ae16c8e..e788053 100644 --- a/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java +++ b/src/main/java/net/engio/mbassy/multi/common/IdentityObjectTree.java @@ -1,7 +1,7 @@ package net.engio.mbassy.multi.common; -import java.util.IdentityHashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** @@ -228,7 +228,7 @@ public class IdentityObjectTree { IdentityObjectTree objectTree; if (this.children == null) { - this.children = new IdentityHashMap>(2); + this.children = new ConcurrentHashMap>(4, .9f, 1); // might as well add too objectTree = new IdentityObjectTree(); diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index 26c2a46..1f0a31b 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -13,10 +13,10 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { public StrongConcurrentSet() { - this(16); + this(16, .75f); } - public StrongConcurrentSet(int size) { + public StrongConcurrentSet(int size, float lOAD_FACTOR) { super(new IdentityHashMap>(size)); } diff --git a/src/main/java/net/engio/mbassy/multi/common/SubscriptionPoolable.java b/src/main/java/net/engio/mbassy/multi/common/SubscriptionPoolable.java deleted file mode 100644 index 59e840c..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/SubscriptionPoolable.java +++ /dev/null @@ -1,14 +0,0 @@ -package net.engio.mbassy.multi.common; - -import java.util.ArrayDeque; -import java.util.Collection; - -import net.engio.mbassy.multi.subscription.Subscription; -import dorkbox.util.objectPool.PoolableObject; - -public class SubscriptionPoolable implements PoolableObject> { - @Override - public Collection create() { - return new ArrayDeque(64); - } -} diff --git a/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java b/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java index d5df961..d29b5c0 100644 --- a/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java +++ b/src/main/java/net/engio/mbassy/multi/listener/MessageListener.java @@ -1,8 +1,7 @@ package net.engio.mbassy.multi.listener; -import java.util.ArrayList; +import java.util.ArrayDeque; import java.util.Collection; -import java.util.List; /** * All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, @@ -21,7 +20,7 @@ import java.util.List; */ public class MessageListener { - private List handlers = new ArrayList(); + private Collection handlers = new ArrayDeque(); private Class listenerDefinition; public MessageListener(Class listenerDefinition) { @@ -41,7 +40,7 @@ public class MessageListener { return this.handlers.add(messageHandler); } - public List getHandlers() { + public Collection getHandlers() { return this.handlers; } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 3728da2..d04bcd3 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -3,13 +3,11 @@ package net.engio.mbassy.multi.subscription; import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.Collection; -import java.util.IdentityHashMap; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; import net.engio.mbassy.multi.common.IdentityObjectTree; import net.engio.mbassy.multi.common.ReflectionUtils; @@ -34,6 +32,8 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; public class SubscriptionManager { private static final int DEFAULT_SUPER_CLASS_TREE_SIZE = 4; + private final int MAP_STRIPING; + private float LOAD_FACTOR; // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader = new MetadataReader(); @@ -41,32 +41,58 @@ public class SubscriptionManager { // all subscriptions per message type // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, Collection> subscriptionsPerMessageSingle = new IdentityHashMap, Collection>(50); - private final IdentityObjectTree, Collection> subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); + private final Map, Collection> subscriptionsPerMessageSingle; + private final IdentityObjectTree, Collection> subscriptionsPerMessageMulti; // all subscriptions per messageHandler type // this map provides fast access for subscribing and unsubscribing // write access is synchronized and happens very infrequently // once a collection of subscriptions is stored it does not change - private final Map, Collection> subscriptionsPerListener = new IdentityHashMap, Collection>(50); + private final Map, Collection> subscriptionsPerListener; private final Object holder = new Object[0]; // remember classes that can have VarArg casting performed - private final ConcurrentHashMap, Class> varArgClasses = new ConcurrentHashMap, Class>(); + private final Map, Class> varArgClasses; - private final Map, ArrayDeque>> superClassesCache = new IdentityHashMap, ArrayDeque>>(); -// private final Map, Collection> superClassSubscriptionsPerMessageSingle = new IdentityHashMap, Collection>(50); + private final Map, Collection>> superClassesCache; + // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but REALLY improves performance on handlers + // it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one + private volatile Map, Collection> superClassSubscriptions; +// private final IdentityObjectTree, Collection> superClassSubscriptionsMulti = new IdentityObjectTree, Collection>(); // remember already processed classes that do not contain any message handlers - private final ConcurrentHashMap, Object> nonListeners = new ConcurrentHashMap, Object>(); + private final Map, Object> nonListeners; // synchronize read/write acces to the subscription maps private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock(); - public SubscriptionManager() { + public SubscriptionManager(int numberOfThreads) { + this.MAP_STRIPING = 1; + this.LOAD_FACTOR = 0.8f; + +// this.subscriptionsPerMessageSingle = new IdentityHashMap, Collection>(4); +// this.subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); +// +// // only used during SUB/UNSUB +// this.subscriptionsPerListener = new IdentityHashMap, Collection>(4); +// +// this.varArgClasses = new IdentityHashMap, Class>(8); +// this.superClassesCache = new IdentityHashMap, Collection>>(8); + + this.subscriptionsPerMessageSingle = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, this.MAP_STRIPING); + this.subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); + + // only used during SUB/UNSUB + this.subscriptionsPerListener = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, 1); + + this.varArgClasses = new ConcurrentHashMap, Class>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + this.superClassesCache = new ConcurrentHashMap, Collection>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + + this.nonListeners = new ConcurrentHashMap, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING); } public void unsubscribe(Object listener) { @@ -77,8 +103,9 @@ public class SubscriptionManager { Class listenerClass = listener.getClass(); Collection subscriptions; boolean nothingLeft = true; + Lock UPDATE = this.LOCK.updateLock(); try { - this.LOCK.updateLock().lock(); + UPDATE.lock(); subscriptions = this.subscriptionsPerListener.get(listenerClass); @@ -87,6 +114,7 @@ public class SubscriptionManager { subscription.unsubscribe(listener); boolean isEmpty = subscription.isEmpty(); + if (isEmpty) { // single or multi? Class[] handledMessageTypes = subscription.getHandledMessageTypes(); @@ -103,17 +131,12 @@ public class SubscriptionManager { if (subs.isEmpty()) { // remove element this.subscriptionsPerMessageSingle.remove(clazz); + + // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance on handlers + this.superClassSubscriptions = new ConcurrentHashMap, Collection>(8, this.LOAD_FACTOR, this.MAP_STRIPING); } } -// Collection superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz); -// if (superSubs != null) { -// superSubs.remove(subscription); -// -// if (superSubs.isEmpty()) { -// // remove element -// this.superClassSubscriptionsPerMessageSingle.remove(clazz); -// } -// } } else { // NOTE: Not thread-safe! must be synchronized in outer scope IdentityObjectTree, Collection> tree; @@ -130,14 +153,12 @@ public class SubscriptionManager { subs.remove(subscription); if (subs.isEmpty()) { - this.LOCK.writeLock().lock(); // remove tree element switch (size) { case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break; case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break; } - this.LOCK.writeLock().unlock(); } } } @@ -149,13 +170,14 @@ public class SubscriptionManager { } if (nothingLeft) { - this.LOCK.writeLock().lock(); + Lock WRITE = this.LOCK.writeLock(); + WRITE.lock(); this.subscriptionsPerListener.remove(listenerClass); - this.LOCK.writeLock().unlock(); + WRITE.unlock(); } } finally { - this.LOCK.updateLock().unlock(); + UPDATE.unlock(); } return; @@ -172,32 +194,36 @@ public class SubscriptionManager { } Collection subscriptions; + Lock UPDATE = this.LOCK.updateLock(); try { - this.LOCK.updateLock().lock(); + UPDATE.lock(); subscriptions = this.subscriptionsPerListener.get(listenerClass); + // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. + // it's a hit on SUB/UNSUB, but improves performance on handlers + this.superClassSubscriptions = new ConcurrentHashMap, Collection>(8, this.LOAD_FACTOR, this.MAP_STRIPING); + if (subscriptions != null) { // subscriptions already exist and must only be updated for (Subscription subscription : subscriptions) { subscription.subscribe(listener); } } else { - // a listener is subscribed for the first time + Lock WRITE = this.LOCK.writeLock(); try { - this.LOCK.writeLock().lock(); // upgrade updatelock to write lock, Avoid DCL + WRITE.lock(); // upgrade updatelock to write lock, Avoid DCL // a listener is subscribed for the first time - List messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); + Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found this.nonListeners.put(listenerClass, this.holder); return; } - // it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock - subscriptions = new StrongConcurrentSet(messageHandlers.size()); + subscriptions = new StrongConcurrentSet(8, this.LOAD_FACTOR); - // create subscriptions for all detected message handlers + // create NEW subscriptions for all detected message handlers for (MessageHandler messageHandler : messageHandlers) { // create the subscription Subscription subscription = new Subscription(messageHandler); @@ -206,40 +232,32 @@ public class SubscriptionManager { // single or multi? Class[] handledMessageTypes = subscription.getHandledMessageTypes(); int size = handledMessageTypes.length; + boolean acceptsSubtypes = subscription.acceptsSubtypes(); + if (size == 1) { // single Class clazz = handledMessageTypes[0]; Collection subs = this.subscriptionsPerMessageSingle.get(clazz); -// Collection superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz); if (subs == null) { // NOTE: Order is important for safe publication - subs = new StrongConcurrentSet(2); + subs = new StrongConcurrentSet(8, this.LOAD_FACTOR); subs.add(subscription); this.subscriptionsPerMessageSingle.put(clazz, subs); -// if (subscription.acceptsSubtypes()) { -// superSubs = new StrongConcurrentSet(2); -// superSubs.add(subscription); -// this.superClassSubscriptionsPerMessageSingle.put(clazz, superSubs); -// } } else { subs.add(subscription); - -// if (subscription.acceptsSubtypes()) { -// superSubs.add(subscription); -// } } // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive if (subscription.isVarArg()) { Class componentType = clazz.getComponentType(); - this.varArgClasses.putIfAbsent(componentType, clazz); + this.varArgClasses.put(componentType, clazz); // since it's vararg, this means that it's an ARRAY, so we ALSO // have to add the component classes of the array - if (subscription.acceptsSubtypes()) { - ArrayDeque> superClasses = setupSuperClassCache(componentType); + if (acceptsSubtypes) { + Collection> superClasses = setupSuperClassCache(componentType); // have to setup each vararg chain for (Class superClass : superClasses) { @@ -250,7 +268,7 @@ public class SubscriptionManager { } } } - } else if (subscription.acceptsSubtypes()) { + } else if (acceptsSubtypes) { setupSuperClassCache(clazz); } } @@ -261,7 +279,7 @@ public class SubscriptionManager { switch (size) { case 2: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); - if (subscription.acceptsSubtypes()) { + if (acceptsSubtypes) { setupSuperClassCache(handledMessageTypes[0]); setupSuperClassCache(handledMessageTypes[1]); } @@ -269,7 +287,7 @@ public class SubscriptionManager { } case 3: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); - if (subscription.acceptsSubtypes()) { + if (acceptsSubtypes) { setupSuperClassCache(handledMessageTypes[0]); setupSuperClassCache(handledMessageTypes[1]); setupSuperClassCache(handledMessageTypes[2]); @@ -278,7 +296,7 @@ public class SubscriptionManager { } default: { tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); - if (subscription.acceptsSubtypes()) { + if (acceptsSubtypes) { for (Class c : handledMessageTypes) { setupSuperClassCache(c); } @@ -289,7 +307,7 @@ public class SubscriptionManager { Collection subs = tree.getValue(); if (subs == null) { - subs = new LinkedList(); + subs = new StrongConcurrentSet(16, this.LOAD_FACTOR); tree.putValue(subs); } subs.add(subscription); @@ -300,11 +318,11 @@ public class SubscriptionManager { this.subscriptionsPerListener.put(listenerClass, subscriptions); } finally { - this.LOCK.writeLock().unlock(); + WRITE.unlock(); } } } finally { - this.LOCK.updateLock().unlock(); + UPDATE.unlock(); } } @@ -337,22 +355,34 @@ public class SubscriptionManager { // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. public Collection getSuperSubscriptions(Class superType) { - Collection> types = this.superClassesCache.get(superType); - if (types == null || types.isEmpty()) { + Map, Collection> superClassSubs = this.superClassSubscriptions; + if (superClassSubs == null) { + // we haven't created it yet return null; } - Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); + Collection subsPerType = superClassSubs.get(superType); - for (Class superClass : types) { - Collection subs = this.subscriptionsPerMessageSingle.get(superClass); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.acceptsSubtypes()) { - subsPerType.add(sub); + if (subsPerType == null) { + Collection> types = this.superClassesCache.get(superType); + if (types == null || types.isEmpty()) { + return null; + } + + subsPerType = new StrongConcurrentSet(16, this.LOAD_FACTOR); + + for (Class superClass : types) { + Collection subs = this.subscriptionsPerMessageSingle.get(superClass); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.acceptsSubtypes()) { + subsPerType.add(sub); + } } } } + + superClassSubs.put(superType, subsPerType); } return subsPerType; @@ -361,6 +391,9 @@ public class SubscriptionManager { // must be protected by read lock // ALSO checks to see if the superClass accepts subtypes. public Collection getSuperSubscriptions(Class superType1, Class superType2) { +// Collection subsPerType2 = this.superClassSubscriptions.get(); + + // not thread safe. DO NOT MODIFY Collection> types1 = this.superClassesCache.get(superType1); Collection> types2 = this.superClassesCache.get(superType2); @@ -475,42 +508,21 @@ public class SubscriptionManager { return subsPerType; } - // not a thread safe collection, but it doesn't matter - private ArrayDeque> setupSuperClassCache(Class clazz) { - ArrayDeque> types = this.superClassesCache.get(clazz); + private Collection> setupSuperClassCache(Class clazz) { + Collection> types = this.superClassesCache.get(clazz); if (types == null) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); types = new ArrayDeque>(superTypes); - // NOTE: no need to write lock, since race conditions will result in duplicate answers (which we don't care about) + + // race conditions will result in duplicate answers, which we don't care about this.superClassesCache.put(clazz, types); } return types; } - - - /////////////// - // a var-arg handler might match - /////////////// - private void addVarArgClass(Collection subscriptions, Class messageType) { - // tricky part. We have to check the ARRAY version - Collection subs; - - Class varArgClass = this.varArgClasses.get(messageType); - if (varArgClass != null) { - // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } - } - } - } - // must be protected by read lock public Collection getVarArgs(Class clazz) { Class varArgClass = this.varArgClasses.get(clazz); @@ -554,81 +566,6 @@ public class SubscriptionManager { return null; } - - - - - - - /////////////// - // a var-arg handler might match - // tricky part. We have to check the ARRAY version - /////////////// - private void addVarArgClasses(Collection subscriptions, Class messageType, ArrayDeque> types1) { - Collection subs; - - Class varArgClass = this.varArgClasses.get(messageType); - if (varArgClass != null) { - // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } - } - } - - for (Class eventSuperType : types1) { - varArgClass = this.varArgClasses.get(eventSuperType); - if (varArgClass != null) { - // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); - } - } - } - } - } - - private void getSubsVarArg(Collection subscriptions, int length, int index, - IdentityObjectTree, Collection> tree, Class[] messageTypes) { - - Class classType = messageTypes[index]; - // get all the super types, if there are any. - ArrayDeque> superClasses = setupSuperClassCache(classType); - - IdentityObjectTree, Collection> leaf; - Collection subs; - - Class superClass = classType; - int i; - int newIndex; - -// for (i = -1; i < superClasses.size(); i++) { -// if (i > -1) { -// superClass = superClasses.get(i); -// } -// leaf = tree.getLeaf(superClass); -// if (leaf != null) { -// newIndex = index+1; -// if (index == length) { -// subs = leaf.getValue(); -// if (subs != null) { -// for (Subscription sub : subs) { -// if (sub.handlesMessageType(messageTypes)) { -// subscriptions.add(sub); -// } -// } -// } -// } else { -// getSubsVarArg(subscriptions, length, newIndex, leaf, messageTypes); -// } -// } -// } - } - public void readLock() { this.LOCK.readLock().lock(); } diff --git a/src/test/java/net/engio/mbassy/multi/SubscriptionManagerTest.java b/src/test/java/net/engio/mbassy/multi/SubscriptionManagerTest.java index 0399178..7bf6939 100644 --- a/src/test/java/net/engio/mbassy/multi/SubscriptionManagerTest.java +++ b/src/test/java/net/engio/mbassy/multi/SubscriptionManagerTest.java @@ -162,7 +162,7 @@ public class SubscriptionManagerTest extends AssertSupport { Overloading.ListenerBase.class, Overloading.ListenerSub.class); - SubscriptionManager subscriptionManager = new SubscriptionManager(); + SubscriptionManager subscriptionManager = new SubscriptionManager(ConcurrentUnits); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits); SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners) @@ -181,7 +181,7 @@ public class SubscriptionManagerTest extends AssertSupport { } private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){ - final SubscriptionManager subscriptionManager = new SubscriptionManager(); + final SubscriptionManager subscriptionManager = new SubscriptionManager(ConcurrentUnits); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);