Code cleanup

This commit is contained in:
nathan 2016-01-20 13:48:47 +01:00
parent 3674d6031b
commit 61756547bb
15 changed files with 206 additions and 178 deletions

View File

@ -22,7 +22,7 @@ import dorkbox.util.messagebus.publication.PublisherExact;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypesAndVarity;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.subscription.WriterDistruptor;
import dorkbox.util.messagebus.subscription.SubscriptionWriterDistruptor;
import dorkbox.util.messagebus.synchrony.AsyncDisruptor;
import dorkbox.util.messagebus.synchrony.Sync;
import dorkbox.util.messagebus.synchrony.Synchrony;
@ -31,7 +31,7 @@ import dorkbox.util.messagebus.synchrony.Synchrony;
* The base class for all message bus implementations with support for asynchronous message dispatch.
*
* See this post for insight on how it operates: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html
* tldr; we use single-writer-principle + Atomic.lazySet
* tldr; we use single-writer-principle + lazySet/get
*
* @author dorkbox, llc
* Date: 2/2/15
@ -40,7 +40,7 @@ public
class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler;
private final WriterDistruptor subscriptionWriter;
private final SubscriptionWriterDistruptor subscriptionWriter;
private final SubscriptionManager subscriptionManager;
@ -93,7 +93,7 @@ class MessageBus implements IMessageBus {
*/
this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler);
subscriptionWriter = new WriterDistruptor(errorHandler, subscriptionManager);
subscriptionWriter = new SubscriptionWriterDistruptor(errorHandler, subscriptionManager);
switch (publishMode) {
@ -133,7 +133,7 @@ class MessageBus implements IMessageBus {
return;
}
// subscriptionManager.subscribe(listener);
// single writer principle
subscriptionWriter.subscribe(listener);
}
@ -144,7 +144,7 @@ class MessageBus implements IMessageBus {
return;
}
// subscriptionManager.unsubscribe(listener);
// single writer principle
subscriptionWriter.unsubscribe(listener);
}

View File

