wip exchanger. wip simpleq

This commit is contained in:
nathan 2015-03-01 12:29:35 +01:00
parent 3c021f383c
commit 9acffac11b
3 changed files with 83 additions and 37 deletions

View File

@ -4,7 +4,6 @@ import java.lang.reflect.Array;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.MessageHolder;
@ -63,7 +62,7 @@ public class MultiMBassador implements IMessageBus {
* By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages
*/
public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors());
this(Runtime.getRuntime().availableProcessors()/2);
// this(2);
}
@ -108,18 +107,18 @@ public class MultiMBassador implements IMessageBus {
while (true) {
try {
spins = maxSpins;
while ((event = IN_QUEUE.poll()) == null) {
if (spins > 100) {
--spins;
} else if (spins > 0) {
--spins;
LockSupport.parkNanos(1L);
} else {
// spins = maxSpins;
// while ((event = IN_QUEUE.poll()) == null) {
// if (spins > 100) {
// --spins;
// } else if (spins > 0) {
// --spins;
// LockSupport.parkNanos(1L);
// } else {
event = IN_QUEUE.take();
break;
}
}
// break;
// }
// }
Object message1 = event.message1;
IN_QUEUE.release(event);

View File

@ -0,0 +1,51 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.lmax.disruptor.MessageHolder;
public class ExchangerQueue {
private final AtomicInteger availableThreads = new AtomicInteger();
private final Exchanger<MessageHolder> exchanger = new Exchanger<MessageHolder>();
ThreadLocal<MessageHolder> holder = new ThreadLocal<MessageHolder>() {
@Override
protected MessageHolder initialValue() {
return new MessageHolder();
}
};
public ExchangerQueue(int numberOfThreads) {
this.availableThreads.set(numberOfThreads);
}
public void transfer(Object message1) throws InterruptedException {
MessageHolder messageHolder = this.holder.get();
messageHolder.message1 = message1;
this.holder.set(this.exchanger.exchange(messageHolder));
}
public boolean hasPendingMessages() {
return false;
}
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
}
public MessageHolder poll() {
return null;
}
public MessageHolder take(MessageHolder holder) throws InterruptedException {
return this.exchanger.exchange(holder);
}
}

View File

@ -11,7 +11,7 @@ import com.lmax.disruptor.MessageHolder;
public class SimpleQueue {
public final class SimpleQueue {
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
@ -25,7 +25,6 @@ public class SimpleQueue {
private final AtomicReference<MessageHolder> consumer = new AtomicReference<MessageHolder>();
private final AtomicReference<MessageHolder> producer = new AtomicReference<MessageHolder>();
private volatile boolean waitingProducer = false;
private final AtomicInteger availableThreads = new AtomicInteger();
@ -40,10 +39,10 @@ public class SimpleQueue {
if ((holder = this.producer.getAndSet(null)) == null) {
this.publisherLock.lock();
try {
while ((holder = this.producer.getAndSet(null)) == null) {
this.waitingProducer = true;
do {
this.publisherNotifyCondition.await();
}
// LockSupport.parkNanos(1L);
} while ((holder = this.producer.getAndSet(null)) == null);
} finally {
this.publisherLock.unlock();
}
@ -69,20 +68,22 @@ public class SimpleQueue {
}
public MessageHolder poll() {
return this.consumer.getAndSet(null);
}
// public MessageHolder poll() {
// return this.consumer.getAndSet(null);
// }
public MessageHolder take() throws InterruptedException {
MessageHolder holder = null;
this.consumerLock.lock();
try {
while ((holder = this.consumer.getAndSet(null)) == null) {
this.consumerNotifyCondition.await();
if ((holder = this.consumer.getAndSet(null)) == null) {
this.consumerLock.lock();
try {
do {
this.consumerNotifyCondition.await();
} while ((holder = this.consumer.getAndSet(null)) == null);
} finally {
this.consumerLock.unlock();
}
} finally {
this.consumerLock.unlock();
}
@ -94,16 +95,11 @@ public class SimpleQueue {
public void release(MessageHolder holder) {
this.producer.set(holder);
if (this.waitingProducer) {
this.publisherLock.lock();
try {
if (this.waitingProducer) {
this.waitingProducer = false;
this.publisherNotifyCondition.signalAll();
}
} finally {
this.publisherLock.unlock();
}
this.publisherLock.lock();
try {
this.publisherNotifyCondition.signalAll();
} finally {
this.publisherLock.unlock();
}
}
}