Improved simpleq, now beats LTQ in concurrent tests

This commit is contained in:
nathan 2015-05-04 19:03:50 +02:00
parent 2705daa073
commit f96a2f4c19
25 changed files with 450 additions and 3468 deletions

View File

@ -4,8 +4,8 @@ package com.lmax.disruptor;
* Date: 2/2/15
*/
public class MessageType {
public static final short ONE = 1;
public static final short TWO = 2;
public static final short THREE = 3;
public static final short ARRAY = 4;
public static final int ONE = 1;
public static final int TWO = 2;
public static final int THREE = 3;
public static final int ARRAY = 4;
}

View File

@ -24,19 +24,6 @@ import dorkbox.util.messagebus.subscription.Subscription;
*/
public class MultiMBassador implements IMessageBus {
/** The number of CPUs, for spin control */
private static final int NCPUS = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin before blocking.
*
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
private static final int maxSpins = NCPUS < 2 ? 0 : 32*16;
// error handling is first-class functionality
// this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
@ -103,15 +90,10 @@ public class MultiMBassador implements IMessageBus {
// TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
Object message1;
try {
while (true) {
message1 = IN_QUEUE.take();
publish(message1);
}
} catch (InterruptedException e) {
Thread.interrupted();
return;
}
while (true) {
message1 = IN_QUEUE.take();
publish(message1);
}
}
};
@ -413,22 +395,22 @@ public class MultiMBassador implements IMessageBus {
@Override
public void publishAsync(final Object message) {
if (message != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(message);
}
};
// Runnable runnable = new Runnable() {
// @Override
// public void run() {
// MultiMBassador.this.publish(message);
// }
// };
try {
// try {
// this.dispatchQueue.transfer(runnable);
this.dispatchQueue.put(message);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
this.dispatchQueue.transfer(message);
// } catch (InterruptedException e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// }
}
}
@ -442,14 +424,14 @@ public class MultiMBassador implements IMessageBus {
}
};
try {
this.dispatchQueue.put(runnable);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
}
// try {
this.dispatchQueue.transfer(runnable);
// } catch (InterruptedException e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message1, message2));
// }
}
}
@ -464,14 +446,14 @@ public class MultiMBassador implements IMessageBus {
};
try {
this.dispatchQueue.put(runnable);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
// try {
this.dispatchQueue.transfer(runnable);
// } catch (InterruptedException e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message1, message2, message3));
// }
}
}
@ -485,14 +467,14 @@ public class MultiMBassador implements IMessageBus {
}
};
try {
// try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
// } catch (InterruptedException e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// }
}
}
@Override
@ -505,14 +487,14 @@ public class MultiMBassador implements IMessageBus {
}
};
try {
// try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
}
// } catch (InterruptedException e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message1, message2));
// }
}
}
@ -527,14 +509,14 @@ public class MultiMBassador implements IMessageBus {
}
};
try {
// try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
// } catch (InterruptedException e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message1, message2, message3));
// }
}
}
}

View File

