From f96a2f4c195f0f5584ce2750ddd21d7a856c0e07 Mon Sep 17 00:00:00 2001 From: nathan Date: Mon, 4 May 2015 19:03:50 +0200 Subject: [PATCH] Improved simpleq, now beats LTQ in concurrent tests --- .../java/com/lmax/disruptor/MessageType.java | 8 +- .../util/messagebus/MultiMBassador.java | 128 ++- .../common/LinkedTransferQueue.java | 839 ------------------ .../util/messagebus/common/TransferQueue.java | 119 --- .../common/simpleq/HandlerFactory.java | 5 - .../util/messagebus/common/simpleq/Node.java | 64 -- .../common/simpleq/ObjectPoolHolder.java | 39 - .../common/simpleq/PaddedAtomicReference.java | 27 - .../common/simpleq/PaddedObject.java | 45 - .../common/simpleq/ProducerNode.java | 26 - .../bakup/ConcurrentLinkedQueueSimple.java | 458 ---------- .../simpleq/bakup/EliminationStack.java | 355 -------- .../simpleq/bakup/ExchangerStackORG.java | 492 ---------- .../common/simpleq/bakup/NodeORG.java | 42 - .../common/simpleq/bakup/NodePool.java | 315 ------- .../jctools/MpmcArrayTransferQueue.java | 261 ------ .../common/simpleq/jctools/Node.java | 106 +++ .../common/simpleq/jctools/SimpleQueue.java | 484 +++++----- .../LinkTransferQueueConcurrentPerfTest.java | 9 +- .../messagebus/LinkTransferQueuePerfTest.java | 2 +- .../MpmcQueueAltConcurrentPerfTest.java | 2 +- .../util/messagebus/MpmcQueueAltPerfTest.java | 2 +- .../MpmcQueueBaselineNodePerfTest.java | 2 +- .../MpmcQueueConcurrentPerfTest.java | 2 +- .../messagebus/SimpleQueueAltPerfTest.java | 86 +- 25 files changed, 450 insertions(+), 3468 deletions(-) delete mode 100644 src/main/java/dorkbox/util/messagebus/common/LinkedTransferQueue.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/TransferQueue.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/HandlerFactory.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/ObjectPoolHolder.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedAtomicReference.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedObject.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/ProducerNode.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ConcurrentLinkedQueueSimple.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/EliminationStack.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ExchangerStackORG.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodeORG.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodePool.java create mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java diff --git a/src/main/java/com/lmax/disruptor/MessageType.java b/src/main/java/com/lmax/disruptor/MessageType.java index 5aadb86..43c13ce 100644 --- a/src/main/java/com/lmax/disruptor/MessageType.java +++ b/src/main/java/com/lmax/disruptor/MessageType.java @@ -4,8 +4,8 @@ package com.lmax.disruptor; * Date: 2/2/15 */ public class MessageType { - public static final short ONE = 1; - public static final short TWO = 2; - public static final short THREE = 3; - public static final short ARRAY = 4; + public static final int ONE = 1; + public static final int TWO = 2; + public static final int THREE = 3; + public static final int ARRAY = 4; } \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 4d44bf5..f40646c 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -24,19 +24,6 @@ import dorkbox.util.messagebus.subscription.Subscription; */ public class MultiMBassador implements IMessageBus { - /** The number of CPUs, for spin control */ - private static final int NCPUS = Runtime.getRuntime().availableProcessors(); - - /** - * The number of times to spin before blocking. - * - * The value is empirically derived -- it works well across a - * variety of processors and OSes. Empirically, the best value - * seems not to vary with number of CPUs (beyond 2) so is just - * a constant. - */ - private static final int maxSpins = NCPUS < 2 ? 0 : 32*16; - // error handling is first-class functionality // this handler will receive all errors that occur during message dispatch or message handling private final Collection errorHandlers = new ArrayDeque(); @@ -103,15 +90,10 @@ public class MultiMBassador implements IMessageBus { // TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; Object message1; - try { - while (true) { - message1 = IN_QUEUE.take(); - publish(message1); - } - } catch (InterruptedException e) { - Thread.interrupted(); - return; - } + while (true) { + message1 = IN_QUEUE.take(); + publish(message1); + } } }; @@ -413,22 +395,22 @@ public class MultiMBassador implements IMessageBus { @Override public void publishAsync(final Object message) { if (message != null) { - Runnable runnable = new Runnable() { - @Override - public void run() { - MultiMBassador.this.publish(message); - } - }; +// Runnable runnable = new Runnable() { +// @Override +// public void run() { +// MultiMBassador.this.publish(message); +// } +// }; - try { +// try { // this.dispatchQueue.transfer(runnable); - this.dispatchQueue.put(message); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } + this.dispatchQueue.transfer(message); +// } catch (InterruptedException e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message)); +// } } } @@ -442,14 +424,14 @@ public class MultiMBassador implements IMessageBus { } }; - try { - this.dispatchQueue.put(runnable); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message1, message2)); - } +// try { + this.dispatchQueue.transfer(runnable); +// } catch (InterruptedException e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message1, message2)); +// } } } @@ -464,14 +446,14 @@ public class MultiMBassador implements IMessageBus { }; - try { - this.dispatchQueue.put(runnable); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message1, message2, message3)); - } +// try { + this.dispatchQueue.transfer(runnable); +// } catch (InterruptedException e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message1, message2, message3)); +// } } } @@ -485,14 +467,14 @@ public class MultiMBassador implements IMessageBus { } }; - try { +// try { this.dispatchQueue.tryTransfer(runnable, timeout, unit); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } +// } catch (InterruptedException e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message)); +// } } } @Override @@ -505,14 +487,14 @@ public class MultiMBassador implements IMessageBus { } }; - try { +// try { this.dispatchQueue.tryTransfer(runnable, timeout, unit); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message1, message2)); - } +// } catch (InterruptedException e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message1, message2)); +// } } } @@ -527,14 +509,14 @@ public class MultiMBassador implements IMessageBus { } }; - try { +// try { this.dispatchQueue.tryTransfer(runnable, timeout, unit); - } catch (InterruptedException e) { - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message1, message2, message3)); - } +// } catch (InterruptedException e) { +// handlePublicationError(new PublicationError() +// .setMessage("Error while adding an asynchronous message") +// .setCause(e) +// .setPublishedObject(message1, message2, message3)); +// } } } } diff --git a/src/main/java/dorkbox/util/messagebus/common/LinkedTransferQueue.java b/src/main/java/dorkbox/util/messagebus/common/LinkedTransferQueue.java deleted file mode 100644 index 4258b7e..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/LinkedTransferQueue.java +++ /dev/null @@ -1,839 +0,0 @@ -/* - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * - * Copyright 1997-2008 Sun Microsystems, Inc. All rights reserved. - * - * The contents of this file are subject to the terms of either the GNU - * General Public License Version 2 only ("GPL") or the Common Development - * and Distribution License("CDDL") (collectively, the "License"). You - * may not use this file except in compliance with the License. You can obtain - * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html - * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific - * language governing permissions and limitations under the License. - * - * When distributing the software, include this License Header Notice in each - * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. - * Sun designates this particular file as subject to the "Classpath" exception - * as provided by Sun in the GPL Version 2 section of the License file that - * accompanied this code. If applicable, add the following below the License - * Header, with the fields enclosed by brackets [] replaced by your own - * identifying information: "Portions Copyrighted [year] - * [name of copyright owner]" - * - * Contributor(s): - * - * If you wish your version of this file to be governed by only the CDDL or - * only the GPL Version 2, indicate your decision by adding "[Contributor] - * elects to include this software in this distribution under the [CDDL or GPL - * Version 2] license." If you don't indicate a single choice of license, a - * recipient has the option to distribute your version of this file under - * either the CDDL, the GPL Version 2 or to extend the choice of license to - * its licensees as provided above. However, if you add GPL Version 2 code - * and therefore, elected the GPL Version 2 license, then the option applies - * only if the new code is made subject to such option by the copyright - * holder. - */ - -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package dorkbox.util.messagebus.common; - -import java.util.AbstractQueue; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.locks.LockSupport; - -/** - * An unbounded TransferQueue based on linked nodes. - * This queue orders elements FIFO (first-in-first-out) with respect - * to any given producer. The head of the queue is that - * element that has been on the queue the longest time for some - * producer. The tail of the queue is that element that has - * been on the queue the shortest time for some producer. - * - *

Beware that, unlike in most collections, the size - * method is NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current number - * of elements requires a traversal of the elements. - * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. - * - *

Memory consistency effects: As with other concurrent - * collections, actions in a thread prior to placing an object into a - * {@code LinkedTransferQueue} - * happen-before - * actions subsequent to the access or removal of that element from - * the {@code LinkedTransferQueue} in another thread. - * - * @author Doug Lea - * @author The Netty Project (netty-dev@lists.jboss.org) - * @author Trustin Lee (tlee@redhat.com) - * - * @param the type of elements held in this collection - * - */ -public class LinkedTransferQueue extends AbstractQueue implements TransferQueue { - - /* - * This class extends the approach used in FIFO-mode - * SynchronousQueues. See the internal documentation, as well as - * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, - * Lea & Scott - * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) - * - * The main extension is to provide different Wait modes for the - * main "xfer" method that puts or takes items. These don't - * impact the basic dual-queue logic, but instead control whether - * or how threads block upon insertion of request or data nodes - * into the dual queue. It also uses slightly different - * conventions for tracking whether nodes are off-list or - * cancelled. - */ - - // Wait modes for xfer method - private static final int NOWAIT = 0; - private static final int TIMEOUT = 1; - private static final int WAIT = 2; - - /** The number of CPUs, for spin control */ - private static final int NCPUS = Runtime.getRuntime().availableProcessors(); - - /** - * The number of times to spin before blocking in timed waits. - * The value is empirically derived -- it works well across a - * variety of processors and OSes. Empirically, the best value - * seems not to vary with number of CPUs (beyond 2) so is just - * a constant. - */ - private static final int maxTimedSpins = NCPUS < 2? 0 : 32; - - /** - * The number of times to spin before blocking in untimed waits. - * This is greater than timed value because untimed waits spin - * faster since they don't need to check times on each spin. - */ - private static final int maxUntimedSpins = maxTimedSpins * 16; - - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - */ - private static final long spinForTimeoutThreshold = 1000L; - - /** - * Node class for LinkedTransferQueue. Opportunistically - * subclasses from AtomicReference to represent item. Uses Object, - * not E, to allow setting item to "this" after use, to avoid - * garbage retention. Similarly, setting the next field to this is - * used as sentinel that node is off list. - */ - private static final class QNode extends AtomicReference { - private static final long serialVersionUID = 5925596372370723938L; - - transient volatile QNode next; - transient volatile Thread waiter; // to control park/unpark - final boolean isData; - QNode(Object item, boolean isData) { - super(item); - this.isData = isData; - } - - private static final AtomicReferenceFieldUpdater nextUpdater; - static { - AtomicReferenceFieldUpdater tmp = null; - try { - tmp = AtomicReferenceFieldUpdater.newUpdater( - QNode.class, QNode.class, "next"); - - // Test if AtomicReferenceFieldUpdater is really working. - QNode testNode = new QNode(null, false); - tmp.set(testNode, testNode); - if (testNode.next != testNode) { - // Not set as expected - fall back to the safe mode. - throw new Exception(); - } - } catch (Throwable t) { - // Running in a restricted environment with a security manager. - tmp = null; - } - nextUpdater = tmp; - } - - boolean casNext(QNode cmp, QNode val) { - if (nextUpdater != null) { - return nextUpdater.compareAndSet(this, cmp, val); - } else { - return alternativeCasNext(cmp, val); - } - } - - private synchronized boolean alternativeCasNext(QNode cmp, QNode val) { - if (this.next == cmp) { - this.next = val; - return true; - } else { - return false; - } - } - } - - /** - * Padded version of AtomicReference used for head, tail and - * cleanMe, to alleviate contention across threads CASing one vs - * the other. - */ - private static final class PaddedAtomicReference extends AtomicReference { - private static final long serialVersionUID = 4684288940772921317L; - - // enough padding for 64bytes with 4byte refs - @SuppressWarnings("unused") - Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; - PaddedAtomicReference(T r) { super(r); } - } - - - /** head of the queue */ - private final PaddedAtomicReference head; - /** tail of the queue */ - private final PaddedAtomicReference tail; - - /** - * Reference to a cancelled node that might not yet have been - * unlinked from queue because it was the last inserted node - * when it cancelled. - */ - private final PaddedAtomicReference cleanMe; - - /** - * Tries to cas nh as new head; if successful, unlink - * old head's next node to avoid garbage retention. - */ - private boolean advanceHead(QNode h, QNode nh) { - if (h == this.head.get() && this.head.compareAndSet(h, nh)) { - h.next = h; // forget old next - return true; - } - return false; - } - - /** - * Puts or takes an item. Used for most queue operations (except - * poll() and tryTransfer()). See the similar code in - * SynchronousQueue for detailed explanation. - * @param e the item or if null, signifies that this is a take - * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT - * @param nanos timeout in nanosecs, used only if mode is TIMEOUT - * @return an item, or null on failure - */ - private Object xfer(Object e, int mode, long nanos) { - boolean isData = e != null; - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; - - for (;;) { - QNode t = tail.get(); - QNode h = head.get(); - - if (t != null && (t == h || t.isData == isData)) { - if (s == null) { - s = new QNode(e, isData); - } - QNode last = t.next; - if (last != null) { - if (t == tail.get()) { - tail.compareAndSet(t, last); - } - } - else if (t.casNext(null, s)) { - tail.compareAndSet(t, s); - return awaitFulfill(t, s, e, mode, nanos); - } - } - - else if (h != null) { - QNode first = h.next; - if (t == tail.get() && first != null && - advanceHead(h, first)) { - Object x = first.get(); - if (x != first && first.compareAndSet(x, e)) { - LockSupport.unpark(first.waiter); - return isData? e : x; - } - } - } - } - } - - - /** - * Version of xfer for poll() and tryTransfer, which - * simplifies control paths both here and in xfer - */ - private Object fulfill(Object e) { - boolean isData = e != null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; - - for (;;) { - QNode t = tail.get(); - QNode h = head.get(); - - if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; - if (t == tail.get()) { - if (last != null) { - tail.compareAndSet(t, last); - } else { - return null; - } - } - } - else if (h != null) { - QNode first = h.next; - if (t == tail.get() && - first != null && - advanceHead(h, first)) { - Object x = first.get(); - if (x != first && first.compareAndSet(x, e)) { - LockSupport.unpark(first.waiter); - return isData? e : x; - } - } - } - } - } - - /** - * Spins/blocks until node s is fulfilled or caller gives up, - * depending on wait mode. - * - * @param pred the predecessor of waiting node - * @param s the waiting node - * @param e the comparison value for checking match - * @param mode mode - * @param nanos timeout value - * @return matched item, or s if cancelled - */ - private Object awaitFulfill(QNode pred, QNode s, Object e, - int mode, long nanos) { - if (mode == NOWAIT) { - return null; - } - - long lastTime = mode == TIMEOUT? System.nanoTime() : 0; - Thread w = Thread.currentThread(); - int spins = -1; // set to desired spin count below - for (;;) { - if (w.isInterrupted()) { - s.compareAndSet(e, s); - } - Object x = s.get(); - if (x != e) { // Node was matched or cancelled - advanceHead(pred, s); // unlink if head - if (x == s) { // was cancelled - clean(pred, s); - return null; - } - else if (x != null) { - s.set(s); // avoid garbage retention - return x; - } else { - return e; - } - } - if (mode == TIMEOUT) { - long now = System.nanoTime(); - nanos -= now - lastTime; - lastTime = now; - if (nanos <= 0) { - s.compareAndSet(e, s); // try to cancel - continue; - } - } - if (spins < 0) { - QNode h = this.head.get(); // only spin if at head - spins = h != null && h.next == s ? - mode == TIMEOUT? - maxTimedSpins : maxUntimedSpins : 0; - } - if (spins > 0) { - --spins; - } else if (s.waiter == null) { - s.waiter = w; - } else if (mode != TIMEOUT) { - // LockSupport.park(this); - LockSupport.park(); // allows run on java5 - s.waiter = null; - spins = -1; - } - else if (nanos > spinForTimeoutThreshold) { - // LockSupport.parkNanos(this, nanos); - LockSupport.parkNanos(nanos); - s.waiter = null; - spins = -1; - } - } - } - - /** - * Returns validated tail for use in cleaning methods - */ - private QNode getValidatedTail() { - for (;;) { - QNode h = this.head.get(); - QNode first = h.next; - if (first != null && first.next == first) { // help advance - advanceHead(h, first); - continue; - } - QNode t = this.tail.get(); - QNode last = t.next; - if (t == this.tail.get()) { - if (last != null) { - this.tail.compareAndSet(t, last); // help advance - } else { - return t; - } - } - } - } - - /** - * Gets rid of cancelled node s with original predecessor pred. - * @param pred predecessor of cancelled node - * @param s the cancelled node - */ - void clean(QNode pred, QNode s) { - Thread w = s.waiter; - if (w != null) { // Wake up thread - s.waiter = null; - if (w != Thread.currentThread()) { - LockSupport.unpark(w); - } - } - /* - * At any given time, exactly one node on list cannot be - * deleted -- the last inserted node. To accommodate this, if - * we cannot delete s, we save its predecessor as "cleanMe", - * processing the previously saved version first. At least one - * of node s or the node previously saved can always be - * processed, so this always terminates. - */ - while (pred.next == s) { - QNode oldpred = reclean(); // First, help get rid of cleanMe - QNode t = getValidatedTail(); - if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; // s.next == s means s already off list - if (sn == s || pred.casNext(s, sn)) { - break; - } - } - else if (oldpred == pred || // Already saved - oldpred == null && this.cleanMe.compareAndSet(null, pred)) { - break; // Postpone cleaning - } - } - } - - /** - * Tries to unsplice the cancelled node held in cleanMe that was - * previously uncleanable because it was at tail. - * @return current cleanMe node (or null) - */ - private QNode reclean() { - /* - * cleanMe is, or at one time was, predecessor of cancelled - * node s that was the tail so could not be unspliced. If s - * is no longer the tail, try to unsplice if necessary and - * make cleanMe slot available. This differs from similar - * code in clean() because we must check that pred still - * points to a cancelled node that must be unspliced -- if - * not, we can (must) clear cleanMe without unsplicing. - * This can loop only due to contention on casNext or - * clearing cleanMe. - */ - QNode pred; - while ((pred = this.cleanMe.get()) != null) { - QNode t = getValidatedTail(); - QNode s = pred.next; - if (s != t) { - QNode sn; - if (s == null || s == pred || s.get() != s || - (sn = s.next) == s || pred.casNext(s, sn)) { - this.cleanMe.compareAndSet(pred, null); - } - } else { - break; - } - } - return pred; - } - - @SuppressWarnings("unchecked") - E cast(Object e) { - return (E)e; - } - - /** - * Creates an initially empty LinkedTransferQueue. - */ - public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - this.head = new PaddedAtomicReference(dummy); - this.tail = new PaddedAtomicReference(dummy); - this.cleanMe = new PaddedAtomicReference(null); - } - - /** - * Creates a LinkedTransferQueue - * initially containing the elements of the given collection, - * added in traversal order of the collection's iterator. - * @param c the collection of elements to initially contain - * @throws NullPointerException if the specified collection or any - * of its elements are null - */ - public LinkedTransferQueue(Collection c) { - this(); - addAll(c); - } - - @Override - public void put(E e) throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - if (Thread.interrupted()) { - throw new InterruptedException(); - } - xfer(e, NOWAIT, 0); - } - - @Override - public boolean offer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - if (Thread.interrupted()) { - throw new InterruptedException(); - } - xfer(e, NOWAIT, 0); - return true; - } - - @Override - public boolean offer(E e) { - if (e == null) { - throw new NullPointerException(); - } - xfer(e, NOWAIT, 0); - return true; - } - - @Override - public void transfer(E e) throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - if (xfer(e, WAIT, 0) == null) { - Thread.interrupted(); - throw new InterruptedException(); - } - } - - @Override - public boolean tryTransfer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null) { - return true; - } - if (!Thread.interrupted()) { - return false; - } - throw new InterruptedException(); - } - - @Override - public boolean tryTransfer(E e) { - if (e == null) { - throw new NullPointerException(); - } - return fulfill(e) != null; - } - - @Override - public E take() throws InterruptedException { - Object e = xfer(null, WAIT, 0); - if (e != null) { - return cast(e); - } - Thread.interrupted(); - throw new InterruptedException(); - } - - @Override - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); - if (e != null || !Thread.interrupted()) { - return cast(e); - } - throw new InterruptedException(); - } - - @Override - public E poll() { - return cast(fulfill(null)); - } - - @Override - public int drainTo(Collection c) { - if (c == null) { - throw new NullPointerException(); - } - if (c == this) { - throw new IllegalArgumentException(); - } - int n = 0; - E e; - while ( (e = poll()) != null) { - c.add(e); - ++n; - } - return n; - } - - @Override - public int drainTo(Collection c, int maxElements) { - if (c == null) { - throw new NullPointerException(); - } - if (c == this) { - throw new IllegalArgumentException(); - } - int n = 0; - E e; - while (n < maxElements && (e = poll()) != null) { - c.add(e); - ++n; - } - return n; - } - - // Traversal-based methods - - /** - * Return head after performing any outstanding helping steps - */ - QNode traversalHead() { - for (;;) { - QNode t = this.tail.get(); - QNode h = this.head.get(); - if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; - if (t == this.tail.get()) { - if (last != null) { - this.tail.compareAndSet(t, last); - } else if (first != null) { - Object x = first.get(); - if (x == first) { - advanceHead(h, first); - } else { - return h; - } - } else { - return h; - } - } - } - } - } - - @Override - public Iterator iterator() { - return new Itr(); - } - - /** - * Iterators. Basic strategy is to traverse list, treating - * non-data (i.e., request) nodes as terminating list. - * Once a valid data node is found, the item is cached - * so that the next call to next() will return it even - * if subsequently removed. - */ - class Itr implements Iterator { - QNode nextNode; // Next node to return next - QNode currentNode; // last returned node, for remove() - QNode prevNode; // predecessor of last returned node - E nextItem; // Cache of next item, once commited to in next - - Itr() { - this.nextNode = traversalHead(); - advance(); - } - - E advance() { - this.prevNode = this.currentNode; - this.currentNode = this.nextNode; - E x = this.nextItem; - - QNode p = this.nextNode.next; - for (;;) { - if (p == null || !p.isData) { - this.nextNode = null; - this.nextItem = null; - return x; - } - Object item = p.get(); - if (item != p && item != null) { - this.nextNode = p; - this.nextItem = cast(item); - return x; - } - this.prevNode = p; - p = p.next; - } - } - - @Override - public boolean hasNext() { - return this.nextNode != null; - } - - @Override - public E next() { - if (this.nextNode == null) { - throw new NoSuchElementException(); - } - return advance(); - } - - @Override - public void remove() { - QNode p = this.currentNode; - QNode prev = this.prevNode; - if (prev == null || p == null) { - throw new IllegalStateException(); - } - Object x = p.get(); - if (x != null && x != p && p.compareAndSet(x, p)) { - clean(prev, p); - } - } - } - - @Override - public E peek() { - for (;;) { - QNode h = traversalHead(); - QNode p = h.next; - if (p == null) { - return null; - } - Object x = p.get(); - if (p != x) { - if (!p.isData) { - return null; - } - if (x != null) { - return cast(x); - } - } - } - } - - @Override - public boolean isEmpty() { - for (;;) { - QNode h = traversalHead(); - QNode p = h.next; - if (p == null) { - return true; - } - Object x = p.get(); - if (p != x) { - if (!p.isData) { - return true; - } - if (x != null) { - return false; - } - } - } - } - - @Override - public boolean hasWaitingConsumer() { - for (;;) { - QNode h = traversalHead(); - QNode p = h.next; - if (p == null) { - return false; - } - Object x = p.get(); - if (p != x) { - return !p.isData; - } - } - } - - /** - * Returns the number of elements in this queue. If this queue - * contains more than Integer.MAX_VALUE elements, returns - * Integer.MAX_VALUE. - * - *

Beware that, unlike in most collections, this method is - * NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current - * number of elements requires an O(n) traversal. - * - * @return the number of elements in this queue - */ - @Override - public int size() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { - Object x = p.get(); - if (x != null && x != p) { - if (++count == Integer.MAX_VALUE) { - break; - } - } - } - return count; - } - - @Override - public int getWaitingConsumerCount() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { - if (p.get() == null) { - if (++count == Integer.MAX_VALUE) { - break; - } - } - } - return count; - } - - @Override - public int remainingCapacity() { - return Integer.MAX_VALUE; - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/TransferQueue.java b/src/main/java/dorkbox/util/messagebus/common/TransferQueue.java deleted file mode 100644 index 25d7353..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/TransferQueue.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ -package dorkbox.util.messagebus.common; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; - -/** - * A {@link BlockingQueue} in which producers may wait for consumers - * to receive elements. A {@code TransferQueue} may be useful for - * example in message passing applications in which producers - * sometimes (using method {@code transfer}) await receipt of - * elements by consumers invoking {@code take} or {@code poll}, - * while at other times enqueue elements (via method {@code put}) - * without waiting for receipt. Non-blocking and time-out versions of - * {@code tryTransfer} are also available. A TransferQueue may also - * be queried via {@code hasWaitingConsumer} whether there are any - * threads waiting for items, which is a converse analogy to a - * {@code peek} operation. - * - *

Like any {@code BlockingQueue}, a {@code TransferQueue} may be - * capacity bounded. If so, an attempted {@code transfer} operation - * may initially block waiting for available space, and/or - * subsequently block waiting for reception by a consumer. Note that - * in a queue with zero capacity, such as {@link SynchronousQueue}, - * {@code put} and {@code transfer} are effectively synonymous. - * - *

This interface is a member of the - * - * Java Collections Framework. - * - * @since 1.7 - * @author Doug Lea - * @param the type of elements held in this collection - */ -public interface TransferQueue extends BlockingQueue { - /** - * Transfers the specified element if there exists a consumer - * already waiting to receive it, otherwise returning {@code false} - * without enqueuing the element. - * - * @param e the element to transfer - * @return {@code true} if the element was transferred, else - * {@code false} - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - boolean tryTransfer(E e); - - /** - * Inserts the specified element into this queue, waiting if - * necessary for space to become available and the element to be - * dequeued by a consumer invoking {@code take} or {@code poll}. - * - * @param e the element to transfer - * @throws InterruptedException if interrupted while waiting, - * in which case the element is not enqueued. - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - void transfer(E e) throws InterruptedException; - - /** - * Inserts the specified element into this queue, waiting up to - * the specified wait time if necessary for space to become - * available and the element to be dequeued by a consumer invoking - * {@code take} or {@code poll}. - * - * @param e the element to transfer - * @param timeout how long to wait before giving up, in units of - * {@code unit} - * @param unit a {@code TimeUnit} determining how to interpret the - * {@code timeout} parameter - * @return {@code true} if successful, or {@code false} if - * the specified waiting time elapses before completion, - * in which case the element is not enqueued. - * @throws InterruptedException if interrupted while waiting, - * in which case the element is not enqueued. - * @throws ClassCastException if the class of the specified element - * prevents it from being added to this queue - * @throws NullPointerException if the specified element is null - * @throws IllegalArgumentException if some property of the specified - * element prevents it from being added to this queue - */ - boolean tryTransfer(E e, long timeout, TimeUnit unit) - throws InterruptedException; - - /** - * Returns {@code true} if there is at least one consumer waiting - * to dequeue an element via {@code take} or {@code poll}. - * The return value represents a momentary state of affairs. - * - * @return {@code true} if there is at least one waiting consumer - */ - boolean hasWaitingConsumer(); - - /** - * Returns an estimate of the number of consumers waiting to - * dequeue elements via {@code take} or {@code poll}. The return - * value is an approximation of a momentary state of affairs, that - * may be inaccurate if consumers have completed or given up - * waiting. The value may be useful for monitoring and heuristics, - * but not for synchronization control. Implementations of this - * method are likely to be noticeably slower than those for - * {@link #hasWaitingConsumer}. - * - * @return the number of consumers waiting to dequeue elements - */ - int getWaitingConsumerCount(); -} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/HandlerFactory.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/HandlerFactory.java deleted file mode 100644 index aedeaf2..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/HandlerFactory.java +++ /dev/null @@ -1,5 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq; - -public interface HandlerFactory { - public E newInstance(); -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java deleted file mode 100644 index a6422f6..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ /dev/null @@ -1,64 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq; - - - - -// mpmc sparse.shift = 2, for this to be fast. - -abstract class PrePad { -// volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; -} - -abstract class ColdItems extends PrePad { -// private static AtomicInteger count = new AtomicInteger(); -// public final int ID = count.getAndIncrement(); - - public int type = 0; - -// public short messageType = MessageType.ONE; - public Object item1 = null; -// public Object item2 = null; -// public Object item3 = null; -// public Object[] item4 = null; -} - -abstract class Pad0 extends ColdItems { -// volatile long y0, y1, y2, y4, y5, y6 = 7L; -// volatile long z0, z1, z2, z4, z5, z6 = 7L; -} - -abstract class HotItem1 extends Pad0 { -} - -abstract class Pad1 extends HotItem1 { -// volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; -} - -abstract class HotItem2 extends Pad1 { - public Thread thread; -} - -abstract class Pad2 extends HotItem2 { -// volatile long y0, y1, y2, y4, y5, y6 = 7L; -// volatile long z0, z1, z2, z4, z5, z6 = 7L; -} - -abstract class HotItem3 extends Pad2 { -// public transient volatile Node next; -} - -public class Node extends HotItem3 { - // post-padding -// volatile long y0, y1, y2, y4, y5, y6 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; - - public Node() { - } - -// @Override -// public String toString() { -// return "[" + this.ID + "]"; -// } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/ObjectPoolHolder.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/ObjectPoolHolder.java deleted file mode 100644 index edeb32d..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/ObjectPoolHolder.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2014 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq; - -import java.util.concurrent.atomic.AtomicBoolean; - -public class ObjectPoolHolder { - - // enough padding for 64bytes with 4byte refs, to alleviate contention across threads CASing one vs the other. - @SuppressWarnings("unused") - Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; - - private T value; - - AtomicBoolean state = new AtomicBoolean(true); - transient volatile Thread waiter; // to control park/unpark - - - public ObjectPoolHolder(T value) { - this.value = value; - } - - public T getValue() { - return this.value; - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedAtomicReference.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedAtomicReference.java deleted file mode 100644 index f0857ba..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedAtomicReference.java +++ /dev/null @@ -1,27 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * An AtomicReference with heuristic padding to lessen cache effects of this heavily CAS'ed location. While the padding adds - * noticeable space, all slots are created only on demand, and there will be more than one of them only when it would improve throughput - * more than enough to outweigh using extra space. - */ -public class PaddedAtomicReference extends AtomicReference { - private static final long serialVersionUID = 1L; - - // Improve likelihood of isolation on <= 64 byte cache lines - public long z0, z1, z2, z3, z4, z5, z6, z7, z8, z9, za, zb, zc, zd, ze; - - public PaddedAtomicReference() { - } - - public PaddedAtomicReference(T value) { - super(value); - } - - // prevent JIT from optimizing away the padding - public final long sum() { - return this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7 + this.z8 + this.z9 + this.za + this.zb + + this.zc + this.zd + this.ze; - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedObject.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedObject.java deleted file mode 100644 index fa0dd27..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/PaddedObject.java +++ /dev/null @@ -1,45 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq; - -import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; - -// Improve likelihood of isolation on <= 64 byte cache lines - -// java classes DO NOT mix their fields, so if we want specific padding, in a specific order, we must -// subclass them. ALl intel CPU L1/2/2 cache line size are all 64 bytes - -// see: http://psy-lob-saw.blogspot.de/2013/05/know-thy-java-object-memory-layout.html -// see: http://mechanitis.blogspot.de/2011/07/dissecting-disruptor-why-its-so-fast_22.html - -abstract class AtomicPaddedObject { - volatile long z0, z1, z2, z4, z5, z6 = 7L; - volatile O object = null; -} - -public class PaddedObject extends AtomicPaddedObject { - private final static long OFFSET; - static { - try { - OFFSET = UNSAFE.objectFieldOffset(AtomicPaddedObject.class.getDeclaredField("object")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - - // volatile long c0, c1, c2, c4, c5, c6, c7 = 7L; - volatile long z0, z1, z2, z4, z5, z6 = 7L; - - public PaddedObject() { - } - - public O get() { - return this.object; - } - - public final void set(Object newValue) { - UNSAFE.putOrderedObject(this, OFFSET, newValue); - } - - protected final boolean compareAndSet(Object expect, Object newValue) { - return UNSAFE.compareAndSwapObject(this, OFFSET, expect, newValue); - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/ProducerNode.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/ProducerNode.java deleted file mode 100644 index b3744bc..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/ProducerNode.java +++ /dev/null @@ -1,26 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq; - -import java.util.concurrent.atomic.AtomicReference; - -import com.lmax.disruptor.MessageHolder; - -/** - * Nodes hold partially exchanged data. This class opportunistically subclasses AtomicReference to represent the hole. So get() returns - * hole, and compareAndSet CAS'es value into hole. This class cannot be parameterized as "V" because of the use of non-V CANCEL sentinels. - */ -final class ProducerNode extends AtomicReference { - private static final long serialVersionUID = 1L; - - - /** The element offered by the Thread creating this node. */ - public volatile MessageHolder item = new MessageHolder(); - - /** The Thread waiting to be signalled; null until waiting. */ - public volatile Thread waiter = null; - - // for the stack, for waiting producers - public ProducerNode next = null; - - public ProducerNode() { - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ConcurrentLinkedQueueSimple.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ConcurrentLinkedQueueSimple.java deleted file mode 100644 index 9cf9dcb..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ConcurrentLinkedQueueSimple.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package dorkbox.util.messagebus.common.simpleq.bakup; - -import java.util.AbstractQueue; -import java.util.Collection; -import java.util.ConcurrentModificationException; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - - -/** - * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. - * This queue orders elements FIFO (first-in-first-out). - * The head of the queue is that element that has been on the - * queue the longest time. - * The tail of the queue is that element that has been on the - * queue the shortest time. New elements - * are inserted at the tail of the queue, and the queue retrieval - * operations obtain elements at the head of the queue. - * A ConcurrentLinkedQueue is an appropriate choice when - * many threads will share access to a common collection. - * This queue does not permit null elements. - * - *

This implementation employs an efficient "wait-free" - * algorithm based on one described in Simple, - * Fast, and Practical Non-Blocking and Blocking Concurrent Queue - * Algorithms by Maged M. Michael and Michael L. Scott. - * - *

Beware that, unlike in most collections, the size method - * is NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current number - * of elements requires a traversal of the elements. - * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. - * - *

Memory consistency effects: As with other concurrent - * collections, actions in a thread prior to placing an object into a - * {@code ConcurrentLinkedQueue} - * happen-before - * actions subsequent to the access or removal of that element from - * the {@code ConcurrentLinkedQueue} in another thread. - * - *

This class is a member of the - * - * Java Collections Framework. - * - * @since 1.5 - * @author Doug Lea - * @param the type of elements held in this collection - * - */ -public class ConcurrentLinkedQueueSimple extends AbstractQueue implements Queue { - /* - * This is a straight adaptation of Michael & Scott algorithm. - * For explanation, read the paper. The only (minor) algorithmic - * difference is that this version supports lazy deletion of - * internal nodes (method remove(Object)) -- remove CAS'es item - * fields to null. The normal queue operations unlink but then - * pass over nodes with null item fields. Similarly, iteration - * methods ignore those with nulls. - * - * Also note that like most non-blocking algorithms in this - * package, this implementation relies on the fact that in garbage - * collected systems, there is no possibility of ABA problems due - * to recycled nodes, so there is no need to use "counted - * pointers" or related techniques seen in versions used in - * non-GC'ed settings. - */ - private static class Node { - private volatile E item; - private volatile Node next; - - private static final AtomicReferenceFieldUpdater - nextUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next"); - - private static final AtomicReferenceFieldUpdater - itemUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Object.class, "item"); - - Node(E x) { this.item = x; } - - Node(E x, Node n) { this.item = x; this.next = n; } - - E getItem() { - return this.item; - } - - boolean casItem(E cmp, E val) { - return itemUpdater.compareAndSet(this, cmp, val); - } - - void setItem(E val) { - itemUpdater.set(this, val); - } - - Node getNext() { - return this.next; - } - - boolean casNext(Node cmp, Node val) { - return nextUpdater.compareAndSet(this, cmp, val); - } - - void setNext(Node val) { - nextUpdater.set(this, val); - } - } - - private static final AtomicReferenceFieldUpdater - tailUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueueSimple.class, Node.class, "tail"); - - private static final AtomicReferenceFieldUpdater - headUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueueSimple.class, Node.class, "head"); - - private boolean casTail(Node cmp, Node val) { - return tailUpdater.compareAndSet(this, cmp, val); - } - - private boolean casHead(Node cmp, Node val) { - return headUpdater.compareAndSet(this, cmp, val); - } - - - /** - * Pointer to header node, initialized to a dummy node. The first - * actual node is at head.getNext(). - */ - private transient volatile Node head = new Node(null, null); - - /** Pointer to last node on list **/ - private transient volatile Node tail = this.head; - - - /** - * Creates a ConcurrentLinkedQueue that is initially empty. - */ - public ConcurrentLinkedQueueSimple() {} - - /** - * Creates a ConcurrentLinkedQueue - * initially containing the elements of the given collection, - * added in traversal order of the collection's iterator. - * @param c the collection of elements to initially contain - * @throws NullPointerException if the specified collection or any - * of its elements are null - */ - public ConcurrentLinkedQueueSimple(Collection c) { - for (Iterator it = c.iterator(); it.hasNext();) { - add(it.next()); - } - } - - // Have to override just to update the javadoc - - /** - * Inserts the specified element at the tail of this queue. - * - * @return true (as specified by {@link Collection#add}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean add(E e) { - return offer(e); - } - - /** - * Inserts the specified element at the tail of this queue. - * - * @return true (as specified by {@link Queue#offer}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean offer(E e) { - if (e == null) { - throw new NullPointerException(); - } - - Node n = new Node(e, null); - for (;;) { - Node t = this.tail; - Node s = t.getNext(); - if (t == this.tail) { - if (s == null) { - if (t.casNext(s, n)) { - casTail(t, n); - return true; - } - } else { - casTail(t, s); - } - } - } - } - - @Override - public E poll() { - for (;;) { - Node h = this.head; - Node t = this.tail; - Node first = h.getNext(); - if (h == this.head) { - if (h == t) { - if (first == null) { - return null; - } else { - casTail(t, first); - } - } else if (casHead(h, first)) { - E item = first.getItem(); - if (item != null) { - first.setItem(null); - return item; - } - // else skip over deleted item, continue loop, - } - } - } - } - - @Override - public E peek() { // same as poll except don't remove item - for (;;) { - Node h = this.head; - Node t = this.tail; - Node first = h.getNext(); - if (h == this.head) { - if (h == t) { - if (first == null) { - return null; - } else { - casTail(t, first); - } - } else { - E item = first.getItem(); - if (item != null) { - return item; - } else { - casHead(h, first); - } - } - } - } - } - - /** - * Returns the first actual (non-header) node on list. This is yet - * another variant of poll/peek; here returning out the first - * node, not element (so we cannot collapse with peek() without - * introducing race.) - */ - Node first() { - for (;;) { - Node h = this.head; - Node t = this.tail; - Node first = h.getNext(); - if (h == this.head) { - if (h == t) { - if (first == null) { - return null; - } else { - casTail(t, first); - } - } else { - if (first.getItem() != null) { - return first; - } else { - casHead(h, first); - } - } - } - } - } - - - /** - * Returns true if this queue contains no elements. - * - * @return true if this queue contains no elements - */ - @Override - public boolean isEmpty() { - return first() == null; - } - - /** - * Returns the number of elements in this queue. If this queue - * contains more than Integer.MAX_VALUE elements, returns - * Integer.MAX_VALUE. - * - *

Beware that, unlike in most collections, this method is - * NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current - * number of elements requires an O(n) traversal. - * - * @return the number of elements in this queue - */ - @Override - public int size() { - int count = 0; - for (Node p = first(); p != null; p = p.getNext()) { - if (p.getItem() != null) { - // Collections.size() spec says to max out - if (++count == Integer.MAX_VALUE) { - break; - } - } - } - return count; - } - - /** - * Returns true if this queue contains the specified element. - * More formally, returns true if and only if this queue contains - * at least one element e such that o.equals(e). - * - * @param o object to be checked for containment in this queue - * @return true if this queue contains the specified element - */ - @Override - public boolean contains(Object o) { - if (o == null) { - return false; - } - for (Node p = first(); p != null; p = p.getNext()) { - E item = p.getItem(); - if (item != null && - o.equals(item)) { - return true; - } - } - return false; - } - - /** - * Removes a single instance of the specified element from this queue, - * if it is present. More formally, removes an element e such - * that o.equals(e), if this queue contains one or more such - * elements. - * Returns true if this queue contained the specified element - * (or equivalently, if this queue changed as a result of the call). - * - * @param o element to be removed from this queue, if present - * @return true if this queue changed as a result of the call - */ - @Override - public boolean remove(Object o) { - if (o == null) { - return false; - } - for (Node p = first(); p != null; p = p.getNext()) { - E item = p.getItem(); - if (item != null && - o.equals(item) && - p.casItem(item, null)) { - return true; - } - } - return false; - } - - /** - * Returns an iterator over the elements in this queue in proper sequence. - * The returned iterator is a "weakly consistent" iterator that - * will never throw {@link ConcurrentModificationException}, - * and guarantees to traverse elements as they existed upon - * construction of the iterator, and may (but is not guaranteed to) - * reflect any modifications subsequent to construction. - * - * @return an iterator over the elements in this queue in proper sequence - */ - @Override - public Iterator iterator() { - return new Itr(); - } - - private class Itr implements Iterator { - /** - * Next node to return item for. - */ - private Node nextNode; - - /** - * nextItem holds on to item fields because once we claim - * that an element exists in hasNext(), we must return it in - * the following next() call even if it was in the process of - * being removed when hasNext() was called. - */ - private E nextItem; - - /** - * Node of the last returned item, to support remove. - */ - private Node lastRet; - - Itr() { - advance(); - } - - /** - * Moves to next valid node and returns item to return for - * next(), or null if no such. - */ - private E advance() { - this.lastRet = this.nextNode; - E x = this.nextItem; - - Node p = this.nextNode == null? first() : this.nextNode.getNext(); - for (;;) { - if (p == null) { - this.nextNode = null; - this.nextItem = null; - return x; - } - E item = p.getItem(); - if (item != null) { - this.nextNode = p; - this.nextItem = item; - return x; - } else { - p = p.getNext(); - } - } - } - - @Override - public boolean hasNext() { - return this.nextNode != null; - } - - @Override - public E next() { - if (this.nextNode == null) { - throw new NoSuchElementException(); - } - return advance(); - } - - @Override - public void remove() { - Node l = this.lastRet; - if (l == null) { - throw new IllegalStateException(); - } - // rely on a future traversal to relink. - l.setItem(null); - this.lastRet = null; - } - } -} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/EliminationStack.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/EliminationStack.java deleted file mode 100644 index 57d27fc..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/EliminationStack.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Copyright 2013 Ben Manes. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq.bakup; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The top of the stack - * is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A - * {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection. - * Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements. - *

- * This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique - * avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is - * described in A Scalable Lock-free Stack Algorithm. - *

- * Iterators are weakly consistent, returning elements reflecting the state of the stack at some point at or since the creation of - * the iterator. They do not throw {@link java.util.ConcurrentModificationException}, and may proceed concurrently with other - * operations. Elements contained in the stack since the creation of the iterator will be returned exactly once. - *

- * Beware that, unlike in most collections, the {@code size} method is NOT a constant-time operation. Because of the asynchronous - * nature of these stacks, determining the current number of elements requires a traversal of the elements, and so may report inaccurate - * results if this collection is modified during traversal. - * - * @author ben.manes@gmail.com (Ben Manes) - * @see Caffeine - * @param the type of elements held in this collection - */ -public final class EliminationStack { - - /* - * A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value - * atomically. - * - * The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows - * pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and - * therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a - * collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the - * same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a - * cancellation occurs. - * - * This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a - * match [4]. - * - * [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data - * Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free - * fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel - * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396 - */ - - /** The number of CPUs */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** The number of slots in the elimination array. */ - static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2); - - /** The mask value for indexing into the arena. */ - static int ARENA_MASK = ARENA_LENGTH - 1; - - /** The number of times to step ahead, probe, and try to match. */ - static final int LOOKAHEAD = Math.min(4, NCPU); - - /** - * The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an - * operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging - * items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU - * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. - * The value here is approximately the average of those across a range of tested systems. - */ - static final int SPINS = NCPU == 1 ? 0 : 2000; - - /** The number of times to spin per lookahead step */ - static final int SPINS_PER_STEP = SPINS / LOOKAHEAD; - - /** A marker indicating that the arena slot is free. */ - static final Object FREE = null; - - /** A marker indicating that a thread is waiting in that slot to be transfered an element. */ - static final Object CONSUMER_WAITER = new Object(); - - /** A marker indicating that a thread is waiting in that slot to be transfered an element. */ - static final Object WAITER = new Object(); - - static int ceilingNextPowerOfTwo(int x) { - // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1); - } - - /** The top of the stack. */ - final AtomicReference> top; - - /** The arena where slots can be used to perform an exchange. */ - final AtomicReference[] arena; - - /** Creates a {@code EliminationStack} that is initially empty. */ - @SuppressWarnings("unchecked") - public EliminationStack() { - this.top = new AtomicReference>(); - this.arena = new AtomicReference[ARENA_LENGTH]; - for (int i = 0; i < ARENA_LENGTH; i++) { - this.arena[i] = new AtomicReference(); - } - } - - /** - * Returns true if this stack contains no elements. - * - * @return true if this stack contains no elements - */ - public boolean hasPendingMessages() { - for (;;) { - Node node = this.top.get(); - if (node == null) { - return false; - } - E e = node.get(); - if (e == null) { - this.top.compareAndSet(node, node.next); - } else { - return true; - } - } - } - - /** - * Removes and returns the top element or returns null if this stack is empty. - * - * @return the top of this stack, or null if this stack is empty - */ - public E pop() { - for (;;) { - Node current = this.top.get(); - if (current == null) { - return null; - } - - // Attempt to pop from the stack, backing off to the elimination array if contended - if (this.top.get() == current && this.top.compareAndSet(current, current.next)) { - return current.get(); - } - - E e = tryReceive(); - if (e != null) { - return e; - } - } - } - - /** - * Pushes an element onto the stack (in other words, adds an element at the top of this stack). - * - * @param e the element to push - */ - public void push(E e) { - Node node = new Node(e); - for (;;) { - node.next = this.top.get(); - - // Attempt to push to the stack, backing off to the elimination array if contended - if (this.top.get() == node.next && this.top.compareAndSet(node.next, node)) { - return; - } - if (tryTransfer(e)) { - return; - } - } - } - - - public boolean remove(Object o) { - for (Node node = this.top.get(); node != null; node = node.next) { - E value = node.get(); - if (o.equals(value) && node.compareAndSet(value, null)) { - return true; - } - } - return false; - } - - - /** - * Attempts to transfer the element to a waiting consumer. - * - * @param e the element to try to exchange - * @return if the element was successfully transfered - */ - boolean tryTransfer(E e) { - int start = startIndex(); - return scanAndTransferToWaiter(e, start) || awaitExchange(e, start); - } - - /** - * Scans the arena searching for a waiting consumer to exchange with. - * - * @param e the element to try to exchange - * @return if the element was successfully transfered - */ - boolean scanAndTransferToWaiter(E e, int start) { - for (int i = 0; i < ARENA_LENGTH; i++) { - int index = start + i & ARENA_MASK; - AtomicReference slot = this.arena[index]; - // if some thread is waiting to receive an element then attempt to provide it - if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) { - return true; - } - } - return false; - } - - /** - * Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and - * spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another - * slot until a total spin limit is reached. - * - * @param e the element to transfer - * @param start the arena location to start at - * @return if an exchange was completed successfully - */ - boolean awaitExchange(E e, int start) { - for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) { - int index = start + step & ARENA_MASK; - AtomicReference slot = this.arena[index]; - - Object found = slot.get(); - if (found == WAITER && slot.compareAndSet(WAITER, e)) { - return true; - } else if (found == FREE && slot.compareAndSet(FREE, e)) { - int slotSpins = 0; - for (;;) { - found = slot.get(); - if (found != e) { - return true; - } else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) { - // failed to transfer the element; try a new slot - totalSpins += slotSpins; - break; - } - slotSpins++; - } - } - } - // failed to transfer the element; give up - return false; - } - - /** - * Attempts to receive an element from a waiting provider. - * - * @return an element if successfully transfered or null if unsuccessful - */ - E tryReceive() { - int start = startIndex(); - E e = scanAndMatch(start); - return e == null ? awaitMatch(start) : e; - } - - /** - * Scans the arena searching for a waiting producer to transfer from. - * - * @param start the arena location to start at - * @return an element if successfully transfered or null if unsuccessful - */ - E scanAndMatch(int start) { - for (int i = 0; i < ARENA_LENGTH; i++) { - int index = start + i & ARENA_MASK; - AtomicReference slot = this.arena[index]; - - // accept a transfer if an element is available - Object found = slot.get(); - if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - E e = (E) found; - return e; - } - } - return null; - } - - /** - * Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and - * spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by - * selecting another slot until a total spin limit is reached. - * - * @param start the arena location to start at - * @return an element if successfully transfered or null if unsuccessful - */ - E awaitMatch(int start) { - for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) { - int index = start + step & ARENA_MASK; - AtomicReference slot = this.arena[index]; - Object found = slot.get(); - - if (found == FREE) { - if (slot.compareAndSet(FREE, WAITER)) { - int slotSpins = 0; - for (;;) { - found = slot.get(); - if (found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - E e = (E) found; - return e; - } else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) { - // failed to receive an element; try a new slot - totalSpins += slotSpins; - break; - } - slotSpins++; - } - } - } else if (found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - E e = (E) found; - return e; - } - } - - // failed to receive an element; give up - return null; - } - - /** - * Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code - * (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform - * distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a - * simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function. - */ - static int startIndex() { - long id = Thread.currentThread().getId(); - return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193; - } - - /** - * An item on the stack. The node is mutable prior to being inserted to avoid object churn and is immutable by the time it has been - * published to other threads. - */ - static final class Node extends AtomicReference { - private static final long serialVersionUID = 1L; - - Node next; - - Node(E value) { - super(value); - } - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ExchangerStackORG.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ExchangerStackORG.java deleted file mode 100644 index 3f55774..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/ExchangerStackORG.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Copyright 2013 Ben Manes. All Rights Reserved. - * - * Note, that we cannot use a Treiber stack, since that suffers from ABA problems if we reuse nodes. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq.bakup; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.LockSupport; - -import com.lmax.disruptor.MessageHolder; - -import dorkbox.util.messagebus.common.simpleq.PaddedAtomicReference; - -//http://www.cs.bgu.ac.il/~hendlerd/papers/EfficientCAS.pdf -//This is an implementation of the **BEST** CAS FIFO queue for XEON (INTEL) architecture. -//This is the WRONG approach for running on SPARC. -//More info at: http://java.dzone.com/articles/wanna-get-faster-wait-bit - -// copywrite dorkbox, llc - -/** - * An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The top of the stack - * is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A - * {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection. - * Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements. - *

- * This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique - * avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is - * described in A Scalable Lock-free Stack Algorithm. - *

- * Iterators are weakly consistent, returning elements reflecting the state of the stack at some point at or since the creation of - * the iterator. They do not throw {@link java.util.ConcurrentModificationException}, and may proceed concurrently with other - * operations. Elements contained in the stack since the creation of the iterator will be returned exactly once. - *

- * Beware that, unlike in most collections, the {@code size} method is NOT a constant-time operation. Because of the asynchronous - * nature of these stacks, determining the current number of elements requires a traversal of the elements, and so may report inaccurate - * results if this collection is modified during traversal. - * - * @author ben.manes@gmail.com (Ben Manes) - * @see Caffeine - * @param the type of elements held in this collection - */ -public final class ExchangerStackORG { - - static { - // Prevent rare disastrous classloading in first call to LockSupport.park. - // See: https://bugs.openjdk.java.net/browse/JDK-8074773 - Class ensureLoaded = LockSupport.class; - LockSupport.unpark(Thread.currentThread()); - } - - /* - * A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value - * atomically. - * - * The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows - * pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and - * therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a - * collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the - * same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a - * cancellation occurs. - * - * This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a - * match [4]. - * - * [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data - * Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free - * fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel - * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396 - */ - - /** The number of CPUs */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** The number of slots in the elimination array. */ - private static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2); - - /** The mask value for indexing into the arena. */ - private static int ARENA_MASK = ARENA_LENGTH - 1; - - /** The number of times to step ahead, probe, and try to match. */ - private static final int LOOKAHEAD = Math.min(4, NCPU); - - /** - * The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an - * operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging - * items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU - * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. - * The value here is approximately the average of those across a range of tested systems. - */ - private static final int SPINS = NCPU == 1 ? 0 : 2000; - - /** The number of times to spin per lookahead step */ - private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD; - - /** A marker indicating that the arena slot is free. */ - private static final Object FREE = null; - - /** A marker indicating that a thread is waiting in that slot to be transfered an element. */ - private static final Object WAITER = new Object(); - - - private static final Object READY = null; - private static final Object LOCK_PARK_FIRST = new Object(); - private static final Object LOCK_UNPARK_FIRST = new Object(); - private static final Object UNLOCK = new Object(); - - private static int ceilingNextPowerOfTwo(int x) { - // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1); - } - - - /** The top of the stack. */ - private final AtomicReference> top; - - /** The arena where slots can be used to perform an exchange. */ - private final AtomicReference[] arena; - - private final int numberConsumerThreads; - - private final ValueCopier copier; - - /** Creates a {@code EliminationStack} that is initially empty. */ - @SuppressWarnings("unchecked") - public ExchangerStackORG(int numberConsumerThreads, ValueCopier copier) { - this.numberConsumerThreads = numberConsumerThreads; - this.copier = copier; - - this.top = new PaddedAtomicReference>(); - - this.arena = new PaddedAtomicReference[ARENA_LENGTH]; - for (int i = 0; i < ARENA_LENGTH; i++) { - this.arena[i] = new PaddedAtomicReference(); - } - } - - public void put(NodeORG producer) throws InterruptedException { - Thread producerThread = Thread.currentThread(); - producer.next = null; - producer.waiter = producerThread; - - NodeORG topNode; - for (;;) { - topNode = this.top.get(); - - // it's a [EMPTY or PRODUCER], so we just add ourself as another producer waiting to get popped by a consumer - if (topNode == null || !topNode.isConsumer) { - producer.next = topNode; - - // Attempt to push to the stack, backing off to the elimination array if contended - if (this.top.compareAndSet(topNode, producer)) { - // now we wait - if (!park(producer, producerThread)) { - // have to make sure to pass up exceptions - throw new InterruptedException(); - } - return; - } - - // Contention, so back off to the elimination array -// busySpin(); - if (tryTransfer(producerThread, producer)) { - // means we got a consumer and our data has been transferred to it. - return; - } - } - - else { - // if consumer, pop it, xfer our data to it, wake it, done - if (this.top.compareAndSet(topNode, topNode.next)) { - // xfer data - this.copier.copyValues(producer.item, topNode.item); - - unpark(topNode); - return; - } else { - // contention, so back off -// busySpinPerStep(); - } - } - } - } - - - public void take(NodeORG consumer) throws InterruptedException { - // consumers ALWAYS use the same thread for the same node, so setting the waiter again is not necessary - Thread consumerThread = Thread.currentThread(); - - NodeORG topNode; - for (;;) { - topNode = this.top.get(); - - // it's a [EMPTY or CONSUMER], so we just add ourself as another consumer waiting to get popped by a producer - if (topNode == null || topNode.isConsumer) { - consumer.next = topNode; - - // Attempt to push to the stack, backing off to the elimination array if contended - if (this.top.compareAndSet(topNode, consumer)) { - // now we wait - if (!park(consumer, consumerThread)) { - // have to make sure to pass up exceptions - throw new InterruptedException(); - } - return; - } - - // Contention, so back off to the elimination array -// busySpin(); - -// node = tryReceive(consumerThread); -// if (node != null) { -// // we got a PRODUCER. Have to transfer producer data data to myself -// this.copier.copyValues(node.item, consumer.item); -// return; -// } - } - else { - // if producer, pop it, xfer it's data to us, wake it, done - if (this.top.compareAndSet(topNode, topNode.next)) { - // forget old head (it's still referenced by top) - topNode.next = null; - - // xfer data - this.copier.copyValues(topNode.item, consumer.item); - - unpark(topNode); - return; - } else { - // contention, so back off -// busySpinPerStep(); - } - } - } - } - - - /** - * @return false if we were interrupted, true if we were unparked by another thread - */ - private boolean park(NodeORG myNode, Thread myThread) { - for (;;) { - if (myNode.state.compareAndSet(READY, LOCK_PARK_FIRST)) { - do { - // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. - LockSupport.park(); - if (myThread.isInterrupted()) { - myNode.state.set(READY); - return false; - } - } while (myNode.state.get() != UNLOCK); - myNode.state.set(READY); - return true; - } else if (myNode.state.compareAndSet(LOCK_UNPARK_FIRST, READY)) { - // no parking - return true; - } - } - } - - /** - * Unparks the other node (if it was waiting) - */ - private void unpark(NodeORG otherNode) { - // busy spin if we are NOT in the READY or LOCK state - for (;;) { - if (otherNode.state.compareAndSet(READY, LOCK_UNPARK_FIRST)) { - // no parking - return; - } else if (otherNode.state.compareAndSet(LOCK_PARK_FIRST, UNLOCK)) { - LockSupport.unpark(otherNode.waiter); - return; - } - } - } - - private void busySpin() { - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = SPINS; - for (;;) { - if (spins > 0) { - --spins; - } else { - break; - } - } - } - - private void busySpinPerStep() { - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = SPINS_PER_STEP; - for (;;) { - if (spins > 0) { - --spins; - } else { - break; - } - } - } - - /** - * @return true if all of our consumers are currently waiting for data from producers - */ - public boolean hasPendingMessages() { - // count the number of consumers waiting, it should be the same as the number of threads configured - NodeORG node; - - node = this.top.get(); - if (node == null || !node.isConsumer) { - return false; - } - - int size = 0; - for (node = this.top.get(); node != null; node = node.next) { - if (node.isConsumer) { - size++; - } - } - - // return true if we have ALL consumers waiting for data - return size != this.numberConsumerThreads; - } - - /** - * Attempts to transfer the element to a waiting consumer. - * - * @param e the element to try to exchange - * @return if the element was successfully transfered - */ - private boolean tryTransfer(Thread thread, NodeORG e) { - int start = startIndex(thread); - return scanAndTransferToWaiter(e, start) || awaitExchange(e, start); - } - - /** - * Scans the arena searching for a waiting consumer to exchange with. - * - * @param e the element to try to exchange - * @return if the element was successfully transfered - */ - private boolean scanAndTransferToWaiter(NodeORG e, int start) { - for (int i = 0; i < ARENA_LENGTH; i++) { - int index = start + i & ARENA_MASK; - AtomicReference slot = this.arena[index]; - - // if some thread is waiting to receive an element then attempt to provide it - if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) { - return true; - } - } - return false; - } - - /** - * Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and - * spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another - * slot until a total spin limit is reached. - * - * @param e the element to transfer - * @param start the arena location to start at - * @return if an exchange was completed successfully - */ - private boolean awaitExchange(NodeORG e, int start) { - for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) { - int index = start + step & ARENA_MASK; - AtomicReference slot = this.arena[index]; - - Object found = slot.get(); - if (found == WAITER && slot.compareAndSet(WAITER, e)) { - return true; - } else if (found == FREE && slot.compareAndSet(FREE, e)) { - int slotSpins = 0; - for (;;) { - found = slot.get(); - if (found != e) { - return true; - } else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) { - // failed to transfer the element; try a new slot - totalSpins += slotSpins; - break; - } - slotSpins++; - } - } - } - // failed to transfer the element; give up - return false; - } - - /** - * Attempts to receive an element from a waiting producer. - * - * @return an element if successfully transfered or null if unsuccessful - */ - private NodeORG tryReceive(Thread thread) { - int start = startIndex(thread); - NodeORG e = scanAndMatch(start); - if (e == null) { - return awaitMatch(start); - } else { - return e; - } - } - - /** - * Scans the arena searching for a waiting producer to transfer from. - * - * @param start the arena location to start at - * @return an element if successfully transfered or null if unsuccessful - */ - private NodeORG scanAndMatch(int start) { - for (int i = 0; i < ARENA_LENGTH; i++) { - int index = start + i & ARENA_MASK; - AtomicReference slot = this.arena[index]; - - // accept a transfer if an element is available - Object found = slot.get(); - if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - NodeORG cast = (NodeORG) found; - return cast; - } - } - return null; - } - - /** - * Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and - * spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by - * selecting another slot until a total spin limit is reached. - * - * @param start the arena location to start at - * @param node - * @return an element if successfully transfered or null if unsuccessful - */ - private NodeORG awaitMatch(int start) { - for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) { - int index = start + step & ARENA_MASK; - - AtomicReference slot = this.arena[index]; - Object found = slot.get(); - - if (found == FREE) { - if (slot.compareAndSet(FREE, WAITER)) { - int slotSpins = 0; - for (;;) { - found = slot.get(); - if (found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - NodeORG cast = (NodeORG) found; - return cast; - } else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) { - // failed to receive an element; try a new slot - totalSpins += slotSpins; - break; - } - slotSpins++; - } - } - } else if (found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - NodeORG cast = (NodeORG) found; - return cast; - } - } - - // failed to receive an element; give up - return null; - } - - /** - * Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code - * (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform - * distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a - * simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function. - */ - private static int startIndex(Thread thread) { - long id = thread.getId(); - return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193; - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodeORG.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodeORG.java deleted file mode 100644 index 244d67f..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodeORG.java +++ /dev/null @@ -1,42 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq.bakup; - -import java.util.concurrent.atomic.AtomicReference; - -public class NodeORG { - // Improve likelihood of isolation on <= 64 byte cache lines - public long z0, z1, z2, z3, z4, z5, z6, z7, z8, z9, za, zb, zc, zd, ze; - - - public volatile E item; - - /** The Thread waiting to be signaled to wake up*/ - public volatile Thread waiter; - public AtomicReference state = new AtomicReference(); - - public volatile NodeORG next; - public volatile short ID = 0; - - public final boolean isConsumer; - - private NodeORG(boolean isConsumer, E item) { - this.isConsumer = isConsumer; - this.item = item; - } - - // prevent JIT from optimizing away the padding - public final long sum() { - return this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7 + this.z8 + this.z9 + this.za + this.zb + + this.zc + this.zd + this.ze; - } - - public static NodeORG newProducer(E item) { - // producers VARY which node is used on which thread. (so the waiter is set in the put method) - return new NodeORG(false, item); - } - - public static NodeORG newConsumer(E item) { - // consumers will always use the SAME node in the SAME thread - NodeORG node = new NodeORG(true, item); - node.waiter = Thread.currentThread(); - return node; - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodePool.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodePool.java deleted file mode 100644 index adf03fa..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/bakup/NodePool.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Modified by Dorkbox, llc - * - * - * Copyright 2013 Ben Manes. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ -package dorkbox.util.messagebus.common.simpleq.bakup; - -import java.util.concurrent.atomic.AtomicReference; - -import dorkbox.util.messagebus.common.simpleq.Node; -import dorkbox.util.messagebus.common.simpleq.PaddedAtomicReference; - -/** - * An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The top of the stack - * is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A - * {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection. - * Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements. - *

- * This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique - * avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is - * described in A Scalable Lock-free Stack Algorithm. - * - * @author ben.manes@gmail.com (Ben Manes) - * @see Caffeine - * @param the type of elements held in this collection - */ -public final class NodePool { - - /* - * A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value - * atomically. - * - * The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows - * pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and - * therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a - * collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the - * same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a - * cancellation occurs. - * - * This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a - * match [4]. - * - * [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data - * Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free - * fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel - * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396 - */ - - /** The number of CPUs */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** The number of slots in the elimination array. */ - private static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2); - - /** The mask value for indexing into the arena. */ - private static int ARENA_MASK = ARENA_LENGTH - 1; - - /** The number of times to step ahead, probe, and try to match. */ - private static final int LOOKAHEAD = Math.min(4, NCPU); - - /** - * The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an - * operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging - * items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU - * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. - * The value here is approximately the average of those across a range of tested systems. - */ - private static final int SPINS = NCPU == 1 ? 0 : 2000; - - /** The number of times to spin per lookahead step */ - private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD; - - /** A marker indicating that the arena slot is free. */ - @SuppressWarnings("rawtypes") - private static final Node FREE = null; - - /** A marker indicating that a thread is waiting in that slot to be transfered an element. */ - @SuppressWarnings("rawtypes") - private static final Node WAITER = new Node(); - - private static int ceilingNextPowerOfTwo(int x) { - // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1); - } - - - - - - /** The top of the stack. */ - private final AtomicReference> top; - - /** The arena where slots can be used to perform an exchange. */ - private final AtomicReference[] arena; - - /** Creates a {@code EliminationStack} that is initially empty. */ - @SuppressWarnings("unchecked") - public NodePool() { - this.top = new PaddedAtomicReference>(); - - this.arena = new PaddedAtomicReference[ARENA_LENGTH]; - for (int i = 0; i < ARENA_LENGTH; i++) { - this.arena[i] = new PaddedAtomicReference(); - } - } - - /** - * Removes and returns the top element or returns null if this stack is empty. - * - * @return the top of this stack, or null if this stack is empty - */ - public NodeORG pop() { - NodeORG current; - - for (;;) { - current = this.top.get(); - if (current == null) { - return null; - } - - // Attempt to pop from the stack, backing off to the elimination array if contended - if (this.top.compareAndSet(current, current.next)) { - return current; - } - - current = tryReceive(); - if (current != null) { - return current; - } - } - } - - /** - * Pushes an element onto the stack (in other words, adds an element at the top of this stack). - * - * @param e the element to push - */ - public void put(NodeORG node) { - NodeORG current; - for (;;) { - current = this.top.get(); - node.next = current; - - // Attempt to push to the stack, backing off to the elimination array if contended - if (this.top.compareAndSet(current, node)) { - return; - } - if (tryTransfer(node)) { - return; - } - } - } - - /** - * Attempts to transfer the element to a waiting consumer. - * - * @param e the element to try to exchange - * @return if the element was successfully transfered - */ - boolean tryTransfer(NodeORG e) { - int start = startIndex(); - return scanAndTransferToWaiter(e, start) || awaitExchange(e, start); - } - - /** - * Scans the arena searching for a waiting consumer to exchange with. - * - * @param e the element to try to exchange - * @return if the element was successfully transfered - */ - boolean scanAndTransferToWaiter(NodeORG e, int start) { - for (int i = 0; i < ARENA_LENGTH; i++) { - int index = start + i & ARENA_MASK; - AtomicReference slot = this.arena[index]; - // if some thread is waiting to receive an element then attempt to provide it - if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) { - return true; - } - } - return false; - } - - /** - * Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and - * spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another - * slot until a total spin limit is reached. - * - * @param e the element to transfer - * @param start the arena location to start at - * @return if an exchange was completed successfully - */ - boolean awaitExchange(NodeORG e, int start) { - for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) { - int index = start + step & ARENA_MASK; - AtomicReference slot = this.arena[index]; - - Object found = slot.get(); - if (found == WAITER && slot.compareAndSet(WAITER, e)) { - return true; - } else if (found == FREE && slot.compareAndSet(FREE, e)) { - int slotSpins = 0; - for (;;) { - found = slot.get(); - if (found != e) { - return true; - } else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) { - // failed to transfer the element; try a new slot - totalSpins += slotSpins; - break; - } - slotSpins++; - } - } - } - // failed to transfer the element; give up - return false; - } - - /** - * Attempts to receive an element from a waiting provider. - * - * @return an element if successfully transfered or null if unsuccessful - */ - NodeORG tryReceive() { - int start = startIndex(); - NodeORG e = scanAndMatch(start); - return e == null ? awaitMatch(start) : e; - } - - /** - * Scans the arena searching for a waiting producer to transfer from. - * - * @param start the arena location to start at - * @return an element if successfully transfered or null if unsuccessful - */ - NodeORG scanAndMatch(int start) { - for (int i = 0; i < ARENA_LENGTH; i++) { - int index = start + i & ARENA_MASK; - AtomicReference slot = this.arena[index]; - - // accept a transfer if an element is available - Object found = slot.get(); - if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - NodeORG cast = (NodeORG) found; - return cast; - } - } - return null; - } - - /** - * Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and - * spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by - * selecting another slot until a total spin limit is reached. - * - * @param start the arena location to start at - * @return an element if successfully transfered or null if unsuccessful - */ - NodeORG awaitMatch(int start) { - for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) { - int index = start + step & ARENA_MASK; - - AtomicReference slot = this.arena[index]; - Object found = slot.get(); - - if (found == FREE) { - if (slot.compareAndSet(FREE, WAITER)) { - int slotSpins = 0; - for (;;) { - found = slot.get(); - if (found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - NodeORG cast = (NodeORG) found; - return cast; - } else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) { - // failed to receive an element; try a new slot - totalSpins += slotSpins; - break; - } - slotSpins++; - } - } - } else if (found != WAITER && slot.compareAndSet(found, FREE)) { - @SuppressWarnings("unchecked") - NodeORG cast = (NodeORG) found; - return cast; - } - } - - // failed to receive an element; give up - return null; - } - - /** - * Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code - * (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform - * distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a - * simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function. - */ - static int startIndex() { - long id = Thread.currentThread().getId(); - return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193; - } -} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java index c28d439..48bb6c1 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayTransferQueue.java @@ -2,8 +2,6 @@ package dorkbox.util.messagebus.common.simpleq.jctools; import java.util.concurrent.ThreadLocalRandom; -import dorkbox.util.messagebus.common.simpleq.Node; - public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField { /** The number of CPUs */ @@ -133,265 +131,6 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField= currentProducerIndex && // test against cached pIndex -//// currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must -//// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] -//// // return null; -//// -//// // spin -////// busySpin(prevElementType, incomingType); -//// } -//// -//// // another consumer beat us and moved sequence ahead, retry 2 -//// // only producer busyspins -//// continue; -//// } -// -// -// // it is possible that two threads check the queue at the exact same time, -// // BOTH can think that the queue is empty, resulting in a deadlock between threads -// // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when -// // in reality, it is. -// -// if (prevElementType == TYPE_FREE || prevElementType == incomingType) { -// // empty or same mode = push+park onto queue -// -// if (timed && nanos <= 0) { -// // can't wait -// return null; -// } -// -// final long delta = pSeq - currentProducerIndex; -// if (delta == 0) { -// if (prevElementType != lpElementType(prevElementOffset)) { -// // inconsistent read of type. -// continue; -// } -// -// // this is expected if we see this first time around -// final long nextProducerIndex = currentProducerIndex + 1; -// if (casProducerIndex(currentProducerIndex, nextProducerIndex)) { -// // Successful CAS: full barrier -// -// // on 64bit(no compressed oops) JVM this is the same as seqOffset -// final long offset = calcElementOffset(currentProducerIndex, mask); -// -// if (prevElementType == TYPE_FREE && currentProducerIndex != currentConsumerIndex) { -// // inconsistent read -// -// // try to undo, if possible. -// if (!casProducerIndex(nextProducerIndex, currentProducerIndex)) { -//// spElementType(offset, TYPE_CANCELED); -// -// // increment sequence by 1, the value expected by consumer -// // (seeing this value from a producer will lead to retry 2) -// soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore -// } -// -// continue; -// } -// -// spElementType(offset, incomingType); -// spElementThread(offset, Thread.currentThread()); -// -// if (incomingType == TYPE_CONSUMER) { -// // increment sequence by 1, the value expected by consumer -// // (seeing this value from a producer will lead to retry 2) -// soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore -// -// park(offset, timed, nanos); -// -// Object lpElement = lpElementNoCast(offset); -// Object lpItem1 = lpItem1(lpElement); -// -// return lpItem1; -// } else { -// // producer -// Object lpElement = lpElementNoCast(offset); -// spItem1(lpElement, item); -// -// // increment sequence by 1, the value expected by consumer -// // (seeing this value from a producer will lead to retry 2) -// soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore -// -// park(offset, timed, nanos); -// -// return null; -// } -// } -// // failed cas, retry 1 -// } else if (delta < 0 && // poll has not moved this value forward -// currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex -// currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex -// // Extra check required to ensure [Queue.offer == false iff queue is full] -// // return false; -// // we spin if the queue is full -// } -// -// // another producer has moved the sequence by one, retry 2 -// -// // only producer will busySpin -//// busySpin(prevElementType, incomingType); -// } -// else { -// // complimentary mode = unpark+pop off queue -// -// cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); -// final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad -// -// final long nextConsumerIndex = currentConsumerIndex + 1; -// final long delta = seq - nextConsumerIndex; -// -// if (delta == 0) { -// if (prevElementType != lpElementType(prevElementOffset)) { -// // inconsistent read of type. -// continue; -// } -// -// if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { -// // Successful CAS: full barrier -// -// if (prevElementType != lpElementType(prevElementOffset)) { -// System.err.println("WAHT"); -// -// // Move sequence ahead by capacity, preparing it for next offer -// // (seeing this value from a consumer will lead to retry 2) -// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore -// continue; -// } -// -// // on 64bit(no compressed oops) JVM this is the same as seqOffset -// final long offset = calcElementOffset(currentConsumerIndex, mask); -// spElementType(offset, TYPE_FREE); -// -// final Object thread = lpElementThread(offset); -// final Object lpElement = lpElementNoCast(offset); -// -// if (incomingType == TYPE_CONSUMER) { -// // is already cancelled/fulfilled -// if (thread == null || -// !casElementThread(offset, thread, null)) { // FULL barrier -// -// // Move sequence ahead by capacity, preparing it for next offer -// // (seeing this value from a consumer will lead to retry 2) -// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore -// -// continue; -// } -// -// Object returnItem = lpItem1(lpElement); -// spItem1(lpElement, null); -// -// UNSAFE.unpark(thread); -// -// // Move sequence ahead by capacity, preparing it for next offer -// // (seeing this value from a consumer will lead to retry 2) -// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore -// -// return returnItem; -// } else { -// // producer -// spItem1(lpElement, item); -// -// // is already cancelled/fulfilled -// if (thread == null || -// !casElementThread(offset, thread, null)) { // FULL barrier -// -// // Move sequence ahead by capacity, preparing it for next offer -// // (seeing this value from a consumer will lead to retry 2) -// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore -// -// continue; -// } -// -// UNSAFE.unpark(thread); -// -// // Move sequence ahead by capacity, preparing it for next offer -// // (seeing this value from a consumer will lead to retry 2) -// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore -// -// return null; -// } -// } -// // failed cas, retry 1 -// } else if (delta < 0 && // slot has not been moved by producer -// currentConsumerIndex >= currentProducerIndex && // test against cached pIndex -// currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must -// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] -// // return null; -// -// // spin -//// busySpin(prevElementType, incomingType); -// } -// -// // another consumer beat us and moved sequence ahead, retry 2 -// // only producer busyspins -// } -// } -// } - private static final void busySpin() { ThreadLocalRandom randomYields = ThreadLocalRandom.current(); diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java new file mode 100644 index 0000000..f4f21ac --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/Node.java @@ -0,0 +1,106 @@ +package dorkbox.util.messagebus.common.simpleq.jctools; + +import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; + +import com.lmax.disruptor.MessageType; + +abstract class ColdItems { + public int type = 0; + + public int messageType = MessageType.ONE; + public Object item1 = null; + public Object item2 = null; + public Object item3 = null; + public Object[] item4 = null; +} + +abstract class Pad0 extends ColdItems { +// volatile long y0, y1, y2, y4, y5, y6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + +abstract class HotItem1 extends Pad0 { + public Thread thread; +} + +public class Node extends HotItem1 { + private static final long ITEM1; + private static final long THREAD; + private static final long TYPE; + + static { + try { + TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type")); + ITEM1 = UNSAFE.objectFieldOffset(Node.class.getField("item1")); + THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); + + // now make sure we can access UNSAFE + Node node = new Node(); + Object o = new Object(); + spItem1(node, o); + Object lpItem1 = lpItem1(node); + spItem1(node, null); + + if (lpItem1 != o) { + throw new Exception("Cannot access unsafe fields"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static final void spItem1(Object node, Object item) { + UNSAFE.putObject(node, ITEM1, item); + } + + static final void soItem1(Object node, Object item) { + UNSAFE.putOrderedObject(node, ITEM1, item); + } + + static final Object lpItem1(Object node) { + return UNSAFE.getObject(node, ITEM1); + } + + static final Object lvItem1(Object node) { + return UNSAFE.getObjectVolatile(node, ITEM1); + } + + ////////////// + static final void spType(Object node, int type) { + UNSAFE.putInt(node, TYPE, type); + } + + static final int lpType(Object node) { + return UNSAFE.getInt(node, TYPE); + } + + /////////// + static final void spThread(Object node, Object thread) { + UNSAFE.putObject(node, THREAD, thread); + } + + static final void soThread(Object node, Object thread) { + UNSAFE.putOrderedObject(node, THREAD, thread); + } + + static final Object lpThread(Object node) { + return UNSAFE.getObject(node, THREAD); + } + + static final Object lvThread(Object node) { + return UNSAFE.getObjectVolatile(node, THREAD); + } + + + // post-padding +// volatile long y0, y1, y2, y4, y5, y6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; + + public Node() { + } + +// @Override +// public String toString() { +// return "[" + this.ID + "]"; +// } +} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java index 2b8258c..faaa1fc 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/SimpleQueue.java @@ -1,106 +1,35 @@ package dorkbox.util.messagebus.common.simpleq.jctools; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpType; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread; +import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType; import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE; +import java.util.Collection; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - -import dorkbox.util.messagebus.common.simpleq.Node; - -public final class SimpleQueue extends MpmcArrayQueueConsumerField { - public static final int TYPE_EMPTY = 0; - public static final int TYPE_CONSUMER = 1; - public static final int TYPE_PRODUCER = 2; - - private static final long ITEM1_OFFSET; - private static final long THREAD; - private static final long TYPE; - - static { - try { - TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type")); - ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); - THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - - private static final void spItem1(Object node, Object item) { - UNSAFE.putObject(node, ITEM1_OFFSET, item); - } - - private static final void soItem1(Object node, Object item) { - UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item); - } - - private static final Object lpItem1(Object node) { - return UNSAFE.getObject(node, ITEM1_OFFSET); - } - - private static final Object lvItem1(Object node) { - return UNSAFE.getObjectVolatile(node, ITEM1_OFFSET); - } +import java.util.concurrent.TransferQueue; - private static final void spType(Object node, int type) { - UNSAFE.putInt(node, TYPE, type); - } +public final class SimpleQueue extends MpmcArrayQueueConsumerField implements TransferQueue { + private static final int TYPE_EMPTY = 0; + private static final int TYPE_CONSUMER = 1; + private static final int TYPE_PRODUCER = 2; - private static final int lpType(Object node) { - return UNSAFE.getInt(node, TYPE); - } - - private static final void spThread(Object node, Object thread) { - UNSAFE.putObject(node, THREAD, thread); - } - - private static final void soThread(Object node, Object thread) { - UNSAFE.putOrderedObject(node, THREAD, thread); - } - - private static final Object lpThread(Object node) { - return UNSAFE.getObject(node, THREAD); - } - - private static final Object lvThread(Object node) { - return UNSAFE.getObjectVolatile(node, THREAD); - } - - /** The number of CPUs */ + /** Is it multi-processor? */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; - /** - * The number of times to spin (with randomly interspersed calls - * to Thread.yield) on multiprocessor before blocking when a node - * is apparently the first waiter in the queue. See above for - * explanation. Must be a power of two. The value is empirically - * derived -- it works pretty well across a variety of processors, - * numbers of CPUs, and OSes. - */ - private static final int FRONT_SPINS = 1 << 7; + private static int INPROGRESS_SPINS = MP ? 32 : 0; + private static int PUSH_SPINS = MP ? 512 : 0; + private static int POP_SPINS = MP ? 512 : 0; - /** - * The number of times to spin before blocking when a node is - * preceded by another node that is apparently spinning. Also - * serves as an increment to FRONT_SPINS on phase changes, and as - * base average frequency for yielding during spins. Must be a - * power of two. - */ - private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; - - - /** The number of CPUs */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** - * The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an - * operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging - * items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU - * resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. - * The value here is approximately the average of those across a range of tested systems. - */ - private static final int SPINS = MP ? 0 : 512; // orig: 2000 /** * The number of times to spin before blocking in timed waits. @@ -109,26 +38,26 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ - static final int maxTimedSpins = NCPU < 2 ? 0 : 32; + private static int PARK_TIMED_SPINS = MP ? 32 : 0; /** * The number of times to spin before blocking in untimed waits. * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin. */ - static final int maxUntimedSpins = maxTimedSpins * 16; - static final int negMaxUntimedSpins = -maxUntimedSpins; + private static int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16; /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */ - static final long spinForTimeoutThreshold = 1000L; - private int size; + private static final long SPIN_THRESHOLD = 1000L; - public SimpleQueue(final int size) { - super(1 << 17); - this.size = size; + private final int consumerCount; + + public SimpleQueue(final int consumerCount) { + super(Pow2.roundToPowerOfTwo(consumerCount*Runtime.getRuntime().availableProcessors())); + this.consumerCount = consumerCount; } private final static ThreadLocal nodeThreadLocal = new ThreadLocal() { @@ -140,9 +69,12 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { /** - * PRODUCER + * PRODUCER method + *

+ * Place an item on the queue, and wait (if necessary) for a corresponding consumer to take it. This will wait as long as necessary. */ - public void put(Object item) throws InterruptedException { + @Override + public final void transfer(final Object item) { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; @@ -164,7 +96,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); if (previousElement == null) { // the last producer hasn't finished setting the object yet - busySpin_InProgress(); + busySpin(INPROGRESS_SPINS); continue; } @@ -207,7 +139,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } // whoops, inconsistent state - busySpin_pushConflict(); + busySpin(PUSH_SPINS); continue; } case TYPE_CONSUMER: { @@ -237,7 +169,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } // whoops, inconsistent state - busySpin_popConflict(); + busySpin(POP_SPINS); continue; } } @@ -247,7 +179,8 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { /** * CONSUMER */ - public Object take() throws InterruptedException { + @Override + public final Object take() { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long[] sBuffer = this.sequenceBuffer; @@ -268,7 +201,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); if (previousElement == null) { // the last producer hasn't finished setting the object yet - busySpin_InProgress(); + busySpin(INPROGRESS_SPINS); continue; } @@ -313,7 +246,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } // whoops, inconsistent state - busySpin_pushConflict(); + busySpin(PUSH_SPINS); continue; } case TYPE_PRODUCER: { @@ -343,138 +276,90 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } // whoops, inconsistent state - busySpin_popConflict(); + busySpin(POP_SPINS); continue; } } } } - /** - * Spin for when the current thread is waiting for the item to be set. The producer index has incremented, but the - * item isn't present yet. - * @param random - */ - private static final void busySpin_InProgress() { -// ThreadLocalRandom randomYields = ThreadLocalRandom.current(); -// -// if (randomYields.nextInt(1) != 0) { -////// LockSupport.parkNanos(1); // occasionally yield -// Thread.yield(); -////// break; -// } - - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = 128; + private static final void busySpin(int spins) { for (;;) { if (spins > 0) { -// if (randomYields.nextInt(CHAINED_SPINS) == 0) { -//// LockSupport.parkNanos(1); // occasionally yield -// Thread.yield(); -//// break; -// } --spins; } else { - break; + return; } } } - private static final void busySpin_pushConflict() { -// ThreadLocalRandom randomYields = ThreadLocalRandom.current(); + @SuppressWarnings("null") + private final void park(final Object node, final Thread myThread, final boolean timed, long nanos) { + long lastTime = timed ? System.nanoTime() : 0L; + int spins = -1; // initialized after first item and cancel checks + ThreadLocalRandom randomYields = null; // bound if needed - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = 256; for (;;) { - if (spins > 0) { -// if (randomYields.nextInt(1) != 0) { -////// LockSupport.parkNanos(1); // occasionally yield -// Thread.yield(); -//// break; -// } - --spins; - } else { - break; - } - } - } + if (lvThread(node) == null) { + return; + } else if (myThread.isInterrupted() || timed && nanos <= 0) { + return; + } else if (spins < 0) { + if (timed) { + spins = PARK_TIMED_SPINS; + } else { + spins = PARK_UNTIMED_SPINS; + } - private static final void busySpin_popConflict() { -// ThreadLocalRandom randomYields = ThreadLocalRandom.current(); - - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = 64; - for (;;) { - if (spins > 0) { -// if (randomYields.nextInt(1) != 0) { -////// LockSupport.parkNanos(1); // occasionally yield -// Thread.yield(); -//// break; -// } - --spins; - } else { - break; - } - } - } - - private static final void busySpin2() { - ThreadLocalRandom randomYields = ThreadLocalRandom.current(); - - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = 64; - for (;;) { - if (spins > 0) { - if (randomYields.nextInt(1) != 0) { -//// LockSupport.parkNanos(1); // occasionally yield - Thread.yield(); -// break; + if (spins > 0) { + randomYields = ThreadLocalRandom.current(); + } + } else if (spins > 0) { + if (randomYields.nextInt(256) == 0) { + Thread.yield(); // occasionally yield } --spins; - } else { - break; - } - } - } - - private static final void busySpin(ThreadLocalRandom random) { - // busy spin for the amount of time (roughly) of a CPU context switch -// int spins = spinsFor(); - int spins = 128; - for (;;) { - if (spins > 0) { - --spins; - if (random.nextInt(CHAINED_SPINS) == 0) { -//// LockSupport.parkNanos(1); // occasionally yield -// Thread.yield(); - break; + } else if (timed) { + long now = System.nanoTime(); + long remaining = nanos -= now - lastTime; + if (remaining > 0) { + if (remaining < SPIN_THRESHOLD) { + busySpin(PARK_UNTIMED_SPINS); + } else { + UNSAFE.park(false, nanos); + } } + lastTime = now; } else { - break; + // park can return for NO REASON (must check for thread values) + UNSAFE.park(false, 0L); } } } -// @Override -// public boolean isEmpty() { -// // Order matters! -// // Loading consumer before producer allows for producer increments after consumer index is read. -// // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is -// // nothing we can do to make this an exact method. -// return lvConsumerIndex() == lvProducerIndex(); -// } + private final void unpark(Object node) { + final Object thread = lpThread(node); + soThread(node, null); + UNSAFE.unpark(thread); + } - public boolean hasPendingMessages() { + @Override + public final boolean isEmpty() { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is + // nothing we can do to make this an exact method. + return lvConsumerIndex() == lvProducerIndex(); + } + + public final boolean hasPendingMessages() { // count the number of consumers waiting, it should be the same as the number of threads configured -// return this.consumersWaiting.size() == this.numberConsumerThreads; -// return false; - long consumerIndex = lvConsumerIndex(); long producerIndex = lvProducerIndex(); if (consumerIndex != producerIndex) { final Object previousElement = lpElementNoCast(calcElementOffset(producerIndex-1)); - if (previousElement != null && lpType(previousElement) == TYPE_CONSUMER && consumerIndex + this.size == producerIndex) { + if (previousElement != null && lpType(previousElement) == TYPE_CONSUMER && consumerIndex + this.consumerCount == producerIndex) { return false; } } @@ -482,75 +367,158 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { return true; } - public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub + + + + + + + + public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) { } - public final void park(final Object node, final Thread myThread, final boolean timed, final long nanos) throws InterruptedException { - ThreadLocalRandom randomYields = null; // bound if needed - - -// long lastTime = timed ? System.nanoTime() : 0; -// int spins = timed ? maxTimedSpins : maxUntimedSpins; -// int spins = maxTimedSpins; - int spins = 51200; - -// if (timed) { -// long now = System.nanoTime(); -// nanos -= now - lastTime; -// lastTime = now; -// if (nanos <= 0) { -//// s.tryCancel(e); -// continue; -// } -// } - - for (;;) { - if (lvThread(node) == null) { - return; - } else if (spins > 0) { -// if (randomYields == null) { -// randomYields = ThreadLocalRandom.current(); -// } else if (randomYields.nextInt(spins) == 0) { -// Thread.yield(); // occasionally yield -// } - --spins; - } else if (myThread.isInterrupted()) { - Thread.interrupted(); - throw new InterruptedException(); - } else { - // park can return for NO REASON (must check for thread values) - UNSAFE.park(false, 0L); - } - } - } - - public void unpark(Object node) { - final Object thread = lpThread(node); - soThread(node, null); - UNSAFE.unpark(thread); - } @Override - public boolean offer(Node message) { + public boolean offer(Object message) { // TODO Auto-generated method stub return false; } @Override - public Node poll() { + public Object poll() { // TODO Auto-generated method stub return null; } @Override - public Node peek() { - // TODO Auto-generated method stub - return null; + public Object peek() { + long currConsumerIndex; + Object e; + do { + currConsumerIndex = lvConsumerIndex(); + // other consumers may have grabbed the element, or queue might be empty + e = lpElementNoCast(calcElementOffset(currConsumerIndex)); + // only return null if queue is empty + } while (e == null && currConsumerIndex != lvProducerIndex()); + return e; } @Override public int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and + * consumer indices, therefore protection is required to ensure size is within valid range. In the + * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer + * index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + + + + + + + + + + + + @Override + public void put(Object e) throws InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public boolean offer(Object e, long timeout, TimeUnit unit) throws InterruptedException { + // TODO Auto-generated method stub + return false; + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int remainingCapacity() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int drainTo(Collection c) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int drainTo(Collection c, int maxElements) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Object[] toArray(Object[] a) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean containsAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean addAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean removeAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean retainAll(Collection c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean tryTransfer(Object e) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean tryTransfer(Object e, long timeout, TimeUnit unit) throws InterruptedException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean hasWaitingConsumer() { + // TODO Auto-generated method stub + return false; + } + + @Override + public int getWaitingConsumerCount() { // TODO Auto-generated method stub return 0; } diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java index 244d594..ffccddb 100644 --- a/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/LinkTransferQueueConcurrentPerfTest.java @@ -12,20 +12,21 @@ */ package dorkbox.util.messagebus; +import java.util.concurrent.LinkedTransferQueue; + import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.LinkedTransferQueue; -import dorkbox.util.messagebus.common.simpleq.Node; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; public class LinkTransferQueueConcurrentPerfTest { // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10; + public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - private static final int concurrency = 8; + private static final int concurrency = 2; public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); diff --git a/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java b/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java index dda3de7..c6d3c46 100644 --- a/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/LinkTransferQueuePerfTest.java @@ -20,7 +20,7 @@ import java.util.concurrent.LinkedTransferQueue; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.Node; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; public class LinkTransferQueuePerfTest { // 15 == 32 * 1024 diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java index 653ee29..5c4439c 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltConcurrentPerfTest.java @@ -18,8 +18,8 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.Node; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; public class MpmcQueueAltConcurrentPerfTest { // 15 == 32 * 1024 diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java index 515c457..b30fbf7 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java @@ -18,9 +18,9 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.Node; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; public class MpmcQueueAltPerfTest { // 15 == 32 * 1024 diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java index 207baa9..c894fa8 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java @@ -18,8 +18,8 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.Node; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; public class MpmcQueueBaselineNodePerfTest { // 15 == 32 * 1024 diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java index ea63269..b98f0f3 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueConcurrentPerfTest.java @@ -18,8 +18,8 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.Node; import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; public class MpmcQueueConcurrentPerfTest { // 15 == 32 * 1024 diff --git a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java index 9de2822..b26c6c0 100644 --- a/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java @@ -1,50 +1,70 @@ -/* - * Copyright 2012 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.Node; +import dorkbox.util.messagebus.common.simpleq.jctools.Node; import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue; public class SimpleQueueAltPerfTest { - // 15 == 32 * 1024 - public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10; + public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); - public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); + public static final int QUEUE_CAPACITY = 1 << 17; - private static final int concurrency = 8; + private static final int concurrency = 2; public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency); - final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY); - final long[] results = new long[20]; - for (int i = 0; i < 20; i++) { + final int warmupRuns = 2; + final int runs = 5; + + long average = 0; + + final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY); + average = averageRun(warmupRuns, runs, queue); + +// SimpleQueue.INPROGRESS_SPINS = 64; +// SimpleQueue.POP_SPINS = 512; +// SimpleQueue.PUSH_SPINS = 512; +// SimpleQueue.PARK_SPINS = 512; +// +// for (int i = 128; i< 10000;i++) { +// int full = 2*i; +// +// final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY); +// SimpleQueue.PARK_SPINS = full; +// +// +// long newAverage = averageRun(warmupRuns, runs, queue); +// if (newAverage > average) { +// average = newAverage; +// System.err.println("Best value: " + i + " : " + newAverage); +// } +// } + + + System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); + } + + private static long averageRun(int warmUpRuns, int sumCount, SimpleQueue queue) throws Exception { + int runs = warmUpRuns + sumCount; + final long[] results = new long[runs]; + for (int i = 0; i < runs; i++) { System.gc(); results[i] = performanceRun(i, queue); } - // only average last 10 results for summary + // only average last X results for summary long sum = 0; - for (int i = 10; i < 20; i++) { + for (int i = warmUpRuns; i < runs; i++) { sum += results[i]; } - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10); + + return sum/sumCount; } private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception { @@ -109,13 +129,9 @@ public class SimpleQueueAltPerfTest { int i = REPETITIONS; this.start = System.nanoTime(); - try { - do { - producer.put(TEST_VALUE); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } + do { + producer.transfer(TEST_VALUE); + } while (0 != --i); } } @@ -134,13 +150,9 @@ public class SimpleQueueAltPerfTest { Object result = null; int i = REPETITIONS; - try { - do { - result = consumer.take(); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } + do { + result = consumer.take(); + } while (0 != --i); this.result = result; this.end = System.nanoTime();