WIP var-arg/super-var-arg implementation

This commit is contained in:
nathan 2015-06-03 14:47:12 +02:00
parent 67639ffb48
commit 30617a2b57
18 changed files with 257 additions and 280 deletions

View File

@ -14,7 +14,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
* <p/> * <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them. * Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus generally expects message handlers to be stateless as it may invoke them concurrently if multiple * Hence, the bus generally expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages getSubscriptions published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked * messages publish published asynchronously. If handlers are stateful and not thread-safe they can be marked to be invoked
* in a synchronized fashion using @Synchronized annotation * in a synchronized fashion using @Synchronized annotation
* *
* <p/> * <p/>
@ -25,7 +25,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
* <p/> * <p/>
* By default, the bus uses weak references to all listeners such that registered listeners do not need to * By default, the bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are * be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages getSubscriptions dispatched. This can be changed using the @Listener annotation. * removed on-the-fly as messages publish dispatched. This can be changed using the @Listener annotation.
* *
* <p/> * <p/>
* Generally message handlers will be invoked in inverse sequence of subscription but any * Generally message handlers will be invoked in inverse sequence of subscription but any
@ -50,7 +50,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
* *
* <p/> * <p/>
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will * NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* getSubscriptions dispatched to all message handlers that take an instance of List as their parameter * publish dispatched to all message handlers that take an instance of List as their parameter
* *
* @Author bennidi * @Author bennidi
* Date: 2/8/12 * Date: 2/8/12

View File