@ -1,839 +0,0 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 1997-2008 Sun Microsystems, Inc. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License"). You
* may not use this file except in compliance with the License. You can obtain
* a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
* or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
* Sun designates this particular file as subject to the "Classpath" exception
* as provided by Sun in the GPL Version 2 section of the License file that
* accompanied this code. If applicable, add the following below the License
* Header, with the fields enclosed by brackets [] replaced by your own
* identifying information: "Portions Copyrighted [year]
* [name of copyright owner]"
*
* Contributor(s):
*
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license." If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above. However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package dorkbox.util.messagebus.common;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
/**
* An unbounded <tt>TransferQueue</tt> based on linked nodes.
* This queue orders elements FIFO (first-in-first-out) with respect
* to any given producer. The <em>head</em> of the queue is that
* element that has been on the queue the longest time for some
* producer. The <em>tail</em> of the queue is that element that has
* been on the queue the shortest time for some producer.
*
* <p>Beware that, unlike in most collections, the <tt>size</tt>
* method is <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
*
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code LinkedTransferQueue}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code LinkedTransferQueue} in another thread.
*
* @author Doug Lea
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @param <E> the type of elements held in this collection
*
*/
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E> {
/*
* This class extends the approach used in FIFO-mode
* SynchronousQueues. See the internal documentation, as well as
* the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
* Lea & Scott
* (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
*
* The main extension is to provide different Wait modes for the
* main "xfer" method that puts or takes items. These don't
* impact the basic dual-queue logic, but instead control whether
* or how threads block upon insertion of request or data nodes
* into the dual queue. It also uses slightly different
* conventions for tracking whether nodes are off-list or
* cancelled.
*/
// Wait modes for xfer method
private static final int NOWAIT = 0;
private static final int TIMEOUT = 1;
private static final int WAIT = 2;
/** The number of CPUs, for spin control */
private static final int NCPUS = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin before blocking in timed waits.
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
private static final int maxTimedSpins = NCPUS < 2? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
*/
private static final int maxUntimedSpins = maxTimedSpins * 16;
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
private static final long spinForTimeoutThreshold = 1000L;
/**
* Node class for LinkedTransferQueue. Opportunistically
* subclasses from AtomicReference to represent item. Uses Object,
* not E, to allow setting item to "this" after use, to avoid
* garbage retention. Similarly, setting the next field to this is
* used as sentinel that node is off list.
*/
private static final class QNode extends AtomicReference<Object> {
private static final long serialVersionUID = 5925596372370723938L;
transient volatile QNode next;
transient volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
super(item);
this.isData = isData;
}
private static final AtomicReferenceFieldUpdater<QNode, QNode> nextUpdater;
static {
AtomicReferenceFieldUpdater<QNode, QNode> tmp = null;
try {
tmp = AtomicReferenceFieldUpdater.newUpdater(
QNode.class, QNode.class, "next");
// Test if AtomicReferenceFieldUpdater is really working.
QNode testNode = new QNode(null, false);
tmp.set(testNode, testNode);
if (testNode.next != testNode) {
// Not set as expected - fall back to the safe mode.
throw new Exception();
}
} catch (Throwable t) {
// Running in a restricted environment with a security manager.
tmp = null;
}
nextUpdater = tmp;
}
boolean casNext(QNode cmp, QNode val) {
if (nextUpdater != null) {
return nextUpdater.compareAndSet(this, cmp, val);
} else {
return alternativeCasNext(cmp, val);
}
}
private synchronized boolean alternativeCasNext(QNode cmp, QNode val) {
if (this.next == cmp) {
this.next = val;
return true;
} else {
return false;
}
}
}
/**
* Padded version of AtomicReference used for head, tail and
* cleanMe, to alleviate contention across threads CASing one vs
* the other.
*/
private static final class PaddedAtomicReference<T> extends AtomicReference<T> {
private static final long serialVersionUID = 4684288940772921317L;
// enough padding for 64bytes with 4byte refs
@SuppressWarnings("unused")
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
PaddedAtomicReference(T r) { super(r); }
}
/** head of the queue */
private final PaddedAtomicReference<QNode> head;
/** tail of the queue */
private final PaddedAtomicReference<QNode> tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it cancelled.
*/
private final PaddedAtomicReference<QNode> cleanMe;
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
private boolean advanceHead(QNode h, QNode nh) {
if (h == this.head.get() && this.head.compareAndSet(h, nh)) {
h.next = h; // forget old next
return true;
}
return false;
}
/**
* Puts or takes an item. Used for most queue operations (except
* poll() and tryTransfer()). See the similar code in
* SynchronousQueue for detailed explanation.
* @param e the item or if null, signifies that this is a take
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT
* @return an item, or null on failure
*/
private Object xfer(Object e, int mode, long nanos) {
boolean isData = e != null;
QNode s = null;
final PaddedAtomicReference<QNode> head = this.head;
final PaddedAtomicReference<QNode> tail = this.tail;
for (;;) {
QNode t = tail.get();
QNode h = head.get();
if (t != null && (t == h || t.isData == isData)) {
if (s == null) {
s = new QNode(e, isData);
}
QNode last = t.next;
if (last != null) {
if (t == tail.get()) {
tail.compareAndSet(t, last);
}
}
else if (t.casNext(null, s)) {
tail.compareAndSet(t, s);
return awaitFulfill(t, s, e, mode, nanos);
}
}
else if (h != null) {
QNode first = h.next;
if (t == tail.get() && first != null &&
advanceHead(h, first)) {
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
return isData? e : x;
}
}
}
}
}
/**
* Version of xfer for poll() and tryTransfer, which
* simplifies control paths both here and in xfer
*/
private Object fulfill(Object e) {
boolean isData = e != null;
final PaddedAtomicReference<QNode> head = this.head;
final PaddedAtomicReference<QNode> tail = this.tail;
for (;;) {
QNode t = tail.get();
QNode h = head.get();
if (t != null && (t == h || t.isData == isData)) {
QNode last = t.next;
if (t == tail.get()) {
if (last != null) {
tail.compareAndSet(t, last);
} else {
return null;
}
}
}
else if (h != null) {
QNode first = h.next;
if (t == tail.get() &&
first != null &&
advanceHead(h, first)) {
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
return isData? e : x;
}
}
}
}
}
/**
* Spins/blocks until node s is fulfilled or caller gives up,
* depending on wait mode.
*
* @param pred the predecessor of waiting node
* @param s the waiting node
* @param e the comparison value for checking match
* @param mode mode
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
private Object awaitFulfill(QNode pred, QNode s, Object e,
int mode, long nanos) {
if (mode == NOWAIT) {
return null;
}
long lastTime = mode == TIMEOUT? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = -1; // set to desired spin count below
for (;;) {
if (w.isInterrupted()) {
s.compareAndSet(e, s);
}
Object x = s.get();
if (x != e) { // Node was matched or cancelled
advanceHead(pred, s); // unlink if head
if (x == s) { // was cancelled
clean(pred, s);
return null;
}
else if (x != null) {
s.set(s); // avoid garbage retention
return x;
} else {
return e;
}
}
if (mode == TIMEOUT) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.compareAndSet(e, s); // try to cancel
continue;
}
}
if (spins < 0) {
QNode h = this.head.get(); // only spin if at head
spins = h != null && h.next == s ?
mode == TIMEOUT?
maxTimedSpins : maxUntimedSpins : 0;
}
if (spins > 0) {
--spins;
} else if (s.waiter == null) {
s.waiter = w;
} else if (mode != TIMEOUT) {
// LockSupport.park(this);
LockSupport.park(); // allows run on java5
s.waiter = null;
spins = -1;
}
else if (nanos > spinForTimeoutThreshold) {
// LockSupport.parkNanos(this, nanos);
LockSupport.parkNanos(nanos);
s.waiter = null;
spins = -1;
}
}
}
/**
* Returns validated tail for use in cleaning methods
*/
private QNode getValidatedTail() {
for (;;) {
QNode h = this.head.get();
QNode first = h.next;
if (first != null && first.next == first) { // help advance
advanceHead(h, first);
continue;
}
QNode t = this.tail.get();
QNode last = t.next;
if (t == this.tail.get()) {
if (last != null) {
this.tail.compareAndSet(t, last); // help advance
} else {
return t;
}
}
}
}
/**
* Gets rid of cancelled node s with original predecessor pred.
* @param pred predecessor of cancelled node
* @param s the cancelled node
*/
void clean(QNode pred, QNode s) {
Thread w = s.waiter;
if (w != null) { // Wake up thread
s.waiter = null;
if (w != Thread.currentThread()) {
LockSupport.unpark(w);
}
}
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this, if
* we cannot delete s, we save its predecessor as "cleanMe",
* processing the previously saved version first. At least one
* of node s or the node previously saved can always be
* processed, so this always terminates.
*/
while (pred.next == s) {
QNode oldpred = reclean(); // First, help get rid of cleanMe
QNode t = getValidatedTail();
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next; // s.next == s means s already off list
if (sn == s || pred.casNext(s, sn)) {
break;
}
}
else if (oldpred == pred || // Already saved
oldpred == null && this.cleanMe.compareAndSet(null, pred)) {
break; // Postpone cleaning
}
}
}
/**
* Tries to unsplice the cancelled node held in cleanMe that was
* previously uncleanable because it was at tail.
* @return current cleanMe node (or null)
*/
private QNode reclean() {
/*
* cleanMe is, or at one time was, predecessor of cancelled
* node s that was the tail so could not be unspliced. If s
* is no longer the tail, try to unsplice if necessary and
* make cleanMe slot available. This differs from similar
* code in clean() because we must check that pred still
* points to a cancelled node that must be unspliced -- if
* not, we can (must) clear cleanMe without unsplicing.
* This can loop only due to contention on casNext or
* clearing cleanMe.
*/
QNode pred;
while ((pred = this.cleanMe.get()) != null) {
QNode t = getValidatedTail();
QNode s = pred.next;
if (s != t) {
QNode sn;
if (s == null || s == pred || s.get() != s ||
(sn = s.next) == s || pred.casNext(s, sn)) {
this.cleanMe.compareAndSet(pred, null);
}
} else {
break;
}
}
return pred;
}
@SuppressWarnings("unchecked")
E cast(Object e) {
return (E)e;
}
/**
* Creates an initially empty <tt>LinkedTransferQueue</tt>.
*/
public LinkedTransferQueue() {
QNode dummy = new QNode(null, false);
this.head = new PaddedAtomicReference<QNode>(dummy);
this.tail = new PaddedAtomicReference<QNode>(dummy);
this.cleanMe = new PaddedAtomicReference<QNode>(null);
}
/**
* Creates a <tt>LinkedTransferQueue</tt>
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}
@Override
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
xfer(e, NOWAIT, 0);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
xfer(e, NOWAIT, 0);
return true;
}
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
xfer(e, NOWAIT, 0);
return true;
}
@Override
public void transfer(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
if (xfer(e, WAIT, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
if (xfer(e, TIMEOUT, unit.toNanos(timeout)) != null) {
return true;
}
if (!Thread.interrupted()) {
return false;
}
throw new InterruptedException();
}
@Override
public boolean tryTransfer(E e) {
if (e == null) {
throw new NullPointerException();
}
return fulfill(e) != null;
}
@Override
public E take() throws InterruptedException {
Object e = xfer(null, WAIT, 0);
if (e != null) {
return cast(e);
}
Thread.interrupted();
throw new InterruptedException();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted()) {
return cast(e);
}
throw new InterruptedException();
}
@Override
public E poll() {
return cast(fulfill(null));
}
@Override
public int drainTo(Collection<? super E> c) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
int n = 0;
E e;
while ( (e = poll()) != null) {
c.add(e);
++n;
}
return n;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
int n = 0;
E e;
while (n < maxElements && (e = poll()) != null) {
c.add(e);
++n;
}
return n;
}
// Traversal-based methods
/**
* Return head after performing any outstanding helping steps
*/
QNode traversalHead() {
for (;;) {
QNode t = this.tail.get();
QNode h = this.head.get();
if (h != null && t != null) {
QNode last = t.next;
QNode first = h.next;
if (t == this.tail.get()) {
if (last != null) {
this.tail.compareAndSet(t, last);
} else if (first != null) {
Object x = first.get();
if (x == first) {
advanceHead(h, first);
} else {
return h;
}
} else {
return h;
}
}
}
}
}
@Override
public Iterator<E> iterator() {
return new Itr();
}
/**
* Iterators. Basic strategy is to traverse list, treating
* non-data (i.e., request) nodes as terminating list.
* Once a valid data node is found, the item is cached
* so that the next call to next() will return it even
* if subsequently removed.
*/
class Itr implements Iterator<E> {
QNode nextNode; // Next node to return next
QNode currentNode; // last returned node, for remove()
QNode prevNode; // predecessor of last returned node
E nextItem; // Cache of next item, once commited to in next
Itr() {
this.nextNode = traversalHead();
advance();
}
E advance() {
this.prevNode = this.currentNode;
this.currentNode = this.nextNode;
E x = this.nextItem;
QNode p = this.nextNode.next;
for (;;) {
if (p == null || !p.isData) {
this.nextNode = null;
this.nextItem = null;
return x;
}
Object item = p.get();
if (item != p && item != null) {
this.nextNode = p;
this.nextItem = cast(item);
return x;
}
this.prevNode = p;
p = p.next;
}
}
@Override
public boolean hasNext() {
return this.nextNode != null;
}
@Override
public E next() {
if (this.nextNode == null) {
throw new NoSuchElementException();
}
return advance();
}
@Override
public void remove() {
QNode p = this.currentNode;
QNode prev = this.prevNode;
if (prev == null || p == null) {
throw new IllegalStateException();
}
Object x = p.get();
if (x != null && x != p && p.compareAndSet(x, p)) {
clean(prev, p);
}
}
}
@Override
public E peek() {
for (;;) {
QNode h = traversalHead();
QNode p = h.next;
if (p == null) {
return null;
}
Object x = p.get();
if (p != x) {
if (!p.isData) {
return null;
}
if (x != null) {
return cast(x);
}
}
}
}
@Override
public boolean isEmpty() {
for (;;) {
QNode h = traversalHead();
QNode p = h.next;
if (p == null) {
return true;
}
Object x = p.get();
if (p != x) {
if (!p.isData) {
return true;
}
if (x != null) {
return false;
}
}
}
}
@Override
public boolean hasWaitingConsumer() {
for (;;) {
QNode h = traversalHead();
QNode p = h.next;
if (p == null) {
return false;
}
Object x = p.get();
if (p != x) {
return !p.isData;
}
}
}
/**
* Returns the number of elements in this queue. If this queue
* contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
*
* @return the number of elements in this queue
*/
@Override
public int size() {
int count = 0;
QNode h = traversalHead();
for (QNode p = h.next; p != null && p.isData; p = p.next) {
Object x = p.get();
if (x != null && x != p) {
if (++count == Integer.MAX_VALUE) {
break;
}
}
}
return count;
}
@Override
public int getWaitingConsumerCount() {
int count = 0;
QNode h = traversalHead();
for (QNode p = h.next; p != null && !p.isData; p = p.next) {
if (p.get() == null) {
if (++count == Integer.MAX_VALUE) {
break;
}
}
}
return count;
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
}

View File

