diff --git a/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java b/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java index 84bdc73..402f49a 100644 --- a/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java +++ b/src/main/java/dorkbox/util/messagebus/common/HashMapTree.java @@ -1,11 +1,8 @@ package dorkbox.util.messagebus.common; -import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; - +import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; /** @@ -17,14 +14,13 @@ import java.util.concurrent.locks.Lock; * Date: 2/2/15 */ public class HashMapTree { - private final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); + private Map> children; + private volatile VALUE value; - private ConcurrentMap> children; // protected by read/write lock - private volatile VALUE value; // protected by read/write lock private final int defaultSize; private final float loadFactor; - public HashMapTree(int defaultSize, float loadFactor) { + public HashMapTree(final int defaultSize, final float loadFactor) { this.defaultSize = defaultSize; this.loadFactor = loadFactor; } @@ -33,30 +29,26 @@ public class HashMapTree { /** * can be overridden to provide a custom backing map */ - protected ConcurrentMap> createChildren(int defaultSize, float loadFactor) { + protected Map> createChildren(int defaultSize, float loadFactor) { return new ConcurrentHashMapV8>(defaultSize, loadFactor, 1); } - public VALUE getValue() { - VALUE returnValue = this.value; - return returnValue; + public final VALUE getValue() { + return this.value; } - public void putValue(VALUE value) { + public final void putValue(VALUE value) { this.value = value; } - public void removeValue() { + public final void removeValue() { this.value = null; } - public void clear() { - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - + public final void clear() { if (this.children != null) { Set>> entrySet = this.children.entrySet(); for (Entry> entry : entrySet) { @@ -66,8 +58,6 @@ public class HashMapTree { this.children.clear(); this.value = null; } - - WRITE.unlock(); } @@ -76,7 +66,7 @@ public class HashMapTree { *

