Added jctools library, cleaned up. Slow performance for 4x4??

This commit is contained in:
nathan 2015-05-08 13:32:56 +02:00
parent 1a591165a8
commit b967733a63
23 changed files with 575 additions and 1179 deletions

View File

@ -5,12 +5,13 @@ import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.jctools.util.Pow2;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Pow2;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;

View File

@ -1,155 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.AbstractQueue;
import java.util.Iterator;
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}
/**
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
* the array is padded on either side to help with False sharing prevention. It is expected that subclasses handle post
* padding.
* <p>
* Offset calculation is separate from access to enable the reuse of a give compute offset.
* <p>
* Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of final field reload after a
* LoadLoad barrier.
* <p>
*
* @author nitsanw
*
* @param <E>
*/
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
protected static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
static {
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
REF_ELEMENT_SHIFT = 2 + SPARSE_SHIFT;
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
+ (BUFFER_PAD << REF_ELEMENT_SHIFT - SPARSE_SHIFT);
}
protected final long mask;
// @Stable :(
protected final E[] buffer;
@SuppressWarnings("unchecked")
public ConcurrentCircularArrayQueue(int capacity) {
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = actualCapacity - 1;
// pad data on either end with some empty slots.
this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
}
/**
* @param index desirable element index
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index) {
return calcElementOffset(index, this.mask);
}
/**
* @param index desirable element index
* @param mask
* @return the offset in bytes within the array for a given index.
*/
protected static final long calcElementOffset(long index, long mask) {
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElement(long offset, Object e) {
UNSAFE.putObject(this.buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(long offset, E e) {
UNSAFE.putOrderedObject(this.buffer, offset, e);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lpElement(long offset) {
return (E) UNSAFE.getObject(this.buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final Object lpElementNoCast(long offset) {
return UNSAFE.getObject(this.buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lvElement(long offset) {
return (E) UNSAFE.getObjectVolatile(this.buffer, offset);
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
// we have to test isEmpty because of the weaker poll() guarantee
while (poll() != null || !isEmpty()) {
;
}
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E> {
private static final long ARRAY_BASE;
private static final int ELEMENT_SHIFT;
static {
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class);
if (8 == scale) {
ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unexpected long[] element size");
}
// Including the buffer pad in the array base offset
ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << ELEMENT_SHIFT - SPARSE_SHIFT);
}
protected final long[] sequenceBuffer;
public ConcurrentSequencedCircularArrayQueue(int capacity) {
super(capacity);
int actualCapacity = (int) (this.mask + 1);
// pad data on either end with some empty slots.
this.sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
for (long i = 0; i < actualCapacity; i++) {
soSequence(this.sequenceBuffer, calcSequenceOffset(i), i);
}
}
protected final long calcSequenceOffset(long index) {
return calcSequenceOffset(index, this.mask);
}
protected static final long calcSequenceOffset(long index, long mask) {
return ARRAY_BASE + ((index & mask) << ELEMENT_SHIFT);
}
protected final void soSequence(long[] buffer, long offset, long e) {
UNSAFE.putOrderedLong(buffer, offset, e);
}
protected final long lvSequence(long[] buffer, long offset) {
return UNSAFE.getLongVolatile(buffer, offset);
}
}

View File

@ -1,71 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import java.util.Queue;
/**
* This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface
* sufficient for concurrent message passing.<br>
* Message passing queues offer happens before semantics to messages passed through, namely that writes made by the
* producer before offering the message are visible to the consuming thread after the message has been polled out of the
* queue.
*
* @author nitsanw
*
* @param <M> the event/message type
*/
interface MessagePassingQueue<M> {
/**
* Called from a producer thread subject to the restrictions appropriate to the implementation and according to the
* {@link Queue#offer(Object)} interface.
*
* @param message
* @return true if element was inserted into the queue, false iff full
*/
boolean offer(M message);
/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#poll()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
M poll();
/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#peek()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
M peek();
/**
* This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a
* best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
*
* @return number of messages in the queue, between 0 and queue capacity or {@link Integer#MAX_VALUE} if not bounded
*/
int size();
/**
* This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
*
* @return true if empty, false otherwise
*/
boolean isEmpty();
}

View File

@ -1,214 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
/**
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
* any and all threads may call the offer/poll/peek methods and correctness is maintained. <br>
* This implementation follows patterns documented on the package level for False Sharing protection.<br>
* The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See <a
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original
* algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
* Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
* field of the struct. There is a further alternative in the experimental project which uses iteration phase
* markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
* well as this implementation.<br>
* Tradeoffs to keep in mind:
* <ol>
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of
* both arrays. We are trading memory to avoid false sharing(active and passive).
* <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
* elements array. This is doubling/tripling the memory allocated for the buffer.
* <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
* equal to the requested capacity.
* </ol>
*/
public class MpmcArrayQueue<T> extends MpmcArrayQueueConsumerField<T> {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 600;
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueue(final int capacity) {
super(validateCapacity(capacity));
}
private static int validateCapacity(int capacity) {
if(capacity < 2) {
throw new IllegalArgumentException("Minimum size is 2");
}
return capacity;
}
@Override
public boolean offer(final T e) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long producerIndex;
long pSeqOffset;
long consumerIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
producerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, e);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
return true;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
producerIndex - capacity <= consumerIndex && // test against cached cIndex
producerIndex - capacity <= (consumerIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
return false;
}
// another producer has moved the sequence by one, retry 2
busySpin();
}
}
/**
* {@inheritDoc}
* <p>
* Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
* and must test producer index when next element is not visible.
*/
@Override
public T poll() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long cSeqOffset;
long producerIndex = -1; // start with bogus value, hope we don't need it
while (true) {
consumerIndex = lvConsumerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final T e = lpElement(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
consumerIndex >= producerIndex && // test against cached pIndex
consumerIndex == (producerIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
// another consumer beat us and moved sequence ahead, retry 2
// only producer will busy spin
}
}
@Override
public T peek() {
long currConsumerIndex;
T e;
do {
currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty
e = lpElement(calcElementOffset(currConsumerIndex));
// only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex());
return e;
}
@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}
@Override
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
}
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
}

View File

@ -1,31 +0,0 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
public abstract class MpmcArrayQueueConsumerField<E> extends MpmcArrayQueueL2Pad<E> {
private final static long C_INDEX_OFFSET;
static {
try {
C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long consumerIndex;
public MpmcArrayQueueConsumerField(int capacity) {
super(capacity);
}
protected final long lvConsumerIndex() {
return this.consumerIndex;
}
protected final long lpConsumerIndex() {
return UNSAFE.getLong(this, C_INDEX_OFFSET);
}
protected final boolean casConsumerIndex(long expect, long newValue) {
return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
}
}

View File

@ -1,10 +0,0 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
abstract class MpmcArrayQueueL1Pad<E> extends ConcurrentSequencedCircularArrayQueue<E> {
long p10, p11, p12, p13, p14, p15, p16;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueueL1Pad(int capacity) {
super(capacity);
}
}

View File

@ -1,10 +0,0 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
abstract class MpmcArrayQueueL2Pad<E> extends MpmcArrayQueueProducerField<E> {
long p20, p21, p22, p23, p24, p25, p26;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueueL2Pad(int capacity) {
super(capacity);
}
}

View File

@ -1,28 +0,0 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
abstract class MpmcArrayQueueProducerField<E> extends MpmcArrayQueueL1Pad<E> {
private final static long P_INDEX_OFFSET;
static {
try {
P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class
.getDeclaredField("producerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long producerIndex;
public MpmcArrayQueueProducerField(int capacity) {
super(capacity);
}
protected final long lvProducerIndex() {
return this.producerIndex;
}
protected final boolean casProducerIndex(long expect, long newValue) {
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
}

View File

@ -10,15 +10,18 @@ import static dorkbox.util.messagebus.common.simpleq.jctools.Node.soThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spItem1;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spThread;
import static dorkbox.util.messagebus.common.simpleq.jctools.Node.spType;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import org.jctools.queues.MpmcArrayQueue;
import org.jctools.util.Pow2;
import org.jctools.util.UnsafeAccess;
public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Object> implements TransferQueue<Object> {
public final class MpmcTransferArrayQueue extends MpmcArrayQueue<Object> implements TransferQueue<Object> {
private static final int TYPE_EMPTY = 0;
private static final int TYPE_CONSUMER = 1;
private static final int TYPE_PRODUCER = 2;
@ -79,19 +82,127 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
*/
@Override
public final void transfer(final Object item) {
producerWait(item, false, 0L);
producerXfer(item, false, 0L);
}
/**
* CONSUMER
* <p>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the queue. This will
* as long as necessary
*/
@Override
public final Object take() {
return consumerWait(false, 0L);
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
previousElement = null;
} else {
previousElement = lpElement(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
lastType = lpType(previousElement);
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_CONSUMER: {
// if (lastType != TYPE_PRODUCER) {
// TYPE_EMPTY, TYPE_CONSUMER
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_CONSUMER);
spThread(node, myThread);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
park(node, myThread, false, 0L);
Object item1 = lvItem1(node);
return item1;
}
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
continue;
}
// else {
case TYPE_PRODUCER: {
// TYPE_PRODUCER
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
final Object lvItem1 = lpItem1(e);
unpark(e);
return lvItem1;
}
}
// whoops, inconsistent state
busySpin(POP_SPINS);
continue;
}
}
}
}
// modification of super implementation, as to include a small busySpin on contention
@Override
public boolean offer(Object item) {
public final boolean offer(Object item) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
@ -137,6 +248,9 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
}
}
/**
* Will busy spin until timeout
*/
@Override
public boolean offer(Object item, long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
@ -188,7 +302,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
if (remaining < SPIN_THRESHOLD) {
busySpin(PARK_UNTIMED_SPINS);
} else {
UNSAFE.park(false, 1L);
UnsafeAccess.UNSAFE.park(false, 1L);
}
} else {
return false;
@ -251,7 +365,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
// modification of super implementation, as to include a small busySpin on contention
@Override
public Object poll() {
// local load of field to avoid repeated loads after volatile reads
@ -274,7 +388,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
final Object e = lpElement(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
@ -312,7 +426,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
do {
currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty
e = lpElementNoCast(calcElementOffset(currConsumerIndex));
e = lpElement(calcElementOffset(currConsumerIndex));
// only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex());
return e;
@ -356,7 +470,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
lastType = TYPE_EMPTY;
previousElement = null;
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
previousElement = lpElement(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
@ -366,52 +480,49 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
lastType = lpType(previousElement);
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_PRODUCER: {
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (lastType != TYPE_CONSUMER) {
// TYPE_EMPTY, TYPE_PRODUCER
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
return false;
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
continue;
if (delta == 0) {
// don't add to queue, abort!!
return false;
}
case TYPE_CONSUMER: {
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// whoops, inconsistent state
busySpin(PUSH_SPINS);
} else {
// TYPE_CONSUMER
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset);
spElement(offset, null);
soItem1(e, item);
unpark(e);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
return true;
}
soItem1(e, item);
unpark(e);
return true;
}
// whoops, inconsistent state
busySpin(POP_SPINS);
continue;
}
// whoops, inconsistent state
busySpin(POP_SPINS);
}
}
}
@ -419,97 +530,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
@Override
public boolean tryTransfer(Object item, long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
long lastTime = System.nanoTime();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
previousElement = null;
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
lastType = lpType(previousElement);
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_PRODUCER: {
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
long now = System.nanoTime();
long remaining = nanos -= now - lastTime;
lastTime = now;
if (remaining > 0) {
if (remaining < SPIN_THRESHOLD) {
busySpin(PARK_UNTIMED_SPINS);
} else {
UNSAFE.park(false, 1L);
}
// make sure to continue here (so we don't spin twice)
continue;
} else {
return false;
}
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
continue;
}
case TYPE_CONSUMER: {
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
soItem1(e, item);
unpark(e);
return true;
}
}
// whoops, inconsistent state
busySpin(POP_SPINS);
continue;
}
}
}
return producerXfer(item, true, nanos);
}
@Override
@ -537,7 +558,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
final Object e = lpElement(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
@ -560,7 +581,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
if (remaining < SPIN_THRESHOLD) {
busySpin(PARK_UNTIMED_SPINS);
} else {
UNSAFE.park(false, 1L);
UnsafeAccess.UNSAFE.park(false, 1L);
}
// make sure to continue here (so we don't spin twice)
continue;
@ -635,7 +656,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
if (consumerIndex == producerIndex) {
return false;
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
previousElement = lpElement(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
@ -660,7 +681,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
if (consumerIndex == producerIndex) {
return 0;
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
previousElement = lpElement(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
@ -689,7 +710,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
if (consumerIndex == producerIndex) {
return true;
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
previousElement = lpElement(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
@ -711,17 +732,21 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
}
}
/**
* @return true if the park was correctly unparked. false if there was an interruption or a timed-park expired
*/
@SuppressWarnings("null")
private final void park(final Object node, final Thread myThread, final boolean timed, long nanos) {
long lastTime = timed ? System.nanoTime() : 0L;
private final boolean park(final Object node, final Thread myThread, final boolean timed, long nanos) {
long lastTime = System.nanoTime();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
if (lvThread(node) == null) {
return;
return true;
} else if (myThread.isInterrupted() || timed && nanos <= 0) {
return;
return false;
} else if (spins < 0) {
if (timed) {
spins = PARK_TIMED_SPINS;
@ -743,16 +768,17 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
lastTime = now;
if (remaining > 0) {
if (remaining < SPIN_THRESHOLD) {
// a park is too slow for this number, so just busy spin instead
busySpin(PARK_UNTIMED_SPINS);
} else {
UNSAFE.park(false, nanos);
UnsafeAccess.UNSAFE.park(false, nanos);
}
} else {
return;
return false;
}
} else {
// park can return for NO REASON (must check for thread values)
UNSAFE.park(false, 0L);
UnsafeAccess.UNSAFE.park(false, 0L);
}
}
}
@ -760,10 +786,14 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
private final void unpark(Object node) {
final Object thread = lpThread(node);
soThread(node, null);
UNSAFE.unpark(thread);
UnsafeAccess.UNSAFE.unpark(thread);
}
private final void producerWait(final Object item, final boolean timed, final long nanos) {
/**
* @return true if the producer successfully transferred an item (either through waiting for a consumer that took it, or transferring
* directly to an already waiting consumer). false if the thread was interrupted, or it was timed and has expired.
*/
private final boolean producerXfer(final Object item, final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
@ -781,7 +811,7 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
lastType = TYPE_EMPTY;
previousElement = null;
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
previousElement = lpElement(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
@ -791,177 +821,70 @@ public final class MpmcTransferArrayQueue extends MpmcArrayQueueConsumerField<Ob
lastType = lpType(previousElement);
}
switch (lastType) {
case TYPE_EMPTY:
case TYPE_PRODUCER: {
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (lastType != TYPE_CONSUMER) {
// TYPE_EMPTY, TYPE_PRODUCER
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spItem1(node, item);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
park(node, myThread, timed, nanos);
return;
}
return park(node, myThread, timed, nanos);
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
continue;
}
case TYPE_CONSUMER: {
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
soItem1(e, item);
unpark(e);
return;
}
}
// whoops, inconsistent state
busySpin(POP_SPINS);
continue;
}
}
}
}
private final Object consumerWait(final boolean timed, final long nanos) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long consumerIndex;
long producerIndex;
int lastType;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
final Object previousElement;
if (consumerIndex == producerIndex) {
lastType = TYPE_EMPTY;
previousElement = null;
// whoops, inconsistent state
busySpin(PUSH_SPINS);
} else {
previousElement = lpElementNoCast(calcElementOffset(producerIndex-1));
if (previousElement == null) {
// the last producer hasn't finished setting the object yet
busySpin(INPROGRESS_SPINS);
continue;
}
// TYPE_CONSUMER
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
lastType = lpType(previousElement);
}
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
switch (lastType) {
case TYPE_EMPTY:
case TYPE_CONSUMER: {
// empty or same mode = push+park onto queue
long pSeqOffset = calcSequenceOffset(producerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - producerIndex;
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElement(offset);
spElement(offset, null);
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(producerIndex, producerIndex + 1)) {
// Successful CAS: full barrier
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
final Thread myThread = Thread.currentThread();
final Object node = nodeThreadLocal.get();
soItem1(e, item);
unpark(e);
spType(node, TYPE_CONSUMER);
spThread(node, myThread);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(producerIndex, mask);
spElement(offset, node);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, producerIndex + 1); // StoreStore
park(node, myThread, timed, nanos);
Object item1 = lvItem1(node);
return item1;
}
return true;
}
// whoops, inconsistent state
busySpin(PUSH_SPINS);
continue;
}
case TYPE_PRODUCER: {
// complimentary mode = pop+unpark off queue
long cSeqOffset = calcSequenceOffset(consumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (consumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(consumerIndex, consumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(consumerIndex, mask);
final Object e = lpElementNoCast(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, consumerIndex + mask + 1); // StoreStore
final Object lvItem1 = lpItem1(e);
unpark(e);
return lvItem1;
}
}
// whoops, inconsistent state
busySpin(POP_SPINS);
continue;
}
// whoops, inconsistent state
busySpin(POP_SPINS);
}
}
}

View File

@ -1,6 +1,7 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import org.jctools.util.UnsafeAccess;
import dorkbox.util.messagebus.common.simpleq.MessageType;
abstract class ColdItems {
@ -29,9 +30,9 @@ public class Node extends HotItem1 {
static {
try {
TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type"));
ITEM1 = UNSAFE.objectFieldOffset(Node.class.getField("item1"));
THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread"));
TYPE = UnsafeAccess.UNSAFE.objectFieldOffset(Node.class.getField("type"));
ITEM1 = UnsafeAccess.UNSAFE.objectFieldOffset(Node.class.getField("item1"));
THREAD = UnsafeAccess.UNSAFE.objectFieldOffset(Node.class.getField("thread"));
// now make sure we can access UNSAFE
Node node = new Node();
@ -49,45 +50,45 @@ public class Node extends HotItem1 {
}
static final void spItem1(Object node, Object item) {
UNSAFE.putObject(node, ITEM1, item);
UnsafeAccess.UNSAFE.putObject(node, ITEM1, item);
}
static final void soItem1(Object node, Object item) {
UNSAFE.putOrderedObject(node, ITEM1, item);
UnsafeAccess.UNSAFE.putOrderedObject(node, ITEM1, item);
}
static final Object lpItem1(Object node) {
return UNSAFE.getObject(node, ITEM1);
return UnsafeAccess.UNSAFE.getObject(node, ITEM1);
}
static final Object lvItem1(Object node) {
return UNSAFE.getObjectVolatile(node, ITEM1);
return UnsafeAccess.UNSAFE.getObjectVolatile(node, ITEM1);
}
//////////////
static final void spType(Object node, int type) {
UNSAFE.putInt(node, TYPE, type);
UnsafeAccess.UNSAFE.putInt(node, TYPE, type);
}
static final int lpType(Object node) {
return UNSAFE.getInt(node, TYPE);
return UnsafeAccess.UNSAFE.getInt(node, TYPE);
}
///////////
static final void spThread(Object node, Object thread) {
UNSAFE.putObject(node, THREAD, thread);
UnsafeAccess.UNSAFE.putObject(node, THREAD, thread);
}
static final void soThread(Object node, Object thread) {
UNSAFE.putOrderedObject(node, THREAD, thread);
UnsafeAccess.UNSAFE.putOrderedObject(node, THREAD, thread);
}
static final Object lpThread(Object node) {
return UNSAFE.getObject(node, THREAD);
return UnsafeAccess.UNSAFE.getObject(node, THREAD);
}
static final Object lvThread(Object node) {
return UNSAFE.getObjectVolatile(node, THREAD);
return UnsafeAccess.UNSAFE.getObjectVolatile(node, THREAD);
}

View File

@ -1,43 +0,0 @@
/*
* https://github.com/JCTools/JCTools
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
/**
* Power of 2 utility functions.
*/
public class Pow2 {
/**
* Find the next larger positive power of two value up from the given value. If value is a power of two then
* this value will be returned.
*
* @param value from which next positive power of two will be found.
* @return the next positive power of 2 or this value if it is a power of 2.
*/
public static int roundToPowerOfTwo(final int value) {
return 1 << 32 - Integer.numberOfLeadingZeros(value - 1);
}
/**
* Is this value a power of two.
*
* @param value to be tested to see if it is a power of two.
* @return true if the value is a power of 2 otherwise false.
*/
public static boolean isPowerOfTwo(final int value) {
return (value & value - 1) == 0;
}
}

View File

@ -1,58 +0,0 @@
/*
* https://github.com/JCTools/JCTools
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import sun.misc.Unsafe;
/**
* Why should we resort to using Unsafe?<br>
* <ol>
* <li>To construct class fields which allow volatile/ordered/plain access: This requirement is covered by
* {@link AtomicReferenceFieldUpdater} and similar but their performance is arguably worse than the DIY approach
* (depending on JVM version) while Unsafe intrinsification is a far lesser challenge for JIT compilers.
* <li>To construct flavors of {@link AtomicReferenceArray}.
* <li>Other use cases exist but are not present in this library yet.
* </ol>
*
* @author nitsanw
*
*/
public class UnsafeAccess {
public static final boolean SUPPORTS_GET_AND_SET;
public static final Unsafe UNSAFE;
static {
try {
final Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);
} catch (Exception e) {
SUPPORTS_GET_AND_SET = false;
throw new RuntimeException(e);
}
boolean getAndSetSupport = false;
try {
Unsafe.class.getMethod("getAndSetObject", Object.class, Long.TYPE,Object.class);
getAndSetSupport = true;
} catch (Exception e) {
}
SUPPORTS_GET_AND_SET = getAndSetSupport;
}
}

View File

@ -1,107 +0,0 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
import java.util.concurrent.LinkedTransferQueue;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class LinkTransferQueuePerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, LinkedTransferQueue<Integer> queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
LinkedTransferQueue<Integer> consumer = queue;
Object result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
result = consumer.take();
} while (0 != --i);
long end = System.nanoTime();
thread.join();
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, result, queueEmpty, p.queueFull);
return ops;
}
public static class Producer implements Runnable {
private final LinkedTransferQueue<Integer> queue;
int queueFull = 0;
long start;
public Producer(LinkedTransferQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedTransferQueue<Integer> producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
this.queueFull = f;
this.start = s;
}
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final LinkedBlockingQueue queue;
volatile long start;
public Producer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedBlockingQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
try {
do {
producer.put(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
}
}
public static class Consumer implements Runnable {
private final LinkedBlockingQueue queue;
Object result;
volatile long end;
public Consumer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedBlockingQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue_NonBlock {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final LinkedBlockingQueue queue = new LinkedBlockingQueue(1 << 17);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, LinkedBlockingQueue queue) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue);
consumers[i] = new Consumer(queue);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
return ops;
}
public static class Producer implements Runnable {
private final LinkedBlockingQueue queue;
volatile long start;
public Producer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedBlockingQueue producer = this.queue;
int i = REPETITIONS;
this.start = System.nanoTime();
do {
while (!producer.offer(TEST_VALUE)) {
Thread.yield();
}
} while (0 != --i);
}
}
public static class Consumer implements Runnable {
private final LinkedBlockingQueue queue;
Object result;
volatile long end;
public Consumer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
LinkedBlockingQueue consumer = this.queue;
Object result = null;
int i = REPETITIONS;
do {
while (null == (result = consumer.poll())) {
Thread.yield();
}
} while (0 != --i);
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -14,24 +14,16 @@ package dorkbox.util.messagebus;
import java.util.concurrent.LinkedTransferQueue;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class LinkTransferQueueConcurrentPerfTest {
public class PerfTest_LinkedTransferQueue {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
private static final int concurrency = 1;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final LinkedTransferQueue queue = new LinkedTransferQueue();

View File

@ -14,12 +14,7 @@ package dorkbox.util.messagebus;
import java.util.concurrent.LinkedTransferQueue;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class LinkTransferQueueConcurrentNonBlockPerfTest {
public class PerfTest_LinkedTransferQueue_NonBlock {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
@ -29,9 +24,6 @@ public class LinkTransferQueueConcurrentNonBlockPerfTest {
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final LinkedTransferQueue queue = new LinkedTransferQueue();

View File

@ -15,12 +15,14 @@
*/
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import org.jctools.queues.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
public class MpmcQueueBaselinePerfTest {
public class PerfTest_MpmcArrayQueue_Baseline {
static {
System.setProperty("sparse.shift", "2");
}
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
@ -28,11 +30,8 @@ public class MpmcQueueBaselinePerfTest {
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Integer.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY);
final MpmcArrayQueue<Integer> queue = new MpmcArrayQueue<Integer>(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
@ -47,13 +46,13 @@ public class MpmcQueueBaselinePerfTest {
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception {
private static long performanceRun(int runNumber, MpmcArrayQueue<Integer> queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
MpmcArrayQueue consumer = queue;
Object result;
MpmcArrayQueue<Integer> consumer = queue;
Integer result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
@ -74,17 +73,17 @@ public class MpmcQueueBaselinePerfTest {
}
public static class Producer implements Runnable {
private final MpmcArrayQueue queue;
private final MpmcArrayQueue<Integer> queue;
int queueFull = 0;
long start;
public Producer(MpmcArrayQueue queue) {
public Producer(MpmcArrayQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayQueue producer = this.queue;
MpmcArrayQueue<Integer> producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();

View File

@ -15,13 +15,11 @@
*/
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import org.jctools.queues.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcQueueBaselineNodePerfTest {
public class PerfTest_MpmcArrayQueue_Baseline_Node {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
@ -29,9 +27,6 @@ public class MpmcQueueBaselineNodePerfTest {
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcArrayQueue<Node> queue = new MpmcArrayQueue<Node>(QUEUE_CAPACITY);

View File

@ -15,25 +15,18 @@
*/
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import org.jctools.queues.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcQueueConcurrentPerfTest {
public class PerfTest_MpmcArrayQueue_Concurrent {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 100;
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
private static final int concurrency = 2;
private static final int concurrency = 1;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final MpmcArrayQueue queue = new MpmcArrayQueue(1 << 17);
@ -113,7 +106,9 @@ public class MpmcQueueConcurrentPerfTest {
this.start = System.nanoTime();
do {
producer.offer(TEST_VALUE);
while (!producer.offer(TEST_VALUE)) {
Thread.yield();
}
} while (0 != --i);
}
}
@ -134,7 +129,9 @@ public class MpmcQueueConcurrentPerfTest {
int i = REPETITIONS;
do {
result = consumer.poll();
while (null == (result = consumer.poll())) {
Thread.yield();
}
} while (0 != --i);
this.result = result;

View File

@ -6,15 +6,16 @@ import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcTransferPerfTest {
public class PerfTest_MpmcTransferArrayQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 2;
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());

View File

@ -1,25 +1,16 @@
package dorkbox.util.messagebus;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.util.VMSupport;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.jctools.Node;
public class MpmcNonBlockPerfTest {
public static final int REPETITIONS = 50 * 1000 * 100;
public class PerfTest_MpmcTransferArrayQueue_NonBlock {
public static final int REPETITIONS = 50 * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 4;
private static final int concurrency = 1;
public static void main(final String[] args) throws Exception {
System.out.println(VMSupport.vmDetails());
System.out.println(ClassLayout.parseClass(Node.class).toPrintable());
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 2;
final int runs = 5;
@ -129,13 +120,11 @@ public class MpmcNonBlockPerfTest {
int i = REPETITIONS;
this.start = System.nanoTime();
try {
do {
producer.put(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
do {
while (!producer.offer(TEST_VALUE)) {
Thread.yield();
}
} while (0 != --i);
}
}
@ -155,8 +144,8 @@ public class MpmcNonBlockPerfTest {
int i = REPETITIONS;
do {
while((result = consumer.poll()) == null) {
Thread.yield();
while (null == (result = consumer.poll())) {
Thread.yield();
}
} while (0 != --i);