@ -1,119 +0,0 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package dorkbox.util.messagebus.common;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* A {@link BlockingQueue} in which producers may wait for consumers
* to receive elements. A {@code TransferQueue} may be useful for
* example in message passing applications in which producers
* sometimes (using method {@code transfer}) await receipt of
* elements by consumers invoking {@code take} or {@code poll},
* while at other times enqueue elements (via method {@code put})
* without waiting for receipt. Non-blocking and time-out versions of
* {@code tryTransfer} are also available. A TransferQueue may also
* be queried via {@code hasWaitingConsumer} whether there are any
* threads waiting for items, which is a converse analogy to a
* {@code peek} operation.
*
* <p>Like any {@code BlockingQueue}, a {@code TransferQueue} may be
* capacity bounded. If so, an attempted {@code transfer} operation
* may initially block waiting for available space, and/or
* subsequently block waiting for reception by a consumer. Note that
* in a queue with zero capacity, such as {@link SynchronousQueue},
* {@code put} and {@code transfer} are effectively synonymous.
*
* <p>This interface is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.7
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
public interface TransferQueue<E> extends BlockingQueue<E> {
/**
* Transfers the specified element if there exists a consumer
* already waiting to receive it, otherwise returning {@code false}
* without enqueuing the element.
*
* @param e the element to transfer
* @return {@code true} if the element was transferred, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean tryTransfer(E e);
/**
* Inserts the specified element into this queue, waiting if
* necessary for space to become available and the element to be
* dequeued by a consumer invoking {@code take} or {@code poll}.
*
* @param e the element to transfer
* @throws InterruptedException if interrupted while waiting,
* in which case the element is not enqueued.
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void transfer(E e) throws InterruptedException;
/**
* Inserts the specified element into this queue, waiting up to
* the specified wait time if necessary for space to become
* available and the element to be dequeued by a consumer invoking
* {@code take} or {@code poll}.
*
* @param e the element to transfer
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before completion,
* in which case the element is not enqueued.
* @throws InterruptedException if interrupted while waiting,
* in which case the element is not enqueued.
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Returns {@code true} if there is at least one consumer waiting
* to dequeue an element via {@code take} or {@code poll}.
* The return value represents a momentary state of affairs.
*
* @return {@code true} if there is at least one waiting consumer
*/
boolean hasWaitingConsumer();
/**
* Returns an estimate of the number of consumers waiting to
* dequeue elements via {@code take} or {@code poll}. The return
* value is an approximation of a momentary state of affairs, that
* may be inaccurate if consumers have completed or given up
* waiting. The value may be useful for monitoring and heuristics,
* but not for synchronization control. Implementations of this
* method are likely to be noticeably slower than those for
* {@link #hasWaitingConsumer}.
*
* @return the number of consumers waiting to dequeue elements
*/
int getWaitingConsumerCount();
}

View File

@ -1,5 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
public interface HandlerFactory<E> {
public E newInstance();
}

View File

@ -1,64 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
// mpmc sparse.shift = 2, for this to be fast.
abstract class PrePad {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class ColdItems extends PrePad {
// private static AtomicInteger count = new AtomicInteger();
// public final int ID = count.getAndIncrement();
public int type = 0;
// public short messageType = MessageType.ONE;
public Object item1 = null;
// public Object item2 = null;
// public Object item3 = null;
// public Object[] item4 = null;
}
abstract class Pad0 extends ColdItems {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem1 extends Pad0 {
}
abstract class Pad1 extends HotItem1 {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem2 extends Pad1 {
public Thread thread;
}
abstract class Pad2 extends HotItem2 {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem3 extends Pad2 {
// public transient volatile Node next;
}
public class Node extends HotItem3 {
// post-padding
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
public Node() {
}
// @Override
// public String toString() {
// return "[" + this.ID + "]";
// }
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 2014 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicBoolean;
public class ObjectPoolHolder<T> {
// enough padding for 64bytes with 4byte refs, to alleviate contention across threads CASing one vs the other.
@SuppressWarnings("unused")
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
private T value;
AtomicBoolean state = new AtomicBoolean(true);
transient volatile Thread waiter; // to control park/unpark
public ObjectPoolHolder(T value) {
this.value = value;
}
public T getValue() {
return this.value;
}
}

View File

@ -1,27 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicReference;
/**
* An AtomicReference with heuristic padding to lessen cache effects of this heavily CAS'ed location. While the padding adds
* noticeable space, all slots are created only on demand, and there will be more than one of them only when it would improve throughput
* more than enough to outweigh using extra space.
*/
public class PaddedAtomicReference<T> extends AtomicReference<T> {
private static final long serialVersionUID = 1L;
// Improve likelihood of isolation on <= 64 byte cache lines
public long z0, z1, z2, z3, z4, z5, z6, z7, z8, z9, za, zb, zc, zd, ze;
public PaddedAtomicReference() {
}
public PaddedAtomicReference(T value) {
super(value);
}
// prevent JIT from optimizing away the padding
public final long sum() {
return this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7 + this.z8 + this.z9 + this.za + this.zb + + this.zc + this.zd + this.ze;
}
}

View File

@ -1,45 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
// Improve likelihood of isolation on <= 64 byte cache lines
// java classes DO NOT mix their fields, so if we want specific padding, in a specific order, we must
// subclass them. ALl intel CPU L1/2/2 cache line size are all 64 bytes
// see: http://psy-lob-saw.blogspot.de/2013/05/know-thy-java-object-memory-layout.html
// see: http://mechanitis.blogspot.de/2011/07/dissecting-disruptor-why-its-so-fast_22.html
abstract class AtomicPaddedObject<O> {
volatile long z0, z1, z2, z4, z5, z6 = 7L;
volatile O object = null;
}
public class PaddedObject<O> extends AtomicPaddedObject<O> {
private final static long OFFSET;
static {
try {
OFFSET = UNSAFE.objectFieldOffset(AtomicPaddedObject.class.getDeclaredField("object"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
// volatile long c0, c1, c2, c4, c5, c6, c7 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
public PaddedObject() {
}
public O get() {
return this.object;
}
public final void set(Object newValue) {
UNSAFE.putOrderedObject(this, OFFSET, newValue);
}
protected final boolean compareAndSet(Object expect, Object newValue) {
return UNSAFE.compareAndSwapObject(this, OFFSET, expect, newValue);
}
}

View File

@ -1,26 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicReference;
import com.lmax.disruptor.MessageHolder;
/**
* Nodes hold partially exchanged data. This class opportunistically subclasses AtomicReference to represent the hole. So get() returns
* hole, and compareAndSet CAS'es value into hole. This class cannot be parameterized as "V" because of the use of non-V CANCEL sentinels.
*/
final class ProducerNode extends AtomicReference<MessageHolder> {
private static final long serialVersionUID = 1L;
/** The element offered by the Thread creating this node. */
public volatile MessageHolder item = new MessageHolder();
/** The Thread waiting to be signalled; null until waiting. */
public volatile Thread waiter = null;
// for the stack, for waiting producers
public ProducerNode next = null;
public ProducerNode() {
}
}

View File

@ -1,458 +0,0 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out).
* The <em>head</em> of the queue is that element that has been on the
* queue the longest time.
* The <em>tail</em> of the queue is that element that has been on the
* queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.
* A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
* many threads will share access to a common collection.
* This queue does not permit <tt>null</tt> elements.
*
* <p>This implementation employs an efficient &quot;wait-free&quot;
* algorithm based on one described in <a
* href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
* Fast, and Practical Non-Blocking and Blocking Concurrent Queue
* Algorithms</a> by Maged M. Michael and Michael L. Scott.
*
* <p>Beware that, unlike in most collections, the <tt>size</tt> method
* is <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
*
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code ConcurrentLinkedQueue}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code ConcurrentLinkedQueue} in another thread.
*
* <p>This class is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
*
*/
public class ConcurrentLinkedQueueSimple<E> extends AbstractQueue<E> implements Queue<E> {
/*
* This is a straight adaptation of Michael & Scott algorithm.
* For explanation, read the paper. The only (minor) algorithmic
* difference is that this version supports lazy deletion of
* internal nodes (method remove(Object)) -- remove CAS'es item
* fields to null. The normal queue operations unlink but then
* pass over nodes with null item fields. Similarly, iteration
* methods ignore those with nulls.
*
* Also note that like most non-blocking algorithms in this
* package, this implementation relies on the fact that in garbage
* collected systems, there is no possibility of ABA problems due
* to recycled nodes, so there is no need to use "counted
* pointers" or related techniques seen in versions used in
* non-GC'ed settings.
*/
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
private static final AtomicReferenceFieldUpdater<Node, Node>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
private static final AtomicReferenceFieldUpdater<Node, Object>
itemUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Object.class, "item");
Node(E x) { this.item = x; }
Node(E x, Node<E> n) { this.item = x; this.next = n; }
E getItem() {
return this.item;
}
boolean casItem(E cmp, E val) {
return itemUpdater.compareAndSet(this, cmp, val);
}
void setItem(E val) {
itemUpdater.set(this, val);
}
Node<E> getNext() {
return this.next;
}
boolean casNext(Node<E> cmp, Node<E> val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
void setNext(Node<E> val) {
nextUpdater.set(this, val);
}
}
private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueueSimple, Node>
tailUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueueSimple.class, Node.class, "tail");
private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueueSimple, Node>
headUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueueSimple.class, Node.class, "head");
private boolean casTail(Node<E> cmp, Node<E> val) {
return tailUpdater.compareAndSet(this, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return headUpdater.compareAndSet(this, cmp, val);
}
/**
* Pointer to header node, initialized to a dummy node. The first
* actual node is at head.getNext().
*/
private transient volatile Node<E> head = new Node<E>(null, null);
/** Pointer to last node on list **/
private transient volatile Node<E> tail = this.head;
/**
* Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
*/
public ConcurrentLinkedQueueSimple() {}
/**
* Creates a <tt>ConcurrentLinkedQueue</tt>
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ConcurrentLinkedQueueSimple(Collection<? extends E> c) {
for (Iterator<? extends E> it = c.iterator(); it.hasNext();) {
add(it.next());
}
}
// Have to override just to update the javadoc
/**
* Inserts the specified element at the tail of this queue.
*
* @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
@Override
public boolean add(E e) {
return offer(e);
}
/**
* Inserts the specified element at the tail of this queue.
*
* @return <tt>true</tt> (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
Node<E> n = new Node<E>(e, null);
for (;;) {
Node<E> t = this.tail;
Node<E> s = t.getNext();
if (t == this.tail) {
if (s == null) {
if (t.casNext(s, n)) {
casTail(t, n);
return true;
}
} else {
casTail(t, s);
}
}
}
}
@Override
public E poll() {
for (;;) {
Node<E> h = this.head;
Node<E> t = this.tail;
Node<E> first = h.getNext();
if (h == this.head) {
if (h == t) {
if (first == null) {
return null;
} else {
casTail(t, first);
}
} else if (casHead(h, first)) {
E item = first.getItem();
if (item != null) {
first.setItem(null);
return item;
}
// else skip over deleted item, continue loop,
}
}
}
}
@Override
public E peek() { // same as poll except don't remove item
for (;;) {
Node<E> h = this.head;
Node<E> t = this.tail;
Node<E> first = h.getNext();
if (h == this.head) {
if (h == t) {
if (first == null) {
return null;
} else {
casTail(t, first);
}
} else {
E item = first.getItem();
if (item != null) {
return item;
} else {
casHead(h, first);
}
}
}
}
}
/**
* Returns the first actual (non-header) node on list. This is yet
* another variant of poll/peek; here returning out the first
* node, not element (so we cannot collapse with peek() without
* introducing race.)
*/
Node<E> first() {
for (;;) {
Node<E> h = this.head;
Node<E> t = this.tail;
Node<E> first = h.getNext();
if (h == this.head) {
if (h == t) {
if (first == null) {
return null;
} else {
casTail(t, first);
}
} else {
if (first.getItem() != null) {
return first;
} else {
casHead(h, first);
}
}
}
}
}
/**
* Returns <tt>true</tt> if this queue contains no elements.
*
* @return <tt>true</tt> if this queue contains no elements
*/
@Override
public boolean isEmpty() {
return first() == null;
}
/**
* Returns the number of elements in this queue. If this queue
* contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
*
* @return the number of elements in this queue
*/
@Override
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = p.getNext()) {
if (p.getItem() != null) {
// Collections.size() spec says to max out
if (++count == Integer.MAX_VALUE) {
break;
}
}
}
return count;
}
/**
* Returns <tt>true</tt> if this queue contains the specified element.
* More formally, returns <tt>true</tt> if and only if this queue contains
* at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
*
* @param o object to be checked for containment in this queue
* @return <tt>true</tt> if this queue contains the specified element
*/
@Override
public boolean contains(Object o) {
if (o == null) {
return false;
}
for (Node<E> p = first(); p != null; p = p.getNext()) {
E item = p.getItem();
if (item != null &&
o.equals(item)) {
return true;
}
}
return false;
}
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element <tt>e</tt> such
* that <tt>o.equals(e)</tt>, if this queue contains one or more such
* elements.
* Returns <tt>true</tt> if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return <tt>true</tt> if this queue changed as a result of the call
*/
@Override
public boolean remove(Object o) {
if (o == null) {
return false;
}
for (Node<E> p = first(); p != null; p = p.getNext()) {
E item = p.getItem();
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) {
return true;
}
}
return false;
}
/**
* Returns an iterator over the elements in this queue in proper sequence.
* The returned iterator is a "weakly consistent" iterator that
* will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
@Override
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/**
* Next node to return item for.
*/
private Node<E> nextNode;
/**
* nextItem holds on to item fields because once we claim
* that an element exists in hasNext(), we must return it in
* the following next() call even if it was in the process of
* being removed when hasNext() was called.
*/
private E nextItem;
/**
* Node of the last returned item, to support remove.
*/
private Node<E> lastRet;
Itr() {
advance();
}
/**
* Moves to next valid node and returns item to return for
* next(), or null if no such.
*/
private E advance() {
this.lastRet = this.nextNode;
E x = this.nextItem;
Node<E> p = this.nextNode == null? first() : this.nextNode.getNext();
for (;;) {
if (p == null) {
this.nextNode = null;
this.nextItem = null;
return x;
}
E item = p.getItem();
if (item != null) {
this.nextNode = p;
this.nextItem = item;
return x;
} else {
p = p.getNext();
}
}
}
@Override
public boolean hasNext() {
return this.nextNode != null;
}
@Override
public E next() {
if (this.nextNode == null) {
throw new NoSuchElementException();
}
return advance();
}
@Override
public void remove() {
Node<E> l = this.lastRet;
if (l == null) {
throw new IllegalStateException();
}
// rely on a future traversal to relink.
l.setItem(null);
this.lastRet = null;
}
}
}