@ -58,7 +58,6 @@ import java.util.Arrays;
* <p/>
* BECAUSE OF THIS, we always treat the two the same
* <p/>
* <p/>
*
* @author bennidi
* Date: 11/14/12
@ -67,14 +66,12 @@ import java.util.Arrays;
*/
public
class MessageHandler {
// publish all listeners defined by the given class (includes
// listeners defined in super classes)
// publish all listeners defined by the given class (includes listeners defined in super classes)
public static
//cache this?
MessageHandler[] get(final Class<?> target) {
MessageHandler[] get(final Class<?> messageClass) {
// publish all handlers (this will include all (inherited) methods directly annotated using @Handler)
final Method[] allMethods = ReflectionUtils.getMethods(target);
final Method[] allMethods = ReflectionUtils.getMethods(messageClass);
final int length = allMethods.length;
final ArrayList<MessageHandler> finalMethods = new ArrayList<MessageHandler>(length);
@ -94,7 +91,7 @@ class MessageHandler {
continue;
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target);
Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, messageClass);
if (overriddenHandler == null) {
overriddenHandler = method;
}

View File

@ -1,29 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public
class Java6Adapter implements MapAdapter {
@Override
public final
<K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize) {
return new ConcurrentHashMap<K, V>(size, loadFactor, stripeSize);
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public
class Java8Adapter implements MapAdapter {
@Override
public final
<K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize) {
return new ConcurrentHashMap<K, V>(size, loadFactor, stripeSize);
}
}

View File

@ -1,42 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ConcurrentMap;
public
class JavaVersionAdapter {
private static final MapAdapter get;
static {
MapAdapter adapter;
try {
Class.forName("java.util.concurrent.locks.StampedLock");
adapter = new Java8Adapter();
} catch (Exception e) {
adapter = new Java6Adapter();
}
get = adapter;
}
public static
<K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize) {
return get.concurrentMap(size, loadFactor, stripeSize);
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ConcurrentMap;
public
interface MapAdapter {
<K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize);
}

View File

@ -35,7 +35,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common;
package dorkbox.util.messagebus.error;
import java.util.Arrays;

View File

@ -15,7 +15,7 @@
*/
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;

View File

@ -15,7 +15,7 @@
*/
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
@ -45,7 +45,7 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run subscriptions
final Subscription[] subscriptions = subManager.getExactAsArray(message1Class); // can return null
final Subscription[] subscriptions = subManager.getExact(message1Class); // can return null
if (subscriptions != null) {
hasSubs = true;
synchrony.publish(subscriptions, message1);
@ -62,7 +62,7 @@ class PublisherExactWithSuperTypes implements Publisher {
// Run dead message subscriptions
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getExactAsArray(DeadMessage.class); // can return null
final Subscription[] deadSubscriptions = subManager.getExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
synchrony.publish(deadSubscriptions, new DeadMessage(message1));
}

View File

@ -15,7 +15,7 @@
*/
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;

View File

@ -63,7 +63,7 @@ class SubscriptionManager {
private volatile IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle;
// keeps track of all subscriptions of the super classes of a message type.
private volatile IdentityMap<Class<?>, Subscription[]> subsPerMessageSuperSingle;
private volatile IdentityMap<Class<?>, Subscription[]> subsPerSuperMessageSingle;
@ -88,10 +88,10 @@ class SubscriptionManager {
IdentityMap.class,
"subsPerMessageSingle");
private final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsPerMessageSuperSingleRef =
private final AtomicReferenceFieldUpdater<SubscriptionManager, IdentityMap> subsPerSuperMessageSingleREF =
AtomicReferenceFieldUpdater.newUpdater(SubscriptionManager.class,
IdentityMap.class,
"subsPerMessageSuperSingle");
"subsPerSuperMessageSingle");
//NOTE for multiArg, can use the memory address concatenated with other ones and then just put it in the 'single" map (convert single to
// use this too). it would likely have to be longs no idea what to do for arrays?? (arrays should verify all the elements are the
@ -107,7 +107,7 @@ class SubscriptionManager {
nonListeners = new IdentityMap<Class<?>, Boolean>(16, LOAD_FACTOR);
subsPerListener = new IdentityMap<>(32, LOAD_FACTOR);
subsPerMessageSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
subsPerMessageSuperSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
subsPerSuperMessageSingle = new IdentityMap<Class<?>, Subscription[]>(32, LOAD_FACTOR);
@ -128,7 +128,7 @@ class SubscriptionManager {
this.nonListeners.clear();
this.subsPerMessageSingle.clear();
this.subsPerMessageSuperSingle.clear();
this.subsPerSuperMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
this.subsPerListener.clear();
@ -195,8 +195,8 @@ class SubscriptionManager {
// create the subscriptions
subscriptions = new Subscription[handlersSize];
final IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle = subsPerMessageSingleREF.get(this);
// final IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle = this.subsPerMessageSingle;
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Class<?>, Subscription[]> local = subsPerMessageSingleREF.get(this);
Subscription subscription;
@ -218,11 +218,10 @@ class SubscriptionManager {
messageHandlerTypes = messageHandler.getHandledMessages();
handlerType = messageHandlerTypes[0];
if (!subsPerMessageSingle.containsKey(handlerType)) {
subsPerMessageSingle.put(handlerType, SUBSCRIPTIONS); // this is copied to a larger array if necessary
if (!local.containsKey(handlerType)) {
local.put(handlerType, SUBSCRIPTIONS); // this is copied to a larger array if necessary
}
// create the subscription. This can be thrown away if the subscription succeeds in another thread
subscription = new Subscription(messageHandler);
subscriptions[i] = subscription;
@ -247,22 +246,21 @@ class SubscriptionManager {
// makes this subscription visible for publication
final Subscription[] currentSubs = subsPerMessageSingle.get(handlerType);
final Subscription[] currentSubs = local.get(handlerType);
final int currentLength = currentSubs.length;
// add the new subscription to the beginning of the array
final Subscription[] newSubs = new Subscription[currentLength + 1];
newSubs[0] = subscription;
System.arraycopy(currentSubs, 0, newSubs, 1, currentLength);
subsPerMessageSingle.put(handlerType, newSubs);
local.put(handlerType, newSubs);
// update the super types/varity types
registerSuperSubs(handlerType);
// update the super types
registerSuperSubs(handlerType, local);
}
subsPerMessageSingleREF.lazySet(this, subsPerMessageSingle);
// SUBS_SINGLE.set(this, subsPerMessageSingle);
// this.subsPerMessageSingle = subsPerMessageSingle;
// save this snapshot back to the original (single writer principle)
subsPerMessageSingleREF.lazySet(this, local);
}
else {
// subscriptions already exist and must only be updated
@ -298,11 +296,11 @@ class SubscriptionManager {
}
private
void registerSuperSubs(final Class<?> clazz) {
void registerSuperSubs(final Class<?> clazz, final IdentityMap<Class<?>, Subscription[]> subsPerMessageSingle) {
final Class<?>[] superClasses = this.classUtils.getSuperClasses(clazz); // never returns null, cached response
final IdentityMap<Class<?>, Subscription[]> local = subsPerMessageSuperSingleRef.get(this);
// final IdentityMap<Class<?>, Subscription[]> local = this.subsPerMessageSuperSingle;
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Class<?>, Subscription[]> local = subsPerSuperMessageSingleREF.get(this);
// types was not empty, so collect subscriptions for each type and collate them
// save the subscriptions
@ -318,7 +316,7 @@ class SubscriptionManager {
// walks through all of the subscriptions that might exist for super types, and if applicable, save them
for (int i = 0; i < length; i++) {
superClass = superClasses[i];
superSubs = getExactAsArray(superClass);
superSubs = subsPerMessageSingle.get(superClass);
if (superSubs != null) {
superSubLength = superSubs.length;
@ -338,9 +336,8 @@ class SubscriptionManager {
subsAsList.toArray(subs);
local.put(clazz, subs);
subsPerMessageSuperSingleRef.lazySet(this, local);
// subsPerMessageSuperSingle = local;
// save this snapshot back to the original (single writer principle)
subsPerSuperMessageSingleREF.lazySet(this, local);
}
}
@ -448,17 +445,18 @@ class SubscriptionManager {
}
}
// can return null
public
Subscription[] getExactAsArray(final Class<?> messageClass) {
Subscription[] getExact(final Class<?> messageClass) {
return (Subscription[]) subsPerMessageSingleREF.get(this).get(messageClass);
// return subsPerMessageSingle.get(messageClass);
}
// can return null
public
Subscription[] getSuperExactAsArray(final Class<?> messageClass) {
// whenever our subscriptions change, this map is cleared.
return (Subscription[]) subsPerMessageSuperSingleRef.get(this).get(messageClass);
return (Subscription[]) subsPerSuperMessageSingleREF.get(this).get(messageClass);
// return this.subsPerMessageSuperSingle.get(messageClass);
}
@ -473,11 +471,6 @@ class SubscriptionManager {
return subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
}
// can return null
public
Subscription[] getExact(final Class<?> messageClass) {
return getExactAsArray(messageClass);
}
// can return null
public
@ -513,7 +506,7 @@ class SubscriptionManager {
// can return null
public
Subscription[] getExactAndSuper(final Class<?> messageClass) {
Subscription[] collection = getExactAsArray(messageClass); // can return null
Subscription[] collection = getExact(messageClass); // can return null
// now publish superClasses
final Subscription[] superSubscriptions = getSuperExactAsArray(messageClass); // can return null

View File

@ -24,7 +24,7 @@ import java.util.concurrent.locks.LockSupport;
* subscriptions. Even with concurrent hashMaps, there is still locks happening during contention.
*/
public
class WriterDistruptor {
class SubscriptionWriterABQ {
private WorkProcessor workProcessor;
private SubscriptionHandler handler;
@ -32,7 +32,7 @@ class WriterDistruptor {
private Sequence workSequence;
public
WriterDistruptor(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) {
SubscriptionWriterABQ(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) {
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(1, 1,

View File

@ -0,0 +1,159 @@
package dorkbox.util.messagebus.subscription;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.publication.disruptor.PublicationExceptionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* Objective of this class is to conform to the "single writer principle", in order to maintain CLEAN AND SIMPLE concurrency for the
* subscriptions. Even with concurrent hashMaps, there is still locks happening during contention.
*/
public
class SubscriptionWriterDistruptor {
private WorkProcessor workProcessor;
private SubscriptionHandler handler;
private RingBuffer<SubscriptionHolder> ringBuffer;
private Sequence workSequence;
public
SubscriptionWriterDistruptor(final ErrorHandlingSupport errorHandler, final SubscriptionManager subscriptionManager) {
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(1, 1,
0, TimeUnit.NANOSECONDS, // handlers are never idle, so this doesn't matter
new java.util.concurrent.LinkedTransferQueue<Runnable>(),
new NamedThreadFactory("MessageBus-Subscriber"));
final PublicationExceptionHandler<SubscriptionHolder> exceptionHandler = new PublicationExceptionHandler<SubscriptionHolder>(errorHandler);
EventFactory<SubscriptionHolder> factory = new SubscriptionFactory();
// setup the work handlers
handler = new SubscriptionHandler(subscriptionManager);
// final int BUFFER_SIZE = ringBufferSize * 64;
final int BUFFER_SIZE = 1024 * 64;
// final int BUFFER_SIZE = 1024;
// final int BUFFER_SIZE = 32;
// final int BUFFER_SIZE = 16;
// final int BUFFER_SIZE = 8;
// final int BUFFER_SIZE = 4;
WaitStrategy consumerWaitStrategy;
// consumerWaitStrategy = new LiteBlockingWaitStrategy(); // good one
// consumerWaitStrategy = new BlockingWaitStrategy();
// consumerWaitStrategy = new YieldingWaitStrategy();
// consumerWaitStrategy = new BusySpinWaitStrategy();
// consumerWaitStrategy = new SleepingWaitStrategy();
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new SleepingWaitStrategy(0));
// consumerWaitStrategy = new PhasedBackoffWaitStrategy(20, 50, TimeUnit.MILLISECONDS, new BlockingWaitStrategy());
consumerWaitStrategy = new PhasedBackoffWaitStrategy(2, 5, TimeUnit.MILLISECONDS, new LiteBlockingWaitStrategy());
ringBuffer = RingBuffer.createMultiProducer(factory, BUFFER_SIZE, consumerWaitStrategy);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// setup the WorkProcessors (these consume from the ring buffer -- one at a time) and tell the "handler" to execute the item
workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
workProcessor = new WorkProcessor<SubscriptionHolder>(ringBuffer, sequenceBarrier, handler, exceptionHandler, workSequence);
// setup the WorkProcessor sequences (control what is consumed from the ring buffer)
final Sequence[] sequences = getSequences();
ringBuffer.addGatingSequences(sequences);
// configure the start position for the WorkProcessors, and start them
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
workProcessor.getSequence()
.set(cursor);
executor.execute(workProcessor);
}
/**
* @param listener is never null
*/
public
void subscribe(final Object listener) {
long seq = ringBuffer.next();
SubscriptionHolder job = ringBuffer.get(seq);
job.doSubscribe = true;
job.listener = listener;
ringBuffer.publish(seq);
}
/**
* @param listener is never null
*/
public
void unsubscribe(final Object listener) {
long seq = ringBuffer.next();
SubscriptionHolder job = ringBuffer.get(seq);
job.doSubscribe = false;
job.listener = listener;
ringBuffer.publish(seq);
}
// gets the sequences used for processing work
private
Sequence[] getSequences() {
final Sequence[] sequences = new Sequence[2];
sequences[0] = workProcessor.getSequence();
sequences[1] = workSequence; // always add the work sequence
return sequences;
}
public
void start() {
}
public
void shutdown() {
workProcessor.halt();
while (!handler.isShutdown()) {
LockSupport.parkNanos(100L); // wait 100ms for handlers to quit
}
}
public
boolean hasPendingMessages() {
// from workerPool.drainAndHalt()
Sequence[] workerSequences = getSequences();
final long cursor = ringBuffer.getCursor();
for (Sequence s : workerSequences) {
if (cursor > s.get()) {
return true;
}
}
return false;
}
}

View File

@ -72,7 +72,7 @@ class VarArgUtils {
// this gets (and caches) our array type. This is never cleared.
final Class<?> arrayVersion = this.superClassUtils.getArrayClass(messageClass);
final Subscription[] subs = subManager.getExactAsArray(arrayVersion);
final Subscription[] subs = subManager.getExact(arrayVersion);
if (subs != null) {
final int length = subs.length;
final ArrayList<Subscription> varArgSubsAsList = new ArrayList<Subscription>(length);
@ -134,7 +134,7 @@ class VarArgUtils {
for (int i = 0; i < typesLength; i++) {
type = types[i];
subs = subManager.getExactAsArray(type);
subs = subManager.getExact(type);
if (subs != null) {
length = subs.length;

View File

@ -24,6 +24,7 @@ package dorkbox.util.messagebus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.error.DeadMessage;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.listeners.ObjectListener;