@ -73,21 +73,20 @@ public class MultiMBassador implements IMessageBus {
if (forceExactMatches) { if (forceExactMatches) {
subscriptionMatcher = new Matcher() { subscriptionMatcher = new Matcher() {
@Override @Override
public Subscription[] getSubscriptions(Class<?> messageClass) { public boolean publish(Object message) throws Throwable {
return subscriptionManager.getSubscriptionsForcedExact(messageClass); return subscriptionManager.publishExact(message);
} }
}; };
} }
else { else {
subscriptionMatcher = new Matcher() { subscriptionMatcher = new Matcher() {
@Override @Override
public Subscription[] getSubscriptions(Class<?> messageClass) { public boolean publish(Object message) throws Throwable {
return subscriptionManager.getSubscriptions(messageClass); return subscriptionManager.publish(message);
} }
}; };
} }
this.threads = new ArrayDeque<Thread>(numberOfThreads); this.threads = new ArrayDeque<Thread>(numberOfThreads);
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus"); NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
@ -101,6 +100,7 @@ public class MultiMBassador implements IMessageBus {
MultiNode node = new MultiNode(); MultiNode node = new MultiNode();
while (!MultiMBassador.this.shuttingDown) { while (!MultiMBassador.this.shuttingDown) {
try { try {
//noinspection InfiniteLoopStatement
while (true) { while (true) {
IN_QUEUE.take(node); IN_QUEUE.take(node);
switch (node.messageType) { switch (node.messageType) {
@ -112,7 +112,6 @@ public class MultiMBassador implements IMessageBus {
continue; continue;
case 3: case 3:
publish(node.item1, node.item2, node.item3); publish(node.item1, node.item2, node.item3);
continue;
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -120,22 +119,21 @@ public class MultiMBassador implements IMessageBus {
switch (node.messageType) { switch (node.messageType) {
case 1: { case 1: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1)); .setCause(e).setPublishedObject(node.item1));
continue; continue;
} }
case 2: { case 2: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1, node.item2)); .setCause(e).setPublishedObject(node.item1, node.item2));
continue; continue;
} }
case 3: { case 3: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e) .setCause(e)
.setPublishedObject(node.item1, node.item2, node.item3)); .setPublishedObject(node.item1, node.item2, node.item3));
continue;
} }
} }
} }
@ -202,97 +200,19 @@ public class MultiMBassador implements IMessageBus {
return this.dispatchQueue.hasPendingMessages(); return this.dispatchQueue.hasPendingMessages();
} }
@Override @Override
public void publish(final Object message) { public void publish(final Object message) {
try { try {
boolean subsPublished = false; boolean subsPublished = subscriptionMatcher.publish(message);
final SubscriptionManager manager = this.subscriptionManager;
final Class<?> messageClass = message.getClass();
Subscription[] subscriptions; if (!subsPublished) {
Subscription sub;
subscriptions = subscriptionMatcher.getSubscriptions(messageClass);
int c = 0;
// Run subscriptions
if (subscriptions != null) {
subsPublished = true;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
c = sub.c();
}
}
// if (!this.forceExactMatches) {
// Subscription[] superSubscriptions = manager.getSuperSubscriptions(messageClass); // NOT return null
// // now getSubscriptions superClasses
// int length = superSubscriptions.length;
//
// if (length > 0) {
// for (int i=0;i<length;i++) {
// sub = superSubscriptions[i];
//
// sub.publish(message);
// }
//
// subsPublished = true;
// }
// if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
// for (iterator = superSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
//
// // this catches all exception types
// sub.publishToSubscription(this, message);
// }
// subsPublished = true;
// }
// // publish to var arg, only if not already an array
// if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) {
// Object[] asArray = null;
//
// ConcurrentSet<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
// if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
// asArray = (Object[]) Array.newInstance(messageClass, 1);
// asArray[0] = message;
//
// for (iterator = varargSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// // this catches all exception types
// sub.publishToSubscription(this, subsPublished, asArray);
// }
// }
//
// ConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
// if (asArray == null) {
// asArray = (Object[]) Array.newInstance(messageClass, 1);
// asArray[0] = message;
// }
// for (iterator = varargSuperSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// // this catches all exception types
// sub.publishToSubscription(this, subsPublished, asArray);
// }
// }
// }
// }
if (c == 0 && !subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses) // Dead Event must EXACTLY MATCH (no subclasses)
Subscription[] deadSubscriptions = manager.getSubscriptionsForcedExact(DeadMessage.class); Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsForcedExact(DeadMessage.class);
if (deadSubscriptions != null) { if (deadSubscriptions != null) {
DeadMessage deadMessage = new DeadMessage(message); DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < deadSubscriptions.length; i++) { for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i]; sub = deadSubscriptions[i];
@ -314,7 +234,7 @@ public class MultiMBassador implements IMessageBus {
// Class<?> messageClass2 = message2.getClass(); // Class<?> messageClass2 = message2.getClass();
// //
// StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); // StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
// BooleanHolder subsPublished = this.booleanThreadLocal.getSubscriptions(); // BooleanHolder subsPublished = this.booleanThreadLocal.publish();
// subsPublished.bool = false; // subsPublished.bool = false;
// //
// ISetEntry<Subscription> current; // ISetEntry<Subscription> current;
@ -334,7 +254,7 @@ public class MultiMBassador implements IMessageBus {
// //
// if (!this.forceExactMatches) { // if (!this.forceExactMatches) {
// StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); // StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// // now getSubscriptions superClasses // // now publish superClasses
// if (superSubscriptions != null) { // if (superSubscriptions != null) {
// current = superSubscriptions.head; // current = superSubscriptions.head;
// while (current != null) { // while (current != null) {
@ -368,7 +288,7 @@ public class MultiMBassador implements IMessageBus {
// } // }
// //
// StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) // // now publish array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { // if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
// if (asArray == null) { // if (asArray == null) {
// asArray = (Object[]) Array.newInstance(messageClass1, 2); // asArray = (Object[]) Array.newInstance(messageClass1, 2);
@ -388,7 +308,7 @@ public class MultiMBassador implements IMessageBus {
// } else { // } else {
// StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2); // StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2);
// //
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) // // now publish array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { // if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
// current = varargSuperMultiSubscriptions.head; // current = varargSuperMultiSubscriptions.head;
// while (current != null) { // while (current != null) {
@ -438,7 +358,7 @@ public class MultiMBassador implements IMessageBus {
// Class<?> messageClass3 = message3.getClass(); // Class<?> messageClass3 = message3.getClass();
// //
// StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); // StrongConcurrentSet<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
// BooleanHolder subsPublished = this.booleanThreadLocal.getSubscriptions(); // BooleanHolder subsPublished = this.booleanThreadLocal.publish();
// subsPublished.bool = false; // subsPublished.bool = false;
// //
// ISetEntry<Subscription> current; // ISetEntry<Subscription> current;
@ -459,7 +379,7 @@ public class MultiMBassador implements IMessageBus {
// //
// if (!this.forceExactMatches) { // if (!this.forceExactMatches) {
// StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // StrongConcurrentSet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// // now getSubscriptions superClasses // // now publish superClasses
// if (superSubscriptions != null) { // if (superSubscriptions != null) {
// current = superSubscriptions.head; // current = superSubscriptions.head;
// while (current != null) { // while (current != null) {
@ -493,7 +413,7 @@ public class MultiMBassador implements IMessageBus {
// } // }
// //
// StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1); // StrongConcurrentSet<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) // // now publish array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) { // if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
// if (asArray == null) { // if (asArray == null) {
// asArray = (Object[]) Array.newInstance(messageClass1, 3); // asArray = (Object[]) Array.newInstance(messageClass1, 3);
@ -514,7 +434,7 @@ public class MultiMBassador implements IMessageBus {
// } else { // } else {
// StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); // StrongConcurrentSet<Subscription> varargSuperMultiSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// //
// // now getSubscriptions array based superClasses (but only if those ALSO accept vararg) // // now publish array based superClasses (but only if those ALSO accept vararg)
// if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) { // if (varargSuperMultiSubscriptions != null && !varargSuperMultiSubscriptions.isEmpty()) {
// current = varargSuperMultiSubscriptions.head; // current = varargSuperMultiSubscriptions.head;
// while (current != null) { // while (current != null) {

View File

@ -27,14 +27,14 @@ import java.util.concurrent.ForkJoinPool;
* interoperable with {@code Hashtable} in programs that rely on its * interoperable with {@code Hashtable} in programs that rely on its
* thread safety but not on its synchronization details. * thread safety but not on its synchronization details.
* *
* <p>Retrieval operations (including {@code getSubscriptions}) generally do not * <p>Retrieval operations (including {@code get}) generally do not
* block, so may overlap with update operations (including {@code put} * block, so may overlap with update operations (including {@code put}
* and {@code remove}). Retrievals reflect the results of the most * and {@code remove}). Retrievals reflect the results of the most
* recently <em>completed</em> update operations holding upon their * recently <em>completed</em> update operations holding upon their
* onset. (More formally, an update operation for a given key bears a * onset. (More formally, an update operation for a given key bears a
* <em>happens-before</em> relation with any (non-null) retrieval for * <em>happens-before</em> relation with any (non-null) retrieval for
* that key reporting the updated value.) For aggregate operations * that key reporting the updated value.) For aggregate operations
* such as {@code putAll} and {@code shutdown}, concurrent retrievals may * such as {@code putAll} and {@code clear}, concurrent retrievals may
* reflect insertion or removal of only some entries. Similarly, * reflect insertion or removal of only some entries. Similarly,
* Iterators and Enumerations return elements reflecting the state of * Iterators and Enumerations return elements reflecting the state of
* the hash table at some point at or since the creation of the * the hash table at some point at or since the creation of the
@ -141,7 +141,7 @@ import java.util.concurrent.ForkJoinPool;
* *
* <p>The concurrency properties of bulk operations follow * <p>The concurrency properties of bulk operations follow
* from those of ConcurrentHashMapV8: Any non-null result returned * from those of ConcurrentHashMapV8: Any non-null result returned
* from {@code getSubscriptions(key)} and related access methods bears a * from {@code get(key)} and related access methods bears a
* happens-before relation with the associated insertion or * happens-before relation with the associated insertion or
* update. The result of any bulk operation reflects the * update. The result of any bulk operation reflects the
* composition of these per-element relations (but is not * composition of these per-element relations (but is not
@ -275,7 +275,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// * Overview: // * Overview:
// * // *
// * The primary design goal of this hash table is to maintain // * The primary design goal of this hash table is to maintain
// * concurrent readability (typically method getSubscriptions(), but also // * concurrent readability (typically method publish(), but also
// * iterators and related methods) while minimizing update // * iterators and related methods) while minimizing update
// * contention. Secondary goals are to keep space consumption about // * contention. Secondary goals are to keep space consumption about
// * the same or better than java.util.HashMap, and to support high // * the same or better than java.util.HashMap, and to support high
@ -632,7 +632,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// } // }
// //
// /** // /**
// * Virtualized support for map.getSubscriptions(); overridden in subclasses. // * Virtualized support for map.publish(); overridden in subclasses.
// */ // */
// Node<K,V> find(int h, Object k) { // Node<K,V> find(int h, Object k) {
// Node<K,V> e = this; // Node<K,V> e = this;
@ -931,7 +931,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// * @throws NullPointerException if the specified key is null // * @throws NullPointerException if the specified key is null
// */ // */
// @Override // @Override
// public V getSubscriptions(Object key) { // public V publish(Object key) {
// Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// int h = spread(key.hashCode()); // int h = spread(key.hashCode());
// if ((tab = this.table) != null && (n = tab.length) > 0 && // if ((tab = this.table) != null && (n = tab.length) > 0 &&
@ -965,7 +965,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// */ // */
// @Override // @Override
// public boolean containsKey(Object key) { // public boolean containsKey(Object key) {
// return getSubscriptions(key) != null; // return publish(key) != null;
// } // }
// //
// /** // /**
@ -1000,7 +1000,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// * Maps the specified key to the specified value in this table. // * Maps the specified key to the specified value in this table.
// * Neither the key nor the value can be null. // * Neither the key nor the value can be null.
// * // *
// * <p>The value can be retrieved by calling the {@code getSubscriptions} method // * <p>The value can be retrieved by calling the {@code publish} method
// * with a key that is equal to the original key. // * with a key that is equal to the original key.
// * // *
// * @param key key with which the specified value is to be associated // * @param key key with which the specified value is to be associated
@ -1381,7 +1381,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// Traverser<K,V> it = new Traverser<K,V>(t, f, 0, f); // Traverser<K,V> it = new Traverser<K,V>(t, f, 0, f);
// for (Node<K,V> p; (p = it.advance()) != null; ) { // for (Node<K,V> p; (p = it.advance()) != null; ) {
// V val = p.val; // V val = p.val;
// Object v = m.getSubscriptions(p.key); // Object v = m.publish(p.key);
// if (v == null || v != val && !v.equals(val)) { // if (v == null || v != val && !v.equals(val)) {
// return false; // return false;
// } // }
@ -1390,7 +1390,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// Object mk, mv, v; // Object mk, mv, v;
// if ((mk = e.getKey()) == null || // if ((mk = e.getKey()) == null ||
// (mv = e.getValue()) == null || // (mv = e.getValue()) == null ||
// (v = getSubscriptions(mk)) == null || // (v = publish(mk)) == null ||
// mv != v && !mv.equals(v)) { // mv != v && !mv.equals(v)) {
// return false; // return false;
// } // }
@ -1628,7 +1628,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// */ // */
// public V getOrDefault(Object key, V defaultValue) { // public V getOrDefault(Object key, V defaultValue) {
// V v; // V v;
// return (v = getSubscriptions(key)) == null ? defaultValue : v; // return (v = publish(key)) == null ? defaultValue : v;
// } // }
// //
// public void forEach(BiAction<? super K, ? super V> action) { // public void forEach(BiAction<? super K, ? super V> action) {
@ -1659,7 +1659,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// throw new NullPointerException(); // throw new NullPointerException();
// } // }
// if (replaceNode(key, newValue, oldValue) != null || // if (replaceNode(key, newValue, oldValue) != null ||
// (oldValue = getSubscriptions(key)) == null) { // (oldValue = publish(key)) == null) {
// break; // break;
// } // }
// } // }
@ -2328,7 +2328,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// !U.compareAndSwapLong(this, BASECOUNT, b = this.baseCount, s = b + x)) { // !U.compareAndSwapLong(this, BASECOUNT, b = this.baseCount, s = b + x)) {
// CounterHashCode hc; CounterCell a; long v; int m; // CounterHashCode hc; CounterCell a; long v; int m;
// boolean uncontended = true; // boolean uncontended = true;
// if ((hc = threadCounterHashCode.getSubscriptions()) == null || // if ((hc = threadCounterHashCode.publish()) == null ||
// as == null || (m = as.length - 1) < 0 || // as == null || (m = as.length - 1) < 0 ||
// (a = as[m & hc.code]) == null || // (a = as[m & hc.code]) == null ||
// !(uncontended = // !(uncontended =
@ -4037,7 +4037,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// Object k, v, r; Map.Entry<?,?> e; // Object k, v, r; Map.Entry<?,?> e;
// return o instanceof Map.Entry && // return o instanceof Map.Entry &&
// (k = (e = (Map.Entry<?,?>)o).getKey()) != null && // (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
// (r = this.map.getSubscriptions(k)) != null && // (r = this.map.publish(k)) != null &&
// (v = e.getValue()) != null && // (v = e.getValue()) != null &&
// (v == r || v.equals(r)); // (v == r || v.equals(r));
// } // }
@ -4328,7 +4328,7 @@ public class ConcurrentHashMapV8<K, V> extends ConcurrentHashMap<K, V>
// Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; // Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
// for (java.lang.reflect.Field f : k.getDeclaredFields()) { // for (java.lang.reflect.Field f : k.getDeclaredFields()) {
// f.setAccessible(true); // f.setAccessible(true);
// Object x = f.getSubscriptions(null); // Object x = f.publish(null);
// if (k.isInstance(x)) { // if (k.isInstance(x)) {
// return k.cast(x); // return k.cast(x);
// } // }

View File

@ -9,7 +9,7 @@ import java.util.concurrent.locks.Lock;
/** /**
* Simple tree structure that is a map that contains a chain of keys to getSubscriptions to a value. * Simple tree structure that is a map that contains a chain of keys to publish to a value.
* <p> * <p>
* THREAD SAFE, each level in the tree has it's own write lock, and there a tree-global read lock, to prevent writes * THREAD SAFE, each level in the tree has it's own write lock, and there a tree-global read lock, to prevent writes
* *
@ -506,7 +506,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> objectTree = null; HashMapTree<KEY, VALUE> objectTree = null;
// getSubscriptions value from our children // publish value from our children
objectTree = getLeaf_NL(key); // protected by lock objectTree = getLeaf_NL(key); // protected by lock
if (objectTree == null) { if (objectTree == null) {
@ -525,7 +525,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null; HashMapTree<KEY, VALUE> tree = null;
// getSubscriptions value from our children // publish value from our children
tree = getLeaf_NL(key1); // protected by lock tree = getLeaf_NL(key1); // protected by lock
if (tree != null) { if (tree != null) {
tree = tree.getLeaf_NL(key2); // protected by lock tree = tree.getLeaf_NL(key2); // protected by lock
@ -547,7 +547,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null; HashMapTree<KEY, VALUE> tree = null;
// getSubscriptions value from our children // publish value from our children
tree = getLeaf_NL(key1); tree = getLeaf_NL(key1);
if (tree != null) { if (tree != null) {
tree = tree.getLeaf_NL(key2); tree = tree.getLeaf_NL(key2);
@ -573,7 +573,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null; HashMapTree<KEY, VALUE> tree = null;
// getSubscriptions value from our children // publish value from our children
tree = getLeaf_NL(keys[0]); tree = getLeaf_NL(keys[0]);
int size = keys.length; int size = keys.length;
@ -625,7 +625,7 @@ public class HashMapTree<KEY, VALUE> {
Lock READ = this.lock.readLock(); Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks READ.lock(); // allows other readers, blocks others from acquiring update or write locks
// getSubscriptions value from our children // publish value from our children
tree = getLeaf_NL(key1); tree = getLeaf_NL(key1);
if (tree != null) { if (tree != null) {
tree = tree.getLeaf_NL(key2); tree = tree.getLeaf_NL(key2);
@ -642,7 +642,7 @@ public class HashMapTree<KEY, VALUE> {
Lock READ = this.lock.readLock(); Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks READ.lock(); // allows other readers, blocks others from acquiring update or write locks
// getSubscriptions value from our children // publish value from our children
tree = getLeaf_NL(key1); tree = getLeaf_NL(key1);
if (tree != null) { if (tree != null) {
tree = tree.getLeaf_NL(key2); tree = tree.getLeaf_NL(key2);
@ -668,7 +668,7 @@ public class HashMapTree<KEY, VALUE> {
READ.lock(); // allows other readers, blocks others from acquiring update or write locks READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null; HashMapTree<KEY, VALUE> tree = null;
// getSubscriptions value from our children // publish value from our children
tree = getLeaf_NL(keys[0]); tree = getLeaf_NL(keys[0]);
for (int i=1;i<size;i++) { for (int i=1;i<size;i++) {

View File

@ -29,11 +29,11 @@ import java.util.Arrays;
*/ */
public class MessageHandler { public class MessageHandler {
// getSubscriptions all listeners defined by the given class (includes // publish all listeners defined by the given class (includes
// listeners defined in super classes) // listeners defined in super classes)
public static final MessageHandler[] get(final Class<?> target) { public static final MessageHandler[] get(final Class<?> target) {
// getSubscriptions all handlers (this will include all (inherited) methods directly annotated using @Handler) // publish all handlers (this will include all (inherited) methods directly annotated using @Handler)
final Method[] allMethods = ReflectionUtils.getMethods(target); final Method[] allMethods = ReflectionUtils.getMethods(target);
final int length = allMethods.length; final int length = allMethods.length;

View File

@ -29,7 +29,7 @@ public class SuperClassUtils {
Class<?>[] classes = local.get(clazz); Class<?>[] classes = local.get(clazz);
if (classes == null) { if (classes == null) {
// getSubscriptions all super types of class // publish all super types of class
final Class<?>[] superTypes = ReflectionUtils.getSuperTypes(clazz); final Class<?>[] superTypes = ReflectionUtils.getSuperTypes(clazz);
final int length = superTypes.length; final int length = superTypes.length;

View File

@ -1,49 +0,0 @@
package dorkbox.util.messagebus.common;
import org.jctools.util.UnsafeAccess;
abstract class VarArgPossibility_P0 {
// pre-padding
volatile long y0, y1, y2, y4, y5, y6 = 7L;
}
abstract class VarArgPossibility_I0 extends VarArgPossibility_P0 {
public boolean hasVarArgPossibility = false;
}
public class VarArgPossibility extends VarArgPossibility_I0 {
private static final long BOOL;
static {
try {
BOOL = UnsafeAccess.UNSAFE.objectFieldOffset(VarArgPossibility.class.getField("hasVarArgPossibility"));
// now make sure we can access UNSAFE
VarArgPossibility bool = new VarArgPossibility();
boolean o = true;
bool.set( o);
boolean lpItem1 = bool.get();
bool.set(false);
if (lpItem1 != o) {
throw new Exception("Cannot access unsafe fields");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public final void set(boolean bool) {
UnsafeAccess.UNSAFE.putBoolean(this, BOOL, bool);
}
public final boolean get() {
return UnsafeAccess.UNSAFE.getBoolean(this, BOOL);
}
// post-padding
volatile long z0, z1, z2, z4, z5, z6 = 7L;
public VarArgPossibility() {
}
}

View File

@ -6,12 +6,12 @@ import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionUtils; import dorkbox.util.messagebus.subscription.SubscriptionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap;
public class VarArgUtils { public class VarArgUtils {
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> varArgSubscriptions; private final Map<Class<?>, ArrayList<Subscription>> varArgSubscriptions;
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> varArgSuperClassSubscriptions; private final Map<Class<?>, List<Subscription>> varArgSuperClassSubscriptions;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> varArgSuperClassSubscriptionsMulti; private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> varArgSuperClassSubscriptionsMulti;
private final SubscriptionHolder subHolderConcurrent; private final SubscriptionHolder subHolderConcurrent;
@ -20,19 +20,22 @@ public class VarArgUtils {
private final int stripeSize; private final int stripeSize;
private final SubscriptionUtils utils; private final SubscriptionUtils utils;
private final SuperClassUtils superClassUtils;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle; private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
public VarArgUtils(SubscriptionUtils utils, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle, float loadFactor, public VarArgUtils(SubscriptionUtils utils, SuperClassUtils superClassUtils,
Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle, float loadFactor,
int stripeSize) { int stripeSize) {
this.utils = utils; this.utils = utils;
this.superClassUtils = superClassUtils;
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.loadFactor = loadFactor; this.loadFactor = loadFactor;
this.stripeSize = stripeSize; this.stripeSize = stripeSize;
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(16, loadFactor, stripeSize); this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(16, loadFactor, stripeSize); this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, List<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor); this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor);
this.subHolderConcurrent = new SubscriptionHolder(); this.subHolderConcurrent = new SubscriptionHolder();
@ -49,12 +52,38 @@ public class VarArgUtils {
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions // and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSubscriptions(Class<?> messageClass) { public ArrayList<Subscription> getVarArgSubscriptions(Class<?> messageClass) {
// ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSubscriptions; Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptions;
//
// // whenever our subscriptions change, this map is cleared. ArrayList<Subscription> varArgSubs = local.get(messageClass);
if (varArgSubs == null) {
// this gets (and caches) our array type. This is never cleared.
final Class<?> arrayVersion = this.superClassUtils.getArrayClass(messageClass);
ArrayList<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
if (subs != null) {
int length = subs.size();
varArgSubs = new ArrayList<Subscription>(length);
Subscription sub;
for (int i = 0; i < length; i++) {
sub = subs.get(i);
if (sub.acceptsVarArgs()) {
varArgSubs.add(sub);
}
}
local.put(messageClass, varArgSubs);
}
}
return varArgSubs;
// whenever our subscriptions change, this map is cleared.
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.getSubscriptions(); // ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.publish();
// //
// // cache our subscriptions for super classes, so that their access can be fast! // // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType); // ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
@ -62,13 +91,11 @@ public class VarArgUtils {
// // we are the first one in the map // // we are the first one in the map
// subHolderConcurrent.set(subHolderConcurrent.initialValue()); // subHolderConcurrent.set(subHolderConcurrent.initialValue());
// //
// // this caches our array type. This is never cleared.
// Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
// //
// Iterator<Subscription> iterator; // Iterator<Subscription> iterator;
// Subscription sub; // Subscription sub;
// //
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.getSubscriptions(arrayVersion); // Collection<Subscription> subs = this.subscriptionsPerMessageSingle.publish(arrayVersion);
// if (subs != null) { // if (subs != null) {
// for (iterator = subs.iterator(); iterator.hasNext();) { // for (iterator = subs.iterator(); iterator.hasNext();) {
// sub = iterator.next(); // sub = iterator.next();
@ -83,7 +110,7 @@ public class VarArgUtils {
// return putIfAbsent; // return putIfAbsent;
// } // }
return null; // return null;
} }
@ -96,7 +123,7 @@ public class VarArgUtils {
// ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions; // ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions;
// //
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.getSubscriptions(); // ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.publish();
// //
// // cache our subscriptions for super classes, so that their access can be fast! // // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType); // ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
@ -123,7 +150,7 @@ public class VarArgUtils {
// for (iterator = types.iterator(); iterator.hasNext();) { // for (iterator = types.iterator(); iterator.hasNext();) {
// superClass = iterator.next(); // superClass = iterator.next();
// //
// Collection<Subscription> subs = local2.getSubscriptions(superClass); // Collection<Subscription> subs = local2.publish(superClass);
// if (subs != null) { // if (subs != null) {
// for (subIterator = subs.iterator(); subIterator.hasNext();) { // for (subIterator = subs.iterator(); subIterator.hasNext();) {
// sub = subIterator.next(); // sub = subIterator.next();
@ -159,7 +186,7 @@ public class VarArgUtils {
// subsPerType = subsPerTypeLeaf.getValue(); // subsPerType = subsPerTypeLeaf.getValue();
// } else { // } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.getSubscriptions(); // subsPerType = subHolderConcurrent.publish();
// //
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); // ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
// if (putIfAbsent != null) { // if (putIfAbsent != null) {
@ -207,7 +234,7 @@ public class VarArgUtils {
// subsPerType = subsPerTypeLeaf.getValue(); // subsPerType = subsPerTypeLeaf.getValue();
// } else { // } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; // SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.getSubscriptions(); // subsPerType = subHolderConcurrent.publish();
// //
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3); // ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3);
// if (putIfAbsent != null) { // if (putIfAbsent != null) {

View File

@ -1,5 +0,0 @@
package dorkbox.util.messagebus.common;
public abstract class item<T> {
public volatile Entry<T> head; // reference to the first element
}

View File

@ -1,11 +0,0 @@
package dorkbox.util.messagebus.common;
import java.util.Iterator;
abstract class item3<T> implements Iterator<T> {
public ISetEntry<T> current;
public item3(ISetEntry<T> current) {
this.current = current;
}
}

View File

@ -432,7 +432,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// Successful CAS: full barrier // Successful CAS: full barrier
final Thread myThread = Thread.currentThread(); final Thread myThread = Thread.currentThread();
// final Object node = nodeThreadLocal.getSubscriptions(); // final Object node = nodeThreadLocal.publish();
spType(node, TYPE_CONSUMER); spType(node, TYPE_CONSUMER);
spThread(node, myThread); spThread(node, myThread);

View File

@ -39,7 +39,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
return false; return false;
} }
// had to modify the super implementation so we getSubscriptions Node<T> back // had to modify the super implementation so we publish Node<T> back
Node<T> alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER); Node<T> alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER);
if (alreadyPresent == null) { if (alreadyPresent == null) {
// this doesn't already exist // this doesn't already exist

View File

@ -1,5 +1,5 @@
package dorkbox.util.messagebus.subscription; package dorkbox.util.messagebus.subscription;
public interface Matcher { public interface Matcher {
Subscription[] getSubscriptions(Class<?> messageClass); boolean publish(Object messageClass) throws Throwable;
} }

View File

@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* Date: 2/2/15 * Date: 2/2/15
*/ */
public class Subscription { public class Subscription {
private static AtomicInteger ID_COUNTER = new AtomicInteger(); private static final AtomicInteger ID_COUNTER = new AtomicInteger();
public final int ID = ID_COUNTER.getAndIncrement(); public final int ID = ID_COUNTER.getAndIncrement();
@ -86,11 +86,6 @@ public class Subscription {
return this.listeners.size(); return this.listeners.size();
} }
int c = 0;
public int c() {
return this.c;
}
/** /**
* @return true if there were listeners for this publication, false if there was nothing * @return true if there were listeners for this publication, false if there was nothing
*/ */
@ -105,7 +100,6 @@ public class Subscription {
for (iterator = this.listeners.iterator(); iterator.hasNext();) { for (iterator = this.listeners.iterator(); iterator.hasNext();) {
listener = iterator.next(); listener = iterator.next();
// this.c++;
invocation.invoke(listener, handler, handleIndex, message); invocation.invoke(listener, handler, handleIndex, message);
} }
} }

View File

@ -1,12 +1,14 @@
package dorkbox.util.messagebus.subscription; package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.*; import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
/** /**
@ -37,7 +39,7 @@ public class SubscriptionManager {
private final Map<Class<?>, Boolean> nonListeners; private final Map<Class<?>, Boolean> nonListeners;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final VarArgPossibility varArgPossibility = new VarArgPossibility(); private AtomicBoolean varArgPossibility = new AtomicBoolean(false);
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message // this is the primary list for dispatching a specific message
@ -52,9 +54,11 @@ public class SubscriptionManager {
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener; private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
private final SuperClassUtils superClass;
private final VarArgUtils varArgUtils; private final VarArgUtils varArgUtils;
private final StampedLock lock = new StampedLock(); private final StampedLock lock = new StampedLock();
// private final ReadWriteLock lock = new ReentrantReadWriteLock(); // private final ReadWriteLock lock = new ReentrantReadWriteLock();
@ -72,12 +76,13 @@ public class SubscriptionManager {
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>(); this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>();
} }
this.utils = new SubscriptionUtils(this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, this.superClass = new SuperClassUtils(loadFactor, 1);
this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor,
numberOfThreads); numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers // it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads); this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads);
} }
public void shutdown() { public void shutdown() {
@ -91,10 +96,6 @@ public class SubscriptionManager {
clearConcurrentCollections(); clearConcurrentCollections();
} }
public boolean hasVarArgPossibility() {
return this.varArgPossibility.get();
}
public void subscribe(Object listener) { public void subscribe(Object listener) {
if (listener == null) { if (listener == null) {
return; return;
@ -138,8 +139,6 @@ public class SubscriptionManager {
return; return;
} }
VarArgPossibility varArgPossibility = this.varArgPossibility;
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle; Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti; HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
@ -160,7 +159,7 @@ public class SubscriptionManager {
// now add this subscription to each of the handled types // now add this subscription to each of the handled types
subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle, subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle,
subsPerMessageMulti, varArgPossibility); subsPerMessageMulti);
subsForPublication.add(subscription); // activates this sub for publication subsForPublication.add(subscription); // activates this sub for publication
} }
@ -177,7 +176,7 @@ public class SubscriptionManager {
} }
// subscriptions already exist and must only be updated // subscriptions already exist and must only be updated
// only getSubscriptions here if our single-check was OK, or our double-check was OK // only publish here if our single-check was OK, or our double-check was OK
Subscription subscription; Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) { for (int i = 0; i < subscriptions.length; i++) {
@ -190,8 +189,7 @@ public class SubscriptionManager {
// also puts it into the correct map if it's not already there // also puts it into the correct map if it's not already there
private Collection<Subscription> getSubsForPublication(final Class<?>[] messageHandlerTypes, private Collection<Subscription> getSubsForPublication(final Class<?>[] messageHandlerTypes,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle, final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti) {
final VarArgPossibility varArgPossibility) {
final int size = messageHandlerTypes.length; final int size = messageHandlerTypes.length;
@ -208,11 +206,11 @@ public class SubscriptionManager {
boolean isArray = type0.isArray(); boolean isArray = type0.isArray();
if (isArray) { if (isArray) {
varArgPossibility.set(true); varArgPossibility.lazySet(true);
} }
// cache the super classes // cache the super classes
// utils.cacheSuperClasses(type0, isArray); //utils.cacheSuperClasses(type0, isArray);
subsPerMessageSingle.put(type0, subs); subsPerMessageSingle.put(type0, subs);
} }
@ -222,7 +220,7 @@ public class SubscriptionManager {
case 2: { case 2: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time // the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle; // SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.getSubscriptions(); // subsPerType = subHolderSingle.publish();
// //
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]); // Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]);
// if (putIfAbsent != null) { // if (putIfAbsent != null) {
@ -240,7 +238,7 @@ public class SubscriptionManager {
case 3: { case 3: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time // the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle; // SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.getSubscriptions(); // subsPerType = subHolderSingle.publish();
// //
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]); // Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]);
// if (putIfAbsent != null) { // if (putIfAbsent != null) {
@ -259,7 +257,7 @@ public class SubscriptionManager {
default: { default: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time // the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle; // SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.getSubscriptions(); // subsPerType = subHolderSingle.publish();
// //
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types); // Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
// if (putIfAbsent != null) { // if (putIfAbsent != null) {
@ -356,26 +354,39 @@ public class SubscriptionManager {
} }
// never return null // never return null
public final Subscription[] getSubscriptions(final Class<?> messageClass) { public final Subscription[] getSubscriptions(final Class<?> messageClass, boolean isArray) {
ArrayList<Subscription> collection;
StampedLock lock = this.lock; StampedLock lock = this.lock;
long stamp = lock.readLock(); long stamp = lock.readLock();
// Lock readLock = this.lock.readLock(); // Lock readLock = this.lock.readLock();
// readLock.lock(); // readLock.lock();
final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray);
lock.unlockRead(stamp);
// readLock.unlock();
return subscriptions;
}
// never return null
private Subscription[] getSubscriptions_NL(final Class<?> messageClass, boolean isArray) {
ArrayList<Subscription> collection;
collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
// now getSubscriptions superClasses // now publish superClasses
ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass); // NOT return null ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass, isArray); // NOT return null
if (collection != null) { if (collection != null) {
collection = new ArrayList<Subscription>(collection); collection = new ArrayList<Subscription>(collection);
collection.addAll(superSubscriptions); if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
} }
else { else {
collection = superSubscriptions; if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
} }
final Subscription[] subscriptions; final Subscription[] subscriptions;
@ -386,9 +397,6 @@ public class SubscriptionManager {
subscriptions = null; subscriptions = null;
} }
lock.unlockRead(stamp);
// readLock.unlock();
return subscriptions; return subscriptions;
} }
@ -403,7 +411,7 @@ public class SubscriptionManager {
// readLock.lock(); // readLock.lock();
// //
// try { // try {
// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType); // collection = this.subscriptionsPerMessageSingle.publish(messageType);
// if (collection != null) { // if (collection != null) {
// subscriptions = collection.toArray(EMPTY); // subscriptions = collection.toArray(EMPTY);
// } // }
@ -416,7 +424,7 @@ public class SubscriptionManager {
// //
//// long stamp = this.lock.tryOptimisticRead(); // non blocking //// long stamp = this.lock.tryOptimisticRead(); // non blocking
//// ////
//// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType); //// collection = this.subscriptionsPerMessageSingle.publish(messageType);
//// if (collection != null) { //// if (collection != null) {
////// subscriptions = new ArrayDeque<>(collection); ////// subscriptions = new ArrayDeque<>(collection);
//// subscriptions = new ArrayList<>(collection); //// subscriptions = new ArrayList<>(collection);
@ -429,7 +437,7 @@ public class SubscriptionManager {
//// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock //// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock
//// stamp = this.lock.readLock(); //// stamp = this.lock.readLock();
//// try { //// try {
//// collection = this.subscriptionsPerMessageSingle.getSubscriptions(messageType); //// collection = this.subscriptionsPerMessageSingle.publish(messageType);
//// if (collection != null) { //// if (collection != null) {
////// subscriptions = new ArrayDeque<>(collection); ////// subscriptions = new ArrayDeque<>(collection);
//// subscriptions = new ArrayList<>(collection); //// subscriptions = new ArrayList<>(collection);
@ -480,12 +488,6 @@ public class SubscriptionManager {
return this.subscriptionsPerMessageMulti.get(messageTypes); return this.subscriptionsPerMessageMulti.get(messageTypes);
} }
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSubscriptions(Class<?> messageClass) {
return this.varArgUtils.getVarArgSubscriptions(messageClass);
}
// CAN NOT RETURN NULL // CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI // check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
@ -527,4 +529,103 @@ public class SubscriptionManager {
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) { public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
return this.utils.getSuperSubscriptions(superType1, superType2, superType3); return this.utils.getSuperSubscriptions(superType1, superType2, superType3);
} }
/**
* @return true if subscriptions were published
*/
public boolean publishExact(Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final boolean isArray = messageClass.isArray();
StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray);
Subscription sub;
// Run subscriptions
if (subscriptions != null) {
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
}
lock.unlockRead(stamp);
return true;
}
lock.unlockRead(stamp);
return false;
}
/**
* @return true if subscriptions were published
*/
public boolean publish(Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final boolean isArray = messageClass.isArray();
StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptions_NL(messageClass, isArray);
Subscription sub;
// Run subscriptions
if (subscriptions != null) {
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
}
// publish to var arg, only if not already an array
if (varArgPossibility.get() && !isArray) {
final ArrayList<Subscription> varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass);
if (varArgSubscriptions != null && !varArgSubscriptions.isEmpty()) {
Object[] asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
Iterator<Subscription> iterator;
for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) {
sub = iterator.next();
sub.publish(asArray);
}
// now publish array based superClasses (but only if those ALSO accept vararg)
final Collection<Subscription> varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass);
if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) {
for (iterator = varArgSubscriptions.iterator(); iterator.hasNext(); ) {
sub = iterator.next();
sub.publish(asArray);
}
}
}
else {
// now publish array based superClasses (but only if those ALSO accept vararg)
final Collection<Subscription> varArgSuperSubscriptions = getVarArgSuperSubscriptions(messageClass);
if (varArgSuperSubscriptions != null && !varArgSuperSubscriptions.isEmpty()) {
Object[] asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
Iterator<Subscription> iterator;
for (iterator = varArgSuperSubscriptions.iterator(); iterator.hasNext(); ) {
sub = iterator.next();
sub.publish(asArray);
}
}
}
}
lock.unlockRead(stamp);
return true;
}
lock.unlockRead(stamp);
return false;
}
} }

View File

@ -30,11 +30,10 @@ public class SubscriptionUtils {
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti; private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
public SubscriptionUtils(Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle, public SubscriptionUtils(SuperClassUtils superClass, Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti, float loadFactor, HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti, float loadFactor,
int stripeSize) { int stripeSize) {
this.superClass = superClass;
this.superClass = new SuperClassUtils(loadFactor, 1);
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle; this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti; this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti;
@ -75,7 +74,7 @@ public class SubscriptionUtils {
// long stamp = lock.tryOptimisticRead(); // long stamp = lock.tryOptimisticRead();
// //
// if (stamp > 0) { // if (stamp > 0) {
// ArrayList<Class<?>> arrayList = local.getSubscriptions(clazz); // ArrayList<Class<?>> arrayList = local.publish(clazz);
// if (arrayList != null) { // if (arrayList != null) {
// classes = arrayList.toArray(SUPER_CLASS_EMPTY); // classes = arrayList.toArray(SUPER_CLASS_EMPTY);
// //
@ -84,7 +83,7 @@ public class SubscriptionUtils {
// } else { // } else {
// stamp = lock.readLock(); // stamp = lock.readLock();
// //
// arrayList = local.getSubscriptions(clazz); // arrayList = local.publish(clazz);
// if (arrayList != null) { // if (arrayList != null) {
// classes = arrayList.toArray(SUPER_CLASS_EMPTY); // classes = arrayList.toArray(SUPER_CLASS_EMPTY);
// lock.unlockRead(stamp); // lock.unlockRead(stamp);
@ -94,7 +93,7 @@ public class SubscriptionUtils {
// } // }
// } // }
// //
// // unable to getSubscriptions a valid subscription. Have to acquire a write lock // // unable to publish a valid subscription. Have to acquire a write lock
// long origStamp = stamp; // long origStamp = stamp;
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) { // if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// lock.unlockRead(origStamp); // lock.unlockRead(origStamp);
@ -102,7 +101,7 @@ public class SubscriptionUtils {
// } // }
// //
// //
// // getSubscriptions all super types of class // // publish all super types of class
// Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz); // Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
// ArrayList<Class<?>> arrayList = new ArrayList<Class<?>>(superTypes.size()); // ArrayList<Class<?>> arrayList = new ArrayList<Class<?>>(superTypes.size());
// Iterator<Class<?>> iterator; // Iterator<Class<?>> iterator;
@ -144,25 +143,25 @@ public class SubscriptionUtils {
* *
* @return CAN NOT RETURN NULL * @return CAN NOT RETURN NULL
*/ */
public final ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz) { public final ArrayList<Subscription> getSuperSubscriptions(final Class<?> clazz, boolean isArray) {
// whenever our subscriptions change, this map is cleared. // whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions; final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
ArrayList<Subscription> superSubscriptions = local.get(clazz); ArrayList<Subscription> superSubscriptions = local.get(clazz);
if (superSubscriptions == null) { if (superSubscriptions == null) {
// types was not empty, so getSubscriptions subscriptions for each type and collate them // types was not empty, so publish subscriptions for each type and collate them
final Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle; final Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle;
// save the subscriptions // save the subscriptions
final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz, clazz.isArray()); // never returns null, cached response final Class<?>[] superClasses = this.superClass.getSuperClasses(clazz, isArray); // never returns null, cached response
Class<?> superClass; Class<?> superClass;
ArrayList<Subscription> superSubs; ArrayList<Subscription> superSubs;
Subscription sub; Subscription sub;
final int length = superClasses.length; final int length = superClasses.length;
int superSubLengh; int superSubLength;
superSubscriptions = new ArrayList<Subscription>(length); superSubscriptions = new ArrayList<Subscription>(length);
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
@ -170,8 +169,8 @@ public class SubscriptionUtils {
superSubs = local2.get(superClass); superSubs = local2.get(superClass);
if (superSubs != null) { if (superSubs != null) {
superSubLengh = superSubs.size(); superSubLength = superSubs.size();
for (int j = 0; j < superSubLengh; j++) { for (int j = 0; j < superSubLength; j++) {
sub = superSubs.get(j); sub = superSubs.get(j);
if (sub.acceptsSubtypes()) { if (sub.acceptsSubtypes()) {
@ -181,6 +180,7 @@ public class SubscriptionUtils {
} }
} }
superSubscriptions.trimToSize();
local.put(clazz, superSubscriptions); local.put(clazz, superSubscriptions);
} }
@ -202,7 +202,7 @@ public class SubscriptionUtils {
// subsPerType = subsPerTypeLeaf.getValue(); // subsPerType = subsPerTypeLeaf.getValue();
// } else { // } else {
// SubscriptionHolder subHolderSingle = this.subHolderSingle; // SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.getSubscriptions(); // subsPerType = subHolderSingle.publish();
// //
// // cache our subscriptions for super classes, so that their access can be fast! // // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2); // ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
@ -288,7 +288,7 @@ public class SubscriptionUtils {
// subsPerType = subsPerTypeLeaf.getValue(); // subsPerType = subsPerTypeLeaf.getValue();
// } else { // } else {
// SubscriptionHolder subHolderSingle = this.subHolderSingle; // SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.getSubscriptions(); // subsPerType = subHolderSingle.publish();
// //
// // cache our subscriptions for super classes, so that their access can be fast! // // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3); // ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2, superType3);

View File

@ -48,7 +48,7 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Synchronized @Synchronized
public void handleMessage(Object o){ public void handleMessage(Object o){
counter.getAndIncrement(); counter.getAndIncrement();
// System.err.println(counter.getSubscriptions()); // System.err.println(counter.publish());
} }
} }
} }

View File

@ -38,7 +38,7 @@ public class SubscriptionValidator extends AssertSupport {
// we split subs + superSubs into TWO calls. // we split subs + superSubs into TWO calls.
Collection<Subscription> collection = new ArrayDeque<Subscription>(8); Collection<Subscription> collection = new ArrayDeque<Subscription>(8);
Subscription[] subscriptions = manager.getSubscriptions(messageType); Subscription[] subscriptions = manager.getSubscriptions(messageType, messageType.isArray());
if (subscriptions != null) { if (subscriptions != null) {
collection.addAll(Arrays.asList(subscriptions)); collection.addAll(Arrays.asList(subscriptions));
} }