View File

@ -1,355 +0,0 @@
/*
* Copyright 2013 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
/**
* An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The <em>top</em> of the stack
* is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A
* {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection.
* Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements.
* <p>
* This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique
* avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is
* described in <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728">A Scalable Lock-free Stack Algorithm</a>.
* <p>
* Iterators are <i>weakly consistent</i>, returning elements reflecting the state of the stack at some point at or since the creation of
* the iterator. They do <em>not</em> throw {@link java.util.ConcurrentModificationException}, and may proceed concurrently with other
* operations. Elements contained in the stack since the creation of the iterator will be returned exactly once.
* <p>
* Beware that, unlike in most collections, the {@code size} method is <em>NOT</em> a constant-time operation. Because of the asynchronous
* nature of these stacks, determining the current number of elements requires a traversal of the elements, and so may report inaccurate
* results if this collection is modified during traversal.
*
* @author ben.manes@gmail.com (Ben Manes)
* @see <a href="https://github.com/ben-manes/caffeine">Caffeine</a>
* @param <E> the type of elements held in this collection
*/
public final class EliminationStack<E> {
/*
* A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value
* atomically.
*
* The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows
* pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and
* therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a
* collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the
* same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a
* cancellation occurs.
*
* This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a
* match [4].
*
* [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data
* Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free
* fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel
* http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396
*/
/** The number of CPUs */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The number of slots in the elimination array. */
static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
static final int LOOKAHEAD = Math.min(4, NCPU);
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
static final int SPINS = NCPU == 1 ? 0 : 2000;
/** The number of times to spin per lookahead step */
static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
static final Object FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
static final Object CONSUMER_WAITER = new Object();
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
static final Object WAITER = new Object();
static int ceilingNextPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1);
}
/** The top of the stack. */
final AtomicReference<Node<E>> top;
/** The arena where slots can be used to perform an exchange. */
final AtomicReference<Object>[] arena;
/** Creates a {@code EliminationStack} that is initially empty. */
@SuppressWarnings("unchecked")
public EliminationStack() {
this.top = new AtomicReference<Node<E>>();
this.arena = new AtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new AtomicReference<Object>();
}
}
/**
* Returns <tt>true</tt> if this stack contains no elements.
*
* @return <tt>true</tt> if this stack contains no elements
*/
public boolean hasPendingMessages() {
for (;;) {
Node<E> node = this.top.get();
if (node == null) {
return false;
}
E e = node.get();
if (e == null) {
this.top.compareAndSet(node, node.next);
} else {
return true;
}
}
}
/**
* Removes and returns the top element or returns <tt>null</tt> if this stack is empty.
*
* @return the top of this stack, or <tt>null</tt> if this stack is empty
*/
public E pop() {
for (;;) {
Node<E> current = this.top.get();
if (current == null) {
return null;
}
// Attempt to pop from the stack, backing off to the elimination array if contended
if (this.top.get() == current && this.top.compareAndSet(current, current.next)) {
return current.get();
}
E e = tryReceive();
if (e != null) {
return e;
}
}
}
/**
* Pushes an element onto the stack (in other words, adds an element at the top of this stack).
*
* @param e the element to push
*/
public void push(E e) {
Node<E> node = new Node<E>(e);
for (;;) {
node.next = this.top.get();
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.get() == node.next && this.top.compareAndSet(node.next, node)) {
return;
}
if (tryTransfer(e)) {
return;
}
}
}
public boolean remove(Object o) {
for (Node<E> node = this.top.get(); node != null; node = node.next) {
E value = node.get();
if (o.equals(value) && node.compareAndSet(value, null)) {
return true;
}
}
return false;
}
/**
* Attempts to transfer the element to a waiting consumer.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean tryTransfer(E e) {
int start = startIndex();
return scanAndTransferToWaiter(e, start) || awaitExchange(e, start);
}
/**
* Scans the arena searching for a waiting consumer to exchange with.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean scanAndTransferToWaiter(E e, int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// if some thread is waiting to receive an element then attempt to provide it
if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
}
}
return false;
}
/**
* Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and
* spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another
* slot until a total spin limit is reached.
*
* @param e the element to transfer
* @param start the arena location to start at
* @return if an exchange was completed successfully
*/
boolean awaitExchange(E e, int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
} else if (found == FREE && slot.compareAndSet(FREE, e)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != e) {
return true;
} else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) {
// failed to transfer the element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
}
// failed to transfer the element; give up
return false;
}
/**
* Attempts to receive an element from a waiting provider.
*
* @return an element if successfully transfered or null if unsuccessful
*/
E tryReceive() {
int start = startIndex();
E e = scanAndMatch(start);
return e == null ? awaitMatch(start) : e;
}
/**
* Scans the arena searching for a waiting producer to transfer from.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
E scanAndMatch(int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// accept a transfer if an element is available
Object found = slot.get();
if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
E e = (E) found;
return e;
}
}
return null;
}
/**
* Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and
* spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by
* selecting another slot until a total spin limit is reached.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
E awaitMatch(int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == FREE) {
if (slot.compareAndSet(FREE, WAITER)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
E e = (E) found;
return e;
} else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) {
// failed to receive an element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
} else if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
E e = (E) found;
return e;
}
}
// failed to receive an element; give up
return null;
}
/**
* Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code
* (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform
* distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a
* simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function.
*/
static int startIndex() {
long id = Thread.currentThread().getId();
return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193;
}
/**
* An item on the stack. The node is mutable prior to being inserted to avoid object churn and is immutable by the time it has been
* published to other threads.
*/
static final class Node<E> extends AtomicReference<E> {
private static final long serialVersionUID = 1L;
Node<E> next;
Node(E value) {
super(value);
}
}
}

