diff --git a/src/dorkbox/util/messagebus/common/MessageHandler.java b/src/dorkbox/util/messagebus/common/MessageHandler.java
index d8fdfa8..092abe9 100644
--- a/src/dorkbox/util/messagebus/common/MessageHandler.java
+++ b/src/dorkbox/util/messagebus/common/MessageHandler.java
@@ -101,9 +101,7 @@ class MessageHandler {
}
}
- final MessageHandler[] messageHandlers = new MessageHandler[finalMethods.size()];
- finalMethods.toArray(messageHandlers);
- return messageHandlers;
+ return finalMethods.toArray(new MessageHandler[0]);
}
private final MethodAccess handler;
diff --git a/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java b/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java
index 9c2c799..5b65131 100644
--- a/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java
+++ b/src/dorkbox/util/messagebus/common/thread/ConcurrentLinkedQueue2.java
@@ -20,9 +20,18 @@
package dorkbox.util.messagebus.common.thread;
-import org.jctools.util.UnsafeAccess;
+import com.esotericsoftware.kryo.util.IdentityMap;
-import java.util.*;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
/**
* An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
@@ -39,9 +48,9 @@ import java.util.*;
* does not permit the use of {@code null} elements.
*
*
This implementation employs an efficient non-blocking
- * algorithm based on one described in
- *
- * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
+ * algorithm based on one described in Simple,
+ * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
* Algorithms by Maged M. Michael and Michael L. Scott.
*
*
Iterators are weakly consistent , returning elements
@@ -81,133 +90,125 @@ import java.util.*;
* @author Doug Lea
* @param the type of elements held in this collection
*/
+public class ConcurrentLinkedQueue2 extends AbstractQueue
+ implements Queue, java.io.Serializable {
+ private static final long serialVersionUID = 196745693267521676L;
-/*
- * This is a modification of the Michael & Scott algorithm,
- * adapted for a garbage-collected environment, with support for
- * interior node deletion (to support remove(Object)). For
- * explanation, read the paper.
- *
- * Note that like most non-blocking algorithms in this package,
- * this implementation relies on the fact that in garbage
- * collected systems, there is no possibility of ABA problems due
- * to recycled nodes, so there is no need to use "counted
- * pointers" or related techniques seen in versions used in
- * non-GC'ed settings.
- *
- * The fundamental invariants are:
- * - There is exactly one (last) Node with a null next reference,
- * which is CASed when enqueueing. This last Node can be
- * reached in O(1) time from tail, but tail is merely an
- * optimization - it can always be reached in O(N) time from
- * head as well.
- * - The elements contained in the queue are the non-null items in
- * Nodes that are reachable from head. CASing the item
- * reference of a Node to null atomically removes it from the
- * queue. Reachability of all elements from head must remain
- * true even in the case of concurrent modifications that cause
- * head to advance. A dequeued Node may remain in use
- * indefinitely due to creation of an Iterator or simply a
- * poll() that has lost its time slice.
- *
- * The above might appear to imply that all Nodes are GC-reachable
- * from a predecessor dequeued Node. That would cause two problems:
- * - allow a rogue Iterator to cause unbounded memory retention
- * - cause cross-generational linking of old Nodes to new Nodes if
- * a Node was tenured while live, which generational GCs have a
- * hard time dealing with, causing repeated major collections.
- * However, only non-deleted Nodes need to be reachable from
- * dequeued Nodes, and reachability does not necessarily have to
- * be of the kind understood by the GC. We use the trick of
- * linking a Node that has just been dequeued to itself. Such a
- * self-link implicitly means to advance to head.
- *
- * Both head and tail are permitted to lag. In fact, failing to
- * update them every time one could is a significant optimization
- * (fewer CASes). As with LinkedTransferQueue (see the internal
- * documentation for that class), we use a slack threshold of two;
- * that is, we update head/tail when the current pointer appears
- * to be two or more steps away from the first/last node.
- *
- * Since head and tail are updated concurrently and independently,
- * it is possible for tail to lag behind head (why not)?
- *
- * CASing a Node's item reference to null atomically removes the
- * element from the queue. Iterators skip over Nodes with null
- * items. Prior implementations of this class had a race between
- * poll() and remove(Object) where the same element would appear
- * to be successfully removed by two concurrent operations. The
- * method remove(Object) also lazily unlinks deleted Nodes, but
- * this is merely an optimization.
- *
- * When constructing a Node (before enqueuing it) we avoid paying
- * for a volatile write to item by using Unsafe.putObject instead
- * of a normal write. This allows the cost of enqueue to be
- * "one-and-a-half" CASes.
- *
- * Both head and tail may or may not point to a Node with a
- * non-null item. If the queue is empty, all items must of course
- * be null. Upon creation, both head and tail refer to a dummy
- * Node with null item. Both head and tail are only updated using
- * CAS, so they never regress, although again this is merely an
- * optimization.
- */
-abstract class Item0 {
- public volatile E item;
-}
-
-abstract class Pad0 extends Item0{
- volatile long z0, z1, z2, z4, z5, z6 = 7L;
-}
-
-abstract class Item1 extends Pad0 {
- public volatile Node next;
-}
-
-class Node extends Item1 {
- volatile long z0, z1, z2, z4, z5, z6 = 7L;
-
-
- /**
- * Constructs a new node. Uses relaxed write because item can
- * only be seen after publication via casNext.
+ /*
+ * This is a modification of the Michael & Scott algorithm,
+ * adapted for a garbage-collected environment, with support for
+ * interior node deletion (to support remove(Object)). For
+ * explanation, read the paper.
+ *
+ * Note that like most non-blocking algorithms in this package,
+ * this implementation relies on the fact that in garbage
+ * collected systems, there is no possibility of ABA problems due
+ * to recycled nodes, so there is no need to use "counted
+ * pointers" or related techniques seen in versions used in
+ * non-GC'ed settings.
+ *
+ * The fundamental invariants are:
+ * - There is exactly one (last) Node with a null next reference,
+ * which is CASed when enqueueing. This last Node can be
+ * reached in O(1) time from tail, but tail is merely an
+ * optimization - it can always be reached in O(N) time from
+ * head as well.
+ * - The elements contained in the queue are the non-null items in
+ * Nodes that are reachable from head. CASing the item
+ * reference of a Node to null atomically removes it from the
+ * queue. Reachability of all elements from head must remain
+ * true even in the case of concurrent modifications that cause
+ * head to advance. A dequeued Node may remain in use
+ * indefinitely due to creation of an Iterator or simply a
+ * poll() that has lost its time slice.
+ *
+ * The above might appear to imply that all Nodes are GC-reachable
+ * from a predecessor dequeued Node. That would cause two problems:
+ * - allow a rogue Iterator to cause unbounded memory retention
+ * - cause cross-generational linking of old Nodes to new Nodes if
+ * a Node was tenured while live, which generational GCs have a
+ * hard time dealing with, causing repeated major collections.
+ * However, only non-deleted Nodes need to be reachable from
+ * dequeued Nodes, and reachability does not necessarily have to
+ * be of the kind understood by the GC. We use the trick of
+ * linking a Node that has just been dequeued to itself. Such a
+ * self-link implicitly means to advance to head.
+ *
+ * Both head and tail are permitted to lag. In fact, failing to
+ * update them every time one could is a significant optimization
+ * (fewer CASes). As with LinkedTransferQueue (see the internal
+ * documentation for that class), we use a slack threshold of two;
+ * that is, we update head/tail when the current pointer appears
+ * to be two or more steps away from the first/last node.
+ *
+ * Since head and tail are updated concurrently and independently,
+ * it is possible for tail to lag behind head (why not)?
+ *
+ * CASing a Node's item reference to null atomically removes the
+ * element from the queue. Iterators skip over Nodes with null
+ * items. Prior implementations of this class had a race between
+ * poll() and remove(Object) where the same element would appear
+ * to be successfully removed by two concurrent operations. The
+ * method remove(Object) also lazily unlinks deleted Nodes, but
+ * this is merely an optimization.
+ *
+ * When constructing a Node (before enqueuing it) we avoid paying
+ * for a volatile write to item by using Unsafe.putObject instead
+ * of a normal write. This allows the cost of enqueue to be
+ * "one-and-a-half" CASes.
+ *
+ * Both head and tail may or may not point to a Node with a
+ * non-null item. If the queue is empty, all items must of course
+ * be null. Upon creation, both head and tail refer to a dummy
+ * Node with null item. Both head and tail are only updated using
+ * CAS, so they never regress, although again this is merely an
+ * optimization.
*/
- Node(E item) {
- UNSAFE.putObject(this, itemOffset, item);
- }
- boolean casItem(E cmp, E val) {
- return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
- }
+ private static class Node {
+ volatile E item;
+ volatile Node next;
- void lazySetNext(Node val) {
- UNSAFE.putOrderedObject(this, nextOffset, val);
- }
+ /**
+ * Constructs a new node. Uses relaxed write because item can
+ * only be seen after publication via casNext.
+ */
+ Node(E item) {
+ UNSAFE.putObject(this, itemOffset, item);
+ }
- boolean casNext(Node cmp, Node val) {
- return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
- }
+ boolean casItem(E cmp, E val) {
+ return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
+ }
- // Unsafe mechanics
+ void lazySetNext(Node val) {
+ UNSAFE.putOrderedObject(this, nextOffset, val);
+ }
- private static final sun.misc.Unsafe UNSAFE;
- private static final long itemOffset;
- private static final long nextOffset;
+ boolean casNext(Node cmp, Node val) {
+ return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
+ }
- static {
- try {
- UNSAFE = UnsafeAccess.UNSAFE;
+ // Unsafe mechanics
- itemOffset = UNSAFE.objectFieldOffset(Node.class.getField("item"));
- nextOffset = UNSAFE.objectFieldOffset(Node.class.getField("next"));
- } catch (Exception e) {
- throw new Error(e);
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long itemOffset;
+ private static final long nextOffset;
+
+ static {
+ try {
+ UNSAFE = sun.misc.Unsafe.getUnsafe();
+ Class> k = Node.class;
+ itemOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("item"));
+ nextOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("next"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
}
}
-}
-
-abstract class CLQItem0 extends AbstractQueue {
/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
@@ -220,15 +221,8 @@ abstract class CLQItem0 extends AbstractQueue {
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
*/
- public transient volatile Node head;
-}
+ private transient volatile Node head;
-
-abstract class CLQPad0 extends CLQItem0 {
- volatile long z0, z1, z2, z4, z5, z6 = 7L;
-}
-
-abstract class CLQItem1 extends CLQPad0 {
/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
@@ -241,21 +235,13 @@ abstract class CLQItem1 extends CLQPad0 {
* to not be reachable from head!
* - tail.next may or may not be self-pointing to tail.
*/
- public volatile Node tail;
-}
-
-
-public class ConcurrentLinkedQueue2 extends CLQItem1
- implements Queue, java.io.Serializable {
- private static final long serialVersionUID = 196745693267521676L;
-
- volatile long z0, z1, z2, z4, z5, z6 = 7L;
+ private transient volatile Node tail;
/**
* Creates a {@code ConcurrentLinkedQueue} that is initially empty.
*/
public ConcurrentLinkedQueue2() {
- this.head = this.tail = new Node(null);
+ head = tail = new Node(null);
}
/**
@@ -272,18 +258,17 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
for (E e : c) {
checkNotNull(e);
Node newNode = new Node(e);
- if (h == null) {
+ if (h == null)
h = t = newNode;
- } else {
+ else {
t.lazySetNext(newNode);
t = newNode;
}
}
- if (h == null) {
+ if (h == null)
h = t = new Node(null);
- }
- this.head = h;
- this.tail = t;
+ head = h;
+ tail = t;
}
// Have to override just to update the javadoc
@@ -296,7 +281,6 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
- @Override
public boolean add(E e) {
return offer(e);
}
@@ -306,9 +290,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* as sentinel for succ(), below.
*/
final void updateHead(Node h, Node p) {
- if (h != p && casHead(h, p)) {
+ if (h != p && casHead(h, p))
h.lazySetNext(h);
- }
}
/**
@@ -318,9 +301,15 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
*/
final Node succ(Node p) {
Node next = p.next;
- return p == next ? this.head : next;
+ return (p == next) ? head : next;
}
+ // can ONLY be touched by a single reader/writer (for add/offer/remove/poll/etc)
+ private final IdentityMap> quickLookup = new IdentityMap>(32);
+
+
+
+
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
@@ -328,17 +317,11 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
- @Override
public boolean offer(E e) {
- offerNode(e);
- return true;
- }
-
- public Node offerNode(E e) {
checkNotNull(e);
final Node newNode = new Node(e);
- for (Node t = this.tail, p = t;;) {
+ for (Node t = tail, p = t;;) {
Node q = p.next;
if (q == null) {
// p is last node
@@ -346,70 +329,66 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
- if (p != t)
- {
+ if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
- }
- return newNode;
+
+ quickLookup.put(e, newNode);
+ return true;
}
// Lost CAS race to another thread; re-read next
}
- else if (p == q) {
+ else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
- p = t != (t = this.tail) ? t : this.head;
- } else {
+ p = (t != (t = tail)) ? t : head;
+ else
// Check for tail updates after two hops.
- p = p != t && t != (t = this.tail) ? t : q;
- }
+ p = (p != t && t != (t = tail)) ? t : q;
}
}
- @Override
public E poll() {
restartFromHead:
for (;;) {
- for (Node h = this.head, p = h, q;;) {
+ for (Node h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
- if (p != h) {
- updateHead(h, (q = p.next) != null ? q : p);
- }
+ if (p != h) // hop two nodes at a time
+ updateHead(h, ((q = p.next) != null) ? q : p);
+
+ quickLookup.remove(item);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
- else if (p == q) {
+ else if (p == q)
continue restartFromHead;
- } else {
+ else
p = q;
- }
}
}
}
- @Override
public E peek() {
restartFromHead:
for (;;) {
- for (Node h = this.head, p = h, q;;) {
+ for (Node h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
- else if (p == q) {
+ else if (p == q)
continue restartFromHead;
- } else {
+ else
p = q;
- }
}
}
}
@@ -425,17 +404,16 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
Node first() {
restartFromHead:
for (;;) {
- for (Node h = this.head, p = h, q;;) {
- boolean hasItem = p.item != null;
+ for (Node h = head, p = h, q;;) {
+ boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
- else if (p == q) {
+ else if (p == q)
continue restartFromHead;
- } else {
+ else
p = q;
- }
}
}
}
@@ -445,7 +423,6 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
*
* @return {@code true} if this queue contains no elements
*/
- @Override
public boolean isEmpty() {
return first() == null;
}
@@ -466,17 +443,13 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
*
* @return the number of elements in this queue
*/
- @Override
public int size() {
int count = 0;
- for (Node p = first(); p != null; p = succ(p)) {
- if (p.item != null) {
+ for (Node p = first(); p != null; p = succ(p))
+ if (p.item != null)
// Collection.size() spec says to max out
- if (++count == Integer.MAX_VALUE) {
+ if (++count == Integer.MAX_VALUE)
break;
- }
- }
- }
return count;
}
@@ -488,16 +461,12 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
- @Override
public boolean contains(Object o) {
- if (o == null) {
- return false;
- }
+ if (o == null) return false;
for (Node p = first(); p != null; p = succ(p)) {
E item = p.item;
- if (item != null && o.equals(item)) {
+ if (item != null && o.equals(item))
return true;
- }
}
return false;
}
@@ -513,11 +482,9 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
- @Override
public boolean remove(Object o) {
- if (o == null) {
- return false;
- }
+ if (o == null) return false;
+
Node pred = null;
for (Node p = first(); p != null; p = succ(p)) {
E item = p.item;
@@ -525,9 +492,10 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
o.equals(item) &&
p.casItem(item, null)) {
Node next = succ(p);
- if (pred != null && next != null) {
+ if (pred != null && next != null)
pred.casNext(p, next);
- }
+
+ quickLookup.remove(o);
return true;
}
pred = p;
@@ -547,31 +515,28 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* of its elements are null
* @throws IllegalArgumentException if the collection is this queue
*/
- @Override
public boolean addAll(Collection extends E> c) {
- if (c == this) {
+ if (c == this)
// As historically specified in AbstractQueue#addAll
throw new IllegalArgumentException();
- }
// Copy c into a private chain of Nodes
Node beginningOfTheEnd = null, last = null;
for (E e : c) {
checkNotNull(e);
Node newNode = new Node(e);
- if (beginningOfTheEnd == null) {
+ if (beginningOfTheEnd == null)
beginningOfTheEnd = last = newNode;
- } else {
+ else {
last.lazySetNext(newNode);
last = newNode;
}
}
- if (beginningOfTheEnd == null) {
+ if (beginningOfTheEnd == null)
return false;
- }
// Atomically append the chain at the tail of this collection
- for (Node t = this.tail, p = t;;) {
+ for (Node t = tail, p = t;;) {
Node q = p.next;
if (q == null) {
// p is last node
@@ -581,25 +546,23 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
if (!casTail(t, last)) {
// Try a little harder to update tail,
// since we may be adding many elements.
- t = this.tail;
- if (last.next == null) {
+ t = tail;
+ if (last.next == null)
casTail(t, last);
- }
}
return true;
}
// Lost CAS race to another thread; re-read next
}
- else if (p == q) {
+ else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
- p = t != (t = this.tail) ? t : this.head;
- } else {
+ p = (t != (t = tail)) ? t : head;
+ else
// Check for tail updates after two hops.
- p = p != t && t != (t = this.tail) ? t : q;
- }
+ p = (p != t && t != (t = tail)) ? t : q;
}
}
@@ -616,15 +579,13 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
*
* @return an array containing all of the elements in this queue
*/
- @Override
public Object[] toArray() {
// Use ArrayList to deal with resizing.
ArrayList al = new ArrayList();
for (Node p = first(); p != null; p = succ(p)) {
E item = p.item;
- if (item != null) {
+ if (item != null)
al.add(item);
- }
}
return al.toArray();
}
@@ -664,7 +625,6 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* this queue
* @throws NullPointerException if the specified array is null
*/
- @Override
@SuppressWarnings("unchecked")
public T[] toArray(T[] a) {
// try to use sent-in array
@@ -672,14 +632,12 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
Node p;
for (p = first(); p != null && k < a.length; p = succ(p)) {
E item = p.item;
- if (item != null) {
+ if (item != null)
a[k++] = (T)item;
- }
}
if (p == null) {
- if (k < a.length) {
+ if (k < a.length)
a[k] = null;
- }
return a;
}
@@ -687,9 +645,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
ArrayList al = new ArrayList();
for (Node q = first(); q != null; q = succ(q)) {
E item = q.item;
- if (item != null) {
+ if (item != null)
al.add(item);
- }
}
return al.toArray(a);
}
@@ -698,16 +655,11 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
- * The returned iterator is a "weakly consistent" iterator that
- * will never throw {@link java.util.ConcurrentModificationException
- * ConcurrentModificationException}, and guarantees to traverse
- * elements as they existed upon construction of the iterator, and
- * may (but is not guaranteed to) reflect any modifications
- * subsequent to construction.
+ *
The returned iterator is
+ * weakly consistent .
*
* @return an iterator over the elements in this queue in proper sequence
*/
- @Override
public Iterator iterator() {
return new Itr();
}
@@ -740,73 +692,67 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* next(), or null if no such.
*/
private E advance() {
- this.lastRet = this.nextNode;
- E x = this.nextItem;
+ lastRet = nextNode;
+ E x = nextItem;
Node pred, p;
- if (this.nextNode == null) {
+ if (nextNode == null) {
p = first();
pred = null;
} else {
- pred = this.nextNode;
- p = succ(this.nextNode);
+ pred = nextNode;
+ p = succ(nextNode);
}
for (;;) {
if (p == null) {
- this.nextNode = null;
- this.nextItem = null;
+ nextNode = null;
+ nextItem = null;
return x;
}
E item = p.item;
if (item != null) {
- this.nextNode = p;
- this.nextItem = item;
+ nextNode = p;
+ nextItem = item;
return x;
} else {
// skip over nulls
Node next = succ(p);
- if (pred != null && next != null) {
+ if (pred != null && next != null)
pred.casNext(p, next);
- }
p = next;
}
}
}
- @Override
public boolean hasNext() {
- return this.nextNode != null;
+ return nextNode != null;
}
- @Override
public E next() {
- if (this.nextNode == null) {
- throw new NoSuchElementException();
- }
+ if (nextNode == null) throw new NoSuchElementException();
return advance();
}
- @Override
public void remove() {
- Node l = this.lastRet;
- if (l == null) {
- throw new IllegalStateException();
- }
+ Node l = lastRet;
+ if (l == null) throw new IllegalStateException();
// rely on a future traversal to relink.
l.item = null;
- this.lastRet = null;
+ lastRet = null;
}
}
/**
* Saves this queue to a stream (that is, serializes it).
*
+ * @param s the stream
+ * @throws java.io.IOException if an I/O error occurs
* @serialData All of the elements (each an {@code E}) in
* the proper order, followed by a null
*/
private void writeObject(java.io.ObjectOutputStream s)
- throws java.io.IOException {
+ throws java.io.IOException {
// Write out any hidden stuff
s.defaultWriteObject();
@@ -814,9 +760,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
// Write out all elements in the proper order.
for (Node p = first(); p != null; p = succ(p)) {
Object item = p.item;
- if (item != null) {
+ if (item != null)
s.writeObject(item);
- }
}
// Use trailing null as sentinel
@@ -825,9 +770,13 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
/**
* Reconstitutes this queue from a stream (that is, deserializes it).
+ * @param s the stream
+ * @throws ClassNotFoundException if the class of a serialized object
+ * could not be found
+ * @throws java.io.IOException if an I/O error occurs
*/
private void readObject(java.io.ObjectInputStream s)
- throws java.io.IOException, ClassNotFoundException {
+ throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
// Read in elements until trailing null sentinel found
@@ -836,18 +785,124 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
while ((item = s.readObject()) != null) {
@SuppressWarnings("unchecked")
Node newNode = new Node((E) item);
- if (h == null) {
+ if (h == null)
h = t = newNode;
- } else {
+ else {
t.lazySetNext(newNode);
t = newNode;
}
}
- if (h == null) {
+ if (h == null)
h = t = new Node(null);
+ head = h;
+ tail = t;
+ }
+
+ /** A customized variant of Spliterators.IteratorSpliterator */
+ static final class CLQSpliterator implements Spliterator {
+ static final int MAX_BATCH = 1 << 25; // max batch array size;
+ final ConcurrentLinkedQueue2 queue;
+ Node current; // current node; null until initialized
+ int batch; // batch size for splits
+ boolean exhausted; // true when no more nodes
+ CLQSpliterator(ConcurrentLinkedQueue2 queue) {
+ this.queue = queue;
}
- this.head = h;
- this.tail = t;
+
+ public Spliterator trySplit() {
+ Node p;
+ final ConcurrentLinkedQueue2 q = this.queue;
+ int b = batch;
+ int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null) &&
+ p.next != null) {
+ Object[] a = new Object[n];
+ int i = 0;
+ do {
+ if ((a[i] = p.item) != null)
+ ++i;
+ if (p == (p = p.next))
+ p = q.first();
+ } while (p != null && i < n);
+ if ((current = p) == null)
+ exhausted = true;
+ if (i > 0) {
+ batch = i;
+ return Spliterators.spliterator
+ (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT);
+ }
+ }
+ return null;
+ }
+
+ public void forEachRemaining(Consumer super E> action) {
+ Node p;
+ if (action == null) throw new NullPointerException();
+ final ConcurrentLinkedQueue2 q = this.queue;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null)) {
+ exhausted = true;
+ do {
+ E e = p.item;
+ if (p == (p = p.next))
+ p = q.first();
+ if (e != null)
+ action.accept(e);
+ } while (p != null);
+ }
+ }
+
+ public boolean tryAdvance(Consumer super E> action) {
+ Node p;
+ if (action == null) throw new NullPointerException();
+ final ConcurrentLinkedQueue2 q = this.queue;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null)) {
+ E e;
+ do {
+ e = p.item;
+ if (p == (p = p.next))
+ p = q.first();
+ } while (e == null && p != null);
+ if ((current = p) == null)
+ exhausted = true;
+ if (e != null) {
+ action.accept(e);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public long estimateSize() { return Long.MAX_VALUE; }
+
+ public int characteristics() {
+ return Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT;
+ }
+ }
+
+ /**
+ * Returns a {@link Spliterator} over the elements in this queue.
+ *
+ * The returned spliterator is
+ * weakly consistent .
+ *
+ *
The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
+ * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
+ *
+ * @implNote
+ * The {@code Spliterator} implements {@code trySplit} to permit limited
+ * parallelism.
+ *
+ * @return a {@code Spliterator} over the elements in this queue
+ * @since 1.8
+ */
+ @Override
+ public Spliterator spliterator() {
+ return new CLQSpliterator(this);
}
/**
@@ -856,9 +911,8 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
* @param v the element
*/
private static void checkNotNull(Object v) {
- if (v == null) {
+ if (v == null)
throw new NullPointerException();
- }
}
private boolean casTail(Node cmp, Node val) {
@@ -876,11 +930,12 @@ public class ConcurrentLinkedQueue2 extends CLQItem1
private static final long tailOffset;
static {
try {
- UNSAFE = UnsafeAccess.UNSAFE;
- Class> k = ConcurrentLinkedQueue2.class;
-
- headOffset = UNSAFE.objectFieldOffset(k.getField("head"));
- tailOffset = UNSAFE.objectFieldOffset(k.getField("tail"));
+ UNSAFE = sun.misc.Unsafe.getUnsafe();
+ Class> k = ConcurrentLinkedQueue.class;
+ headOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("head"));
+ tailOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
diff --git a/src/dorkbox/util/messagebus/common/thread/ConcurrentSet.java b/src/dorkbox/util/messagebus/common/thread/ConcurrentSet.java
deleted file mode 100644
index 865d2e3..0000000
--- a/src/dorkbox/util/messagebus/common/thread/ConcurrentSet.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Copyright 2015 dorkbox, llc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package dorkbox.util.messagebus.common.thread;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This data structure is optimized for non-blocking reads even when write operations occur.
- * Running read iterators will not be affected by add operations since writes always insert at the head of the
- * structure. Remove operations can affect any running iterator such that a removed element that has not yet
- * been reached by the iterator will not appear in that iterator anymore.
- */
-public
-class ConcurrentSet extends ConcurrentLinkedQueue2 {
- private static final long serialVersionUID = -2729855178402529784L;
-
- private static final AtomicLong id = new AtomicLong();
- private final transient long ID = id.getAndIncrement();
-
-
- private final Node IN_PROGRESS_MARKER = new Node(null);
- private ConcurrentMap> entries;
-
- public
- ConcurrentSet() {
- this(16, 0.75f, Runtime.getRuntime().availableProcessors());
- }
-
- public
- ConcurrentSet(int size, float loadFactor, int stripeSize) {
- super();
- this.entries = new ConcurrentHashMap<>(size, loadFactor, stripeSize);
- }
-
- @Override
- public
- boolean add(T element) {
- if (element == null) {
- return false;
- }
-
- // had to modify the super implementation so we publish Node back
- Node alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER);
- if (alreadyPresent == null) {
- // this doesn't already exist
- Node offerNode = super.offerNode(element);
-
- this.entries.put(element, offerNode);
- return true;
- }
-
- return false;
- }
-
- @Override
- public
- boolean contains(Object element) {
- if (element == null) {
- return false;
- }
-
- Node node;
- while ((node = this.entries.get(element)) == this.IN_PROGRESS_MARKER) {
- ; // data race
- }
-
- if (node == null) {
- return false;
- }
-
- return node.item != null;
- }
-
- @Override
- public
- int size() {
- return this.entries.size();
- }
-
- @Override
- public
- boolean isEmpty() {
- return super.isEmpty();
- }
-
- /**
- * @return TRUE if the element was successfully removed
- */
- @Override
- public
- boolean remove(Object element) {
- while (this.entries.get(element) == this.IN_PROGRESS_MARKER) {
- ; // data race
- }
-
- Node node = this.entries.remove(element);
- if (node == null) {
- return false;
- }
-
- Node pred = null;
- for (Node p = this.head; p != null; p = succ(p)) {
- T item = p.item;
- if (item != null &&
- element.equals(item) &&
- p.casItem(item, null)) {
- Node next = succ(p);
- if (pred != null && next != null) {
- pred.casNext(p, next);
- }
- return true;
- }
- pred = p;
- }
- return false;
- }
-
- @Override
- public
- Iterator iterator() {
- return new Itr2();
- }
-
- private
- class Itr2 implements Iterator {
- /**
- * Next node to return item for.
- */
- private Node nextNode;
-
- /**
- * Node of the last returned item, to support remove.
- */
- private Node lastRet;
-
- /**
- * nextItem holds on to item fields because once we claim
- * that an element exists in hasNext(), we must return it in
- * the following next() call even if it was in the process of
- * being removed when hasNext() was called.
- */
- private T nextItem;
-
- Itr2() {
- advance();
- }
-
- /**
- * Moves to next valid node and returns item to return for
- * next(), or null if no such.
- */
- private
- T advance() {
- this.lastRet = this.nextNode; // for removing items via iterator
- T nextItem = this.nextItem;
-
- Node pred, p;
- if (this.nextNode == null) {
- p = first();
- pred = null;
- }
- else {
- pred = this.nextNode;
- p = succ(this.nextNode);
- }
-
- for (; ; ) {
- if (p == null) {
- this.nextNode = null;
- this.nextItem = null;
- return nextItem;
- }
-
- T item = p.item;
- if (item != null) {
- this.nextNode = p;
- this.nextItem = item;
- return nextItem;
- }
- else {
- // skip over nulls
- Node next = succ(p);
- if (pred != null && next != null) {
- pred.casNext(p, next);
- }
- p = next;
- }
- }
- }
-
-
-
- @Override
- public
- boolean hasNext() {
- return this.nextNode != null;
- }
-
- @Override
- public
- T next() {
- if (this.nextNode == null) {
- throw new NoSuchElementException();
- }
- return advance();
- }
-
- @Override
- public
- void remove() {
- Node l = this.lastRet;
- if (l == null) {
- throw new IllegalStateException();
- }
-
- T value = l.item;
-
- if (value != null) {
- Map> entries2 = ConcurrentSet.this.entries;
- while (entries2.get(value) == ConcurrentSet.this.IN_PROGRESS_MARKER) {
- ; // data race
- }
-
- entries2.remove(value);
-
- // rely on a future traversal to relink.
- l.item = null;
- this.lastRet = null;
- }
- }
- }
-
- @Override
- public
- Object[] toArray() {
- return this.entries.keySet().toArray();
- }
-
- @Override
- public
- T[] toArray(T[] a) {
- return this.entries.keySet().toArray(a);
- }
-
- @Override
- public
- boolean containsAll(Collection> c) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public
- boolean removeAll(Collection> c) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public
- boolean retainAll(Collection> c) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public
- void clear() {
- super.clear();
- }
-
- @Override
- public
- int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (this.ID ^ this.ID >>> 32);
- return result;
- }
-
- @Override
- public
- boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- ConcurrentSet other = (ConcurrentSet) obj;
- if (this.ID != other.ID) {
- return false;
- }
- return true;
- }
-}
diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java
index 1bca471..342101e 100644
--- a/src/dorkbox/util/messagebus/subscription/Subscription.java
+++ b/src/dorkbox/util/messagebus/subscription/Subscription.java
@@ -37,28 +37,27 @@
*/
package dorkbox.util.messagebus.subscription;
+import com.esotericsoftware.kryo.util.IdentityMap;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
- * A subscription is a thread-safe container that manages exactly one message handler of all registered
- * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class
- * will be referenced in the subscription created for SingleMessageHandler.class.
+ * A subscription is a container that manages exactly one message handler of all registered
+ * message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message
+ * will be referenced in the subscription created for a message.
*
* There will be as many unique subscription objects per message listener class as there are message handlers
* defined in the message listeners class hierarchy.
*
- * The subscription provides functionality for message publication by means of delegation to the respective
- * message dispatcher.
+ * This class uses the "single writer principle", so that the subscription are only MODIFIED by a single thread,
+ * but are READ by X number of threads (in a safe way). This uses object thread visibility/publication to work.
*
* @author bennidi
* @author dorkbox, llc
@@ -66,37 +65,32 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public final
class Subscription {
+ private static final int GROW_SIZE = 8;
+
private static final AtomicInteger ID_COUNTER = new AtomicInteger();
public final int ID = ID_COUNTER.getAndIncrement();
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handler;
-
private final IHandlerInvocation invocation;
- private final Collection listeners;
+
+ // NOTE: this is still inside the single-writer! can use the same techniques as subscription manager (for thread safe publication)
+ private int firstFreeSpot = 0; // only touched by a single thread
+ private volatile Object[] listeners = new Object[GROW_SIZE]; // only modified by a single thread
+
+ private final IdentityMap listenerMap = new IdentityMap<>(GROW_SIZE);
+
+ // Recommended for best performance while adhering to the "single writer principle". Must be static-final
+ private static final AtomicReferenceFieldUpdater listenersREF =
+ AtomicReferenceFieldUpdater.newUpdater(Subscription.class,
+ Object[].class,
+ "listeners");
+
public
Subscription(final MessageHandler handler) {
this.handler = handler;
-
-// this.listeners = Collections.newSetFromMap(new ConcurrentHashMap(8)); // really bad performance
-// this.listeners = new StrongConcurrentSetV8(16, 0.7F, 8);
-
- ///this is by far, the fastest
- this.listeners = new ConcurrentSkipListSet<>(new Comparator() {
- @Override
- public
- int compare(final Object o1, final Object o2) {
- return Integer.compare(o1.hashCode(), o2.hashCode());
-// return 0;
- }
- });
-// this.listeners = new StrongConcurrentSet(16, 0.85F);
-// this.listeners = new ConcurrentLinkedQueue2();
-// this.listeners = new CopyOnWriteArrayList();
-// this.listeners = new CopyOnWriteArraySet(); // not very good
-
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) {
invocation = new SynchronizedHandlerInvocation(invocation);
@@ -111,27 +105,82 @@ class Subscription {
}
public
- boolean isEmpty() {
- return this.listeners.isEmpty();
- }
+ void subscribe(final Object listener) {
+ // single writer principle!
- public
- void subscribe(Object listener) {
- this.listeners.add(listener);
+ Object[] localListeners = listenersREF.get(this);
+
+ final int length = localListeners.length;
+ int spotToPlace = firstFreeSpot;
+
+ while (true) {
+ if (spotToPlace >= length) {
+ // if we couldn't find a place to put the listener, grow the array, but it is never shrunk
+ localListeners = Arrays.copyOf(localListeners, length + GROW_SIZE, Object[].class);
+ break;
+ }
+
+ if (localListeners[spotToPlace] == null) {
+ break;
+ }
+ spotToPlace++;
+ }
+
+ listenerMap.put(listener, spotToPlace);
+ localListeners[spotToPlace] = listener;
+
+ // mark this spot as taken, so the next subscribe starts out a little ahead
+ firstFreeSpot = spotToPlace + 1;
+ listenersREF.lazySet(this, localListeners);
}
/**
* @return TRUE if the element was removed
*/
public
- boolean unsubscribe(Object existingListener) {
- return this.listeners.remove(existingListener);
+ boolean unsubscribe(final Object listener) {
+ // single writer principle!
+
+ final Integer integer = listenerMap.remove(listener);
+ Object[] localListeners = listenersREF.get(this);
+
+ if (integer != null) {
+ final int index = integer;
+ firstFreeSpot = index;
+ localListeners[index] = null;
+ listenersREF.lazySet(this, localListeners);
+ return true;
+ }
+ else {
+ for (int i = 0; i < localListeners.length; i++) {
+ if (localListeners[i] == listener) {
+ firstFreeSpot = i;
+ localListeners[i] = null;
+ listenersREF.lazySet(this, localListeners);
+ return true;
+ }
+ }
+ }
+
+ firstFreeSpot = 0;
+ return false;
}
- // only used in unit-test
+ /**
+ * only used in unit tests
+ */
public
int size() {
- return this.listeners.size();
+ // since this is ONLY used in unit tests, we count how many are non-null
+
+ int count = 0;
+ for (int i = 0; i < listeners.length; i++) {
+ if (listeners[i] != null) {
+ count++;
+ }
+ }
+
+ return count;
}
public
@@ -140,62 +189,62 @@ class Subscription {
final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
- Iterator iterator;
Object listener;
-
- for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
- listener = iterator.next();
-
- invocation.invoke(listener, handler, handleIndex, message);
+ final Object[] localListeners = listenersREF.get(this);
+ for (int i = 0; i < localListeners.length; i++) {
+ listener = localListeners[i];
+ if (listener != null) {
+ invocation.invoke(listener, handler, handleIndex, message);
+ }
}
}
public
void publish(final Object message1, final Object message2) throws Throwable {
- final MethodAccess handler = this.handler.getHandler();
- final int handleIndex = this.handler.getMethodIndex();
- final IHandlerInvocation invocation = this.invocation;
-
- Iterator iterator;
- Object listener;
-
- for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
- listener = iterator.next();
-
- invocation.invoke(listener, handler, handleIndex, message1, message2);
- }
+// final MethodAccess handler = this.handler.getHandler();
+// final int handleIndex = this.handler.getMethodIndex();
+// final IHandlerInvocation invocation = this.invocation;
+//
+// Iterator iterator;
+// Object listener;
+//
+// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
+// listener = iterator.next();
+//
+// invocation.invoke(listener, handler, handleIndex, message1, message2);
+// }
}
public
void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
- final MethodAccess handler = this.handler.getHandler();
- final int handleIndex = this.handler.getMethodIndex();
- final IHandlerInvocation invocation = this.invocation;
-
- Iterator iterator;
- Object listener;
-
- for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
- listener = iterator.next();
-
- invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
- }
+// final MethodAccess handler = this.handler.getHandler();
+// final int handleIndex = this.handler.getMethodIndex();
+// final IHandlerInvocation invocation = this.invocation;
+//
+// Iterator iterator;
+// Object listener;
+//
+// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
+// listener = iterator.next();
+//
+// invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
+// }
}
public
void publish(final Object... messages) throws Throwable {
- final MethodAccess handler = this.handler.getHandler();
- final int handleIndex = this.handler.getMethodIndex();
- final IHandlerInvocation invocation = this.invocation;
-
- Iterator iterator;
- Object listener;
-
- for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
- listener = iterator.next();
-
- invocation.invoke(listener, handler, handleIndex, messages);
- }
+// final MethodAccess handler = this.handler.getHandler();
+// final int handleIndex = this.handler.getMethodIndex();
+// final IHandlerInvocation invocation = this.invocation;
+//
+// Iterator iterator;
+// Object listener;
+//
+// for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
+// listener = iterator.next();
+//
+// invocation.invoke(listener, handler, handleIndex, messages);
+// }
}
@@ -207,7 +256,7 @@ class Subscription {
@Override
public
- boolean equals(Object obj) {
+ boolean equals(final Object obj) {
if (this == obj) {
return true;
}
diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
index 04b7b07..25f3c82 100644
--- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
+++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java
@@ -24,6 +24,7 @@ import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,23 +91,23 @@ class SubscriptionManager {
private final ClassUtils classUtils;
- // Recommended for best performance while adhering to the "single writer principle"
- private final AtomicReferenceFieldUpdater subsSingleREF =
+ // Recommended for best performance while adhering to the "single writer principle". Must be static-final
+ private static final AtomicReferenceFieldUpdater subsSingleREF =
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
- IdentityMap.class,
- "subsSingle");
+ IdentityMap.class,
+ "subsSingle");
- private final AtomicReferenceFieldUpdater subsSuperSingleREF =
+ private static final AtomicReferenceFieldUpdater subsSuperSingleREF =
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
IdentityMap.class,
"subsSuperSingle");
- private final AtomicReferenceFieldUpdater subsVaritySingleREF =
+ private static final AtomicReferenceFieldUpdater subsVaritySingleREF =
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
IdentityMap.class,
"subsVaritySingle");
- private final AtomicReferenceFieldUpdater subsSuperVaritySingleREF =
+ private static final AtomicReferenceFieldUpdater subsSuperVaritySingleREF =
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
IdentityMap.class,
"subsSuperVaritySingle");
@@ -255,8 +256,9 @@ class SubscriptionManager {
// now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions
+ // activates this sub for sub/unsub (only used by the subscription writer thread)
+ subsPerListener.put(listenerClass, subscriptions);
- subsPerListener.put(listenerClass, subscriptions); // activates this sub for sub/unsub
// we can now safely add for publication AND subscribe since the data structures are consistent
for (int i = 0; i < handlersSize; i++) {
@@ -275,10 +277,9 @@ class SubscriptionManager {
final Subscription[] currentSubs = localSubs.get(handlerType);
final int currentLength = currentSubs.length;
- // add the new subscription to the beginning of the array
- final Subscription[] newSubs = new Subscription[currentLength + 1];
- newSubs[0] = subscription;
- System.arraycopy(currentSubs, 0, newSubs, 1, currentLength);
+ // add the new subscription to the array
+ final Subscription[] newSubs = Arrays.copyOf(currentSubs, currentLength + 1, Subscription[].class);
+ newSubs[currentLength] = subscription;
localSubs.put(handlerType, newSubs);
// update the varity/super types
@@ -328,32 +329,29 @@ class SubscriptionManager {
final IdentityMap, Subscription[]> subsPerSuperMessageSingle,
final IdentityMap, Subscription[]> subsPerVarityMessageSingle) {
- final Class> arrayVersion = this.classUtils.getArrayClass(clazz); // never returns null, cached response
+// final Class> arrayVersion = this.classUtils.getArrayClass(clazz); // never returns null, cached response
final Class>[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response
Subscription sub;
-
- // Register Varity (Var-Arg) subscriptions
- final Subscription[] arraySubs = subsPerMessageSingle.get(arrayVersion);
- if (arraySubs != null) {
- final int length = arraySubs.length;
- final ArrayList varArgSubsAsList = new ArrayList(length);
-
- for (int i = 0; i < length; i++) {
- sub = arraySubs[i];
-
- if (sub.getHandler().acceptsVarArgs()) {
- varArgSubsAsList.add(sub);
- }
- }
-
- final int size = varArgSubsAsList.size();
- if (size > 0) {
- Subscription[] varArgSubs = new Subscription[size];
- varArgSubsAsList.toArray(varArgSubs);
- subsPerVarityMessageSingle.put(clazz, varArgSubs);
- }
- }
+//
+// // Register Varity (Var-Arg) subscriptions
+// final Subscription[] arraySubs = subsPerMessageSingle.get(arrayVersion);
+// if (arraySubs != null) {
+// final int length = arraySubs.length;
+// final ArrayList varArgSubsAsList = new ArrayList(length);
+//
+// for (int i = 0; i < length; i++) {
+// sub = arraySubs[i];
+//
+// if (sub.getHandler().acceptsVarArgs()) {
+// varArgSubsAsList.add(sub);
+// }
+// }
+//
+// if (!varArgSubsAsList.isEmpty()) {
+// subsPerVarityMessageSingle.put(clazz, varArgSubsAsList.toArray(new Subscription[0]));
+// }
+// }
@@ -378,16 +376,10 @@ class SubscriptionManager {
}
}
- final int size = subsAsList.size();
- if (size > 0) {
+ if (!subsAsList.isEmpty()) {
// save the subscriptions
- Subscription[] subs = new Subscription[size];
- subsAsList.toArray(subs);
- subsPerSuperMessageSingle.put(clazz, subs);
+ subsPerSuperMessageSingle.put(clazz, subsAsList.toArray(new Subscription[0]));
}
-
-
-
}
@@ -519,10 +511,7 @@ class SubscriptionManager {
final ArrayList collection = getExactAsArray(messageClass1, messageClass2);
if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
-
- return subscriptions;
+ return collection.toArray(new Subscription[0]);
}
return null;
@@ -535,10 +524,7 @@ class SubscriptionManager {
final ArrayList collection = getExactAsArray(messageClass1, messageClass2, messageClass3);
if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
-
- return subscriptions;
+ return collection.toArray(new Subscription[0]);
}
return null;
@@ -592,9 +578,7 @@ class SubscriptionManager {
}
if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
- return subscriptions;
+ return collection.toArray(new Subscription[0]);
}
else {
return null;
@@ -623,9 +607,7 @@ class SubscriptionManager {
}
if (collection != null) {
- final Subscription[] subscriptions = new Subscription[collection.size()];
- collection.toArray(subscriptions);
- return subscriptions;
+ return collection.toArray(new Subscription[0]);
}
else {
return null;
diff --git a/src/dorkbox/util/messagebus/utils/ReflectionUtils.java b/src/dorkbox/util/messagebus/utils/ReflectionUtils.java
index 0df44f4..ca50192 100644
--- a/src/dorkbox/util/messagebus/utils/ReflectionUtils.java
+++ b/src/dorkbox/util/messagebus/utils/ReflectionUtils.java
@@ -65,9 +65,7 @@ class ReflectionUtils {
ArrayList methods = new ArrayList();
getMethods(target, methods);
- final Method[] array = new Method[methods.size()];
- methods.toArray(array);
- return array;
+ return methods.toArray(new Method[0]);
}
private static
@@ -124,9 +122,7 @@ class ReflectionUtils {
collectInterfaces(from, superclasses);
}
- final Class>[] classes = new Class>[superclasses.size()];
- superclasses.toArray(classes);
- return classes;
+ return superclasses.toArray(new Class>[0]);
}
public static
diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java
index 83ca194..4bca985 100644
--- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java
+++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java
@@ -86,7 +86,7 @@ class VarArgUtils {
}
}
- varArgSubs = new Subscription[varArgSubsAsList.size()];
+ varArgSubs = new Subscription[0];
varArgSubsAsList.toArray(varArgSubs);
local.put(messageClass, varArgSubs);
diff --git a/test/dorkbox/util/messagebus/MultiMessageTest.java b/test/dorkbox/util/messagebus/MultiMessageTest.java
index 58690b1..ea39294 100644
--- a/test/dorkbox/util/messagebus/MultiMessageTest.java
+++ b/test/dorkbox/util/messagebus/MultiMessageTest.java
@@ -25,8 +25,23 @@ public class MultiMessageTest extends MessageBusTest {
MultiListener listener1 = new MultiListener();
bus.subscribe(listener1);
+ bus.subscribe(listener1);
+ bus.subscribe(listener1);
+ bus.subscribe(listener1);
+ bus.subscribe(listener1);
+ bus.subscribe(listener1);
+ bus.subscribe(listener1);
+ bus.subscribe(listener1);
+ bus.subscribe(listener1);
+
+ bus.unsubscribe(listener1);
+ bus.unsubscribe(listener1);
+ bus.unsubscribe(listener1);
bus.unsubscribe(listener1);
+ bus.publish("s");
+ bus.publish("s");
+ bus.publish("s");
bus.publish("s");
bus.publish("s", "s");
bus.publish("s", "s", "s");
@@ -84,40 +99,40 @@ public class MultiMessageTest extends MessageBusTest {
System.err.println("match String");
}
- @Handler
- public void handleSync(String o1, String o2) {
- count.getAndIncrement();
- System.err.println("match String, String");
- }
-
- @Handler
- public void handleSync(String o1, String o2, String o3) {
- count.getAndIncrement();
- System.err.println("match String, String, String");
- }
-
- @Handler
- public void handleSync(Integer o1, Integer o2, String o3) {
- count.getAndIncrement();
- System.err.println("match Integer, Integer, String");
- }
-
- @Handler(acceptVarargs = true)
- public void handleSync(String... o) {
- count.getAndIncrement();
- System.err.println("match String[]");
- }
-
- @Handler
- public void handleSync(Integer... o) {
- count.getAndIncrement();
- System.err.println("match Integer[]");
- }
-
- @Handler(acceptVarargs = true)
- public void handleSync(Object... o) {
- count.getAndIncrement();
- System.err.println("match Object[]");
- }
+// @Handler
+// public void handleSync(String o1, String o2) {
+// count.getAndIncrement();
+// System.err.println("match String, String");
+// }
+//
+// @Handler
+// public void handleSync(String o1, String o2, String o3) {
+// count.getAndIncrement();
+// System.err.println("match String, String, String");
+// }
+//
+// @Handler
+// public void handleSync(Integer o1, Integer o2, String o3) {
+// count.getAndIncrement();
+// System.err.println("match Integer, Integer, String");
+// }
+//
+// @Handler(acceptVarargs = true)
+// public void handleSync(String... o) {
+// count.getAndIncrement();
+// System.err.println("match String[]");
+// }
+//
+// @Handler
+// public void handleSync(Integer... o) {
+// count.getAndIncrement();
+// System.err.println("match Integer[]");
+// }
+//
+// @Handler(acceptVarargs = true)
+// public void handleSync(Object... o) {
+// count.getAndIncrement();
+// System.err.println("match Object[]");
+// }
}
}
diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java
index 2dea0d8..d9c0d44 100644
--- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java
+++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java
@@ -61,15 +61,22 @@ public class SubscriptionManagerTest extends AssertSupport {
@Test
public void testIMessageListener() {
- ListenerFactory listeners = listeners(IMessageListener.DefaultListener.class, IMessageListener.DisabledListener.class,
- IMessageListener.NoSubtypesListener.class);
+ ListenerFactory listeners = listeners(IMessageListener.DefaultListener.class
+// ,
+// IMessageListener.DisabledListener.class,
+// IMessageListener.NoSubtypesListener.class
+ );
- SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(IMessageListener.DefaultListener.class)
- .handles(IMessage.class, AbstractMessage.class,
- IMultipartMessage.class,
- StandardMessage.class,
- MessageTypes.class).listener(
- IMessageListener.NoSubtypesListener.class).handles(IMessage.class);
+ SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners);
+ expectedSubscriptions.listener(IMessageListener.DefaultListener.class)
+ .handles(IMessage.class,
+ AbstractMessage.class,
+ IMultipartMessage.class,
+ StandardMessage.class,
+ MessageTypes.class);
+
+// expectedSubscriptions.listener(IMessageListener.NoSubtypesListener.class)
+// .handles(IMessage.class);
runTestWith(listeners, expectedSubscriptions);
}
diff --git a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java
index ccf3bcd..0a896d1 100644
--- a/test/dorkbox/util/messagebus/common/SubscriptionValidator.java
+++ b/test/dorkbox/util/messagebus/common/SubscriptionValidator.java
@@ -55,6 +55,7 @@ public class SubscriptionValidator extends AssertSupport {
private SubscriptionValidator expect(Class subscriber, Class messageType) {
this.validations.add(new ValidationEntry(messageType, subscriber));
this.messageTypes.add(messageType);
+
return this;
}
@@ -126,8 +127,11 @@ public class SubscriptionValidator extends AssertSupport {
- public class Expectation {
+
+
+
+ public class Expectation {
private Class listener;
private Expectation(Class listener) {
@@ -138,24 +142,19 @@ public class SubscriptionValidator extends AssertSupport {
for (Class message : messages) {
expect(this.listener, message);
}
+
return SubscriptionValidator.this;
}
}
private class ValidationEntry {
-
-
private Class subscriber;
-
private Class messageType;
private ValidationEntry(Class messageType, Class subscriber) {
this.messageType = messageType;
this.subscriber = subscriber;
}
-
-
}
-
}
diff --git a/test/dorkbox/util/messagebus/listeners/IMessageListener.java b/test/dorkbox/util/messagebus/listeners/IMessageListener.java
index b088420..5cafedd 100644
--- a/test/dorkbox/util/messagebus/listeners/IMessageListener.java
+++ b/test/dorkbox/util/messagebus/listeners/IMessageListener.java
@@ -33,22 +33,21 @@ import dorkbox.util.messagebus.messages.IMessage;
public class IMessageListener {
private static abstract class BaseListener {
-
@Handler
public void handle(IMessage message){
message.handled(this.getClass());
}
-
}
- public static class DefaultListener extends BaseListener {
+ public static class DefaultListener extends BaseListener {
@Override
public void handle(IMessage message){
super.handle(message);
}
}
+
public static class NoSubtypesListener extends BaseListener {
@Override
@@ -60,7 +59,6 @@ public class IMessageListener {
public static class DisabledListener extends BaseListener {
-
@Override
@Handler(enabled = false)
public void handle(IMessage message){
@@ -68,6 +66,4 @@ public class IMessageListener {
}
}
-
-
}
diff --git a/test/dorkbox/util/messagebus/PerfTest_Collections.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_Collections.java
similarity index 97%
rename from test/dorkbox/util/messagebus/PerfTest_Collections.java
rename to test/dorkbox/util/messagebus/queuePerf/PerfTest_Collections.java
index 3f7a601..11ca81e 100644
--- a/test/dorkbox/util/messagebus/PerfTest_Collections.java
+++ b/test/dorkbox/util/messagebus/queuePerf/PerfTest_Collections.java
@@ -13,18 +13,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package dorkbox.util.messagebus;
+package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.thread.ConcurrentLinkedQueue2;
-import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.subscription.Subscription;
import java.lang.ref.WeakReference;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedTransferQueue;
@@ -64,7 +69,6 @@ class PerfTest_Collections {
System.err.println("Done");
bench(size, new ArrayList(size * 2));
- bench(size, new ConcurrentSet(size * 2, LOAD_FACTOR, 5));
bench(size, new ConcurrentLinkedQueue2());
bench(size, new ConcurrentLinkedQueue());
bench(size, new LinkedTransferQueue());