back to custom MPMC queue. wip locking

This commit is contained in:
nathan 2015-04-21 22:50:39 +02:00
parent b4e012c87c
commit 8b23c992e1
4 changed files with 99 additions and 77 deletions

View File

@ -1,46 +1,48 @@
package dorkbox.util.messagebus.common.simpleq;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.concurrent.atomic.AtomicReference;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
import dorkbox.util.messagebus.common.simpleq.jctools.Pow2;
public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node> {
private final static long THREAD;
private static final long TYPE_OFFSET;
// private final static long THREAD;
// private static final long TYPE_OFFSET;
private static final long ITEM1_OFFSET;
static {
try {
TYPE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("nodeType"));
// TYPE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer"));
ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
// THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private static final void setMessage1(Object node, Object item) {
private static final void spItem1(Object node, Object item) {
UNSAFE.putObject(node, ITEM1_OFFSET, item);
}
private static final Object getMessage1(Object node) {
private static final Object lpItem1(Object node) {
return UNSAFE.getObject(node, ITEM1_OFFSET);
}
public final Thread get() {
return (Thread) UNSAFE.getObject(this, THREAD);
}
public final void set(Thread newValue) {
UNSAFE.putOrderedObject(this, THREAD, newValue);
}
protected final boolean compareAndSet(Object expect, Object newValue) {
return UNSAFE.compareAndSwapObject(this, THREAD, expect, newValue);
}
// public final Thread get() {
// return (Thread) UNSAFE.getObject(this, THREAD);
// }
//
//
// public final void set(Thread newValue) {
// UNSAFE.putOrderedObject(this, THREAD, newValue);
// }
//
// protected final boolean compareAndSet(Object expect, Object newValue) {
// return UNSAFE.compareAndSwapObject(this, THREAD, expect, newValue);
// }
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
@ -52,7 +54,29 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000
private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000
/** The number of slots in the elimination array. */
private static final int ARENA_LENGTH = Pow2.roundToPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
private static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
private static final int LOOKAHEAD = Math.min(4, NCPU);
/** The number of times to spin per lookahead step */
private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
private static final Object FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
private static final Object WAITER = new Object();
/** The arena where slots can be used to perform an exchange. */
private final AtomicReference<Object>[] arena;
long p40, p41, p42, p43, p44, p45, p46;
@ -73,19 +97,25 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
soElement(elementOffset, new Node());
}
this.arena = new PaddedAtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new PaddedAtomicReference<Object>();
}
}
/**
* PRODUCER
*/
public void put(Object item) {
final Thread thread = Thread.currentThread();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
// long currentConsumerIndex;
long currentProducerIndex;
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
@ -95,7 +125,7 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
// currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
@ -109,9 +139,8 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
Object lpElement = lpElement(offset);
setMessage1(lpElement, item);
// spElement(offset, item);
Object lpElement = lpElementNoCast(offset);
spItem1(lpElement, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
@ -125,21 +154,21 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return null;
busySpin();
}
// another producer has moved the sequence by one, retry 2
// only producer will busySpin if contention
// busySpin();
busySpin();
}
}
/**
* CONSUMER
* @return null iff empty
*/
public Object take() {
final Thread thread = Thread.currentThread();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
@ -168,8 +197,8 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Object e = lpElement(offset);
Object item = getMessage1(e);
final Object e = lpElementNoCast(offset);
Object item = lpItem1(e);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
@ -184,7 +213,7 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField<Node>
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// contention. we WILL have data in the Q, we just got to it too quickly
// contention.
busySpin();
}

View File

@ -6,15 +6,15 @@ package dorkbox.util.messagebus.common.simpleq;
abstract class PrePad {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class ColdItems {
abstract class ColdItems extends PrePad {
// private static AtomicInteger count = new AtomicInteger();
// public final int ID = count.getAndIncrement();
// public short type = MessageType.ONE;
public transient volatile Object item1 = null;
public Object item1 = null;
// public Object item2 = null;
// public Object item3 = null;
// public Object[] item4 = null;
@ -22,35 +22,35 @@ abstract class ColdItems {
abstract class Pad0 extends ColdItems {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem1 extends Pad0 {
public transient volatile boolean isConsumer = false;
// public transient volatile boolean isConsumer = false;
}
abstract class Pad1 extends HotItem1 {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem2 extends Pad1 {
public transient volatile Thread thread;
// public transient volatile Thread thread;
}
abstract class Pad2 extends HotItem2 {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class HotItem3 extends Pad2 {
public transient volatile Node next;
// public transient volatile Node next;
}
public class Node extends HotItem3 {
// post-padding
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
public Node() {
}

View File

@ -41,7 +41,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> imp
* @param <E>
*/
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2);
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
protected static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
@ -186,6 +186,10 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
return lpElement(this.buffer, offset);
}
protected final Object lpElementNoCast(long offset) {
return lpElementNoCast(this.buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
@ -198,6 +202,17 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
return (E) UNSAFE.getObject(buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Object lpElementNoCast(E[] buffer, long offset) {
return UNSAFE.getObject(buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element TYPE from a given offset.
*

View File

@ -18,12 +18,12 @@ package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
public class MpmcQueueAltPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
@ -33,7 +33,7 @@ public class MpmcQueueAltPerfTest {
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcArrayQueue<Node> queue = new MpmcArrayQueue<Node>(QUEUE_CAPACITY);
final MpmcExchangerQueue queue = new MpmcExchangerQueue(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
@ -48,20 +48,16 @@ public class MpmcQueueAltPerfTest {
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcArrayQueue<Node> queue) throws Exception {
private static long performanceRun(int runNumber, MpmcExchangerQueue queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
MpmcArrayQueue<Node> consumer = queue;
Node result;
MpmcExchangerQueue consumer = queue;
Object result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
while (null == (result = consumer.poll())) {
queueEmpty++;
Thread.yield();
}
result = consumer.take();
} while (0 != --i);
long end = System.nanoTime();
@ -69,45 +65,27 @@ public class MpmcQueueAltPerfTest {
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, result.item1, queueEmpty, p.queueFull);
System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result);
return ops;
}
@SuppressWarnings("rawtypes")
public static class Producer implements Runnable {
private final MpmcArrayQueue queue;
int queueFull = 0;
private final MpmcExchangerQueue queue;
long start;
public Producer(MpmcArrayQueue queue) {
public Producer(MpmcExchangerQueue queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayQueue producer = this.queue;
MpmcExchangerQueue producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();
MpmcArrayQueue<Node> pool = new MpmcArrayQueue<Node>(2);
pool.offer(new Node());
pool.offer(new Node());
Node node;
do {
node = pool.poll();
node.item1 = TEST_VALUE;
while (!producer.offer(node)) {
Thread.yield();
f++;
}
pool.offer(node);
producer.put(TEST_VALUE);
} while (0 != --i);
this.queueFull = f;
this.start = s;
}
}