View File

@ -1,492 +0,0 @@
/*
* Copyright 2013 Ben Manes. All Rights Reserved.
*
* Note, that we cannot use a Treiber stack, since that suffers from ABA problems if we reuse nodes.
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.MessageHolder;
import dorkbox.util.messagebus.common.simpleq.PaddedAtomicReference;
//http://www.cs.bgu.ac.il/~hendlerd/papers/EfficientCAS.pdf
//This is an implementation of the **BEST** CAS FIFO queue for XEON (INTEL) architecture.
//This is the WRONG approach for running on SPARC.
//More info at: http://java.dzone.com/articles/wanna-get-faster-wait-bit
// copywrite dorkbox, llc
/**
* An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The <em>top</em> of the stack
* is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A
* {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection.
* Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements.
* <p>
* This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique
* avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is
* described in <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728">A Scalable Lock-free Stack Algorithm</a>.
* <p>
* Iterators are <i>weakly consistent</i>, returning elements reflecting the state of the stack at some point at or since the creation of
* the iterator. They do <em>not</em> throw {@link java.util.ConcurrentModificationException}, and may proceed concurrently with other
* operations. Elements contained in the stack since the creation of the iterator will be returned exactly once.
* <p>
* Beware that, unlike in most collections, the {@code size} method is <em>NOT</em> a constant-time operation. Because of the asynchronous
* nature of these stacks, determining the current number of elements requires a traversal of the elements, and so may report inaccurate
* results if this collection is modified during traversal.
*
* @author ben.manes@gmail.com (Ben Manes)
* @see <a href="https://github.com/ben-manes/caffeine">Caffeine</a>
* @param <E> the type of elements held in this collection
*/
public final class ExchangerStackORG<E extends MessageHolder> {
static {
// Prevent rare disastrous classloading in first call to LockSupport.park.
// See: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
LockSupport.unpark(Thread.currentThread());
}
/*
* A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value
* atomically.
*
* The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows
* pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and
* therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a
* collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the
* same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a
* cancellation occurs.
*
* This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a
* match [4].
*
* [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data
* Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free
* fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel
* http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396
*/
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The number of slots in the elimination array. */
private static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
private static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
private static final int LOOKAHEAD = Math.min(4, NCPU);
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 2000;
/** The number of times to spin per lookahead step */
private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
private static final Object FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
private static final Object WAITER = new Object();
private static final Object READY = null;
private static final Object LOCK_PARK_FIRST = new Object();
private static final Object LOCK_UNPARK_FIRST = new Object();
private static final Object UNLOCK = new Object();
private static int ceilingNextPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1);
}
/** The top of the stack. */
private final AtomicReference<NodeORG<E>> top;
/** The arena where slots can be used to perform an exchange. */
private final AtomicReference<Object>[] arena;
private final int numberConsumerThreads;
private final ValueCopier<E> copier;
/** Creates a {@code EliminationStack} that is initially empty. */
@SuppressWarnings("unchecked")
public ExchangerStackORG(int numberConsumerThreads, ValueCopier<E> copier) {
this.numberConsumerThreads = numberConsumerThreads;
this.copier = copier;
this.top = new PaddedAtomicReference<NodeORG<E>>();
this.arena = new PaddedAtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new PaddedAtomicReference<Object>();
}
}
public void put(NodeORG<E> producer) throws InterruptedException {
Thread producerThread = Thread.currentThread();
producer.next = null;
producer.waiter = producerThread;
NodeORG<E> topNode;
for (;;) {
topNode = this.top.get();
// it's a [EMPTY or PRODUCER], so we just add ourself as another producer waiting to get popped by a consumer
if (topNode == null || !topNode.isConsumer) {
producer.next = topNode;
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(topNode, producer)) {
// now we wait
if (!park(producer, producerThread)) {
// have to make sure to pass up exceptions
throw new InterruptedException();
}
return;
}
// Contention, so back off to the elimination array
// busySpin();
if (tryTransfer(producerThread, producer)) {
// means we got a consumer and our data has been transferred to it.
return;
}
}
else {
// if consumer, pop it, xfer our data to it, wake it, done
if (this.top.compareAndSet(topNode, topNode.next)) {
// xfer data
this.copier.copyValues(producer.item, topNode.item);
unpark(topNode);
return;
} else {
// contention, so back off
// busySpinPerStep();
}
}
}
}
public void take(NodeORG<E> consumer) throws InterruptedException {
// consumers ALWAYS use the same thread for the same node, so setting the waiter again is not necessary
Thread consumerThread = Thread.currentThread();
NodeORG<E> topNode;
for (;;) {
topNode = this.top.get();
// it's a [EMPTY or CONSUMER], so we just add ourself as another consumer waiting to get popped by a producer
if (topNode == null || topNode.isConsumer) {
consumer.next = topNode;
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(topNode, consumer)) {
// now we wait
if (!park(consumer, consumerThread)) {
// have to make sure to pass up exceptions
throw new InterruptedException();
}
return;
}
// Contention, so back off to the elimination array
// busySpin();
// node = tryReceive(consumerThread);
// if (node != null) {
// // we got a PRODUCER. Have to transfer producer data data to myself
// this.copier.copyValues(node.item, consumer.item);
// return;
// }
}
else {
// if producer, pop it, xfer it's data to us, wake it, done
if (this.top.compareAndSet(topNode, topNode.next)) {
// forget old head (it's still referenced by top)
topNode.next = null;
// xfer data
this.copier.copyValues(topNode.item, consumer.item);
unpark(topNode);
return;
} else {
// contention, so back off
// busySpinPerStep();
}
}
}
}
/**
* @return false if we were interrupted, true if we were unparked by another thread
*/
private boolean park(NodeORG<E> myNode, Thread myThread) {
for (;;) {
if (myNode.state.compareAndSet(READY, LOCK_PARK_FIRST)) {
do {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park();
if (myThread.isInterrupted()) {
myNode.state.set(READY);
return false;
}
} while (myNode.state.get() != UNLOCK);
myNode.state.set(READY);
return true;
} else if (myNode.state.compareAndSet(LOCK_UNPARK_FIRST, READY)) {
// no parking
return true;
}
}
}
/**
* Unparks the other node (if it was waiting)
*/
private void unpark(NodeORG<E> otherNode) {
// busy spin if we are NOT in the READY or LOCK state
for (;;) {
if (otherNode.state.compareAndSet(READY, LOCK_UNPARK_FIRST)) {
// no parking
return;
} else if (otherNode.state.compareAndSet(LOCK_PARK_FIRST, UNLOCK)) {
LockSupport.unpark(otherNode.waiter);
return;
}
}
}
private void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
private void busySpinPerStep() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS_PER_STEP;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
/**
* @return true if all of our consumers are currently waiting for data from producers
*/
public boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured
NodeORG<E> node;
node = this.top.get();
if (node == null || !node.isConsumer) {
return false;
}
int size = 0;
for (node = this.top.get(); node != null; node = node.next) {
if (node.isConsumer) {
size++;
}
}
// return true if we have ALL consumers waiting for data
return size != this.numberConsumerThreads;
}
/**
* Attempts to transfer the element to a waiting consumer.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
private boolean tryTransfer(Thread thread, NodeORG<E> e) {
int start = startIndex(thread);
return scanAndTransferToWaiter(e, start) || awaitExchange(e, start);
}
/**
* Scans the arena searching for a waiting consumer to exchange with.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
private boolean scanAndTransferToWaiter(NodeORG<E> e, int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// if some thread is waiting to receive an element then attempt to provide it
if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
}
}
return false;
}
/**
* Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and
* spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another
* slot until a total spin limit is reached.
*
* @param e the element to transfer
* @param start the arena location to start at
* @return if an exchange was completed successfully
*/
private boolean awaitExchange(NodeORG<E> e, int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
} else if (found == FREE && slot.compareAndSet(FREE, e)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != e) {
return true;
} else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) {
// failed to transfer the element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
}
// failed to transfer the element; give up
return false;
}
/**
* Attempts to receive an element from a waiting producer.
*
* @return an element if successfully transfered or null if unsuccessful
*/
private NodeORG<E> tryReceive(Thread thread) {
int start = startIndex(thread);
NodeORG<E> e = scanAndMatch(start);
if (e == null) {
return awaitMatch(start);
} else {
return e;
}
}
/**
* Scans the arena searching for a waiting producer to transfer from.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
private NodeORG<E> scanAndMatch(int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// accept a transfer if an element is available
Object found = slot.get();
if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
return null;
}
/**
* Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and
* spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by
* selecting another slot until a total spin limit is reached.
*
* @param start the arena location to start at
* @param node
* @return an element if successfully transfered or null if unsuccessful
*/
private NodeORG<E> awaitMatch(int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == FREE) {
if (slot.compareAndSet(FREE, WAITER)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
} else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) {
// failed to receive an element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
} else if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
// failed to receive an element; give up
return null;
}
/**
* Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code
* (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform
* distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a
* simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function.
*/
private static int startIndex(Thread thread) {
long id = thread.getId();
return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193;
}
}

View File

