WIP, not liking results

This commit is contained in:
nathan 2015-04-21 19:47:34 +02:00
parent 5be16c6345
commit b4e012c87c
3 changed files with 83 additions and 88 deletions

View File

@ -65,6 +65,7 @@ public class LinkedArrayList extends PadA1 {
final Object lvHead() { final Object lvHead() {
return UNSAFE.getObjectVolatile(this, HEAD); return UNSAFE.getObjectVolatile(this, HEAD);
// return this.head;
} }
final Object lpHead() { final Object lpHead() {
@ -76,48 +77,51 @@ public class LinkedArrayList extends PadA1 {
} }
final Object lpTail() { final Object lpTail() {
return UNSAFE.getObject(this, TAIL); return lpTail(this);
} }
final Object lpNext(Object node) { static final Object lpTail(Object source) {
return UNSAFE.getObject(source, TAIL);
}
static final Object lpNext(Object node) {
return UNSAFE.getObject(node, NEXT); return UNSAFE.getObject(node, NEXT);
} }
final boolean advanceTail(Object expected, Object newTail) { final boolean advanceTail(Object expected, Object newTail) {
// if (expected == lvTail()) { final Object that = this;
return UNSAFE.compareAndSwapObject(this, TAIL, expected, newTail); return advanceTail(that, lpTail(that), expected, newTail);
// } }
// return false;
static final boolean advanceTail(Object source, Object objectActualTail, Object expected, Object newTail) {
return expected == objectActualTail && UNSAFE.compareAndSwapObject(source, TAIL, expected, newTail);
} }
final boolean advanceHead(Object expected, Object newHead) { final boolean advanceHead(Object expected, Object newHead) {
// if (expected == lvHead()) { return expected == lpHead() && UNSAFE.compareAndSwapObject(this, HEAD, expected, newHead);
return UNSAFE.compareAndSwapObject(this, HEAD, expected, newHead);
// }
// return false;
} }
final Object lvThread(Object node) { static final Object lvThread(Object node) {
return UNSAFE.getObjectVolatile(node, THREAD); return UNSAFE.getObjectVolatile(node, THREAD);
} }
final Object lpThread(Object node) { static final Object lpThread(Object node) {
return UNSAFE.getObject(node, THREAD); return UNSAFE.getObject(node, THREAD);
} }
final void spThread(Object node, Thread thread) { static final void spThread(Object node, Thread thread) {
UNSAFE.putObject(node, THREAD, thread); UNSAFE.putObject(node, THREAD, thread);
} }
final boolean casThread(Object node, Object expected, Object newThread) { static final boolean casThread(Object node, Object expected, Object newThread) {
return UNSAFE.compareAndSwapObject(node, THREAD, expected, newThread); return UNSAFE.compareAndSwapObject(node, THREAD, expected, newThread);
} }
final boolean lpType(Object node) { static final boolean lpType(Object node) {
return UNSAFE.getBoolean(node, IS_CONSUMER); return UNSAFE.getBoolean(node, IS_CONSUMER);
} }
final void spType(Object node, boolean isConsumer) { static final void spType(Object node, boolean isConsumer) {
UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer); UNSAFE.putBoolean(node, IS_CONSUMER, isConsumer);
} }

View File

@ -5,7 +5,7 @@ package dorkbox.util.messagebus.common.simpleq;
// mpmc sparse.shift = 2, for this to be fast. // mpmc sparse.shift = 2, for this to be fast.
abstract class PrePad { abstract class PrePad {
volatile long y0, y1, y2, y4, y5, y6 = 7L; // volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L;
} }
@ -14,42 +14,42 @@ abstract class ColdItems {
// public final int ID = count.getAndIncrement(); // public final int ID = count.getAndIncrement();
// public short type = MessageType.ONE; // public short type = MessageType.ONE;
public volatile Object item1 = null; public transient volatile Object item1 = null;
// public Object item2 = null; // public Object item2 = null;
// public Object item3 = null; // public Object item3 = null;
// public Object[] item4 = null; // public Object[] item4 = null;
} }
abstract class Pad0 extends ColdItems { abstract class Pad0 extends ColdItems {
volatile long y0, y1, y2, y4, y5, y6 = 7L; // volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L;
} }
abstract class HotItem1 extends Pad0 { abstract class HotItem1 extends Pad0 {
public volatile boolean isConsumer = false; public transient volatile boolean isConsumer = false;
} }
abstract class Pad1 extends HotItem1 { abstract class Pad1 extends HotItem1 {
volatile long y0, y1, y2, y4, y5, y6 = 7L; // volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L;
} }
abstract class HotItem2 extends Pad1 { abstract class HotItem2 extends Pad1 {
public volatile Thread thread; public transient volatile Thread thread;
} }
abstract class Pad2 extends HotItem2 { abstract class Pad2 extends HotItem2 {
volatile long y0, y1, y2, y4, y5, y6 = 7L; // volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L;
} }
abstract class HotItem3 extends Pad2 { abstract class HotItem3 extends Pad2 {
public volatile Node next; public transient volatile Node next;
} }
public class Node extends HotItem3 { public class Node extends HotItem3 {
// post-padding // post-padding
volatile long y0, y1, y2, y4, y5, y6 = 7L; // volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L; volatile long z0, z1, z2, z4, z5, z6 = 7L;
public Node() { public Node() {

View File

@ -31,7 +31,7 @@ public final class SimpleQueue extends LinkedArrayList {
* This is greater than timed value because untimed waits spin * This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin. * faster since they don't need to check times on each spin.
*/ */
static final int maxUntimedSpins = maxTimedSpins * 64; static final int maxUntimedSpins = maxTimedSpins * 32;
static final int negMaxUntimedSpins = -maxUntimedSpins; static final int negMaxUntimedSpins = -maxUntimedSpins;
/** /**
@ -77,8 +77,17 @@ public final class SimpleQueue extends LinkedArrayList {
// BOTH can think that the queue is empty, resulting in a deadlock between threads // BOTH can think that the queue is empty, resulting in a deadlock between threads
// it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when
// in reality, it is. // in reality, it is.
final boolean empty = head == lpNext(tail); boolean empty = head == lpNext(tail);
final boolean sameMode = lpType(tail) == isConsumer; boolean sameMode = lpType(tail) == isConsumer;
// check to make sure we are not "double dipping" on our head
thread = lpThread(head);
if (empty && thread != null) {
empty = false;
} else if (sameMode && thread == null) {
busySpin();
continue;
}
// empty or same mode = push+park onto queue // empty or same mode = push+park onto queue
if (empty || sameMode) { if (empty || sameMode) {
@ -94,26 +103,13 @@ public final class SimpleQueue extends LinkedArrayList {
continue; continue;
} }
thread = lpThread(head);
if (thread != null) {
if (empty) {
busySpin();
continue;
}
} else {
if (sameMode) {
busySpin();
continue;
}
}
if (isConsumer) { if (isConsumer) {
spType(tNext, isConsumer); spType(tNext, isConsumer);
spThread(tNext, Thread.currentThread()); spThread(tNext, Thread.currentThread());
if (!advanceTail(tail, tNext)) { // FULL barrier if (!advanceTail(tail, tNext)) { // FULL barrier
// failed to link in // failed to link in
busySpin(); // busySpin();
continue; continue;
} }
@ -121,23 +117,24 @@ public final class SimpleQueue extends LinkedArrayList {
// this will only advance head if necessary // this will only advance head if necessary
// advanceHead(tail, tNext); // advanceHead(tail, tNext);
return lpItem1(tNext); Object lpItem1 = lpItem1(tNext);
spItem1(tNext, null);
return lpItem1;
} else { } else {
// producer
spType(tNext, isConsumer); spType(tNext, isConsumer);
spItem1(tNext, item); spItem1(tNext, item);
spThread(tNext, Thread.currentThread()); spThread(tNext, Thread.currentThread());
if (!advanceTail(tail, tNext)) { // FULL barrier if (!advanceTail(tail, tNext)) { // FULL barrier
// failed to link in // failed to link in
busySpin();
continue; continue;
} }
park(tNext, timed, nanos); park(tNext, timed, nanos);
// this will only advance head if necessary // this will only advance head if necessary
// advanceHead(tail, tNext); advanceHead(tail, tNext);
return null; return null;
} }
} }
@ -153,55 +150,37 @@ public final class SimpleQueue extends LinkedArrayList {
thread = lpThread(head); thread = lpThread(head);
if (isConsumer) { if (isConsumer) {
Object returnVal; Object returnVal = lpItem1(head);
// while (true) {
returnVal = lpItem1(head);
// is already cancelled/fulfilled // is already cancelled/fulfilled
if (thread == null || if (thread == null ||
!casThread(head, thread, null)) { // FULL barrier !casThread(head, thread, null)) { // FULL barrier
// move head forward to look for next "ready" node // head was already used by a different thread
if (!advanceHead(head, next)) { // FULL barrier advanceHead(head, next); // FULL barrier
busySpin();
// head = next;
// next = lpNext(head);
}
// thread = lpThread(head); continue;
}
continue;
}
// break;
// }
spItem1(head, null);
LockSupport.unpark((Thread) thread); LockSupport.unpark((Thread) thread);
advanceHead(head, next); // FULL barrier advanceHead(head, next); // FULL barrier
return returnVal; return returnVal;
} else { } else {
// while (true) { // producer
spItem1(head, item); // StoreStore spItem1(head, item); // StoreStore
// is already cancelled/fulfilled // is already cancelled/fulfilled
if (thread == null || if (thread == null ||
!casThread(head, thread, null)) { // FULL barrier !casThread(head, thread, null)) { // FULL barrier
// move head forward to look for next "ready" node // head was already used by a different thread
if (!advanceHead(head, next)) { // FULL barrier advanceHead(head, next); // FULL barrier
// head = next;
// next = lpNext(head);
busySpin();
}
// thread = lpThread(head); continue;
}
continue;
}
// break;
// }
LockSupport.unpark((Thread) thread); LockSupport.unpark((Thread) thread);
@ -212,6 +191,18 @@ public final class SimpleQueue extends LinkedArrayList {
} }
} }
private static final void busySpin2() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = maxUntimedSpins;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
private static final void busySpin() { private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch // busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS; int spins = SPINS;
@ -243,13 +234,13 @@ public final class SimpleQueue extends LinkedArrayList {
// busy spin for the amount of time (roughly) of a CPU context switch // busy spin for the amount of time (roughly) of a CPU context switch
// then park (if necessary) // then park (if necessary)
for (;;) { for (;;) {
if (lvThread(myNode) == null) { if (lpThread(myNode) == null) {
return; return;
} else if (spins > 0) { // } else if (spins > 0) {
--spins; // --spins;
} else if (spins > negMaxUntimedSpins) { } else if (spins > negMaxUntimedSpins) {
--spins; --spins;
LockSupport.parkNanos(1); // LockSupport.parkNanos(1);
} else { } else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park(); LockSupport.park();