much better results. almost same as LTQ, but with no heap objects

This commit is contained in:
nathan 2015-05-03 22:43:35 +02:00
parent 8c01959080
commit 2705daa073
6 changed files with 346 additions and 285 deletions

View File

@ -7,11 +7,10 @@ import java.util.concurrent.TimeUnit;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.LinkedTransferQueue;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.TransferQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Pow2;
import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
@ -42,8 +41,8 @@ public class MultiMBassador implements IMessageBus {
// this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
private final TransferQueue<Runnable> dispatchQueue;
// private final SimpleQueue dispatchQueue;
// private final TransferQueue<Runnable> dispatchQueue;
private final SimpleQueue dispatchQueue;
private final SubscriptionManager subscriptionManager;
@ -88,8 +87,8 @@ public class MultiMBassador implements IMessageBus {
this.numberOfThreads = numberOfThreads;
this.dispatchQueue = new LinkedTransferQueue<Runnable>();
// this.dispatchQueue = new SimpleQueue(numberOfThreads);
// this.dispatchQueue = new LinkedTransferQueue<Runnable>();
this.dispatchQueue = new SimpleQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
this.threads = new ArrayDeque<Thread>(numberOfThreads);
@ -100,8 +99,8 @@ public class MultiMBassador implements IMessageBus {
Runnable runnable = new Runnable() {
@Override
public void run() {
// SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue;
TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue;
// TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
Object message1;
try {
@ -150,8 +149,8 @@ public class MultiMBassador implements IMessageBus {
@Override
public boolean hasPendingMessages() {
return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads;
// return this.dispatchQueue.hasPendingMessages();
// return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads;
return this.dispatchQueue.hasPendingMessages();
}
@Override
@ -422,8 +421,8 @@ public class MultiMBassador implements IMessageBus {
};
try {
this.dispatchQueue.transfer(runnable);
// this.dispatchQueue.put(message);
// this.dispatchQueue.transfer(runnable);
this.dispatchQueue.put(message);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")

View File

@ -7,7 +7,7 @@ package dorkbox.util.messagebus.common.simpleq;
abstract class PrePad {
// volatile long y0, y1, y2, y4, y5, y6 = 7L;
// volatile long z0, z1, z2, z4, z5, z6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class ColdItems extends PrePad {

View File

@ -1,7 +1,6 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;
import dorkbox.util.messagebus.common.simpleq.Node;
@ -35,67 +34,6 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
/** Creates a {@code EliminationStack} that is initially empty. */
public MpmcArrayTransferQueue(final int size) {
super(size);
//
// // pre-fill our data structures
//
// // local load of field to avoid repeated loads after volatile reads
// final long mask = this.mask;
// long currentProducerIndex;
//
// for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) {
// // on 64bit(no compressed oops) JVM this is the same as seqOffset
// final long elementOffset = calcElementOffset(currentProducerIndex, mask);
// soElement(elementOffset, new Node());
// }
}
/**
* Only put an element into the queue if the queue is empty
* @param item
* @param timed
* @param nanos
* @return the offset that the item was placed into
*/
public boolean putExact(long producerIndex, final Object item, final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
// final long capacity = mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long pSeqOffset;
// long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, item);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
return true;
}
// failed cas, retry 1
// } else if (delta < 0 && // poll has not moved this value forward
// producerIndex - capacity <= consumerIndex && // test against cached cIndex
// producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
//
// // Extra check required to ensure [Queue.offer == false iff queue is full]
// return false;
}
return false;
}
/**
@ -151,45 +89,6 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
}
}
public Object takeExact(long consumerIndex, final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long cSeqOffset;
// long producerIndex = -1; // start with bogus value, hope we don't need it
cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
return e;
}
// failed cas, retry 1
// } else if (delta < 0 && // slot has not been moved by producer
// consumerIndex >= producerIndex && // test against cached pIndex
// consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
// // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
}
// another consumer beat us and moved sequence ahead, retry 2
return null;
}
public Object take(final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
@ -494,18 +393,19 @@ public final class MpmcArrayTransferQueue extends MpmcArrayQueueConsumerField<No
// }
private static final void busySpin() {
ThreadLocalRandom randomYields = null; // bound if needed
randomYields = ThreadLocalRandom.current();
ThreadLocalRandom randomYields = ThreadLocalRandom.current();
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = spinsFor();
// int spins = 512;
// int spins = spinsFor();
int spins = CHAINED_SPINS;
for (;;) {
if (spins > 0) {
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0) {
LockSupport.parkNanos(1); // occasionally yield
// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
break;
}
--spins;
} else {
break;
}

View File

@ -2,11 +2,12 @@ package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import dorkbox.util.messagebus.common.simpleq.Node;
public final class SimpleQueue {
public final class SimpleQueue extends MpmcArrayQueueConsumerField<Node> {
public static final int TYPE_EMPTY = 0;
public static final int TYPE_CONSUMER = 1;
public static final int TYPE_PRODUCER = 2;
@ -46,10 +47,6 @@ public final class SimpleQueue {
UNSAFE.putInt(node, TYPE, type);
}
private static final void soType(Object node, int type) {
UNSAFE.putOrderedInt(node, TYPE, type);
}
private static final int lpType(Object node) {
return UNSAFE.getInt(node, TYPE);
}
@ -70,9 +67,27 @@ public final class SimpleQueue {
return UNSAFE.getObjectVolatile(node, THREAD);
}
private static final boolean casThread(Object node, Object expect, Object newValue) {
return UNSAFE.compareAndSwapObject(node, THREAD, expect, newValue);
}
/** The number of CPUs */
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
/**
* The number of times to spin (with randomly interspersed calls
* to Thread.yield) on multiprocessor before blocking when a node
* is apparently the first waiter in the queue. See above for
* explanation. Must be a power of two. The value is empirically
* derived -- it works pretty well across a variety of processors,
* numbers of CPUs, and OSes.
*/
private static final int FRONT_SPINS = 1 << 7;
/**
* The number of times to spin before blocking when a node is
* preceded by another node that is apparently spinning. Also
* serves as an increment to FRONT_SPINS on phase changes, and as
* base average frequency for yielding during spins. Must be a
* power of two.
*/
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/** The number of CPUs */
@ -85,7 +100,7 @@ public final class SimpleQueue {
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 512; // orig: 2000
private static final int SPINS = MP ? 0 : 512; // orig: 2000
/**
* The number of times to spin before blocking in timed waits.
@ -101,7 +116,7 @@ public final class SimpleQueue {
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
*/
static final int maxUntimedSpins = maxTimedSpins * 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final int negMaxUntimedSpins = -maxUntimedSpins;
/**
@ -109,201 +124,255 @@ public final class SimpleQueue {
* rather than to use timed park. A rough estimate suffices.
*/
static final long spinForTimeoutThreshold = 1000L;
private MpmcArrayTransferQueue queue;
private MpmcArrayTransferQueue pool;
private int size;
public SimpleQueue(final int size) {
int roundToPowerOfTwo = Pow2.roundToPowerOfTwo(size);
this.queue = new MpmcArrayTransferQueue(roundToPowerOfTwo);
this.pool = new MpmcArrayTransferQueue(roundToPowerOfTwo);
for (int i=0;i<roundToPowerOfTwo;i++) {
this.pool.put(new Node(), false, 0);
}
super(1 << 17);
this.size = size;
}
private final static ThreadLocal<Object> nodeThreadLocal = new ThreadLocal<Object>() {
@Override
protected Object initialValue() {
return new Node();
}
};
/**
* PRODUCER
*/
public void put(Object item) throws InterruptedException {
final MpmcArrayTransferQueue queue = this.queue;
final MpmcArrayTransferQueue pool = this.pool;
final Thread myThread = Thread.currentThread();
Object node = null;
// local load of field to avoid repeated loads after volatile reads
final long mask = queue.mask;
final long[] sBuffer = queue.sequenceBuffer;
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long cSeqOffset;
long currConsumerIndex;
long currProducerIndex;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
currConsumerIndex = queue.lvConsumerIndex();
currProducerIndex = queue.lvProducerIndex();
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (currConsumerIndex == currProducerIndex) {
final Object previousElement;
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
previousElement = null;
} else {
cSeqOffset = ConcurrentSequencedCircularArrayQueue.calcSequenceOffset(currConsumerIndex, mask);
final long seq = queue.lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currConsumerIndex + 1);
if (delta == 0) {
final Object lpElementNoCast = queue.lpElementNoCast(queue.calcElementOffset(currConsumerIndex));
if (lpElementNoCast == null) {
continue;
}
lastType = lpType(lpElementNoCast);
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin_InProgress();
continue;
}
lastType = lpType(previousElement);
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_PRODUCER: {
// empty or same mode = push+park onto queue
if (node == null) {
node = pool.take(false, 0);
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
park(node, myThread, false, 0);
return;
}
}
if (!queue.putExact(currProducerIndex, node, false, 0)) {
// whoops, inconsistent state
// busySpin2();
continue;
}
park(node, myThread, false, 0);
return;
// whoops, inconsistent state
busySpin_pushConflict();
continue;
}
case TYPE_CONSUMER: {
// complimentary mode = pop+unpark off queue
if (node != null) {
pool.put(node, false, 0);
}
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
node = queue.takeExact(currConsumerIndex, false, 0);
if (node == null) {
// whoops, inconsistent state
// busySpin2();
continue;
}
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
soItem1(node, item);
unpark(node);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
pool.put(node, false, 0);
return;
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
soItem1(e, item);
unpark(e);
return;
}
}
// whoops, inconsistent state
busySpin_popConflict();
continue;
}
}
}
}
/**
* CONSUMER
*/
public Object take() throws InterruptedException {
final MpmcArrayTransferQueue queue = this.queue;
final MpmcArrayTransferQueue pool = this.pool;
final Thread myThread = Thread.currentThread();
Object node = null;
// local load of field to avoid repeated loads after volatile reads
final long mask = queue.mask;
final long[] sBuffer = queue.sequenceBuffer;
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long cSeqOffset;
long currConsumerIndex;
long currProducerIndex;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
currConsumerIndex = queue.lvConsumerIndex();
currProducerIndex = queue.lvProducerIndex();
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (currConsumerIndex == currProducerIndex) {
final Object previousElement;
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
previousElement = null;
} else {
cSeqOffset = ConcurrentSequencedCircularArrayQueue.calcSequenceOffset(currConsumerIndex, mask);
final long seq = queue.lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currConsumerIndex + 1);
if (delta == 0) {
final Object lpElementNoCast = queue.lpElementNoCast(queue.calcElementOffset(currConsumerIndex));
if (lpElementNoCast == null) {
continue;
}
lastType = lpType(lpElementNoCast);
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin_InProgress();
continue;
}
lastType = lpType(previousElement);
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_CONSUMER:
{
case TYPE_CONSUMER: {
// empty or same mode = push+park onto queue
if (node == null) {
node = pool.take(false, 0);
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
spType(node, TYPE_CONSUMER);
spThread(node, myThread);
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_CONSUMER);
spThread(node, myThread);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
park(node, myThread, false, 0);
Object item1 = lvItem1(node);
return item1;
}
}
if (!queue.putExact(currProducerIndex, node, false, 0)) {
// whoops, inconsistent state
// busySpin2();
continue;
}
park(node, myThread, false, 0);
Object item1 = lvItem1(node);
return item1;
// whoops, inconsistent state
busySpin_pushConflict();
continue;
}
case TYPE_PRODUCER: {
// complimentary mode = pop+unpark off queue
if (node != null) {
pool.put(node, false, 0);
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
final Object lvItem1 = lpItem1(e);
unpark(e);
return lvItem1;
}
}
node = queue.takeExact(currConsumerIndex, false, 0);
if (node == null) {
// whoops, inconsistent state
// busySpin2();
continue;
}
final Object lvItem1 = lpItem1(node);
unpark(node);
pool.put(node, false, 0);
return lvItem1;
// whoops, inconsistent state
busySpin_popConflict();
continue;
}
}
}
}
private static final void busySpin2() {
/**
* Spin for when the current thread is waiting for the item to be set. The producer index has incremented, but the
* item isn't present yet.
* @param random
*/
private static final void busySpin_InProgress() {
// ThreadLocalRandom randomYields = ThreadLocalRandom.current();
//
// if (randomYields.nextInt(1) != 0) {
////// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
////// break;
// }
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = maxUntimedSpins;
int spins = 128;
for (;;) {
if (spins > 0) {
// if (randomYields.nextInt(CHAINED_SPINS) == 0) {
//// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
//// break;
// }
--spins;
} else {
break;
@ -311,12 +380,75 @@ public final class SimpleQueue {
}
}
private static final void busySpin() {
private static final void busySpin_pushConflict() {
// ThreadLocalRandom randomYields = ThreadLocalRandom.current();
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
int spins = 256;
for (;;) {
if (spins > 0) {
// if (randomYields.nextInt(1) != 0) {
////// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
//// break;
// }
--spins;
} else {
break;
}
}
}
private static final void busySpin_popConflict() {
// ThreadLocalRandom randomYields = ThreadLocalRandom.current();
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = 64;
for (;;) {
if (spins > 0) {
// if (randomYields.nextInt(1) != 0) {
////// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
//// break;
// }
--spins;
} else {
break;
}
}
}
private static final void busySpin2() {
ThreadLocalRandom randomYields = ThreadLocalRandom.current();
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = 64;
for (;;) {
if (spins > 0) {
if (randomYields.nextInt(1) != 0) {
//// LockSupport.parkNanos(1); // occasionally yield
Thread.yield();
// break;
}
--spins;
} else {
break;
}
}
}
private static final void busySpin(ThreadLocalRandom random) {
// busy spin for the amount of time (roughly) of a CPU context switch
// int spins = spinsFor();
int spins = 128;
for (;;) {
if (spins > 0) {
--spins;
if (random.nextInt(CHAINED_SPINS) == 0) {
//// LockSupport.parkNanos(1); // occasionally yield
// Thread.yield();
break;
}
} else {
break;
}
@ -335,7 +467,19 @@ public final class SimpleQueue {
public boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured
// return this.consumersWaiting.size() == this.numberConsumerThreads;
return false;
// return false;
long consumerIndex = lvConsumerIndex();
long producerIndex = lvProducerIndex();
if (consumerIndex != producerIndex) {
final Object previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement != null && lpType(previousElement) == TYPE_CONSUMER && consumerIndex + this.size == producerIndex) {
return false;
}
}
return true;
}
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
@ -343,12 +487,13 @@ public final class SimpleQueue {
}
public final void park(final Object node, final Thread myThread, final boolean timed, final long nanos) throws InterruptedException {
// if (casThread(node, null, myThread)) {
// we won against the other thread
ThreadLocalRandom randomYields = null; // bound if needed
// long lastTime = timed ? System.nanoTime() : 0;
// int spins = timed ? maxTimedSpins : maxUntimedSpins;
int spins = maxTimedSpins;
// int spins = maxTimedSpins;
int spins = 51200;
// if (timed) {
// long now = System.nanoTime();
@ -360,36 +505,53 @@ public final class SimpleQueue {
// }
// }
// busy spin for the amount of time (roughly) of a CPU context switch
// then park (if necessary)
for (;;) {
if (lpThread(node) == null) {
return;
} else if (spins > 0) {
--spins;
// } else if (spins > negMaxUntimedSpins) {
// --spins;
// UNSAFE.park(false, 1L);
} else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
UNSAFE.park(false, 0L);
if (myThread.isInterrupted()) {
// casThread(node, myThread, null);
Thread.interrupted();
throw new InterruptedException();
}
}
for (;;) {
if (lvThread(node) == null) {
return;
} else if (spins > 0) {
// if (randomYields == null) {
// randomYields = ThreadLocalRandom.current();
// } else if (randomYields.nextInt(spins) == 0) {
// Thread.yield(); // occasionally yield
// }
--spins;
} else if (myThread.isInterrupted()) {
Thread.interrupted();
throw new InterruptedException();
} else {
// park can return for NO REASON (must check for thread values)
UNSAFE.park(false, 0L);
}
// }
}
}
public void unpark(Object node) {
final Object thread = lpThread(node);
soThread(node, null);
UNSAFE.unpark(thread);
}
// if (thread != null && casThread(node, thread, Thread.currentThread())) {
// }
@Override
public boolean offer(Node message) {
// TODO Auto-generated method stub
return false;
}
@Override
public Node poll() {
// TODO Auto-generated method stub
return null;
}
@Override
public Node peek() {
// TODO Auto-generated method stub
return null;
}
@Override
public int size() {
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -20,12 +20,12 @@ import dorkbox.util.messagebus.common.simpleq.Node;
public class LinkTransferQueueConcurrentPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
private static final int concurrency = 8;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());

View File

@ -20,12 +20,12 @@ import dorkbox.util.messagebus.common.simpleq.jctools.SimpleQueue;
public class SimpleQueueAltPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 10;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
private static final int concurrency = 8;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
@ -88,7 +88,7 @@ public class SimpleQueueAltPerfTest {
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
long ops = REPETITIONS * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);