* Removes a branch from the tree, and cleans up, if necessary */ - public void remove(KEY key) { + public final void remove(KEY key) { if (key != null) { removeLeaf(key); } @@ -88,35 +78,24 @@ public class HashMapTree { *

* Removes a branch from the tree, and cleans up, if necessary */ - public void remove(KEY key1, KEY key2) { + public final void remove(KEY key1, KEY key2) { if (key1 == null || key2 == null) { return; } - Lock UPDATE = this.lock.updateLock(); - UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks - - HashMapTree leaf = null; + HashMapTree leaf; if (this.children != null) { leaf = this.children.get(key1); if (leaf != null) { - // promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - leaf.removeLeaf(key2); this.children.remove(key1); if (this.children.isEmpty()) { this.children = null; } - - WRITE.unlock(); } } - - UPDATE.unlock(); } /** @@ -124,35 +103,24 @@ public class HashMapTree { *

* Removes a branch from the tree, and cleans up, if necessary */ - public void remove(KEY key1, KEY key2, KEY key3) { + public final void remove(KEY key1, KEY key2, KEY key3) { if (key1 == null || key2 == null) { return; } - Lock UPDATE = this.lock.updateLock(); - UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks - - HashMapTree leaf = null; + HashMapTree leaf; if (this.children != null) { leaf = this.children.get(key1); if (leaf != null) { - // promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - leaf.remove(key2, key3); this.children.remove(key1); if (this.children.isEmpty()) { this.children = null; } - - WRITE.unlock(); } } - - UPDATE.unlock(); } @@ -162,7 +130,7 @@ public class HashMapTree { * Removes a branch from the tree, and cleans up, if necessary */ @SuppressWarnings("unchecked") - public void remove(KEY... keys) { + public final void remove(KEY... keys) { if (keys == null) { return; } @@ -174,11 +142,8 @@ public class HashMapTree { /** * Removes a branch from the tree, and cleans up, if necessary */ - private final void removeLeaf(KEY key) { + private void removeLeaf(KEY key) { if (key != null) { - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - if (this.children != null) { HashMapTree leaf = this.children.get(key); @@ -195,16 +160,11 @@ public class HashMapTree { } } } - - WRITE.unlock(); } } // keys CANNOT be null here! - private final void removeLeaf(int index, KEY[] keys) { - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - + private void removeLeaf(int index, KEY[] keys) { if (index == keys.length) { // we have reached the leaf to remove! this.value = null; @@ -223,249 +183,89 @@ public class HashMapTree { } } } - - WRITE.unlock(); } - public VALUE put(VALUE value, KEY key) { + public final VALUE put(VALUE value, KEY key) { if (key == null) { throw new NullPointerException("keys"); } - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - // have to put value into our children - HashMapTree leaf = createLeaf_NL(key); + HashMapTree leaf = createLeaf(key); VALUE prev = leaf.value; leaf.value = value; - WRITE.unlock(); - return prev; } - public VALUE putIfAbsent(VALUE value, KEY key) { - if (key == null) { - throw new NullPointerException("keys"); - } - - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - - // have to put value into our children - HashMapTree leaf = createLeaf_NL(key); - - VALUE prev = leaf.value; - if (prev == null) { - leaf.value = value; - } - - WRITE.unlock(); - - return prev; - } - - public VALUE put(VALUE value, KEY key1, KEY key2) { + public final VALUE put(VALUE value, KEY key1, KEY key2) { if (key1 == null || key2 == null) { throw new NullPointerException("keys"); } - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - // have to put value into our children - HashMapTree leaf = createLeaf_NL(key1); - Lock WRITE2 = leaf.lock.writeLock(); - WRITE2.lock(); - leaf = leaf.createLeaf_NL(key2); - WRITE2.unlock(); + HashMapTree leaf = createLeaf(key1); + leaf = leaf.createLeaf(key2); VALUE prev = leaf.value; leaf.value = value; - WRITE.unlock(); - return prev; } - public VALUE putIfAbsent(VALUE value, KEY key1, KEY key2) { - if (key1 == null || key2 == null) { - throw new NullPointerException("keys"); - } - - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - - // have to put value into our children - HashMapTree leaf = createLeaf_NL(key1); - Lock WRITE2 = leaf.lock.writeLock(); - WRITE2.lock(); - leaf = leaf.createLeaf_NL(key2); - WRITE2.unlock(); - - VALUE prev = leaf.value; - if (prev == null) { - leaf.value = value; - } - - WRITE.unlock(); - - return prev; - } - - public VALUE put(VALUE value, KEY key1, KEY key2, KEY key3) { + public final VALUE put(VALUE value, KEY key1, KEY key2, KEY key3) { if (key1 == null || key2 == null || key3 == null) { throw new NullPointerException("keys"); } - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - // have to put value into our children - HashMapTree leaf = createLeaf_NL(key1); - Lock WRITE2 = leaf.lock.writeLock(); - WRITE2.lock(); - leaf = leaf.createLeaf_NL(key2); - Lock WRITE3 = leaf.lock.writeLock(); - WRITE3.lock(); - leaf = leaf.createLeaf_NL(key3); - WRITE3.unlock(); - WRITE2.unlock(); + HashMapTree leaf = createLeaf(key1); + leaf = leaf.createLeaf(key2); + leaf = leaf.createLeaf(key3); VALUE prev = leaf.value; leaf.value = value; - WRITE.unlock(); - return prev; } - public VALUE putIfAbsent(VALUE value, KEY key1, KEY key2, KEY key3) { - if (key1 == null || key2 == null || key3 == null) { - throw new NullPointerException("keys"); - } - - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - - // have to put value into our children - HashMapTree leaf = createLeaf_NL(key1); - Lock WRITE2 = leaf.lock.writeLock(); - WRITE2.lock(); - leaf = leaf.createLeaf_NL(key2); - Lock WRITE3 = leaf.lock.writeLock(); - WRITE3.lock(); - leaf = leaf.createLeaf_NL(key3); - WRITE3.unlock(); - WRITE2.unlock(); - - VALUE prev = leaf.value; - if (prev == null) { - leaf.value = value; - } - - WRITE.unlock(); - - return prev; - } - - @SuppressWarnings("unchecked") - public VALUE put(VALUE value, KEY... keys) { + public final VALUE put(VALUE value, KEY... keys) { if (keys == null) { throw new NullPointerException("keys"); } int length = keys.length; - Lock[] locks = new Lock[length]; - - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers // have to put value into our children - HashMapTree leaf = createLeaf_NL(keys[0]); + HashMapTree leaf = createLeaf(keys[0]); for (int i=1;i0;i--) { - locks[i].unlock(); + leaf = leaf.createLeaf(keys[i]); } VALUE prev = leaf.value; leaf.value = value; - WRITE.unlock(); - return prev; } - @SuppressWarnings("unchecked") - public VALUE putIfAbsent(VALUE value, KEY... keys) { - if (keys == null) { - throw new NullPointerException("keys"); - } - - int length = keys.length; - Lock[] locks = new Lock[length]; - - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers - - // have to put value into our children - HashMapTree leaf = createLeaf_NL(keys[0]); - for (int i=1;i0;i--) { - locks[i].unlock(); - } - - VALUE prev = leaf.value; - if (prev == null) { - leaf.value = value; - } - - WRITE.unlock(); - - return prev; - } - - @SuppressWarnings("unchecked") - public HashMapTree createLeaf(KEY... keys) { + public final HashMapTree createLeaf(KEY... keys) { if (keys == null) { return this; } - int length = keys.length; - Lock[] locks = new Lock[length]; - Lock WRITE = this.lock.writeLock(); - WRITE.lock(); // upgrade to the write lock, at this point blocks other readers + int length = keys.length; // have to put value into our children - HashMapTree leaf = createLeaf_NL(keys[0]); + HashMapTree leaf = createLeaf(keys[0]); for (int i=1;i0;i--) { - locks[i].unlock(); - } - - WRITE.unlock(); - return leaf; } - private final HashMapTree createLeaf_NL(KEY key) { + private HashMapTree createLeaf(KEY key) { if (key == null) { return null; } @@ -497,105 +297,76 @@ public class HashMapTree { ///////////////////////////////////////// ///////////////////////////////////////// - public VALUE get(KEY key) { + public final VALUE get(KEY key) { if (key == null) { return null; } - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks - - HashMapTree objectTree = null; + HashMapTree objectTree; // publish value from our children - objectTree = getLeaf_NL(key); // protected by lock + objectTree = getLeaf(key); // protected by lock if (objectTree == null) { - READ.unlock(); return null; } - VALUE returnValue = objectTree.value; - - READ.unlock(); - return returnValue; + return objectTree.value; } - public VALUE get(KEY key1, KEY key2) { - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks - - HashMapTree tree = null; + public final VALUE get(KEY key1, KEY key2) { + HashMapTree tree; // publish value from our children - tree = getLeaf_NL(key1); // protected by lock + tree = getLeaf(key1); // protected by lock if (tree != null) { - tree = tree.getLeaf_NL(key2); // protected by lock + tree = tree.getLeaf(key2); // protected by lock } if (tree == null) { - READ.unlock(); return null; } - VALUE returnValue = tree.value; - - READ.unlock(); - return returnValue; + return tree.value; } - public VALUE getValue(KEY key1, KEY key2, KEY key3) { - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks - - HashMapTree tree = null; + public final VALUE getValue(KEY key1, KEY key2, KEY key3) { + HashMapTree tree; // publish value from our children - tree = getLeaf_NL(key1); + tree = getLeaf(key1); if (tree != null) { - tree = tree.getLeaf_NL(key2); + tree = tree.getLeaf(key2); } if (tree != null) { - tree = tree.getLeaf_NL(key3); + tree = tree.getLeaf(key3); } if (tree == null) { - READ.unlock(); return null; } - VALUE returnValue = tree.value; - - READ.unlock(); - return returnValue; + return tree.value; } @SuppressWarnings("unchecked") - public VALUE get(KEY... keys) { - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks + public final VALUE get(KEY... keys) { + HashMapTree tree; - HashMapTree tree = null; // publish value from our children - tree = getLeaf_NL(keys[0]); + tree = getLeaf(keys[0]); int size = keys.length; for (int i=1;i getLeaf(KEY key) { @@ -605,54 +376,39 @@ public class HashMapTree { HashMapTree tree; - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks - if (this.children == null) { tree = null; } else { tree = this.children.get(key); } - READ.unlock(); - return tree; } public final HashMapTree getLeaf(KEY key1, KEY key2) { - HashMapTree tree = null; - - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks + HashMapTree tree; // publish value from our children - tree = getLeaf_NL(key1); + tree = getLeaf(key1); if (tree != null) { - tree = tree.getLeaf_NL(key2); + tree = tree.getLeaf(key2); } - READ.unlock(); - return tree; } public final HashMapTree getLeaf(KEY key1, KEY key2, KEY key3) { - HashMapTree tree = null; - - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks + HashMapTree tree; // publish value from our children - tree = getLeaf_NL(key1); + tree = getLeaf(key1); if (tree != null) { - tree = tree.getLeaf_NL(key2); + tree = tree.getLeaf(key2); } if (tree != null) { - tree = tree.getLeaf_NL(key3); + tree = tree.getLeaf(key3); } - READ.unlock(); - return tree; } @@ -664,36 +420,18 @@ public class HashMapTree { return null; } - Lock READ = this.lock.readLock(); - READ.lock(); // allows other readers, blocks others from acquiring update or write locks - - HashMapTree tree = null; + HashMapTree tree; // publish value from our children - tree = getLeaf_NL(keys[0]); + tree = getLeaf(keys[0]); for (int i=1;i getLeaf_NL(KEY key) { - HashMapTree tree; - - if (this.children == null) { - tree = null; - } else { - tree = this.children.get(key); - } - return tree; } } diff --git a/src/main/java/dorkbox/util/messagebus/dispatch/IHandlerInvocation.java b/src/main/java/dorkbox/util/messagebus/dispatch/IHandlerInvocation.java index 283a61e..0fb0588 100644 --- a/src/main/java/dorkbox/util/messagebus/dispatch/IHandlerInvocation.java +++ b/src/main/java/dorkbox/util/messagebus/dispatch/IHandlerInvocation.java @@ -61,5 +61,5 @@ public interface IHandlerInvocation { * type that the handler consumes * @param handler The handler (method) that will be called via reflection */ - void invoke(Object listener, MethodAccess handler, int methodIndex, Object... message) throws Throwable; + void invoke(Object listener, MethodAccess handler, int methodIndex, Object... messages) throws Throwable; } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index dd27e75..90bad21 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -7,8 +7,6 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import org.omg.CORBA.BooleanHolder; import java.util.ArrayList; import java.util.Collection; @@ -43,9 +41,9 @@ public class Subscription { private final IHandlerInvocation invocation; private final Collection listeners; - public Subscription(MessageHandler handler) { + public Subscription(final MessageHandler handler, final float loadFactor, final int stripeSize) { this.handlerMetadata = handler; - this.listeners = new StrongConcurrentSetV8(16, 0.85F, 15); + this.listeners = new StrongConcurrentSetV8(16, loadFactor, stripeSize); // this.listeners = new StrongConcurrentSet(16, 0.85F); // this.listeners = new ConcurrentLinkedQueue2(); // this.listeners = new CopyOnWriteArrayList(); @@ -62,7 +60,7 @@ public class Subscription { return handlerMetadata; } - public Class[] getHandledMessageTypes() { + public final Class[] getHandledMessageTypes() { return this.handlerMetadata.getHandledMessages(); } @@ -90,13 +88,10 @@ public class Subscription { } // only used in unit-test - public int size() { + public final int size() { return this.listeners.size(); } - /** - * @return true if there were listeners for this publication, false if there was nothing - */ public final void publish(final Object message) throws Throwable { final MethodAccess handler = this.handlerMetadata.getHandler(); final int handleIndex = this.handlerMetadata.getMethodIndex(); @@ -112,126 +107,59 @@ public class Subscription { } } - /** - * @return true if there were listeners for this publication, false if there was nothing - */ - public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2) { -// StrongConcurrentSet listeners = this.listeners; -// -// if (!listeners.isEmpty()) { -// MethodAccess handler = this.handlerMetadata.getHandler(); -// int handleIndex = this.handlerMetadata.getMethodIndex(); -// IHandlerInvocation invocation = this.invocation; -// -// -// ISetEntry current = listeners.head; -// Object listener; -// while (current != null) { -// listener = current.getValue(); -// current = current.next(); -// -// try { -// invocation.invoke(listener, handler, handleIndex, message1, message2); -// } catch (IllegalAccessException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The class or method is not accessible") -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2)); -// } catch (IllegalArgumentException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Wrong arguments passed to method. Was: " + -// message1.getClass() + ", " + -// message2.getClass() -// + ". Expected: " + handler.getParameterTypes()[0] + ", " + -// handler.getParameterTypes()[1] -// ) -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2)); -// } catch (Throwable e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The Message handler code threw an exception") -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2)); -// } -// } -// booleanHolder.bool = true; -// } + public final void publishToSubscription(final Object message1, final Object message2) throws Throwable { + final MethodAccess handler = this.handlerMetadata.getHandler(); + final int handleIndex = this.handlerMetadata.getMethodIndex(); + final IHandlerInvocation invocation = this.invocation; + + Iterator iterator; + Object listener; + + for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { + listener = iterator.next(); + + invocation.invoke(listener, handler, handleIndex, message1, message2); + } } - /** - * @return true if there were listeners for this publication, false if there was nothing - */ - public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2, Object message3) { -// StrongConcurrentSet listeners = this.listeners; -// -// if (!listeners.isEmpty()) { -// MethodAccess handler = this.handlerMetadata.getHandler(); -// int handleIndex = this.handlerMetadata.getMethodIndex(); -// IHandlerInvocation invocation = this.invocation; -// -// -// ISetEntry current = listeners.head; -// Object listener; -// while (current != null) { -// listener = current.getValue(); -// current = current.next(); -// -// try { -// invocation.invoke(listener, handler, handleIndex, message1, message2, message3); -// } catch (IllegalAccessException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The class or method is not accessible") -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); -// } catch (IllegalArgumentException e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Wrong arguments passed to method. Was: " + -// message1.getClass() + ", " + -// message2.getClass() + ", " + -// message3.getClass() -// + ". Expected: " + handler.getParameterTypes()[0] + ", " + -// handler.getParameterTypes()[1] + ", " + -// handler.getParameterTypes()[2] -// ) -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); -// } catch (Throwable e) { -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The Message handler code threw an exception") -// .setCause(e) -// .setMethodName(handler.getMethodNames()[handleIndex]) -// .setListener(listener) -// .setPublishedObject(message1, message2, message3)); -// } -// } -// booleanHolder.bool = true; -// } + public final void publishToSubscription(final Object message1, final Object message2, final Object message3) throws Throwable { + final MethodAccess handler = this.handlerMetadata.getHandler(); + final int handleIndex = this.handlerMetadata.getMethodIndex(); + final IHandlerInvocation invocation = this.invocation; + + Iterator iterator; + Object listener; + + for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { + listener = iterator.next(); + + invocation.invoke(listener, handler, handleIndex, message1, message2, message3); + } + } + + public final void publishToSubscription(final Object... messages) throws Throwable { + final MethodAccess handler = this.handlerMetadata.getHandler(); + final int handleIndex = this.handlerMetadata.getMethodIndex(); + final IHandlerInvocation invocation = this.invocation; + + Iterator iterator; + Object listener; + + for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { + listener = iterator.next(); + + invocation.invoke(listener, handler, handleIndex, messages); + } } @Override - public int hashCode() { + public final int hashCode() { return this.ID; } @Override - public boolean equals(Object obj) { + public final boolean equals(Object obj) { if (this == obj) { return true; } @@ -246,17 +174,15 @@ public class Subscription { } // inside a write lock - // also puts it into the correct map if it's not already there - public Collection createPublicationSubscriptions(final Map, ArrayList> subsPerMessageSingle, - final HashMapTree, ArrayList> subsPerMessageMulti, - AtomicBoolean varArgPossibility, SubscriptionUtils utils) { + // add this subscription to each of the handled types + // to activate this sub for publication + public void registerForPublication(final Map, ArrayList> subsPerMessageSingle, + final HashMapTree, ArrayList> subsPerMessageMulti, + final AtomicBoolean varArgPossibility, final SubscriptionUtils utils) { final Class[] messageHandlerTypes = handlerMetadata.getHandledMessages(); final int size = messageHandlerTypes.length; -// ConcurrentSet subsPerType; - -// SubscriptionUtils utils = this.utils; Class type0 = messageHandlerTypes[0]; switch (size) { @@ -269,73 +195,45 @@ public class Subscription { if (isArray) { varArgPossibility.lazySet(true); } - utils.cacheSuperClasses(type0); subsPerMessageSingle.put(type0, subs); } - return subs; + subs.add(this); + return; } case 2: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time -// SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.publish(); -// -// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]); -// if (putIfAbsent != null) { -// return putIfAbsent; -// } else { -// subHolderSingle.set(subHolderSingle.initialValue()); -// -// // cache the super classes -// utils.cacheSuperClasses(type0); -// utils.cacheSuperClasses(types[1]); -// -// return subsPerType; -// } + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]); + } + + subs.add(this); + return; } case 3: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time -// SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.publish(); -// -// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]); -// if (putIfAbsent != null) { -// return putIfAbsent; -// } else { -// subHolderSingle.set(subHolderSingle.initialValue()); -// -// // cache the super classes -// utils.cacheSuperClasses(type0); -// utils.cacheSuperClasses(types[1]); -// utils.cacheSuperClasses(types[2]); -// -// return subsPerType; -// } + ArrayList subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]); + } + + subs.add(this); + return; } default: { - // the HashMapTree uses read/write locks, so it is only accessible one thread at a time -// SubscriptionHolder subHolderSingle = this.subHolderSingle; -// subsPerType = subHolderSingle.publish(); -// -// Collection putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); -// if (putIfAbsent != null) { -// return putIfAbsent; -// } else { -// subHolderSingle.set(subHolderSingle.initialValue()); -// -// Class c; -// int length = types.length; -// for (int i = 0; i < length; i++) { -// c = types[i]; -// -// // cache the super classes -// utils.cacheSuperClasses(c); -// } -// -// return subsPerType; -// } - return null; + ArrayList subs = subsPerMessageMulti.get(messageHandlerTypes); + if (subs == null) { + subs = new ArrayList(); + + subsPerMessageMulti.put(subs, messageHandlerTypes); + } + + subs.add(this); + return; } } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java index a59f54b..d653404 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -54,28 +54,29 @@ public final class SubscriptionManager { private final VarArgUtils varArgUtils; private final StampedLock lock = new StampedLock(); + private final int numberOfThreads; public SubscriptionManager(int numberOfThreads) { - float loadFactor = SubscriptionManager.LOAD_FACTOR; + this.numberOfThreads = numberOfThreads; // modified ONLY during SUB/UNSUB { - this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, loadFactor, numberOfThreads); + this.nonListeners = new ConcurrentHashMapV8, Boolean>(4, LOAD_FACTOR, numberOfThreads); this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8, ArrayList>(32, LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, loadFactor); + this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, LOAD_FACTOR); // only used during SUB/UNSUB this.subscriptionsPerListener = new ConcurrentHashMapV8, Subscription[]>(32, LOAD_FACTOR, 1); } - final SuperClassUtils superClass = new SuperClassUtils(loadFactor, 1); - this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, + final SuperClassUtils superClass = new SuperClassUtils(LOAD_FACTOR, 1); + this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, LOAD_FACTOR, numberOfThreads); // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers - this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads); + this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads); } public final void shutdown() { @@ -126,7 +127,6 @@ public final class SubscriptionManager { final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; final Subscription[] subsPerListener = new Subscription[handlersSize]; - Collection subsForPublication; // create the subscription MessageHandler messageHandler; @@ -136,12 +136,15 @@ public final class SubscriptionManager { messageHandler = messageHandlers[i]; // create the subscription - subscription = new Subscription(messageHandler); + subscription = new Subscription(messageHandler, LOAD_FACTOR, numberOfThreads); subscription.subscribe(listener); subsPerListener[i] = subscription; // activates this sub for sub/unsub } + final ConcurrentMap, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; + final AtomicBoolean varArgPossibility = this.varArgPossibility; + final SubscriptionUtils utils = this.utils; // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because // of the huge number of reads compared to writes. @@ -149,23 +152,16 @@ public final class SubscriptionManager { final StampedLock lock = this.lock; final long stamp = lock.writeLock(); - final ConcurrentMap, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; subscriptions = subsPerListenerMap.get(listenerClass); // it was still null, so we actually have to create the rest of the subs if (subscriptions == null) { - final AtomicBoolean varArgPossibility = this.varArgPossibility; - final SubscriptionUtils utils = this.utils; - for (int i = 0; i < handlersSize; i++) { subscription = subsPerListener[i]; // now add this subscription to each of the handled types - subsForPublication = subscription - .createPublicationSubscriptions(subsPerMessageSingle, subsPerMessageMulti, varArgPossibility, utils); - - //noinspection ConstantConditions - subsForPublication.add(subscription); // activates this sub for publication + // to activate this sub for publication + subscription.registerForPublication(subsPerMessageSingle, subsPerMessageMulti, varArgPossibility, utils); } subsPerListenerMap.put(listenerClass, subsPerListener); @@ -182,7 +178,6 @@ public final class SubscriptionManager { // subscriptions already exist and must only be updated // only publish here if our single-check was OK, or our double-check was OK Subscription subscription; - for (int i = 0; i < subscriptions.length; i++) { subscription = subscriptions[i]; subscription.subscribe(listener); diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java index e6530c6..6bb57da 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_Collections.java @@ -1,16 +1,5 @@ package dorkbox.util.messagebus; -import java.lang.ref.WeakReference; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedTransferQueue; - import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.MessageHandler; @@ -20,6 +9,11 @@ import dorkbox.util.messagebus.common.thread.ConcurrentLinkedQueue2; import dorkbox.util.messagebus.common.thread.ConcurrentSet; import dorkbox.util.messagebus.subscription.Subscription; +import java.lang.ref.WeakReference; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedTransferQueue; + public class PerfTest_Collections { public static final int REPETITIONS = 10 * 1000 * 100; @@ -75,7 +69,7 @@ public class PerfTest_Collections { for (int i=0;i