From 9acffac11ba3a776e0f4e73733ae81348954e9ee Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 1 Mar 2015 12:29:35 +0100 Subject: [PATCH] wip exchanger. wip simpleq --- .../util/messagebus/MultiMBassador.java | 25 +++++---- .../common/simpleq/ExchangerQueue.java | 51 +++++++++++++++++++ .../common/simpleq/SimpleQueue.java | 44 ++++++++-------- 3 files changed, 83 insertions(+), 37 deletions(-) create mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/ExchangerQueue.java diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 28331fb..0d35b26 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -4,7 +4,6 @@ import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.Collection; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; import com.lmax.disruptor.MessageHolder; @@ -63,7 +62,7 @@ public class MultiMBassador implements IMessageBus { * By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages */ public MultiMBassador() { - this(Runtime.getRuntime().availableProcessors()); + this(Runtime.getRuntime().availableProcessors()/2); // this(2); } @@ -108,18 +107,18 @@ public class MultiMBassador implements IMessageBus { while (true) { try { - spins = maxSpins; - while ((event = IN_QUEUE.poll()) == null) { - if (spins > 100) { - --spins; - } else if (spins > 0) { - --spins; - LockSupport.parkNanos(1L); - } else { +// spins = maxSpins; +// while ((event = IN_QUEUE.poll()) == null) { +// if (spins > 100) { +// --spins; +// } else if (spins > 0) { +// --spins; +// LockSupport.parkNanos(1L); +// } else { event = IN_QUEUE.take(); - break; - } - } +// break; +// } +// } Object message1 = event.message1; IN_QUEUE.release(event); diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/ExchangerQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/ExchangerQueue.java new file mode 100644 index 0000000..f6fba4e --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/ExchangerQueue.java @@ -0,0 +1,51 @@ +package dorkbox.util.messagebus.common.simpleq; + +import java.util.concurrent.Exchanger; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.lmax.disruptor.MessageHolder; + + + +public class ExchangerQueue { + + private final AtomicInteger availableThreads = new AtomicInteger(); + + private final Exchanger exchanger = new Exchanger(); + + ThreadLocal holder = new ThreadLocal() { + @Override + protected MessageHolder initialValue() { + return new MessageHolder(); + } + }; + + + public ExchangerQueue(int numberOfThreads) { + this.availableThreads.set(numberOfThreads); + } + + public void transfer(Object message1) throws InterruptedException { + MessageHolder messageHolder = this.holder.get(); + messageHolder.message1 = message1; + this.holder.set(this.exchanger.exchange(messageHolder)); + } + + public boolean hasPendingMessages() { + return false; + } + + public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { + // TODO Auto-generated method stub + + } + + public MessageHolder poll() { + return null; + } + + public MessageHolder take(MessageHolder holder) throws InterruptedException { + return this.exchanger.exchange(holder); + } +} diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index f21e018..c4c8f33 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -11,7 +11,7 @@ import com.lmax.disruptor.MessageHolder; -public class SimpleQueue { +public final class SimpleQueue { private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); @@ -25,7 +25,6 @@ public class SimpleQueue { private final AtomicReference consumer = new AtomicReference(); private final AtomicReference producer = new AtomicReference(); - private volatile boolean waitingProducer = false; private final AtomicInteger availableThreads = new AtomicInteger(); @@ -40,10 +39,10 @@ public class SimpleQueue { if ((holder = this.producer.getAndSet(null)) == null) { this.publisherLock.lock(); try { - while ((holder = this.producer.getAndSet(null)) == null) { - this.waitingProducer = true; + do { this.publisherNotifyCondition.await(); - } +// LockSupport.parkNanos(1L); + } while ((holder = this.producer.getAndSet(null)) == null); } finally { this.publisherLock.unlock(); } @@ -69,20 +68,22 @@ public class SimpleQueue { } - public MessageHolder poll() { - return this.consumer.getAndSet(null); - } +// public MessageHolder poll() { +// return this.consumer.getAndSet(null); +// } public MessageHolder take() throws InterruptedException { MessageHolder holder = null; - this.consumerLock.lock(); - try { - while ((holder = this.consumer.getAndSet(null)) == null) { - this.consumerNotifyCondition.await(); + if ((holder = this.consumer.getAndSet(null)) == null) { + this.consumerLock.lock(); + try { + do { + this.consumerNotifyCondition.await(); + } while ((holder = this.consumer.getAndSet(null)) == null); + } finally { + this.consumerLock.unlock(); } - } finally { - this.consumerLock.unlock(); } @@ -94,16 +95,11 @@ public class SimpleQueue { public void release(MessageHolder holder) { this.producer.set(holder); - if (this.waitingProducer) { - this.publisherLock.lock(); - try { - if (this.waitingProducer) { - this.waitingProducer = false; - this.publisherNotifyCondition.signalAll(); - } - } finally { - this.publisherLock.unlock(); - } + this.publisherLock.lock(); + try { + this.publisherNotifyCondition.signalAll(); + } finally { + this.publisherLock.unlock(); } } }