@ -1,42 +0,0 @@
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
public class NodeORG<E> {
// Improve likelihood of isolation on <= 64 byte cache lines
public long z0, z1, z2, z3, z4, z5, z6, z7, z8, z9, za, zb, zc, zd, ze;
public volatile E item;
/** The Thread waiting to be signaled to wake up*/
public volatile Thread waiter;
public AtomicReference<Object> state = new AtomicReference<Object>();
public volatile NodeORG<E> next;
public volatile short ID = 0;
public final boolean isConsumer;
private NodeORG(boolean isConsumer, E item) {
this.isConsumer = isConsumer;
this.item = item;
}
// prevent JIT from optimizing away the padding
public final long sum() {
return this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7 + this.z8 + this.z9 + this.za + this.zb + + this.zc + this.zd + this.ze;
}
public static <E> NodeORG<E> newProducer(E item) {
// producers VARY which node is used on which thread. (so the waiter is set in the put method)
return new NodeORG<E>(false, item);
}
public static <E> NodeORG<E> newConsumer(E item) {
// consumers will always use the SAME node in the SAME thread
NodeORG<E> node = new NodeORG<E>(true, item);
node.waiter = Thread.currentThread();
return node;
}
}

View File

@ -1,315 +0,0 @@
/*
* Modified by Dorkbox, llc
*
*
* Copyright 2013 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.PaddedAtomicReference;
/**
* An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The <em>top</em> of the stack
* is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A
* {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection.
* Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements.
* <p>
* This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique
* avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is
* described in <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728">A Scalable Lock-free Stack Algorithm</a>.
*
* @author ben.manes@gmail.com (Ben Manes)
* @see <a href="https://github.com/ben-manes/caffeine">Caffeine</a>
* @param <E> the type of elements held in this collection
*/
public final class NodePool<E> {
/*
* A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value
* atomically.
*
* The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows
* pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and
* therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a
* collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the
* same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a
* cancellation occurs.
*
* This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a
* match [4].
*
* [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data
* Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free
* fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel
* http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396
*/
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The number of slots in the elimination array. */
private static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
private static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
private static final int LOOKAHEAD = Math.min(4, NCPU);
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 2000;
/** The number of times to spin per lookahead step */
private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
@SuppressWarnings("rawtypes")
private static final Node FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
@SuppressWarnings("rawtypes")
private static final Node WAITER = new Node();
private static int ceilingNextPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1);
}
/** The top of the stack. */
private final AtomicReference<NodeORG<E>> top;
/** The arena where slots can be used to perform an exchange. */
private final AtomicReference<Object>[] arena;
/** Creates a {@code EliminationStack} that is initially empty. */
@SuppressWarnings("unchecked")
public NodePool() {
this.top = new PaddedAtomicReference<NodeORG<E>>();
this.arena = new PaddedAtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new PaddedAtomicReference<Object>();
}
}
/**
* Removes and returns the top element or returns <tt>null</tt> if this stack is empty.
*
* @return the top of this stack, or <tt>null</tt> if this stack is empty
*/
public NodeORG<E> pop() {
NodeORG<E> current;
for (;;) {
current = this.top.get();
if (current == null) {
return null;
}
// Attempt to pop from the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(current, current.next)) {
return current;
}
current = tryReceive();
if (current != null) {
return current;
}
}
}
/**
* Pushes an element onto the stack (in other words, adds an element at the top of this stack).
*
* @param e the element to push
*/
public void put(NodeORG<E> node) {
NodeORG<E> current;
for (;;) {
current = this.top.get();
node.next = current;
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(current, node)) {
return;
}
if (tryTransfer(node)) {
return;
}
}
}
/**
* Attempts to transfer the element to a waiting consumer.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean tryTransfer(NodeORG<E> e) {
int start = startIndex();
return scanAndTransferToWaiter(e, start) || awaitExchange(e, start);
}
/**
* Scans the arena searching for a waiting consumer to exchange with.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean scanAndTransferToWaiter(NodeORG<E> e, int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// if some thread is waiting to receive an element then attempt to provide it
if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
}
}
return false;
}
/**
* Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and
* spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another
* slot until a total spin limit is reached.
*
* @param e the element to transfer
* @param start the arena location to start at
* @return if an exchange was completed successfully
*/
boolean awaitExchange(NodeORG<E> e, int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
} else if (found == FREE && slot.compareAndSet(FREE, e)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != e) {
return true;
} else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) {
// failed to transfer the element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
}
// failed to transfer the element; give up
return false;
}
/**
* Attempts to receive an element from a waiting provider.
*
* @return an element if successfully transfered or null if unsuccessful
*/
NodeORG<E> tryReceive() {
int start = startIndex();
NodeORG<E> e = scanAndMatch(start);
return e == null ? awaitMatch(start) : e;
}
/**
* Scans the arena searching for a waiting producer to transfer from.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
NodeORG<E> scanAndMatch(int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// accept a transfer if an element is available
Object found = slot.get();
if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
return null;
}
/**
* Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and
* spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by
* selecting another slot until a total spin limit is reached.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
NodeORG<E> awaitMatch(int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == FREE) {
if (slot.compareAndSet(FREE, WAITER)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
} else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) {
// failed to receive an element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
} else if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
// failed to receive an element; give up
return null;
}
/**
* Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code
* (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform
* distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a
* simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function.
*/
static int startIndex() {
long id = Thread.currentThread().getId();
return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193;
}
}

View File

