WIP - single writer principle. Disruptor added, WIP optimizing data structures for hit on writer, instead of reader

This commit is contained in:
nathan 2016-01-19 00:12:10 +01:00
parent e8efa4390b
commit 0401a6a164
17 changed files with 336 additions and 6010 deletions

View File

@ -17,14 +17,15 @@ package dorkbox.util.messagebus;
import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.synchrony.AsyncDisruptor;
import dorkbox.util.messagebus.synchrony.Sync;
import dorkbox.util.messagebus.synchrony.Synchrony;
import dorkbox.util.messagebus.publication.Publisher; import dorkbox.util.messagebus.publication.Publisher;
import dorkbox.util.messagebus.publication.PublisherExact; import dorkbox.util.messagebus.publication.PublisherExact;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes; import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity; import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity;
import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.subscription.WriterDistruptor;
import dorkbox.util.messagebus.synchrony.AsyncDisruptor;
import dorkbox.util.messagebus.synchrony.Sync;
import dorkbox.util.messagebus.synchrony.Synchrony;
/** /**
* The base class for all message bus implementations with support for asynchronous message dispatch. * The base class for all message bus implementations with support for asynchronous message dispatch.
@ -39,6 +40,8 @@ public
class MessageBus implements IMessageBus { class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler; private final ErrorHandlingSupport errorHandler;
private final WriterDistruptor subscriptionWriter;
private final SubscriptionManager subscriptionManager; private final SubscriptionManager subscriptionManager;
private final Publisher publisher; private final Publisher publisher;
@ -90,6 +93,9 @@ class MessageBus implements IMessageBus {
*/ */
this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler); this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler);
subscriptionWriter = new WriterDistruptor(errorHandler, subscriptionManager);
switch (publishMode) { switch (publishMode) {
case Exact: case Exact:
publisher = new PublisherExact(errorHandler, subscriptionManager); publisher = new PublisherExact(errorHandler, subscriptionManager);
@ -123,13 +129,23 @@ class MessageBus implements IMessageBus {
@Override @Override
public public
void subscribe(final Object listener) { void subscribe(final Object listener) {
MessageBus.this.subscriptionManager.subscribe(listener); if (listener == null) {
return;
}
subscriptionManager.subscribe(listener);
// subscriptionWriter.subscribe(listener);
} }
@Override @Override
public public
void unsubscribe(final Object listener) { void unsubscribe(final Object listener) {
MessageBus.this.subscriptionManager.unsubscribe(listener); if (listener == null) {
return;
}
subscriptionManager.unsubscribe(listener);
// subscriptionWriter.unsubscribe(listener);
} }
@Override @Override
@ -232,6 +248,7 @@ class MessageBus implements IMessageBus {
@Override @Override
public public
void shutdown() { void shutdown() {
this.subscriptionWriter.shutdown();
this.asyncPublication.shutdown(); this.asyncPublication.shutdown();
this.subscriptionManager.shutdown(); this.subscriptionManager.shutdown();
} }

View File

@ -15,8 +15,7 @@
*/ */
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -37,7 +36,7 @@ public class HashMapTree<KEY, VALUE> {
@Override @Override
protected protected
Object initialValue() { Object initialValue() {
return JavaVersionAdapter.concurrentMap(INITIAL_SIZE, LOAD_FACTOR, 1); return new ConcurrentHashMap(INITIAL_SIZE, LOAD_FACTOR, 1);
} }
}; };
@ -197,7 +196,7 @@ public class HashMapTree<KEY, VALUE> {
if (checked == null) { if (checked == null) {
final boolean success = children.compareAndSet(null, cached); final boolean success = children.compareAndSet(null, cached);
if (success) { if (success) {
keyCache.set(JavaVersionAdapter.concurrentMap(INITIAL_SIZE, LOAD_FACTOR, 1)); keyCache.set(new ConcurrentHashMap(INITIAL_SIZE, LOAD_FACTOR, 1));
kids = cast(cached); kids = cast(cached);
} }
else { else {

View File

@ -15,7 +15,7 @@
*/ */
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; import java.util.concurrent.ConcurrentHashMap;
/** /**
* This implementation uses strong references to the elements, uses an IdentityHashMap * This implementation uses strong references to the elements, uses an IdentityHashMap
@ -28,11 +28,11 @@ public class StrongConcurrentSetV8<T> extends StrongConcurrentSet<T> {
public StrongConcurrentSetV8(int size, float loadFactor) { public StrongConcurrentSetV8(int size, float loadFactor) {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks) // 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(JavaVersionAdapter.<T, ISetEntry<T>>concurrentMap(size, loadFactor, 16)); super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, 16));
} }
public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) { public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks) // 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(JavaVersionAdapter.<T, ISetEntry<T>>concurrentMap(size, loadFactor, stripeSize)); super(new ConcurrentHashMap<T, ISetEntry<T>>(size, loadFactor, stripeSize));
} }
} }

View File

@ -15,6 +15,7 @@
*/ */
package dorkbox.util.messagebus.common.adapter; package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
public public
@ -23,6 +24,6 @@ class Java6Adapter implements MapAdapter {
@Override @Override
public final public final
<K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize) { <K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize) {
return new ConcurrentHashMapV8<K, V>(size, loadFactor, stripeSize); return new ConcurrentHashMap<K, V>(size, loadFactor, stripeSize);
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -15,12 +15,11 @@
*/ */
package dorkbox.util.messagebus.common.thread; package dorkbox.util.messagebus.common.thread;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -49,7 +48,7 @@ class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
public public
ConcurrentSet(int size, float loadFactor, int stripeSize) { ConcurrentSet(int size, float loadFactor, int stripeSize) {
super(); super();
this.entries = JavaVersionAdapter.concurrentMap(size, loadFactor, 32); this.entries = new ConcurrentHashMap<>(size, loadFactor, stripeSize);
} }
@Override @Override

View File

@ -39,6 +39,7 @@ class PublisherExactWithSuperTypes implements Publisher {
public public
void publish(final Synchrony synchrony, final Object message1) { void publish(final Synchrony synchrony, final Object message1) {
try { try {
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass = message1.getClass(); final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass); // can return null final Subscription[] subscriptions = subManager.getExactAndSuper(messageClass); // can return null

View File

@ -0,0 +1,20 @@
package dorkbox.util.messagebus.subscription;
import com.lmax.disruptor.EventFactory;
/**
* @author dorkbox, llc
* Date: 1/15/16
*/
public class SubscriptionFactory implements EventFactory<SubscriptionHolder> {
public
SubscriptionFactory() {
}
@Override
public
SubscriptionHolder newInstance() {
return new SubscriptionHolder();
}
}

View File

@ -0,0 +1,49 @@
package dorkbox.util.messagebus.subscription;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public
class SubscriptionHandler implements WorkHandler<SubscriptionHolder>, LifecycleAware {
private final SubscriptionManager subscriptionManager;
AtomicBoolean shutdown = new AtomicBoolean(false);
public
SubscriptionHandler(final SubscriptionManager subscriptionManager) {
this.subscriptionManager = subscriptionManager;
}
@Override
public
void onEvent(final SubscriptionHolder event) throws Exception {
if (event.doSubscribe) {
subscriptionManager.subscribe(event.listener);
}
else {
subscriptionManager.unsubscribe(event.listener);
}
}
@Override
public
void onStart() {
}
@Override
public synchronized
void onShutdown() {
shutdown.set(true);
}
public
boolean isShutdown() {
return shutdown.get();
}
}

View File

@ -0,0 +1,13 @@
package dorkbox.util.messagebus.subscription;
/**
* @author dorkbox, llc Date: 1/15/16
*/
public
class SubscriptionHolder {
public boolean doSubscribe;
public Object listener;
public
SubscriptionHolder() {}
}

View File

@ -17,7 +17,6 @@ package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils; import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils; import dorkbox.util.messagebus.utils.SubscriptionUtils;
@ -26,6 +25,7 @@ import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -45,13 +45,15 @@ public final
class SubscriptionManager { class SubscriptionManager {
public static final float LOAD_FACTOR = 0.8F; public static final float LOAD_FACTOR = 0.8F;
// TODO: during startup, precalculate the number of subscription listeners and x2 to save as subsPerListener expected max size
// ONLY used by SUB/UNSUB
// remember already processed classes that do not contain any message handlers // remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners; private final ConcurrentMap<Class<?>, Boolean> nonListeners;
// all subscriptions per messageHandler type // all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing // this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change // once a collection of subscriptions is stored it does not change
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener; private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
@ -69,15 +71,8 @@ class SubscriptionManager {
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti; private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
final AtomicBoolean varArgPossibility = new AtomicBoolean(false); private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
ThreadLocal<List<Subscription>> listCache = new ThreadLocal<List<Subscription>>() {
@Override
protected
List<Subscription> initialValue() {
return new CopyOnWriteArrayList<Subscription>();
}
};
private final ClassUtils classUtils; private final ClassUtils classUtils;
//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to //NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to
@ -91,20 +86,18 @@ class SubscriptionManager {
classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR); classUtils = new ClassUtils(SubscriptionManager.LOAD_FACTOR);
// modified ONLY during SUB/UNSUB // modified ONLY during SUB/UNSUB
this.nonListeners = JavaVersionAdapter.concurrentMap(4, LOAD_FACTOR, numberOfThreads); this.nonListeners = new ConcurrentHashMap<Class<?>, Boolean>(4, LOAD_FACTOR, numberOfThreads);
// only used during SUB/UNSUB subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Subscription[]>(32, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerListener = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, numberOfThreads); subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, List<Subscription>>(32, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(); this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
this.subUtils = new SubscriptionUtils(classUtils, numberOfThreads, LOAD_FACTOR); this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR, numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically. // var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers // it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, numberOfThreads, LOAD_FACTOR); this.varArgUtils = new VarArgUtils(classUtils, LOAD_FACTOR, numberOfThreads);
} }
public public
@ -115,16 +108,12 @@ class SubscriptionManager {
this.subscriptionsPerMessageMulti.clear(); this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerListener.clear(); this.subscriptionsPerListener.clear();
this.classUtils.clear(); this.classUtils.shutdown();
clear(); clear();
} }
public public
void subscribe(final Object listener) { void subscribe(final Object listener) {
if (listener == null) {
return;
}
// when subscribing, this is a GREAT opportunity to figure out the classes/objects loaded -- their hierarchy, AND generate UUIDs // when subscribing, this is a GREAT opportunity to figure out the classes/objects loaded -- their hierarchy, AND generate UUIDs
// for each CLASS that can be accessed. This then lets us lookup a UUID for each object that comes in -- if an ID is found (for // for each CLASS that can be accessed. This then lets us lookup a UUID for each object that comes in -- if an ID is found (for
// any part of it's object hierarchy) -- it means that we have that listeners for that object. this is MUCH faster checking if // any part of it's object hierarchy) -- it means that we have that listeners for that object. this is MUCH faster checking if
@ -184,7 +173,6 @@ class SubscriptionManager {
final AtomicBoolean varArgPossibility = this.varArgPossibility;
Subscription subscription; Subscription subscription;
MessageHandler messageHandler; MessageHandler messageHandler;
@ -209,18 +197,14 @@ class SubscriptionManager {
messageHandlerTypes = messageHandler.getHandledMessages(); messageHandlerTypes = messageHandler.getHandledMessages();
handlerType = messageHandlerTypes[0]; handlerType = messageHandlerTypes[0];
// using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types if (!subsPerMessageSingle.containsKey(handlerType)) {
final List<Subscription> cachedSubs = listCache.get(); subsPerMessageSingle.put(handlerType, new CopyOnWriteArrayList<Subscription>());
List<Subscription> subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs);
if (subs == null) {
listCache.set(new CopyOnWriteArrayList<Subscription>());
} }
// create the subscription. This can be thrown away if the subscription succeeds in another thread // create the subscription. This can be thrown away if the subscription succeeds in another thread
subscription = new Subscription(messageHandler); subscription = new Subscription(messageHandler);
subscriptions[i] = subscription; subscriptions[i] = subscription;
// now add this subscription to each of the handled types
} }
// now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions // now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions
@ -264,11 +248,8 @@ class SubscriptionManager {
public public
void unsubscribe(final Object listener) { void unsubscribe(final Object listener) {
if (listener == null) {
return;
}
final Class<?> listenerClass = listener.getClass(); final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) { if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers // early reject of known classes that do not define message handlers
return; return;
@ -302,7 +283,7 @@ class SubscriptionManager {
public public
void clear() { void clear() {
this.subUtils.clear(); this.subUtils.clear();
this.varArgUtils.clear(); // this.varArgUtils.clear();
} }
// inside a write lock // inside a write lock
@ -326,21 +307,21 @@ class SubscriptionManager {
return; return;
} }
case 1: { case 1: {
// using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types // // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
final List<Subscription> cachedSubs = listCache.get(); // final List<Subscription> cachedSubs = listCache.get();
List<Subscription> subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs); // List<Subscription> subs = subsPerMessageSingle.putIfAbsent(type0, cachedSubs);
if (subs == null) { // if (subs == null) {
listCache.set(new CopyOnWriteArrayList<Subscription>()); // listCache.set(new CopyOnWriteArrayList<Subscription>());
// listCache.set(new ArrayList<Subscription>(8)); //// listCache.set(new ArrayList<Subscription>(8));
subs = cachedSubs; // subs = cachedSubs;
//
// is this handler able to accept var args? // // is this handler able to accept var args?
if (handler.getVarArgClass() != null) { // if (handler.getVarArgClass() != null) {
varArgPossibility.lazySet(true); // varArgPossibility.lazySet(true);
} // }
} // }
//
subs.add(subscription); // subs.add(subscription);
return; return;
} }
case 2: { case 2: {

View File

@ -0,0 +1,159 @@
package dorkbox.util.messagebus.subscription;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.publication.disruptor.PublicationExceptionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* Objective of this class is to conform to the "single writer principle", in order to maintain CLEAN AND SIMPLE concurrency for the
* subscriptions. Even with concurrent hashMaps, there is still locks happening during contention.
*/
public
class WriterDistruptor {
private WorkProcessor workProcessor;
private SubscriptionHandler handler;
private RingBuffer<SubscriptionHolder> ringBuffer;
private Sequence workSequence;
public
WriterDistruptor(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) {
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(1, 1,
0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter
new java.util.concurrent.LinkedTransferQueue<Runnable>(),
new NamedThreadFactory("MessageBus-Subscriber"));
final PublicationExceptionHandler<SubscriptionHolder> exceptionHandler = new PublicationExceptionHandler<SubscriptionHolder>(errorHandler);
EventFactory<SubscriptionHolder> factory = new SubscriptionFactory();
// setup the work handlers
handler = new SubscriptionHandler(subscriptionManager);
// final int BUFFER_SIZE = ringBufferSize * 64;
final int BUFFER_SIZE = 1024 * 64;
// final int BUFFER_SIZE = 1024;
// final int BUFFER_SIZE = 32;
// final int BUFFER_SIZE = 16;
// final int BUFFER_SIZE = 8;
// final int BUFFER_SIZE = 4;
WaitStrategy consumerWaitStrategy;
// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy();
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
workProcessor = new WorkProcessor<SubscriptionHolder>(ringBuffer, sequenceBarrier, handler, exceptionHandler, workSequence);
// setup the WorkProcessor sequences (control what is consumed from the ring buffer)
final Sequence[] sequences = getSequences();
ringBuffer.addGatingSequences(sequences);
// configure the start position for the WorkProcessors, and start them
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
workProcessor.getSequence()
.set(cursor);
executor.execute(workProcessor);
}
/**
* @param listener is never null
*/
public
void subscribe(final Object listener) {
long seq = ringBuffer.next();
SubscriptionHolder job = ringBuffer.get(seq);
job.doSubscribe = true;
job.listener = listener;
ringBuffer.publish(seq);
}
/**
* @param listener is never null
*/
public
void unsubscribe(final Object listener) {
long seq = ringBuffer.next();
SubscriptionHolder job = ringBuffer.get(seq);
job.doSubscribe = false;
job.listener = listener;
ringBuffer.publish(seq);
}
// gets the sequences used for processing work
private
Sequence[] getSequences() {
final Sequence[] sequences = new Sequence[2];
sequences[0] = workProcessor.getSequence();
sequences[1] = workSequence; // always add the work sequence
return sequences;
}
public
void start() {
}
public
void shutdown() {
workProcessor.halt();
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
public
boolean hasPendingMessages() {
// from workerPool.drainAndHalt()
Sequence[] workerSequences = getSequences();
final long cursor = ringBuffer.getCursor();
for (Sequence s : workerSequences) {
if (cursor > s.get()) {
return true;
}
}
return false;
}
}

View File

@ -77,9 +77,10 @@ class AsyncDisruptor implements Synchrony {
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item // setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
final int numWorkers = handlers.length; final int numWorkers = handlers.length;
workProcessors = new WorkProcessor[numWorkers]; workProcessors = new WorkProcessor[numWorkers];
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
for (int i = 0; i < numWorkers; i++) { for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<MessageHolder>(ringBuffer, workProcessors[i] = new WorkProcessor<MessageHolder>(ringBuffer,

View File

@ -15,11 +15,10 @@
*/ */
package dorkbox.util.messagebus.utils; package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map; import java.util.Map;
public final public final
@ -28,26 +27,30 @@ class ClassUtils {
private final Map<Class<?>, Class<?>> arrayCache; private final Map<Class<?>, Class<?>> arrayCache;
private final Map<Class<?>, Class<?>[]> superClassesCache; private final Map<Class<?>, Class<?>[]> superClassesCache;
/**
* These data structures are never reset because the class hierarchy doesn't change at runtime
*/
public public
ClassUtils(final float loadFactor) { ClassUtils(final float loadFactor) {
this.arrayCache = JavaVersionAdapter.concurrentMap(32, loadFactor, 1); // this.arrayCache = JavaVersionAdapter.concurrentMap(32, loadFactor, 1);
this.superClassesCache = JavaVersionAdapter.concurrentMap(32, loadFactor, 1); // this.superClassesCache = JavaVersionAdapter.concurrentMap(32, loadFactor, 1);
this.arrayCache = new IdentityHashMap<Class<?>, Class<?>>(32);
this.superClassesCache = new IdentityHashMap<Class<?>, Class<?>[]>(32);
} }
/** /**
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
* <p/>
* if parameter clazz is of type array, then the super classes are of array type as well * if parameter clazz is of type array, then the super classes are of array type as well
* <p/> * <p>
* protected by read lock by caller. The cache version is called first, by write lock * race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/ */
public public
Class<?>[] getSuperClasses(final Class<?> clazz) { Class<?>[] getSuperClasses(final Class<?> clazz) {
// this is never reset, since it never needs to be. // this is never reset, since it never needs to be.
final Map<Class<?>, Class<?>[]> local = this.superClassesCache; final Map<Class<?>, Class<?>[]> cache = this.superClassesCache;
Class<?>[] classes = local.get(clazz); Class<?>[] classes = cache.get(clazz);
if (classes == null) { if (classes == null) {
// publish all super types of class // publish all super types of class
@ -82,7 +85,7 @@ class ClassUtils {
classes = new Class<?>[newList.size()]; classes = new Class<?>[newList.size()];
newList.toArray(classes); newList.toArray(classes);
local.put(clazz, classes); cache.put(clazz, classes);
} }
return classes; return classes;
@ -91,18 +94,18 @@ class ClassUtils {
/** /**
* race conditions will result in duplicate answers, which we don't care if happens * race conditions will result in duplicate answers, which we don't care if happens
* never returns null * never returns null
* never reset * never resets
*/ */
public public
Class<?> getArrayClass(final Class<?> c) { Class<?> getArrayClass(final Class<?> c) {
final Map<Class<?>, Class<?>> arrayCache = this.arrayCache; final Map<Class<?>, Class<?>> cache = this.arrayCache;
Class<?> clazz = arrayCache.get(c); Class<?> clazz = cache.get(c);
if (clazz == null) { if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays // messy, but the ONLY way to do it. Array super types are also arrays
final Object[] newInstance = (Object[]) Array.newInstance(c, 1); final Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass(); clazz = newInstance.getClass();
arrayCache.put(c, clazz); cache.put(c, clazz);
} }
return clazz; return clazz;
@ -110,10 +113,10 @@ class ClassUtils {
/** /**
* Clears the caches * Clears the caches, should only be called on shutdown
*/ */
public public
void clear() { void shutdown() {
this.arrayCache.clear(); this.arrayCache.clear();
this.superClassesCache.clear(); this.superClassesCache.clear();
} }

View File

@ -16,13 +16,13 @@
package dorkbox.util.messagebus.utils; package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final public final
class SubscriptionUtils { class SubscriptionUtils {
@ -36,13 +36,13 @@ class SubscriptionUtils {
public public
SubscriptionUtils(final ClassUtils superClass, final int numberOfThreads, final float loadFactor) { SubscriptionUtils(final ClassUtils superClass, final float loadFactor, final int numberOfThreads) {
this.superClass = superClass; this.superClass = superClass;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers // it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, numberOfThreads); this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, ArrayList<Subscription>>(8, loadFactor, numberOfThreads);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(); this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
} }
@ -57,8 +57,6 @@ class SubscriptionUtils {
* Returns an array COPY of the super subscriptions for the specified type. * Returns an array COPY of the super subscriptions for the specified type.
* <p/> * <p/>
* This ALSO checks to see if the superClass accepts subtypes. * This ALSO checks to see if the superClass accepts subtypes.
* <p/>
* protected by read lock by caller
* *
* @return CAN NOT RETURN NULL * @return CAN NOT RETURN NULL
*/ */
@ -110,8 +108,6 @@ class SubscriptionUtils {
* Returns an array COPY of the super subscriptions for the specified type. * Returns an array COPY of the super subscriptions for the specified type.
* <p/> * <p/>
* This ALSO checks to see if the superClass accepts subtypes. * This ALSO checks to see if the superClass accepts subtypes.
* <p/>
* protected by read lock by caller
* *
* @return CAN NOT RETURN NULL * @return CAN NOT RETURN NULL
*/ */
@ -178,8 +174,6 @@ class SubscriptionUtils {
* Returns an array COPY of the super subscriptions for the specified type. * Returns an array COPY of the super subscriptions for the specified type.
* <p/> * <p/>
* This ALSO checks to see if the superClass accepts subtypes. * This ALSO checks to see if the superClass accepts subtypes.
* <p/>
* protected by read lock by caller
* *
* @return CAN NOT RETURN NULL * @return CAN NOT RETURN NULL
*/ */

View File

@ -17,13 +17,13 @@ package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager; import dorkbox.util.messagebus.subscription.SubscriptionManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final public final
class VarArgUtils { class VarArgUtils {
@ -37,14 +37,14 @@ class VarArgUtils {
public public
VarArgUtils(final ClassUtils superClassUtils, final int numberOfThreads, final float loadFactor) { VarArgUtils(final ClassUtils superClassUtils, final float loadFactor, final int numberOfThreads) {
this.superClassUtils = superClassUtils; this.superClassUtils = superClassUtils;
this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, numberOfThreads); this.varArgSubscriptionsSingle = new ConcurrentHashMap<Class<?>, ArrayList<Subscription>>(16, loadFactor, numberOfThreads);
this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(); this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, numberOfThreads); this.varArgSuperSubscriptionsSingle = new ConcurrentHashMap<Class<?>, ArrayList<Subscription>>(16, loadFactor, numberOfThreads);
this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(); this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
} }