From b4247731dbe2c999edcacdbc3e1f404e279c01f5 Mon Sep 17 00:00:00 2001 From: nathan Date: Mon, 25 Jan 2016 12:24:41 +0100 Subject: [PATCH] WIP, getting subscriptions to use the single writer principle --- .../messagebus/common/MessageHandler.java | 4 +- .../common/thread/ConcurrentLinkedQueue2.java | 603 ++++++++++-------- .../common/thread/ConcurrentSet.java | 315 --------- .../messagebus/subscription/Subscription.java | 213 ++++--- .../subscription/SubscriptionManager.java | 96 ++- .../messagebus/utils/ReflectionUtils.java | 8 +- .../util/messagebus/utils/VarArgUtils.java | 2 +- .../util/messagebus/MultiMessageTest.java | 85 ++- .../messagebus/SubscriptionManagerTest.java | 23 +- .../common/SubscriptionValidator.java | 13 +- .../listeners/IMessageListener.java | 8 +- .../{ => queuePerf}/PerfTest_Collections.java | 12 +- 12 files changed, 584 insertions(+), 798 deletions(-) delete mode 100644 src/dorkbox/util/messagebus/common/thread/ConcurrentSet.java rename test/dorkbox/util/messagebus/{ => queuePerf}/PerfTest_Collections.java (97%) diff --git a/src/dorkbox/util/messagebus/common/MessageHandler.java b/src/dorkbox/util/messagebus/common/MessageHandler.java index d8fdfa8..092abe9 100644 --- a/src/dorkbox/util/messagebus/common/MessageHandler.java +++ b/src/dorkbox/util/messagebus/common/MessageHandler.java @@ -101,9 +101,7 @@ class MessageHandler { } } - final MessageHandler[] messageHandlers = new MessageHandler[finalMethods.size()]; - finalMethods.toArray(messageHandlers); - return messageHandlers; + return finalMethods.toArray(new MessageHandler[0]); } private final MethodAccess handler; diff --git a/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java b/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java index 9c2c799..5b65131 100644 --- a/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java +++ b/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java @@ -20,9 +20,18 @@ package dorkbox.util.messagebus.common.thread; -import org.jctools.util.UnsafeAccess; +import com.esotericsoftware.kryo.util.IdentityMap; -import java.util.*; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; /** * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. @@ -39,9 +48,9 @@ import java.util.*; * does not permit the use of {@code null} elements. * *

This implementation employs an efficient non-blocking - * algorithm based on one described in - * - * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue + * algorithm based on one described in Simple, + * Fast, and Practical Non-Blocking and Blocking Concurrent Queue * Algorithms by Maged M. Michael and Michael L. Scott. * *

Iterators are weakly consistent, returning elements @@ -81,133 +90,125 @@ import java.util.*; * @author Doug Lea * @param the type of elements held in this collection */ +public class ConcurrentLinkedQueue2 extends AbstractQueue + implements Queue, java.io.Serializable { + private static final long serialVersionUID = 196745693267521676L; -/* - * This is a modification of the Michael & Scott algorithm, - * adapted for a garbage-collected environment, with support for - * interior node deletion (to support remove(Object)). For - * explanation, read the paper. - * - * Note that like most non-blocking algorithms in this package, - * this implementation relies on the fact that in garbage - * collected systems, there is no possibility of ABA problems due - * to recycled nodes, so there is no need to use "counted - * pointers" or related techniques seen in versions used in - * non-GC'ed settings. - * - * The fundamental invariants are: - * - There is exactly one (last) Node with a null next reference, - * which is CASed when enqueueing. This last Node can be - * reached in O(1) time from tail, but tail is merely an - * optimization - it can always be reached in O(N) time from - * head as well. - * - The elements contained in the queue are the non-null items in - * Nodes that are reachable from head. CASing the item - * reference of a Node to null atomically removes it from the - * queue. Reachability of all elements from head must remain - * true even in the case of concurrent modifications that cause - * head to advance. A dequeued Node may remain in use - * indefinitely due to creation of an Iterator or simply a - * poll() that has lost its time slice. - * - * The above might appear to imply that all Nodes are GC-reachable - * from a predecessor dequeued Node. That would cause two problems: - * - allow a rogue Iterator to cause unbounded memory retention - * - cause cross-generational linking of old Nodes to new Nodes if - * a Node was tenured while live, which generational GCs have a - * hard time dealing with, causing repeated major collections. - * However, only non-deleted Nodes need to be reachable from - * dequeued Nodes, and reachability does not necessarily have to - * be of the kind understood by the GC. We use the trick of - * linking a Node that has just been dequeued to itself. Such a - * self-link implicitly means to advance to head. - * - * Both head and tail are permitted to lag. In fact, failing to - * update them every time one could is a significant optimization - * (fewer CASes). As with LinkedTransferQueue (see the internal - * documentation for that class), we use a slack threshold of two; - * that is, we update head/tail when the current pointer appears - * to be two or more steps away from the first/last node. - * - * Since head and tail are updated concurrently and independently, - * it is possible for tail to lag behind head (why not)? - * - * CASing a Node's item reference to null atomically removes the - * element from the queue. Iterators skip over Nodes with null - * items. Prior implementations of this class had a race between - * poll() and remove(Object) where the same element would appear - * to be successfully removed by two concurrent operations. The - * method remove(Object) also lazily unlinks deleted Nodes, but - * this is merely an optimization. - * - * When constructing a Node (before enqueuing it) we avoid paying - * for a volatile write to item by using Unsafe.putObject instead - * of a normal write. This allows the cost of enqueue to be - * "one-and-a-half" CASes. - * - * Both head and tail may or may not point to a Node with a - * non-null item. If the queue is empty, all items must of course - * be null. Upon creation, both head and tail refer to a dummy - * Node with null item. Both head and tail are only updated using - * CAS, so they never regress, although again this is merely an - * optimization. - */ -abstract class Item0 { - public volatile E item; -} - -abstract class Pad0 extends Item0{ - volatile long z0, z1, z2, z4, z5, z6 = 7L; -} - -abstract class Item1 extends Pad0 { - public volatile Node next; -} - -class Node extends Item1 { - volatile long z0, z1, z2, z4, z5, z6 = 7L; - - - /** - * Constructs a new node. Uses relaxed write because item can - * only be seen after publication via casNext. + /* + * This is a modification of the Michael & Scott algorithm, + * adapted for a garbage-collected environment, with support for + * interior node deletion (to support remove(Object)). For + * explanation, read the paper. + * + * Note that like most non-blocking algorithms in this package, + * this implementation relies on the fact that in garbage + * collected systems, there is no possibility of ABA problems due + * to recycled nodes, so there is no need to use "counted + * pointers" or related techniques seen in versions used in + * non-GC'ed settings. + * + * The fundamental invariants are: + * - There is exactly one (last) Node with a null next reference, + * which is CASed when enqueueing. This last Node can be + * reached in O(1) time from tail, but tail is merely an + * optimization - it can always be reached in O(N) time from + * head as well. + * - The elements contained in the queue are the non-null items in + * Nodes that are reachable from head. CASing the item + * reference of a Node to null atomically removes it from the + * queue. Reachability of all elements from head must remain + * true even in the case of concurrent modifications that cause + * head to advance. A dequeued Node may remain in use + * indefinitely due to creation of an Iterator or simply a + * poll() that has lost its time slice. + * + * The above might appear to imply that all Nodes are GC-reachable + * from a predecessor dequeued Node. That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to advance to head. + * + * Both head and tail are permitted to lag. In fact, failing to + * update them every time one could is a significant optimization + * (fewer CASes). As with LinkedTransferQueue (see the internal + * documentation for that class), we use a slack threshold of two; + * that is, we update head/tail when the current pointer appears + * to be two or more steps away from the first/last node. + * + * Since head and tail are updated concurrently and independently, + * it is possible for tail to lag behind head (why not)? + * + * CASing a Node's item reference to null atomically removes the + * element from the queue. Iterators skip over Nodes with null + * items. Prior implementations of this class had a race between + * poll() and remove(Object) where the same element would appear + * to be successfully removed by two concurrent operations. The + * method remove(Object) also lazily unlinks deleted Nodes, but + * this is merely an optimization. + * + * When constructing a Node (before enqueuing it) we avoid paying + * for a volatile write to item by using Unsafe.putObject instead + * of a normal write. This allows the cost of enqueue to be + * "one-and-a-half" CASes. + * + * Both head and tail may or may not point to a Node with a + * non-null item. If the queue is empty, all items must of course + * be null. Upon creation, both head and tail refer to a dummy + * Node with null item. Both head and tail are only updated using + * CAS, so they never regress, although again this is merely an + * optimization. */ - Node(E item) { - UNSAFE.putObject(this, itemOffset, item); - } - boolean casItem(E cmp, E val) { - return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); - } + private static class Node { + volatile E item; + volatile Node next; - void lazySetNext(Node val) { - UNSAFE.putOrderedObject(this, nextOffset, val); - } + /** + * Constructs a new node. Uses relaxed write because item can + * only be seen after publication via casNext. + */ + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } - boolean casNext(Node cmp, Node val) { - return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); - } + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } - // Unsafe mechanics + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } - private static final sun.misc.Unsafe UNSAFE; - private static final long itemOffset; - private static final long nextOffset; + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } - static { - try { - UNSAFE = UnsafeAccess.UNSAFE; + // Unsafe mechanics - itemOffset = UNSAFE.objectFieldOffset(Node.class.getField("item")); - nextOffset = UNSAFE.objectFieldOffset(Node.class.getField("next")); - } catch (Exception e) { - throw new Error(e); + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = Node.class; + itemOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("item")); + nextOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("next")); + } catch (Exception e) { + throw new Error(e); + } } } -} - -abstract class CLQItem0 extends AbstractQueue { /** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. @@ -220,15 +221,8 @@ abstract class CLQItem0 extends AbstractQueue { * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ - public transient volatile Node head; -} + private transient volatile Node head; - -abstract class CLQPad0 extends CLQItem0 { - volatile long z0, z1, z2, z4, z5, z6 = 7L; -} - -abstract class CLQItem1 extends CLQPad0 { /** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. @@ -241,21 +235,13 @@ abstract class CLQItem1 extends CLQPad0 { * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ - public volatile Node tail; -} - - -public class ConcurrentLinkedQueue2 extends CLQItem1 - implements Queue, java.io.Serializable { - private static final long serialVersionUID = 196745693267521676L; - - volatile long z0, z1, z2, z4, z5, z6 = 7L; + private transient volatile Node tail; /** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public ConcurrentLinkedQueue2() { - this.head = this.tail = new Node(null); + head = tail = new Node(null); } /** @@ -272,18 +258,17 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 for (E e : c) { checkNotNull(e); Node newNode = new Node(e); - if (h == null) { + if (h == null) h = t = newNode; - } else { + else { t.lazySetNext(newNode); t = newNode; } } - if (h == null) { + if (h == null) h = t = new Node(null); - } - this.head = h; - this.tail = t; + head = h; + tail = t; } // Have to override just to update the javadoc @@ -296,7 +281,6 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ - @Override public boolean add(E e) { return offer(e); } @@ -306,9 +290,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * as sentinel for succ(), below. */ final void updateHead(Node h, Node p) { - if (h != p && casHead(h, p)) { + if (h != p && casHead(h, p)) h.lazySetNext(h); - } } /** @@ -318,9 +301,15 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 */ final Node succ(Node p) { Node next = p.next; - return p == next ? this.head : next; + return (p == next) ? head : next; } + // can ONLY be touched by a single reader/writer (for add/offer/remove/poll/etc) + private final IdentityMap> quickLookup = new IdentityMap>(32); + + + + /** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. @@ -328,17 +317,11 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ - @Override public boolean offer(E e) { - offerNode(e); - return true; - } - - public Node offerNode(E e) { checkNotNull(e); final Node newNode = new Node(e); - for (Node t = this.tail, p = t;;) { + for (Node t = tail, p = t;;) { Node q = p.next; if (q == null) { // p is last node @@ -346,70 +329,66 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". - if (p != t) - { + if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. - } - return newNode; + + quickLookup.put(e, newNode); + return true; } // Lost CAS race to another thread; re-read next } - else if (p == q) { + else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. - p = t != (t = this.tail) ? t : this.head; - } else { + p = (t != (t = tail)) ? t : head; + else // Check for tail updates after two hops. - p = p != t && t != (t = this.tail) ? t : q; - } + p = (p != t && t != (t = tail)) ? t : q; } } - @Override public E poll() { restartFromHead: for (;;) { - for (Node h = this.head, p = h, q;;) { + for (Node h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. - if (p != h) { - updateHead(h, (q = p.next) != null ? q : p); - } + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + + quickLookup.remove(item); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } - else if (p == q) { + else if (p == q) continue restartFromHead; - } else { + else p = q; - } } } } - @Override public E peek() { restartFromHead: for (;;) { - for (Node h = this.head, p = h, q;;) { + for (Node h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } - else if (p == q) { + else if (p == q) continue restartFromHead; - } else { + else p = q; - } } } } @@ -425,17 +404,16 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 Node first() { restartFromHead: for (;;) { - for (Node h = this.head, p = h, q;;) { - boolean hasItem = p.item != null; + for (Node h = head, p = h, q;;) { + boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } - else if (p == q) { + else if (p == q) continue restartFromHead; - } else { + else p = q; - } } } } @@ -445,7 +423,6 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * * @return {@code true} if this queue contains no elements */ - @Override public boolean isEmpty() { return first() == null; } @@ -466,17 +443,13 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * * @return the number of elements in this queue */ - @Override public int size() { int count = 0; - for (Node p = first(); p != null; p = succ(p)) { - if (p.item != null) { + for (Node p = first(); p != null; p = succ(p)) + if (p.item != null) // Collection.size() spec says to max out - if (++count == Integer.MAX_VALUE) { + if (++count == Integer.MAX_VALUE) break; - } - } - } return count; } @@ -488,16 +461,12 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * @param o object to be checked for containment in this queue * @return {@code true} if this queue contains the specified element */ - @Override public boolean contains(Object o) { - if (o == null) { - return false; - } + if (o == null) return false; for (Node p = first(); p != null; p = succ(p)) { E item = p.item; - if (item != null && o.equals(item)) { + if (item != null && o.equals(item)) return true; - } } return false; } @@ -513,11 +482,9 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ - @Override public boolean remove(Object o) { - if (o == null) { - return false; - } + if (o == null) return false; + Node pred = null; for (Node p = first(); p != null; p = succ(p)) { E item = p.item; @@ -525,9 +492,10 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 o.equals(item) && p.casItem(item, null)) { Node next = succ(p); - if (pred != null && next != null) { + if (pred != null && next != null) pred.casNext(p, next); - } + + quickLookup.remove(o); return true; } pred = p; @@ -547,31 +515,28 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * of its elements are null * @throws IllegalArgumentException if the collection is this queue */ - @Override public boolean addAll(Collection c) { - if (c == this) { + if (c == this) // As historically specified in AbstractQueue#addAll throw new IllegalArgumentException(); - } // Copy c into a private chain of Nodes Node beginningOfTheEnd = null, last = null; for (E e : c) { checkNotNull(e); Node newNode = new Node(e); - if (beginningOfTheEnd == null) { + if (beginningOfTheEnd == null) beginningOfTheEnd = last = newNode; - } else { + else { last.lazySetNext(newNode); last = newNode; } } - if (beginningOfTheEnd == null) { + if (beginningOfTheEnd == null) return false; - } // Atomically append the chain at the tail of this collection - for (Node t = this.tail, p = t;;) { + for (Node t = tail, p = t;;) { Node q = p.next; if (q == null) { // p is last node @@ -581,25 +546,23 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 if (!casTail(t, last)) { // Try a little harder to update tail, // since we may be adding many elements. - t = this.tail; - if (last.next == null) { + t = tail; + if (last.next == null) casTail(t, last); - } } return true; } // Lost CAS race to another thread; re-read next } - else if (p == q) { + else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. - p = t != (t = this.tail) ? t : this.head; - } else { + p = (t != (t = tail)) ? t : head; + else // Check for tail updates after two hops. - p = p != t && t != (t = this.tail) ? t : q; - } + p = (p != t && t != (t = tail)) ? t : q; } } @@ -616,15 +579,13 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * * @return an array containing all of the elements in this queue */ - @Override public Object[] toArray() { // Use ArrayList to deal with resizing. ArrayList al = new ArrayList(); for (Node p = first(); p != null; p = succ(p)) { E item = p.item; - if (item != null) { + if (item != null) al.add(item); - } } return al.toArray(); } @@ -664,7 +625,6 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * this queue * @throws NullPointerException if the specified array is null */ - @Override @SuppressWarnings("unchecked") public T[] toArray(T[] a) { // try to use sent-in array @@ -672,14 +632,12 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 Node p; for (p = first(); p != null && k < a.length; p = succ(p)) { E item = p.item; - if (item != null) { + if (item != null) a[k++] = (T)item; - } } if (p == null) { - if (k < a.length) { + if (k < a.length) a[k] = null; - } return a; } @@ -687,9 +645,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 ArrayList al = new ArrayList(); for (Node q = first(); q != null; q = succ(q)) { E item = q.item; - if (item != null) { + if (item != null) al.add(item); - } } return al.toArray(a); } @@ -698,16 +655,11 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * Returns an iterator over the elements in this queue in proper sequence. * The elements will be returned in order from first (head) to last (tail). * - *