@ -2,8 +2,6 @@ package dorkbox.util.messagebus.common.simpleq.jctools;
import java.util.concurrent.ThreadLocalRandom;
import dorkbox.util.messagebus.common.simpleq.Node;
public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<Node> {
/** The number of CPUs */
@ -133,265 +131,6 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
}
}
//
// public Object xfer(final Object item, final boolean timed, final long nanos, final int incomingType) throws InterruptedException {
// // empty or same mode = push+park onto queue
// // complimentary mode = unpark+pop off queue
//
// // local load of field to avoid repeated loads after volatile reads
// final long mask = this.mask;
// final long capacity = this.mask + 1;
// final long[] sBuffer = this.sequenceBuffer;
//
// long currentProducerIndex = -1; // start with bogus value, hope we don't need it
// long pSeqOffset;
//
// long currentConsumerIndex = Long.MAX_VALUE; // start with bogus value, hope we don't need it
// long cSeqOffset;
//
// int prevElementType;
//
// while (true) {
// currentConsumerIndex = lvConsumerIndex(); // LoadLoad
// currentProducerIndex = lvProducerIndex(); // LoadLoad
//
//
// // the sequence must be loaded before we get the previous node type, otherwise previousNodeType can be stale/
// pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
// final long pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
//
// // if the queue is EMPTY, this will return TYPE_FREE
// // other consumers may have grabbed the element, or queue might be empty
// // may be FREE or CONSUMER/PRODUCER
// final long prevElementOffset = calcElementOffset(currentProducerIndex - 1, mask);
// prevElementType = lpElementType(prevElementOffset);
//
//// if (prevElementType == TYPE_CANCELED) {
//// // pop off and continue.
////
//// cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
//// final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
////
//// final long nextConsumerIndex = currentConsumerIndex + 1;
//// final long delta = seq - nextConsumerIndex;
////
//// if (delta == 0) {
//// if (prevElementType != lpElementType(prevElementOffset)) {
//// // inconsistent read of type.
//// continue;
//// }
////
//// if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
//// // Successful CAS: full barrier
////
//// // on 64bit(no compressed oops) JVM this is the same as seqOffset
//// final long offset = calcElementOffset(currentConsumerIndex, mask);
//// spElementType(offset, TYPE_FREE);
////
//// // Move sequence ahead by capacity, preparing it for next offer
//// // (seeing this value from a consumer will lead to retry 2)
//// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
//// }
//// // failed cas, retry 1
//// } else if (delta < 0 && // slot has not been moved by producer
//// currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
//// currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
//// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
//// // return null;
////
//// // spin
////// busySpin(prevElementType, incomingType);
//// }
////
//// // another consumer beat us and moved sequence ahead, retry 2
//// // only producer busyspins
//// continue;
//// }
//
//
// // it is possible that two threads check the queue at the exact same time,
// // BOTH can think that the queue is empty, resulting in a deadlock between threads
// // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// // in reality, it is.
//
// if (prevElementType == TYPE_FREE || prevElementType == incomingType) {
// // empty or same mode = push+park onto queue
//
// if (timed && nanos <= 0) {
// // can't wait
// return null;
// }
//
// final long delta = pSeq - currentProducerIndex;
// if (delta == 0) {
// if (prevElementType != lpElementType(prevElementOffset)) {
// // inconsistent read of type.
// continue;
// }
//
// // this is expected if we see this first time around
// final long nextProducerIndex = currentProducerIndex + 1;
// if (casProducerIndex(currentProducerIndex, nextProducerIndex)) {
// // Successful CAS: full barrier
//
// // on 64bit(no compressed oops) JVM this is the same as seqOffset
// final long offset = calcElementOffset(currentProducerIndex, mask);
//
// if (prevElementType == TYPE_FREE && currentProducerIndex != currentConsumerIndex) {
// // inconsistent read
//
// // try to undo, if possible.
// if (!casProducerIndex(nextProducerIndex, currentProducerIndex)) {
//// spElementType(offset, TYPE_CANCELED);
//
// // increment sequence by 1, the value expected by consumer
// // (seeing this value from a producer will lead to retry 2)
// soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
// }
//
// continue;
// }
//
// spElementType(offset, incomingType);
// spElementThread(offset, Thread.currentThread());
//
// if (incomingType == TYPE_CONSUMER) {
// // increment sequence by 1, the value expected by consumer
// // (seeing this value from a producer will lead to retry 2)
// soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
//
// park(offset, timed, nanos);
//
// Object lpElement = lpElementNoCast(offset);
// Object lpItem1 = lpItem1(lpElement);
//
// return lpItem1;
// } else {
// // producer
// Object lpElement = lpElementNoCast(offset);
// spItem1(lpElement, item);
//
// // increment sequence by 1, the value expected by consumer
// // (seeing this value from a producer will lead to retry 2)
// soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore
//
// park(offset, timed, nanos);
//
// return null;
// }
// }
// // failed cas, retry 1
// } else if (delta < 0 && // poll has not moved this value forward
// currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex
// currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex
// // Extra check required to ensure [Queue.offer == false iff queue is full]
// // return false;
// // we spin if the queue is full
// }
//
// // another producer has moved the sequence by one, retry 2
//
// // only producer will busySpin
//// busySpin(prevElementType, incomingType);
// }
// else {
// // complimentary mode = unpark+pop off queue
//
// cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
// final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
//
// final long nextConsumerIndex = currentConsumerIndex + 1;
// final long delta = seq - nextConsumerIndex;
//
// if (delta == 0) {
// if (prevElementType != lpElementType(prevElementOffset)) {
// // inconsistent read of type.
// continue;
// }
//
// if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) {
// // Successful CAS: full barrier
//
// if (prevElementType != lpElementType(prevElementOffset)) {
// System.err.println("WAHT");
//
// // Move sequence ahead by capacity, preparing it for next offer
// // (seeing this value from a consumer will lead to retry 2)
// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
// continue;
// }
//
// // on 64bit(no compressed oops) JVM this is the same as seqOffset
// final long offset = calcElementOffset(currentConsumerIndex, mask);
// spElementType(offset, TYPE_FREE);
//
// final Object thread = lpElementThread(offset);
// final Object lpElement = lpElementNoCast(offset);
//
// if (incomingType == TYPE_CONSUMER) {
// // is already cancelled/fulfilled
// if (thread == null ||
// !casElementThread(offset, thread, null)) { // FULL barrier
//
// // Move sequence ahead by capacity, preparing it for next offer
// // (seeing this value from a consumer will lead to retry 2)
// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
//
// continue;
// }
//
// Object returnItem = lpItem1(lpElement);
// spItem1(lpElement, null);
//
// UNSAFE.unpark(thread);
//
// // Move sequence ahead by capacity, preparing it for next offer
// // (seeing this value from a consumer will lead to retry 2)
// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
//
// return returnItem;
// } else {
// // producer
// spItem1(lpElement, item);
//
// // is already cancelled/fulfilled
// if (thread == null ||
// !casElementThread(offset, thread, null)) { // FULL barrier
//
// // Move sequence ahead by capacity, preparing it for next offer
// // (seeing this value from a consumer will lead to retry 2)
// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
//
// continue;
// }
//
// UNSAFE.unpark(thread);
//
// // Move sequence ahead by capacity, preparing it for next offer
// // (seeing this value from a consumer will lead to retry 2)
// soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore
//
// return null;
// }
// }
// // failed cas, retry 1
// } else if (delta < 0 && // slot has not been moved by producer
// currentConsumerIndex >= currentProducerIndex && // test against cached pIndex
// currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must
// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// // return null;
//
// // spin
//// busySpin(prevElementType, incomingType);
// }
//
// // another consumer beat us and moved sequence ahead, retry 2
// // only producer busyspins
// }
// }
// }
private static final void busySpin() {
ThreadLocalRandom randomYields = ThreadLocalRandom.current();

View File

@ -0,0 +1,106 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import com.lmax.disruptor.MessageType;
abstract class ColdItems {
public int type = 0;
public int messageType = MessageType.ONE;
public Object item1 = null;
public Object item2 = null;
public Object item3 = null;
public Object[] item4 = null;
}
abstract class Pad0 extends ColdItems {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem1 extends Pad0 {
public Thread thread;
}
public class Node extends HotItem1 {
private static final long ITEM1;
private static final long THREAD;
private static final long TYPE;
static {
try {
TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type"));
ITEM1 = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
// now make sure we can access UNSAFE
Node node = new Node();
Object o = new Object();
spItem1(node, o);
Object lpItem1 = lpItem1(node);
spItem1(node, null);
if (lpItem1 != o) {
throw new Exception("Cannot access unsafe fields");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static final void spItem1(Object node, Object item) {
UNSAFE.putObject(node, ITEM1, item);
}
static final void soItem1(Object node, Object item) {
UNSAFE.putOrderedObject(node, ITEM1, item);
}
static final Object lpItem1(Object node) {
return UNSAFE.getObject(node, ITEM1);
}
static final Object lvItem1(Object node) {
return UNSAFE.getObjectVolatile(node, ITEM1);
}
//////////////
static final void spType(Object node, int type) {
UNSAFE.putInt(node, TYPE, type);
}
static final int lpType(Object node) {
return UNSAFE.getInt(node, TYPE);
}
///////////
static final void spThread(Object node, Object thread) {
UNSAFE.putObject(node, THREAD, thread);
}
static final void soThread(Object node, Object thread) {
UNSAFE.putOrderedObject(node, THREAD, thread);
}
static final Object lpThread(Object node) {
return UNSAFE.getObject(node, THREAD);
}
static final Object lvThread(Object node) {
return UNSAFE.getObjectVolatile(node, THREAD);
}
// post-padding
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
public Node() {
}
// @Override
// public String toString() {
// return "[" + this.ID + "]";
// }
}

View File

@ -1,106 +1,35 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lpType;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.lvThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import dorkbox.util.messagebus.common.simpleq.Node;
public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
public static final int TYPE_EMPTY = 0;
public static final int TYPE_CONSUMER = 1;
public static final int TYPE_PRODUCER = 2;
private static final long ITEM1_OFFSET;
private static final long THREAD;
private static final long TYPE;
static {
try {
TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type"));
ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private static final void spItem1(Object node, Object item) {
UNSAFE.putObject(node, ITEM1_OFFSET, item);
}
private static final void soItem1(Object node, Object item) {
UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item);
}
private static final Object lpItem1(Object node) {
return UNSAFE.getObject(node, ITEM1_OFFSET);
}
private static final Object lvItem1(Object node) {
return UNSAFE.getObjectVolatile(node, ITEM1_OFFSET);
}
import java.util.concurrent.TransferQueue;
private static final void spType(Object node, int type) {
UNSAFE.putInt(node, TYPE, type);
}
public final class SimpleQueue extends MpmcArrayQueueConsumerField<Object> implements TransferQueue<Object> {
private static final int TYPE_EMPTY = 0;
private static final int TYPE_CONSUMER = 1;
private static final int TYPE_PRODUCER = 2;
private static final int lpType(Object node) {
return UNSAFE.getInt(node, TYPE);
}
private static final void spThread(Object node, Object thread) {
UNSAFE.putObject(node, THREAD, thread);
}
private static final void soThread(Object node, Object thread) {
UNSAFE.putOrderedObject(node, THREAD, thread);
}
private static final Object lpThread(Object node) {
return UNSAFE.getObject(node, THREAD);
}
private static final Object lvThread(Object node) {
return UNSAFE.getObjectVolatile(node, THREAD);
}
/** The number of CPUs */
/** Is it multi-processor? */
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
/**
* The number of times to spin (with randomly interspersed calls
* to Thread.yield) on multiprocessor before blocking when a node
* is apparently the first waiter in the queue. See above for
* explanation. Must be a power of two. The value is empirically
* derived -- it works pretty well across a variety of processors,
* numbers of CPUs, and OSes.
*/
private static final int FRONT_SPINS = 1 << 7;
private static int INPROGRESS_SPINS = MP ? 32 : 0;
private static int PUSH_SPINS = MP ? 512 : 0;
private static int POP_SPINS = MP ? 512 : 0;
/**
* The number of times to spin before blocking when a node is
* preceded by another node that is apparently spinning. Also
* serves as an increment to FRONT_SPINS on phase changes, and as
* base average frequency for yielding during spins. Must be a
* power of two.
*/
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = MP ? 0 : 512; // orig: 2000
/**
* The number of times to spin before blocking in timed waits.
@ -109,26 +38,26 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
static final int maxTimedSpins = NCPU < 2 ? 0 : 32;
private static int PARK_TIMED_SPINS = MP ? 32 : 0;
/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
*/
static final int maxUntimedSpins = maxTimedSpins * 16;
static final int negMaxUntimedSpins = -maxUntimedSpins;
private static int PARK_UNTIMED_SPINS = PARK_TIMED_SPINS * 16;
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
static final long spinForTimeoutThreshold = 1000L;
private int size;
private static final long SPIN_THRESHOLD = 1000L;
public SimpleQueue(final int size) {
super(1 << 17);
this.size = size;
private final int consumerCount;
public SimpleQueue(final int consumerCount) {
super(Pow2.roundToPowerOfTwo(consumerCount*Runtime.getRuntime().availableProcessors()));
this.consumerCount = consumerCount;
}
private final static ThreadLocal<Object> nodeThreadLocal = new ThreadLocal<Object>() {
@ -140,9 +69,12 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
/**
* PRODUCER
* PRODUCER method
* <p>
* Place an item on the queue, and wait (if necessary) for a corresponding consumer to take it. This will wait as long as necessary.
*/
public void put(Object item) throws InterruptedException {
@Override
public final void transfer(final Object item) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
@ -164,7 +96,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin_InProgress();
busySpin(INPROGRESS_SPINS);
continue;
}
@ -207,7 +139,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
}
// whoops, inconsistent state
busySpin_pushConflict();
busySpin(PUSH_SPINS);
continue;
}
case TYPE_CONSUMER: {
@ -237,7 +169,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
}
// whoops, inconsistent state
busySpin_popConflict();
busySpin(POP_SPINS);
continue;
}
}
@ -247,7 +179,8 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
/**
* CONSUMER
*/
public Object take() throws InterruptedException {
@Override
public final Object take() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
@ -268,7 +201,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin_InProgress();
busySpin(INPROGRESS_SPINS);
continue;
}
@ -313,7 +246,7 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
}
// whoops, inconsistent state
busySpin_pushConflict();
busySpin(PUSH_SPINS);
continue;
}
case TYPE_PRODUCER: {
@ -343,138 +276,90 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
}
// whoops, inconsistent state
busySpin_popConflict();
busySpin(POP_SPINS);
continue;
}
}
}
}
/**
* Spin for when the current thread is waiting for the item to be set. The producer index has incremented, but the
* item isn't present yet.
* @param random
*/
private static final void busySpin_InProgress() {
// ThreadLocalRandom randomYields = ThreadLocalRandom.current();
//
// if (randomYields.nextInt(1) != 0) {
////// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
////// break;
// }
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = 128;
private static final void busySpin(int spins) {
for (;;) {
if (spins > 0) {
// if (randomYields.nextInt(CHAINED_SPINS) == 0) {
//// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
//// break;
// }
--spins;
} else {
break;
return;
}
}
}
private static final void busySpin_pushConflict() {
// ThreadLocalRandom randomYields = ThreadLocalRandom.current();
@SuppressWarnings("null")
private final void park(final Object node, final Thread myThread, final boolean timed, long nanos) {
long lastTime = timed ? System.nanoTime() : 0L;
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = 256;
for (;;) {
if (spins > 0) {
// if (randomYields.nextInt(1) != 0) {
////// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
//// break;
// }
--spins;
} else {
break;
}
}
}
if (lvThread(node) == null) {
return;
} else if (myThread.isInterrupted() || timed && nanos <= 0) {
return;
} else if (spins < 0) {
if (timed) {
spins = PARK_TIMED_SPINS;
} else {
spins = PARK_UNTIMED_SPINS;
}
private static final void busySpin_popConflict() {
// ThreadLocalRandom randomYields = ThreadLocalRandom.current();
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = 64;
for (;;) {
if (spins > 0) {
// if (randomYields.nextInt(1) != 0) {
////// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
//// break;
// }
--spins;
} else {
break;
}
}
}
private static final void busySpin2() {
ThreadLocalRandom randomYields = ThreadLocalRandom.current();
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = 64;
for (;;) {
if (spins > 0) {
if (randomYields.nextInt(1) != 0) {
//// LockSupport.parkNanos(1); // occasionally yield
Thread.yield();
// break;
if (spins > 0) {
randomYields = ThreadLocalRandom.current();
}
} else if (spins > 0) {
if (randomYields.nextInt(256) == 0) {
Thread.yield(); // occasionally yield
}
--spins;
} else {
break;
}
}
}
private static final void busySpin(ThreadLocalRandom random) {
// busy spin for the amount of time (roughly) of a CPU context switch
// int spins = spinsFor();
int spins = 128;
for (;;) {
if (spins > 0) {
--spins;
if (random.nextInt(CHAINED_SPINS) == 0) {
//// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
break;
} else if (timed) {
long now = System.nanoTime();
long remaining = nanos -= now - lastTime;
if (remaining > 0) {
if (remaining < SPIN_THRESHOLD) {
busySpin(PARK_UNTIMED_SPINS);
} else {
UNSAFE.park(false, nanos);
}
}
lastTime = now;
} else {
break;
// park can return for NO REASON (must check for thread values)
UNSAFE.park(false, 0L);
}
}
}
// @Override
// public boolean isEmpty() {
// // Order matters!
// // Loading consumer before producer allows for producer increments after consumer index is read.
// // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// // nothing we can do to make this an exact method.
// return lvConsumerIndex() == lvProducerIndex();
// }
private final void unpark(Object node) {
final Object thread = lpThread(node);
soThread(node, null);
UNSAFE.unpark(thread);
}
public boolean hasPendingMessages() {
@Override
public final boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
}
public final boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured
// return this.consumersWaiting.size() == this.numberConsumerThreads;
// return false;
long consumerIndex = lvConsumerIndex();
long producerIndex = lvProducerIndex();
if (consumerIndex != producerIndex) {
final Object previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement != null && lpType(previousElement) == TYPE_CONSUMER && consumerIndex + this.size == producerIndex) {
if (previousElement != null && lpType(previousElement) == TYPE_CONSUMER && consumerIndex + this.consumerCount == producerIndex) {
return false;
}
}
@ -482,75 +367,158 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
return true;
}
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) {
}
public final void park(final Object node, final Thread myThread, final boolean timed, final long nanos) throws InterruptedException {
ThreadLocalRandom randomYields = null; // bound if needed
// long lastTime = timed ? System.nanoTime() : 0;
// int spins = timed ? maxTimedSpins : maxUntimedSpins;
// int spins = maxTimedSpins;
int spins = 51200;
// if (timed) {
// long now = System.nanoTime();
// nanos -= now - lastTime;
// lastTime = now;
// if (nanos <= 0) {
//// s.tryCancel(e);
// continue;
// }
// }
for (;;) {
if (lvThread(node) == null) {
return;
} else if (spins > 0) {
// if (randomYields == null) {
// randomYields = ThreadLocalRandom.current();
// } else if (randomYields.nextInt(spins) == 0) {
// Thread.yield(); // occasionally yield
// }
--spins;
} else if (myThread.isInterrupted()) {
Thread.interrupted();
throw new InterruptedException();
} else {
// park can return for NO REASON (must check for thread values)
UNSAFE.park(false, 0L);
}
}
}
public void unpark(Object node) {
final Object thread = lpThread(node);
soThread(node, null);
UNSAFE.unpark(thread);
}
@Override
public boolean offer(Node message) {
public boolean offer(Object message) {
// TODO Auto-generated method stub
return false;
}
@Override
public Node poll() {
public Object poll() {
// TODO Auto-generated method stub
return null;
}
@Override
public Node peek() {
// TODO Auto-generated method stub
return null;
public Object peek() {
long currConsumerIndex;
Object e;
do {
currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty
e = lpElementNoCast(calcElementOffset(currConsumerIndex));
// only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex());
return e;
}
@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}
@Override
public void put(Object e) throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean offer(Object e, long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return null;
}
@Override
public int remainingCapacity() {
// TODO Auto-generated method stub
return 0;
}
@Override
public int drainTo(Collection c) {
// TODO Auto-generated method stub
return 0;
}
@Override
public int drainTo(Collection c, int maxElements) {
// TODO Auto-generated method stub
return 0;
}
@Override
public Object[] toArray(Object[] a) {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean containsAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean addAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean removeAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean retainAll(Collection c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryTransfer(Object e) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryTransfer(Object e, long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean hasWaitingConsumer() {
// TODO Auto-generated method stub
return false;
}
@Override
public int getWaitingConsumerCount() {
// TODO Auto-generated method stub
return 0;
}

View File

@ -12,20 +12,21 @@
*/
package dorkbox.util.messagebus;
import java.util.concurrent.LinkedTransferQueue;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.LinkedTransferQueue;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class LinkTransferQueueConcurrentPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 8;
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());

View File

@ -20,7 +20,7 @@ import java.util.concurrent.LinkedTransferQueue;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class LinkTransferQueuePerfTest {
// 15 == 32 * 1024

View File

@ -18,8 +18,8 @@ package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcQueueAltConcurrentPerfTest {
// 15 == 32 * 1024

View File

@ -18,9 +18,9 @@ package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayTransferQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcQueueAltPerfTest {
// 15 == 32 * 1024

View File

@ -18,8 +18,8 @@ package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcQueueBaselineNodePerfTest {
// 15 == 32 * 1024

View File

@ -18,8 +18,8 @@ package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcQueueConcurrentPerfTest {
// 15 == 32 * 1024

View File

@ -1,50 +1,70 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue;
public class SimpleQueueAltPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10;
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 8;
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
final int warmupRuns = 2;
final int runs = 5;
long average = 0;
final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY);
average = averageRun(warmupRuns, runs, queue);
// SimpleQueue.INPROGRESS_SPINS = 64;
// SimpleQueue.POP_SPINS = 512;
// SimpleQueue.PUSH_SPINS = 512;
// SimpleQueue.PARK_SPINS = 512;
//
// for (int i = 128; i< 10000;i++) {
// int full = 2*i;
//
// final SimpleQueue queue = new SimpleQueue(QUEUE_CAPACITY);
// SimpleQueue.PARK_SPINS = full;
//
//
// long newAverage = averageRun(warmupRuns, runs, queue);
// if (newAverage > average) {
// average = newAverage;
// System.err.println("Best value: " + i + " : " + newAverage);
// }
// }
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
private static long averageRun(int warmUpRuns, int sumCount, SimpleQueue queue) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
// only average last X results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
return sum/sumCount;
}
private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception {
@ -109,13 +129,9 @@ public class SimpleQueueAltPerfTest {
int i = REPETITIONS;
this.start = System.nanoTime();
try {
do {
producer.put(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
}
}
@ -134,13 +150,9 @@ public class SimpleQueueAltPerfTest {
Object result = null;
int i = REPETITIONS;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
do {
result = consumer.take();
} while (0 != --i);
this.result = result;
this.end = System.nanoTime();