The returned iterator is a "weakly consistent" iterator that - * will never throw {@link java.util.ConcurrentModificationException - * ConcurrentModificationException}, and guarantees to traverse - * elements as they existed upon construction of the iterator, and - * may (but is not guaranteed to) reflect any modifications - * subsequent to construction. + *

The returned iterator is + * weakly consistent. * * @return an iterator over the elements in this queue in proper sequence */ - @Override public Iterator iterator() { return new Itr(); } @@ -740,73 +692,67 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * next(), or null if no such. */ private E advance() { - this.lastRet = this.nextNode; - E x = this.nextItem; + lastRet = nextNode; + E x = nextItem; Node pred, p; - if (this.nextNode == null) { + if (nextNode == null) { p = first(); pred = null; } else { - pred = this.nextNode; - p = succ(this.nextNode); + pred = nextNode; + p = succ(nextNode); } for (;;) { if (p == null) { - this.nextNode = null; - this.nextItem = null; + nextNode = null; + nextItem = null; return x; } E item = p.item; if (item != null) { - this.nextNode = p; - this.nextItem = item; + nextNode = p; + nextItem = item; return x; } else { // skip over nulls Node next = succ(p); - if (pred != null && next != null) { + if (pred != null && next != null) pred.casNext(p, next); - } p = next; } } } - @Override public boolean hasNext() { - return this.nextNode != null; + return nextNode != null; } - @Override public E next() { - if (this.nextNode == null) { - throw new NoSuchElementException(); - } + if (nextNode == null) throw new NoSuchElementException(); return advance(); } - @Override public void remove() { - Node l = this.lastRet; - if (l == null) { - throw new IllegalStateException(); - } + Node l = lastRet; + if (l == null) throw new IllegalStateException(); // rely on a future traversal to relink. l.item = null; - this.lastRet = null; + lastRet = null; } } /** * Saves this queue to a stream (that is, serializes it). * + * @param s the stream + * @throws java.io.IOException if an I/O error occurs * @serialData All of the elements (each an {@code E}) in * the proper order, followed by a null */ private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { + throws java.io.IOException { // Write out any hidden stuff s.defaultWriteObject(); @@ -814,9 +760,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 // Write out all elements in the proper order. for (Node p = first(); p != null; p = succ(p)) { Object item = p.item; - if (item != null) { + if (item != null) s.writeObject(item); - } } // Use trailing null as sentinel @@ -825,9 +770,13 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 /** * Reconstitutes this queue from a stream (that is, deserializes it). + * @param s the stream + * @throws ClassNotFoundException if the class of a serialized object + * could not be found + * @throws java.io.IOException if an I/O error occurs */ private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { + throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); // Read in elements until trailing null sentinel found @@ -836,18 +785,124 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 while ((item = s.readObject()) != null) { @SuppressWarnings("unchecked") Node newNode = new Node((E) item); - if (h == null) { + if (h == null) h = t = newNode; - } else { + else { t.lazySetNext(newNode); t = newNode; } } - if (h == null) { + if (h == null) h = t = new Node(null); + head = h; + tail = t; + } + + /** A customized variant of Spliterators.IteratorSpliterator */ + static final class CLQSpliterator implements Spliterator { + static final int MAX_BATCH = 1 << 25; // max batch array size; + final ConcurrentLinkedQueue2 queue; + Node current; // current node; null until initialized + int batch; // batch size for splits + boolean exhausted; // true when no more nodes + CLQSpliterator(ConcurrentLinkedQueue2 queue) { + this.queue = queue; } - this.head = h; - this.tail = t; + + public Spliterator trySplit() { + Node p; + final ConcurrentLinkedQueue2 q = this.queue; + int b = batch; + int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; + if (!exhausted && + ((p = current) != null || (p = q.first()) != null) && + p.next != null) { + Object[] a = new Object[n]; + int i = 0; + do { + if ((a[i] = p.item) != null) + ++i; + if (p == (p = p.next)) + p = q.first(); + } while (p != null && i < n); + if ((current = p) == null) + exhausted = true; + if (i > 0) { + batch = i; + return Spliterators.spliterator + (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | + Spliterator.CONCURRENT); + } + } + return null; + } + + public void forEachRemaining(Consumer action) { + Node p; + if (action == null) throw new NullPointerException(); + final ConcurrentLinkedQueue2 q = this.queue; + if (!exhausted && + ((p = current) != null || (p = q.first()) != null)) { + exhausted = true; + do { + E e = p.item; + if (p == (p = p.next)) + p = q.first(); + if (e != null) + action.accept(e); + } while (p != null); + } + } + + public boolean tryAdvance(Consumer action) { + Node p; + if (action == null) throw new NullPointerException(); + final ConcurrentLinkedQueue2 q = this.queue; + if (!exhausted && + ((p = current) != null || (p = q.first()) != null)) { + E e; + do { + e = p.item; + if (p == (p = p.next)) + p = q.first(); + } while (e == null && p != null); + if ((current = p) == null) + exhausted = true; + if (e != null) { + action.accept(e); + return true; + } + } + return false; + } + + public long estimateSize() { return Long.MAX_VALUE; } + + public int characteristics() { + return Spliterator.ORDERED | Spliterator.NONNULL | + Spliterator.CONCURRENT; + } + } + + /** + * Returns a {@link Spliterator} over the elements in this queue. + * + *

The returned spliterator is + * weakly consistent. + * + *

The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, + * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. + * + * @implNote + * The {@code Spliterator} implements {@code trySplit} to permit limited + * parallelism. + * + * @return a {@code Spliterator} over the elements in this queue + * @since 1.8 + */ + @Override + public Spliterator spliterator() { + return new CLQSpliterator(this); } /** @@ -856,9 +911,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 * @param v the element */ private static void checkNotNull(Object v) { - if (v == null) { + if (v == null) throw new NullPointerException(); - } } private boolean casTail(Node cmp, Node val) { @@ -876,11 +930,12 @@ public class ConcurrentLinkedQueue2 extends CLQItem1 private static final long tailOffset; static { try { - UNSAFE = UnsafeAccess.UNSAFE; - Class k = ConcurrentLinkedQueue2.class; - - headOffset = UNSAFE.objectFieldOffset(k.getField("head")); - tailOffset = UNSAFE.objectFieldOffset(k.getField("tail")); + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = ConcurrentLinkedQueue.class; + headOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("head")); + tailOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } diff --git a/src/dorkbox/util/messagebus/common/thread/ConcurrentSet.java b/src/dorkbox/util/messagebus/common/thread/ConcurrentSet.java deleted file mode 100644 index 865d2e3..0000000 --- a/src/dorkbox/util/messagebus/common/thread/ConcurrentSet.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.thread; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -/** - * This data structure is optimized for non-blocking reads even when write operations occur. - * Running read iterators will not be affected by add operations since writes always insert at the head of the - * structure. Remove operations can affect any running iterator such that a removed element that has not yet - * been reached by the iterator will not appear in that iterator anymore. - */ -public -class ConcurrentSet extends ConcurrentLinkedQueue2 { - private static final long serialVersionUID = -2729855178402529784L; - - private static final AtomicLong id = new AtomicLong(); - private final transient long ID = id.getAndIncrement(); - - - private final Node IN_PROGRESS_MARKER = new Node(null); - private ConcurrentMap> entries; - - public - ConcurrentSet() { - this(16, 0.75f, Runtime.getRuntime().availableProcessors()); - } - - public - ConcurrentSet(int size, float loadFactor, int stripeSize) { - super(); - this.entries = new ConcurrentHashMap<>(size, loadFactor, stripeSize); - } - - @Override - public - boolean add(T element) { - if (element == null) { - return false; - } - - // had to modify the super implementation so we publish Node back - Node alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER); - if (alreadyPresent == null) { - // this doesn't already exist - Node offerNode = super.offerNode(element); - - this.entries.put(element, offerNode); - return true; - } - - return false; - } - - @Override - public - boolean contains(Object element) { - if (element == null) { - return false; - } - - Node node; - while ((node = this.entries.get(element)) == this.IN_PROGRESS_MARKER) { - ; // data race - } - - if (node == null) { - return false; - } - - return node.item != null; - } - - @Override - public - int size() { - return this.entries.size(); - } - - @Override - public - boolean isEmpty() { - return super.isEmpty(); - } - - /** - * @return TRUE if the element was successfully removed - */ - @Override - public - boolean remove(Object element) { - while (this.entries.get(element) == this.IN_PROGRESS_MARKER) { - ; // data race - } - - Node node = this.entries.remove(element); - if (node == null) { - return false; - } - - Node pred = null; - for (Node p = this.head; p != null; p = succ(p)) { - T item = p.item; - if (item != null && - element.equals(item) && - p.casItem(item, null)) { - Node next = succ(p); - if (pred != null && next != null) { - pred.casNext(p, next); - } - return true; - } - pred = p; - } - return false; - } - - @Override - public - Iterator iterator() { - return new Itr2(); - } - - private - class Itr2 implements Iterator { - /** - * Next node to return item for. - */ - private Node nextNode; - - /** - * Node of the last returned item, to support remove. - */ - private Node lastRet; - - /** - * nextItem holds on to item fields because once we claim - * that an element exists in hasNext(), we must return it in - * the following next() call even if it was in the process of - * being removed when hasNext() was called. - */ - private T nextItem; - - Itr2() { - advance(); - } - - /** - * Moves to next valid node and returns item to return for - * next(), or null if no such. - */ - private - T advance() { - this.lastRet = this.nextNode; // for removing items via iterator - T nextItem = this.nextItem; - - Node pred, p; - if (this.nextNode == null) { - p = first(); - pred = null; - } - else { - pred = this.nextNode; - p = succ(this.nextNode); - } - - for (; ; ) { - if (p == null) { - this.nextNode = null; - this.nextItem = null; - return nextItem; - } - - T item = p.item; - if (item != null) { - this.nextNode = p; - this.nextItem = item; - return nextItem; - } - else { - // skip over nulls - Node next = succ(p); - if (pred != null && next != null) { - pred.casNext(p, next); - } - p = next; - } - } - } - - - - @Override - public - boolean hasNext() { - return this.nextNode != null; - } - - @Override - public - T next() { - if (this.nextNode == null) { - throw new NoSuchElementException(); - } - return advance(); - } - - @Override - public - void remove() { - Node l = this.lastRet; - if (l == null) { - throw new IllegalStateException(); - } - - T value = l.item; - - if (value != null) { - Map> entries2 = ConcurrentSet.this.entries; - while (entries2.get(value) == ConcurrentSet.this.IN_PROGRESS_MARKER) { - ; // data race - } - - entries2.remove(value); - - // rely on a future traversal to relink. - l.item = null; - this.lastRet = null; - } - } - } - - @Override - public - Object[] toArray() { - return this.entries.keySet().toArray(); - } - - @Override - public - T[] toArray(T[] a) { - return this.entries.keySet().toArray(a); - } - - @Override - public - boolean containsAll(Collection c) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public - boolean removeAll(Collection c) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public - boolean retainAll(Collection c) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public - void clear() { - super.clear(); - } - - @Override - public - int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (this.ID ^ this.ID >>> 32); - return result; - } - - @Override - public - boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - ConcurrentSet other = (ConcurrentSet) obj; - if (this.ID != other.ID) { - return false; - } - return true; - } -} diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java index 1bca471..342101e 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/dorkbox/util/messagebus/subscription/Subscription.java @@ -37,28 +37,27 @@ */ package dorkbox.util.messagebus.subscription; +import com.esotericsoftware.kryo.util.IdentityMap; import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.dispatch.IHandlerInvocation; import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation; import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** - * A subscription is a thread-safe container that manages exactly one message handler of all registered - * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class - * will be referenced in the subscription created for SingleMessageHandler.class. + * A subscription is a container that manages exactly one message handler of all registered + * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message + * will be referenced in the subscription created for a message. *

* There will be as many unique subscription objects per message listener class as there are message handlers * defined in the message listeners class hierarchy. *

- * The subscription provides functionality for message publication by means of delegation to the respective - * message dispatcher. + * This class uses the "single writer principle", so that the subscription are only MODIFIED by a single thread, + * but are READ by X number of threads (in a safe way). This uses object thread visibility/publication to work. * * @author bennidi * @author dorkbox, llc @@ -66,37 +65,32 @@ import java.util.concurrent.atomic.AtomicInteger; */ public final class Subscription { + private static final int GROW_SIZE = 8; + private static final AtomicInteger ID_COUNTER = new AtomicInteger(); public final int ID = ID_COUNTER.getAndIncrement(); // the handler's metadata -> for each handler in a listener, a unique subscription context is created private final MessageHandler handler; - private final IHandlerInvocation invocation; - private final Collection listeners; + + // NOTE: this is still inside the single-writer! can use the same techniques as subscription manager (for thread safe publication) + private int firstFreeSpot = 0; // only touched by a single thread + private volatile Object[] listeners = new Object[GROW_SIZE]; // only modified by a single thread + + private final IdentityMap listenerMap = new IdentityMap<>(GROW_SIZE); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater listenersREF = + AtomicReferenceFieldUpdater.newUpdater(Subscription.class, + Object[].class, + "listeners"); + public Subscription(final MessageHandler handler) { this.handler = handler; - -// this.listeners = Collections.newSetFromMap(new ConcurrentHashMap(8)); // really bad performance -// this.listeners = new StrongConcurrentSetV8(16, 0.7F, 8); - - ///this is by far, the fastest - this.listeners = new ConcurrentSkipListSet<>(new Comparator() { - @Override - public - int compare(final Object o1, final Object o2) { - return Integer.compare(o1.hashCode(), o2.hashCode()); -// return 0; - } - }); -// this.listeners = new StrongConcurrentSet(16, 0.85F); -// this.listeners = new ConcurrentLinkedQueue2(); -// this.listeners = new CopyOnWriteArrayList(); -// this.listeners = new CopyOnWriteArraySet(); // not very good - IHandlerInvocation invocation = new ReflectiveHandlerInvocation(); if (handler.isSynchronized()) { invocation = new SynchronizedHandlerInvocation(invocation); @@ -111,27 +105,82 @@ class Subscription { } public - boolean isEmpty() { - return this.listeners.isEmpty(); - } + void subscribe(final Object listener) { + // single writer principle! - public - void subscribe(Object listener) { - this.listeners.add(listener); + Object[] localListeners = listenersREF.get(this); + + final int length = localListeners.length; + int spotToPlace = firstFreeSpot; + + while (true) { + if (spotToPlace >= length) { + // if we couldn't find a place to put the listener, grow the array, but it is never shrunk + localListeners = Arrays.copyOf(localListeners, length + GROW_SIZE, Object[].class); + break; + } + + if (localListeners[spotToPlace] == null) { + break; + } + spotToPlace++; + } + + listenerMap.put(listener, spotToPlace); + localListeners[spotToPlace] = listener; + + // mark this spot as taken, so the next subscribe starts out a little ahead + firstFreeSpot = spotToPlace + 1; + listenersREF.lazySet(this, localListeners); } /** * @return TRUE if the element was removed */ public - boolean unsubscribe(Object existingListener) { - return this.listeners.remove(existingListener); + boolean unsubscribe(final Object listener) { + // single writer principle! + + final Integer integer = listenerMap.remove(listener); + Object[] localListeners = listenersREF.get(this); + + if (integer != null) { + final int index = integer; + firstFreeSpot = index; + localListeners[index] = null; + listenersREF.lazySet(this, localListeners); + return true; + } + else { + for (int i = 0; i < localListeners.length; i++) { + if (localListeners[i] == listener) { + firstFreeSpot = i; + localListeners[i] = null; + listenersREF.lazySet(this, localListeners); + return true; + } + } + } + + firstFreeSpot = 0; + return false; } - // only used in unit-test + /** + * only used in unit tests + */ public int size() { - return this.listeners.size(); + // since this is ONLY used in unit tests, we count how many are non-null + + int count = 0; + for (int i = 0; i < listeners.length; i++) { + if (listeners[i] != null) { + count++; + } + } + + return count; } public @@ -140,62 +189,62 @@ class Subscription { final int handleIndex = this.handler.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; - Iterator iterator; Object listener; - - for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { - listener = iterator.next(); - - invocation.invoke(listener, handler, handleIndex, message); + final Object[] localListeners = listenersREF.get(this); + for (int i = 0; i < localListeners.length; i++) { + listener = localListeners[i]; + if (listener != null) { + invocation.invoke(listener, handler, handleIndex, message); + } } } public void publish(final Object message1, final Object message2) throws Throwable { - final MethodAccess handler = this.handler.getHandler(); - final int handleIndex = this.handler.getMethodIndex(); - final IHandlerInvocation invocation = this.invocation; - - Iterator iterator; - Object listener; - - for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { - listener = iterator.next(); - - invocation.invoke(listener, handler, handleIndex, message1, message2); - } +// final MethodAccess handler = this.handler.getHandler(); +// final int handleIndex = this.handler.getMethodIndex(); +// final IHandlerInvocation invocation = this.invocation; +// +// Iterator iterator; +// Object listener; +// +// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { +// listener = iterator.next(); +// +// invocation.invoke(listener, handler, handleIndex, message1, message2); +// } } public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { - final MethodAccess handler = this.handler.getHandler(); - final int handleIndex = this.handler.getMethodIndex(); - final IHandlerInvocation invocation = this.invocation; - - Iterator iterator; - Object listener; - - for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { - listener = iterator.next(); - - invocation.invoke(listener, handler, handleIndex, message1, message2, message3); - } +// final MethodAccess handler = this.handler.getHandler(); +// final int handleIndex = this.handler.getMethodIndex(); +// final IHandlerInvocation invocation = this.invocation; +// +// Iterator iterator; +// Object listener; +// +// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { +// listener = iterator.next(); +// +// invocation.invoke(listener, handler, handleIndex, message1, message2, message3); +// } } public void publish(final Object... messages) throws Throwable { - final MethodAccess handler = this.handler.getHandler(); - final int handleIndex = this.handler.getMethodIndex(); - final IHandlerInvocation invocation = this.invocation; - - Iterator iterator; - Object listener; - - for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { - listener = iterator.next(); - - invocation.invoke(listener, handler, handleIndex, messages); - } +// final MethodAccess handler = this.handler.getHandler(); +// final int handleIndex = this.handler.getMethodIndex(); +// final IHandlerInvocation invocation = this.invocation; +// +// Iterator iterator; +// Object listener; +// +// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) { +// listener = iterator.next(); +// +// invocation.invoke(listener, handler, handleIndex, messages); +// } } @@ -207,7 +256,7 @@ class Subscription { @Override public - boolean equals(Object obj) { + boolean equals(final Object obj) { if (this == obj) { return true; } diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index 04b7b07..25f3c82 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -24,6 +24,7 @@ import dorkbox.util.messagebus.utils.SubscriptionUtils; import dorkbox.util.messagebus.utils.VarArgUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -90,23 +91,23 @@ class SubscriptionManager { private final ClassUtils classUtils; - // Recommended for best performance while adhering to the "single writer principle" - private final AtomicReferenceFieldUpdater subsSingleREF = + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater subsSingleREF = AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class, - IdentityMap.class, - "subsSingle"); + IdentityMap.class, + "subsSingle"); - private final AtomicReferenceFieldUpdater subsSuperSingleREF = + private static final AtomicReferenceFieldUpdater subsSuperSingleREF = AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class, IdentityMap.class, "subsSuperSingle"); - private final AtomicReferenceFieldUpdater subsVaritySingleREF = + private static final AtomicReferenceFieldUpdater subsVaritySingleREF = AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class, IdentityMap.class, "subsVaritySingle"); - private final AtomicReferenceFieldUpdater subsSuperVaritySingleREF = + private static final AtomicReferenceFieldUpdater subsSuperVaritySingleREF = AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class, IdentityMap.class, "subsSuperVaritySingle"); @@ -255,8 +256,9 @@ class SubscriptionManager { // now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions + // activates this sub for sub/unsub (only used by the subscription writer thread) + subsPerListener.put(listenerClass, subscriptions); - subsPerListener.put(listenerClass, subscriptions); // activates this sub for sub/unsub // we can now safely add for publication AND subscribe since the data structures are consistent for (int i = 0; i < handlersSize; i++) { @@ -275,10 +277,9 @@ class SubscriptionManager { final Subscription[] currentSubs = localSubs.get(handlerType); final int currentLength = currentSubs.length; - // add the new subscription to the beginning of the array - final Subscription[] newSubs = new Subscription[currentLength + 1]; - newSubs[0] = subscription; - System.arraycopy(currentSubs, 0, newSubs, 1, currentLength); + // add the new subscription to the array + final Subscription[] newSubs = Arrays.copyOf(currentSubs, currentLength + 1, Subscription[].class); + newSubs[currentLength] = subscription; localSubs.put(handlerType, newSubs); // update the varity/super types @@ -328,32 +329,29 @@ class SubscriptionManager { final IdentityMap, Subscription[]> subsPerSuperMessageSingle, final IdentityMap, Subscription[]> subsPerVarityMessageSingle) { - final Class arrayVersion = this.classUtils.getArrayClass(clazz); // never returns null, cached response +// final Class arrayVersion = this.classUtils.getArrayClass(clazz); // never returns null, cached response final Class[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response Subscription sub; - - // Register Varity (Var-Arg) subscriptions - final Subscription[] arraySubs = subsPerMessageSingle.get(arrayVersion); - if (arraySubs != null) { - final int length = arraySubs.length; - final ArrayList varArgSubsAsList = new ArrayList(length); - - for (int i = 0; i < length; i++) { - sub = arraySubs[i]; - - if (sub.getHandler().acceptsVarArgs()) { - varArgSubsAsList.add(sub); - } - } - - final int size = varArgSubsAsList.size(); - if (size > 0) { - Subscription[] varArgSubs = new Subscription[size]; - varArgSubsAsList.toArray(varArgSubs); - subsPerVarityMessageSingle.put(clazz, varArgSubs); - } - } +// +// // Register Varity (Var-Arg) subscriptions +// final Subscription[] arraySubs = subsPerMessageSingle.get(arrayVersion); +// if (arraySubs != null) { +// final int length = arraySubs.length; +// final ArrayList varArgSubsAsList = new ArrayList(length); +// +// for (int i = 0; i < length; i++) { +// sub = arraySubs[i]; +// +// if (sub.getHandler().acceptsVarArgs()) { +// varArgSubsAsList.add(sub); +// } +// } +// +// if (!varArgSubsAsList.isEmpty()) { +// subsPerVarityMessageSingle.put(clazz, varArgSubsAsList.toArray(new Subscription[0])); +// } +// } @@ -378,16 +376,10 @@ class SubscriptionManager { } } - final int size = subsAsList.size(); - if (size > 0) { + if (!subsAsList.isEmpty()) { // save the subscriptions - Subscription[] subs = new Subscription[size]; - subsAsList.toArray(subs); - subsPerSuperMessageSingle.put(clazz, subs); + subsPerSuperMessageSingle.put(clazz, subsAsList.toArray(new Subscription[0])); } - - - } @@ -519,10 +511,7 @@ class SubscriptionManager { final ArrayList collection = getExactAsArray(messageClass1, messageClass2); if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; + return collection.toArray(new Subscription[0]); } return null; @@ -535,10 +524,7 @@ class SubscriptionManager { final ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3); if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; + return collection.toArray(new Subscription[0]); } return null; @@ -592,9 +578,7 @@ class SubscriptionManager { } if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; + return collection.toArray(new Subscription[0]); } else { return null; @@ -623,9 +607,7 @@ class SubscriptionManager { } if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; + return collection.toArray(new Subscription[0]); } else { return null; diff --git a/src/dorkbox/util/messagebus/utils/ReflectionUtils.java b/src/dorkbox/util/messagebus/utils/ReflectionUtils.java index 0df44f4..ca50192 100644 --- a/src/dorkbox/util/messagebus/utils/ReflectionUtils.java +++ b/src/dorkbox/util/messagebus/utils/ReflectionUtils.java @@ -65,9 +65,7 @@ class ReflectionUtils { ArrayList methods = new ArrayList(); getMethods(target, methods); - final Method[] array = new Method[methods.size()]; - methods.toArray(array); - return array; + return methods.toArray(new Method[0]); } private static @@ -124,9 +122,7 @@ class ReflectionUtils { collectInterfaces(from, superclasses); } - final Class[] classes = new Class[superclasses.size()]; - superclasses.toArray(classes); - return classes; + return superclasses.toArray(new Class[0]); } public static diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java index 83ca194..4bca985 100644 --- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java +++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java @@ -86,7 +86,7 @@ class VarArgUtils { } } - varArgSubs = new Subscription[varArgSubsAsList.size()]; + varArgSubs = new Subscription[0]; varArgSubsAsList.toArray(varArgSubs); local.put(messageClass, varArgSubs); diff --git a/test/dorkbox/util/messagebus/MultiMessageTest.java b/test/dorkbox/util/messagebus/MultiMessageTest.java index 58690b1..ea39294 100644 --- a/test/dorkbox/util/messagebus/MultiMessageTest.java +++ b/test/dorkbox/util/messagebus/MultiMessageTest.java @@ -25,8 +25,23 @@ public class MultiMessageTest extends MessageBusTest { MultiListener listener1 = new MultiListener(); bus.subscribe(listener1); + bus.subscribe(listener1); + bus.subscribe(listener1); + bus.subscribe(listener1); + bus.subscribe(listener1); + bus.subscribe(listener1); + bus.subscribe(listener1); + bus.subscribe(listener1); + bus.subscribe(listener1); + + bus.unsubscribe(listener1); + bus.unsubscribe(listener1); + bus.unsubscribe(listener1); bus.unsubscribe(listener1); + bus.publish("s"); + bus.publish("s"); + bus.publish("s"); bus.publish("s"); bus.publish("s", "s"); bus.publish("s", "s", "s"); @@ -84,40 +99,40 @@ public class MultiMessageTest extends MessageBusTest { System.err.println("match String"); } - @Handler - public void handleSync(String o1, String o2) { - count.getAndIncrement(); - System.err.println("match String, String"); - } - - @Handler - public void handleSync(String o1, String o2, String o3) { - count.getAndIncrement(); - System.err.println("match String, String, String"); - } - - @Handler - public void handleSync(Integer o1, Integer o2, String o3) { - count.getAndIncrement(); - System.err.println("match Integer, Integer, String"); - } - - @Handler(acceptVarargs = true) - public void handleSync(String... o) { - count.getAndIncrement(); - System.err.println("match String[]"); - } - - @Handler - public void handleSync(Integer... o) { - count.getAndIncrement(); - System.err.println("match Integer[]"); - } - - @Handler(acceptVarargs = true) - public void handleSync(Object... o) { - count.getAndIncrement(); - System.err.println("match Object[]"); - } +// @Handler +// public void handleSync(String o1, String o2) { +// count.getAndIncrement(); +// System.err.println("match String, String"); +// } +// +// @Handler +// public void handleSync(String o1, String o2, String o3) { +// count.getAndIncrement(); +// System.err.println("match String, String, String"); +// } +// +// @Handler +// public void handleSync(Integer o1, Integer o2, String o3) { +// count.getAndIncrement(); +// System.err.println("match Integer, Integer, String"); +// } +// +// @Handler(acceptVarargs = true) +// public void handleSync(String... o) { +// count.getAndIncrement(); +// System.err.println("match String[]"); +// } +// +// @Handler +// public void handleSync(Integer... o) { +// count.getAndIncrement(); +// System.err.println("match Integer[]"); +// } +// +// @Handler(acceptVarargs = true) +// public void handleSync(Object... o) { +// count.getAndIncrement(); +// System.err.println("match Object[]"); +// } } } diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java index 2dea0d8..d9c0d44 100644 --- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -61,15 +61,22 @@ public class SubscriptionManagerTest extends AssertSupport { @Test public void testIMessageListener() { - ListenerFactory listeners = listeners(IMessageListener.DefaultListener.class, IMessageListener.DisabledListener.class, - IMessageListener.NoSubtypesListener.class); + ListenerFactory listeners = listeners(IMessageListener.DefaultListener.class +// , +// IMessageListener.DisabledListener.class, +// IMessageListener.NoSubtypesListener.class + ); - SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(IMessageListener.DefaultListener.class) - .handles(IMessage.class, AbstractMessage.class, - IMultipartMessage.class, - StandardMessage.class, - MessageTypes.class).listener( - IMessageListener.NoSubtypesListener.class).handles(IMessage.class); + SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners); + expectedSubscriptions.listener(IMessageListener.DefaultListener.class) + .handles(IMessage.class, + AbstractMessage.class, + IMultipartMessage.class, + StandardMessage.class, + MessageTypes.class); + +// expectedSubscriptions.listener(IMessageListener.NoSubtypesListener.class) +// .handles(IMessage.class); runTestWith(listeners, expectedSubscriptions); } diff --git a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java index ccf3bcd..0a896d1 100644 --- a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java +++ b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java @@ -55,6 +55,7 @@ public class SubscriptionValidator extends AssertSupport { private SubscriptionValidator expect(Class subscriber, Class messageType) { this.validations.add(new ValidationEntry(messageType, subscriber)); this.messageTypes.add(messageType); + return this; } @@ -126,8 +127,11 @@ public class SubscriptionValidator extends AssertSupport { - public class Expectation { + + + + public class Expectation { private Class listener; private Expectation(Class listener) { @@ -138,24 +142,19 @@ public class SubscriptionValidator extends AssertSupport { for (Class message : messages) { expect(this.listener, message); } + return SubscriptionValidator.this; } } private class ValidationEntry { - - private Class subscriber; - private Class messageType; private ValidationEntry(Class messageType, Class subscriber) { this.messageType = messageType; this.subscriber = subscriber; } - - } - } diff --git a/test/dorkbox/util/messagebus/listeners/IMessageListener.java b/test/dorkbox/util/messagebus/listeners/IMessageListener.java index b088420..5cafedd 100644 --- a/test/dorkbox/util/messagebus/listeners/IMessageListener.java +++ b/test/dorkbox/util/messagebus/listeners/IMessageListener.java @@ -33,22 +33,21 @@ import dorkbox.util.messagebus.messages.IMessage; public class IMessageListener { private static abstract class BaseListener { - @Handler public void handle(IMessage message){ message.handled(this.getClass()); } - } - public static class DefaultListener extends BaseListener { + public static class DefaultListener extends BaseListener { @Override public void handle(IMessage message){ super.handle(message); } } + public static class NoSubtypesListener extends BaseListener { @Override @@ -60,7 +59,6 @@ public class IMessageListener { public static class DisabledListener extends BaseListener { - @Override @Handler(enabled = false) public void handle(IMessage message){ @@ -68,6 +66,4 @@ public class IMessageListener { } } - - } diff --git a/test/dorkbox/util/messagebus/PerfTest_Collections.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_Collections.java similarity index 97% rename from test/dorkbox/util/messagebus/PerfTest_Collections.java rename to test/dorkbox/util/messagebus/queuePerf/PerfTest_Collections.java index 3f7a601..11ca81e 100644 --- a/test/dorkbox/util/messagebus/PerfTest_Collections.java +++ b/test/dorkbox/util/messagebus/queuePerf/PerfTest_Collections.java @@ -13,18 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dorkbox.util.messagebus; +package dorkbox.util.messagebus.queuePerf; import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.StrongConcurrentSet; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.thread.ConcurrentLinkedQueue2; -import dorkbox.util.messagebus.common.thread.ConcurrentSet; import dorkbox.util.messagebus.subscription.Subscription; import java.lang.ref.WeakReference; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedTransferQueue; @@ -64,7 +69,6 @@ class PerfTest_Collections { System.err.println("Done"); bench(size, new ArrayList(size * 2)); - bench(size, new ConcurrentSet(size * 2, LOAD_FACTOR, 5)); bench(size, new ConcurrentLinkedQueue2()); bench(size, new ConcurrentLinkedQueue()); bench(size, new LinkedTransferQueue());