Added JCTools MPMC queue, added more benchmark tests, WIP simpleQ

This commit is contained in:
nathan 2015-04-09 23:08:12 +02:00
parent 9acffac11b
commit f6bf52c57f
38 changed files with 3644 additions and 308 deletions

View File

@ -8,7 +8,10 @@ import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.MessageHolder;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.simpleq.HandlerFactory;
import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
@ -42,7 +45,7 @@ public class MultiMBassador implements IMessageBus {
// private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
// private final DisruptorQueue dispatchQueue;
private final SimpleQueue dispatchQueue;
private final SimpleQueue<MessageHolder> dispatchQueue;
private final SubscriptionManager subscriptionManager;
@ -62,8 +65,7 @@ public class MultiMBassador implements IMessageBus {
* By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages
*/
public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors()/2);
// this(2);
this(Runtime.getRuntime().availableProcessors());
}
/**
@ -87,7 +89,15 @@ public class MultiMBassador implements IMessageBus {
this.numberOfThreads = numberOfThreads;
// this.dispatchQueue = new DisruptorQueue(this, numberOfThreads);
this.dispatchQueue = new SimpleQueue(numberOfThreads);
HandlerFactory<MessageHolder> factory = new HandlerFactory<MessageHolder>() {
@Override
public MessageHolder newInstance() {
return new MessageHolder();
}
};
this.dispatchQueue = new SimpleQueue<MessageHolder>(numberOfThreads, factory);
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
this.threads = new ArrayDeque<Thread>(numberOfThreads);
@ -96,15 +106,15 @@ public class MultiMBassador implements IMessageBus {
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@SuppressWarnings("null")
@Override
public void run() {
// TransferQueue<Runnable> IN_QUEUE = MultiMBassador.this.dispatchQueue;
SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue;
// Runnable event = null;
MessageHolder event = null;
int spins;
MessageHolder messageHolder = new MessageHolder();
while (true) {
try {
// spins = maxSpins;
@ -115,13 +125,13 @@ public class MultiMBassador implements IMessageBus {
// --spins;
// LockSupport.parkNanos(1L);
// } else {
event = IN_QUEUE.take();
IN_QUEUE.take(messageHolder);
// break;
// }
// }
Object message1 = event.message1;
IN_QUEUE.release(event);
Object message1 = messageHolder.message1;
// IN_QUEUE.release(event);
// event.run();
publish(message1);
@ -183,22 +193,33 @@ public class MultiMBassador implements IMessageBus {
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass = message.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
StrongConcurrentSetV8<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean subsPublished = false;
ISetEntry<Subscription> current;
Subscription sub;
// Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription sub : subscriptions) {
current = subscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, message);
}
}
if (!this.forceExactMatches) {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
StrongConcurrentSetV8<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
for (Subscription sub : superSubscriptions) {
current = superSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, message);
}
@ -209,18 +230,22 @@ public class MultiMBassador implements IMessageBus {
if (!messageClass.isArray()) {
Object[] asArray = null;
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
for (Subscription sub : varargSubscriptions) {
current = varargSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
}
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
if (asArray == null) {
@ -228,7 +253,11 @@ public class MultiMBassador implements IMessageBus {
asArray[0] = message;
}
for (Subscription sub : varargSuperSubscriptions) {
current = varargSuperSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
subsPublished |= sub.publishToSubscription(this, asArray);
}
@ -238,10 +267,15 @@ public class MultiMBassador implements IMessageBus {
if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
StrongConcurrentSetV8<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : deadSubscriptions) {
current = deadSubscriptions.head;
while (current != null) {
sub = current.getValue();
current = current.next();
// this catches all exception types
sub.publishToSubscription(this, deadMessage);
}

View File

@ -2,15 +2,14 @@ package dorkbox.util.messagebus;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.SubscriptionHolder;
import dorkbox.util.messagebus.common.SuperClassIterator;
@ -41,7 +40,7 @@ public class SubscriptionManager {
private static final float LOAD_FACTOR = 0.8F;
// this keeps us from having to constantly recheck our cache for subscriptions
private static final Collection<Subscription> EMPTY_SUBS = Collections.emptyList();
private static final StrongConcurrentSetV8<Subscription> EMPTY_SUBS = new StrongConcurrentSetV8<Subscription>(0, LOAD_FACTOR, 1);
// the metadata reader that is used to inspect objects passed to the subscribe method
private static final MetadataReader metadataReader = new MetadataReader();
@ -52,26 +51,26 @@ public class SubscriptionManager {
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final ConcurrentMap<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final ConcurrentMap<Class<?>, Collection<Subscription>> subscriptionsPerListener;
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerListener;
private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Collection<Class<?>>> superClassesCache;
private final Map<Class<?>, StrongConcurrentSetV8<Class<?>>> superClassesCache;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private final Map<Class<?>, Collection<Subscription>> superClassSubscriptions;
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti;
private final Map<Class<?>, Collection<Subscription>> varArgSubscriptions;
private final Map<Class<?>, Collection<Subscription>> varArgSuperClassSubscriptions;
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSubscriptions;
private final Map<Class<?>, StrongConcurrentSetV8<Subscription>> varArgSuperClassSubscriptions;
// stripe size of maps for concurrency
private final int STRIPE_SIZE;
@ -87,27 +86,27 @@ public class SubscriptionManager {
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, SubscriptionManager.LOAD_FACTOR);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
}
// modified by N threads
{
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Collection<Class<?>>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Class<?>>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
}
this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1);
@ -150,11 +149,11 @@ public class SubscriptionManager {
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency
synchronized(listenerClass) {
ConcurrentMap<Class<?>, Collection<Subscription>> subsPerListener2 = this.subscriptionsPerListener;
Collection<Subscription> subsPerListener = subsPerListener2.get(listenerClass);
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerListener2 = this.subscriptionsPerListener;
StrongConcurrentSetV8<Subscription> subsPerListener = subsPerListener2.get(listenerClass);
if (subsPerListener == null) {
// a listener is subscribed for the first time
Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
StrongConcurrentSetV8<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size();
if (handlersSize == 0) {
@ -163,9 +162,14 @@ public class SubscriptionManager {
return;
} else {
subsPerListener = new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1);
ConcurrentMap<Class<?>, Collection<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
ISetEntry<MessageHandler> current = messageHandlers.head;
MessageHandler messageHandler;
while (current != null) {
messageHandler = current.getValue();
current = current.next();
for (MessageHandler messageHandler : messageHandlers) {
Collection<Subscription> subsPerType = null;
// now add this subscription to each of the handled types
@ -173,7 +177,7 @@ public class SubscriptionManager {
int size = types.length;
switch (size) {
case 1: {
Collection<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], this.subHolderConcurrent.get());
StrongConcurrentSetV8<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], this.subHolderConcurrent.get());
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
@ -235,7 +239,12 @@ public class SubscriptionManager {
}
} else {
// subscriptions already exist and must only be updated
for (Subscription subscription : subsPerListener) {
ISetEntry<Subscription> current = subsPerListener.head;
Subscription subscription;
while (current != null) {
subscription = current.getValue();
current = current.next();
subscription.subscribe(listener);
}
}
@ -258,9 +267,14 @@ public class SubscriptionManager {
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
StrongConcurrentSetV8<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
for (Subscription subscription : subscriptions) {
ISetEntry<Subscription> current = subscriptions.head;
Subscription subscription;
while (current != null) {
subscription = current.getValue();
current = current.next();
subscription.unsubscribe(listener);
}
}
@ -271,19 +285,24 @@ public class SubscriptionManager {
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*/
private Collection<Class<?>> getSuperClass(Class<?> clazz) {
private StrongConcurrentSetV8<Class<?>> getSuperClass(Class<?> clazz) {
// this is never reset, since it never needs to be.
Map<Class<?>, Collection<Class<?>>> local = this.superClassesCache;
Map<Class<?>, StrongConcurrentSetV8<Class<?>>> local = this.superClassesCache;
Collection<Class<?>> superTypes = local.get(clazz);
StrongConcurrentSetV8<Class<?>> superTypes = local.get(clazz);
if (superTypes == null) {
boolean isArray = clazz.isArray();
// it doesn't matter if concurrent access stomps on values, since they are always the same.
superTypes = ReflectionUtils.getSuperTypes(clazz);
StrongConcurrentSet<Class<?>> set = new StrongConcurrentSetV8<Class<?>>(superTypes.size() + 1, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
StrongConcurrentSetV8<Class<?>> set = new StrongConcurrentSetV8<Class<?>>(superTypes.size() + 1, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
ISetEntry<Class<?>> current = superTypes.head;
Class<?> c;
while (current != null) {
c = current.getValue();
current = current.next();
for (Class<?> c : superTypes) {
if (isArray) {
c = getArrayClass(c);
}
@ -316,7 +335,7 @@ public class SubscriptionManager {
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType);
}
@ -339,22 +358,30 @@ public class SubscriptionManager {
// CAN RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSubscriptions(Class<?> varArgType) {
Map<Class<?>, Collection<Subscription>> local = this.varArgSubscriptions;
public StrongConcurrentSetV8<Subscription> getVarArgSubscriptions(Class<?> varArgType) {
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(varArgType);
StrongConcurrentSetV8<Subscription> subsPerType = local.get(varArgType);
if (subsPerType == null) {
// this caches our array type. This is never cleared.
Class<?> arrayVersion = getArrayClass(varArgType);
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local2 = this.subscriptionsPerMessageSingle;
subsPerType = new StrongConcurrentSetV8<Subscription>(local2.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
Collection<Subscription> subs = local2.get(arrayVersion);
ISetEntry<Subscription> current;
Subscription sub;
StrongConcurrentSetV8<Subscription> subs = local2.get(arrayVersion);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
current = subs.head;
while (current != null) {
sub = current.getValue();
current = current.next();
if (sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
@ -372,32 +399,43 @@ public class SubscriptionManager {
// CAN RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) {
Map<Class<?>, Collection<Subscription>> local = this.varArgSuperClassSubscriptions;
public StrongConcurrentSetV8<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) {
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.varArgSuperClassSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(varArgType);
StrongConcurrentSetV8<Subscription> subsPerType = local.get(varArgType);
if (subsPerType == null) {
// this caches our array type. This is never cleared.
Class<?> arrayVersion = getArrayClass(varArgType);
Collection<Class<?>> types = getSuperClass(arrayVersion);
StrongConcurrentSetV8<Class<?>> types = getSuperClass(arrayVersion);
if (types.isEmpty()) {
local.put(varArgType, EMPTY_SUBS);
return null;
}
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local2 = this.subscriptionsPerMessageSingle;
subsPerType = new StrongConcurrentSetV8<Subscription>(local2.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
ISetEntry<Subscription> current;
Subscription sub;
Iterator<Class<?>> iterator = types.iterator();
while (iterator.hasNext()) {
Class<?> superClass = iterator.next();
ISetEntry<Class<?>> current1;
Class<?> superClass;
current1 = types.head;
Collection<Subscription> subs = local2.get(superClass);
while (current1 != null) {
superClass = current1.getValue();
current1 = current1.next();
StrongConcurrentSetV8<Subscription> subs = local2.get(superClass);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
current = subs.head;
while (current != null) {
sub = current.getValue();
current = current.next();
if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
@ -415,30 +453,42 @@ public class SubscriptionManager {
// ALSO checks to see if the superClass accepts subtypes.
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
Map<Class<?>, Collection<Subscription>> local = this.superClassSubscriptions;
public final StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType) {
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(superType);
StrongConcurrentSetV8<Subscription> subsPerType = local.get(superType);
if (subsPerType == null) {
// this caches our class hierarchy. This is never cleared.
Collection<Class<?>> types = getSuperClass(superType);
StrongConcurrentSetV8<Class<?>> types = getSuperClass(superType);
if (types.isEmpty()) {
local.put(superType, EMPTY_SUBS);
return null;
}
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local2 = this.subscriptionsPerMessageSingle;
subsPerType = new StrongConcurrentSetV8<Subscription>(types.size() + 1, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
Iterator<Class<?>> iterator = types.iterator();
while (iterator.hasNext()) {
Class<?> superClass = iterator.next();
ISetEntry<Subscription> current;
Subscription sub;
Collection<Subscription> subs = local2.get(superClass);
ISetEntry<Class<?>> current1;
Class<?> superClass;
current1 = types.head;
while (current1 != null) {
superClass = current1.getValue();
current1 = current1.next();
StrongConcurrentSetV8<Subscription> subs = local2.get(superClass);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
current = subs.head;
while (current != null) {
sub = current.getValue();
current = current.next();
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}

View File

@ -24,7 +24,7 @@ public abstract class AbstractConcurrentSet<T> implements Set<T> {
// Internal state
protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
private final transient Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
protected transient Entry<T> head; // reference to the first element
public transient Entry<T> head; // reference to the first element
protected AbstractConcurrentSet(Map<T, ISetEntry<T>> entries) {
this.entries = entries;

View File

@ -4,7 +4,6 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Set;
import dorkbox.util.messagebus.annotations.Handler;
@ -17,11 +16,11 @@ import dorkbox.util.messagebus.annotations.Handler;
*/
public class ReflectionUtils {
public static Collection<Method> getMethods(Class<?> target) {
public static StrongConcurrentSetV8<Method> getMethods(Class<?> target) {
return getMethods(target, new StrongConcurrentSetV8<Method>(16, .8F, 1));
}
public static Collection<Method> getMethods(Class<?> target, Collection<Method> methods) {
public static StrongConcurrentSetV8<Method> getMethods(Class<?> target, StrongConcurrentSetV8<Method> methods) {
try {
for (Method method : target.getDeclaredMethods()) {
if (getAnnotation(method, Handler.class) != null) {
@ -66,8 +65,8 @@ public class ReflectionUtils {
* @param from The root class to start with
* @return A set of classes, each representing a super type of the root class
*/
public static Set<Class<?>> getSuperTypes(Class<?> from) {
Set<Class<?>> superclasses = new StrongConcurrentSetV8<Class<?>>(8, 0.8F, 1);
public static StrongConcurrentSetV8<Class<?>> getSuperTypes(Class<?> from) {
StrongConcurrentSetV8<Class<?>> superclasses = new StrongConcurrentSetV8<Class<?>>(8, 0.8F, 1);
collectInterfaces( from, superclasses );
@ -79,7 +78,7 @@ public class ReflectionUtils {
return superclasses;
}
public static void collectInterfaces( Class<?> from, Set<Class<?>> accumulator ) {
public static void collectInterfaces( Class<?> from, StrongConcurrentSetV8<Class<?>> accumulator ) {
for ( Class<?> intface : from.getInterfaces() ) {
accumulator.add( intface );
collectInterfaces( intface, accumulator );
@ -87,8 +86,14 @@ public class ReflectionUtils {
}
//
public static boolean containsOverridingMethod(final Collection<Method> allMethods, final Method methodToCheck) {
for (Method method : allMethods) {
public static boolean containsOverridingMethod(final StrongConcurrentSetV8<Method> allMethods, final Method methodToCheck) {
ISetEntry<Method> current = allMethods.head;
Method method;
while (current != null) {
method = current.getValue();
current = current.next();
if (isOverriddenBy(methodToCheck, method)) {
return true;
}

View File

@ -40,7 +40,7 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
if (this.current == null) {
return null;
}
else {
else {
T value = this.current.getValue();
this.current = this.current.next();
return value;

View File

@ -1,10 +1,8 @@
package dorkbox.util.messagebus.common;
import java.util.Collection;
import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionHolder extends ThreadLocal<Collection<Subscription>> {
public class SubscriptionHolder extends ThreadLocal<StrongConcurrentSetV8<Subscription>> {
private final int stripeSize;
private final float loadFactor;
@ -17,7 +15,7 @@ public class SubscriptionHolder extends ThreadLocal<Collection<Subscription>> {
}
@Override
protected Collection<Subscription> initialValue() {
protected StrongConcurrentSetV8<Subscription> initialValue() {
return new StrongConcurrentSetV8<Subscription>(16, this.loadFactor, this.stripeSize);
}
}

View File

@ -1,51 +0,0 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.lmax.disruptor.MessageHolder;
public class ExchangerQueue {
private final AtomicInteger availableThreads = new AtomicInteger();
private final Exchanger<MessageHolder> exchanger = new Exchanger<MessageHolder>();
ThreadLocal<MessageHolder> holder = new ThreadLocal<MessageHolder>() {
@Override
protected MessageHolder initialValue() {
return new MessageHolder();
}
};
public ExchangerQueue(int numberOfThreads) {
this.availableThreads.set(numberOfThreads);
}
public void transfer(Object message1) throws InterruptedException {
MessageHolder messageHolder = this.holder.get();
messageHolder.message1 = message1;
this.holder.set(this.exchanger.exchange(messageHolder));
}
public boolean hasPendingMessages() {
return false;
}
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
}
public MessageHolder poll() {
return null;
}
public MessageHolder take(MessageHolder holder) throws InterruptedException {
return this.exchanger.exchange(holder);
}
}

View File

@ -0,0 +1,5 @@
package dorkbox.util.messagebus.common.simpleq;
public interface HandlerFactory<E> {
public E newInstance();
}

View File

@ -0,0 +1,213 @@
package dorkbox.util.messagebus.common.simpleq;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
public final class MpmcExchangerQueue<M> extends MpmcArrayQueueConsumerField<Node<M>> {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 500; // orig: 2000
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
/** Creates a {@code EliminationStack} that is initially empty. */
public MpmcExchangerQueue(final HandlerFactory<M> factory, final int size) {
super(size);
//////////////
// pre-fill our data structures
//////////////
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentProducerIndex;
long pSeqOffset;
for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) {
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
spElement(elementOffset, new Node<M>(factory.newInstance()));
} else {
// something is seriously wrong. This should never happen.
throw new RuntimeException("Unable to prefill exchangerQueue");
}
}
}
/**
* PRODUCER
* @return null iff queue is full
*/
public Node<M> put() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long currentProducerIndex;
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
final Node<M> e = lpElement(offset);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
return null;
}
// another producer has moved the sequence by one, retry 2
// only producer will busySpin if contention
// busySpin();
}
}
/**
* CONSUMER
* @return null iff empty
*/
public Node<M> take() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long currentProducerIndex;
long cSeqOffset;
long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
currentProducerIndex = lvProducerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currentConsumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Node<M> e = lpElement(offset);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// contention. we WILL have data in the Q, we just got to it too quickly
return null;
}
// another consumer beat us and moved sequence ahead, retry 2
// only producer busyspins
}
}
private void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
@Override
public boolean offer(Node<M> message) {
return false;
}
@Override
public Node<M> poll() {
return null;
}
@Override
public Node<M> peek() {
return null;
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
}
}

View File

@ -0,0 +1,195 @@
package dorkbox.util.messagebus.common.simpleq;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField;
public final class MpmcExchangerQueueAlt<M> extends MpmcArrayQueueConsumerField<Node<M>> {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
private final Node<M> newInstance;
/** Creates a {@code EliminationStack} that is initially empty. */
public MpmcExchangerQueueAlt(final HandlerFactory<M> factory, final int size) {
super(size);
this.newInstance = new Node<M>(factory.newInstance());
//////////////
// pre-fill our data structures
//////////////
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long pSeqOffset;
long currentProducerIndex;
for (currentProducerIndex = 0; currentProducerIndex < size; currentProducerIndex++) {
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex, mask);
spElement(elementOffset, this.newInstance);
}
pSeqOffset = calcSequenceOffset(0, mask);
soSequence(sBuffer, pSeqOffset, 0); // StoreStore
}
/**
* PRODUCER
* @return null iff queue is full
*/
public Node<M> put() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = this.mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long currentProducerIndex;
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
final Node<M> e = lpElement(offset);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return null;
busySpin();
}
// another producer has moved the sequence by one, retry 2
// only producer will busySpin if contention
// busySpin();
}
}
/**
* CONSUMER
* @return null iff empty
*/
public Node<M> take() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long cSeqOffset;
long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currentConsumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final Node<M> e = lpElement(offset);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
// contention. we WILL have data in the Q, we just got to it too quickly
busySpin();
}
// another consumer beat us and moved sequence ahead, retry 2
// only producer busyspins
}
}
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
@Override
public boolean offer(Node<M> message) {
return false;
}
@Override
public Node<M> poll() {
return null;
}
@Override
public Node<M> peek() {
return null;
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
}
}

View File

@ -0,0 +1,33 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicReference;
public class Node<E> {
// Improve likelihood of isolation on <= 64 byte cache lines
// see: http://mechanitis.blogspot.de/2011/07/dissecting-disruptor-why-its-so-fast_22.html
public long x0, x1, x2, x3, x4, x5, x6, x7;
public volatile E item;
public long y0, y1, y2, y3, y4, y5, y6, y7;
/** The Thread waiting to be signaled to wake up*/
public AtomicReference<Thread> waiter = new AtomicReference<Thread>();
public long z0, z1, z2, z3, z4, z5, z6, z7;
public Node(E item) {
this.item = item;
}
// prevent JIT from optimizing away the padding
public final long sum() {
return this.x0 + this.x1 + this.x2 + this.x3 + this.x4 + this.x5 + this.x6 + this.x7 +
this.y0 + this.y1 + this.y2 + this.y3 + this.y4 + this.y5 + this.y6 + this.y7 +
this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2014 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicBoolean;
public class ObjectPoolHolder<T> {
// enough padding for 64bytes with 4byte refs, to alleviate contention across threads CASing one vs the other.
@SuppressWarnings("unused")
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
private T value;
AtomicBoolean state = new AtomicBoolean(true);
transient volatile Thread waiter; // to control park/unpark
public ObjectPoolHolder(T value) {
this.value = value;
}
public T getValue() {
return this.value;
}
}

View File

@ -0,0 +1,27 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicReference;
/**
* An AtomicReference with heuristic padding to lessen cache effects of this heavily CAS'ed location. While the padding adds
* noticeable space, all slots are created only on demand, and there will be more than one of them only when it would improve throughput
* more than enough to outweigh using extra space.
*/
public class PaddedAtomicReference<T> extends AtomicReference<T> {
private static final long serialVersionUID = 1L;
// Improve likelihood of isolation on <= 64 byte cache lines
public long z0, z1, z2, z3, z4, z5, z6, z7, z8, z9, za, zb, zc, zd, ze;
public PaddedAtomicReference() {
}
public PaddedAtomicReference(T value) {
super(value);
}
// prevent JIT from optimizing away the padding
public final long sum() {
return this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7 + this.z8 + this.z9 + this.za + this.zb + + this.zc + this.zd + this.ze;
}
}

View File

@ -0,0 +1,26 @@
package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.atomic.AtomicReference;
import com.lmax.disruptor.MessageHolder;
/**
* Nodes hold partially exchanged data. This class opportunistically subclasses AtomicReference to represent the hole. So get() returns
* hole, and compareAndSet CAS'es value into hole. This class cannot be parameterized as "V" because of the use of non-V CANCEL sentinels.
*/
final class ProducerNode extends AtomicReference<MessageHolder> {
private static final long serialVersionUID = 1L;
/** The element offered by the Thread creating this node. */
public volatile MessageHolder item = new MessageHolder();
/** The Thread waiting to be signalled; null until waiting. */
public volatile Thread waiter = null;
// for the stack, for waiting producers
public ProducerNode next = null;
public ProducerNode() {
}
}

View File

@ -3,103 +3,223 @@ package dorkbox.util.messagebus.common.simpleq;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.MessageHolder;
public final class SimpleQueue<M extends MessageHolder> {
static {
// Prevent rare disastrous classloading in first call to LockSupport.park.
// See: https://bugs.openjdk.java.net/browse/JDK-8074773
@SuppressWarnings("unused")
Class<?> ensureLoaded = LockSupport.class;
LockSupport.unpark(Thread.currentThread());
}
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
public final class SimpleQueue {
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 600;
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private final MpmcExchangerQueueAlt<M> consumersWaiting;
private final MpmcExchangerQueueAlt<M> producersWaiting;
private final Lock publisherLock = new ReentrantLock();
private final Condition publisherNotifyCondition = this.publisherLock.newCondition();
private final AtomicInteger currentCount = new AtomicInteger(0);
private final Lock consumerLock = new ReentrantLock();
private final Condition consumerNotifyCondition = this.consumerLock.newCondition();
private final int numberConsumerThreads;
private final AtomicReference<MessageHolder> consumer = new AtomicReference<MessageHolder>();
private final AtomicReference<MessageHolder> producer = new AtomicReference<MessageHolder>();
public SimpleQueue(int numberConsumerThreads, HandlerFactory<M> factory) {
private final AtomicInteger availableThreads = new AtomicInteger();
public SimpleQueue(int numberOfThreads) {
this.availableThreads.set(numberOfThreads);
this.producer.set(new MessageHolder());
this.numberConsumerThreads = numberConsumerThreads;
this.consumersWaiting = new MpmcExchangerQueueAlt<M>(factory, 1<<14);
this.producersWaiting = new MpmcExchangerQueueAlt<M>(factory, 1<<14);
}
public void transfer(Object message1) throws InterruptedException {
MessageHolder holder = null;
// decrement count
// <0: no consumers available, add to Q, park and wait
// >=0: consumers available, get one from the parking lot
if ((holder = this.producer.getAndSet(null)) == null) {
this.publisherLock.lock();
try {
do {
this.publisherNotifyCondition.await();
// LockSupport.parkNanos(1L);
} while ((holder = this.producer.getAndSet(null)) == null);
} finally {
this.publisherLock.unlock();
Thread myThread = Thread.currentThread();
for (;;) {
final int count = this.currentCount.get();
if (this.currentCount.compareAndSet(count, count - 1)) {
if (count <= 0) {
// <=0: no consumers available (PUSH_P, PARK_P)
Node<M> producer = this.producersWaiting.put();
if (producer == null || producer.item == null) {
System.err.println("KAPOW");
}
producer.item.message1 = message1;
if (!park(producer, myThread)) {
throw new InterruptedException();
}
return;
} else {
// >0: consumers available (TAKE_C, UNPARK_C)
Node<M> consumer = this.consumersWaiting.take();
while (consumer == null) {
// busySpin();
consumer = this.consumersWaiting.take();
}
consumer.item.message1 = message1;
unpark(consumer, myThread);
return;
}
}
// contention
busySpin();
}
}
public void take(MessageHolder item) throws InterruptedException {
// increment count
// >=0: no producers available, park and wait
// <0: producers available, get one from the Q
Thread myThread = Thread.currentThread();
for (;;) {
final int count = this.currentCount.get();
if (this.currentCount.compareAndSet(count, count + 1)) {
if (count >= 0) {
// >=0: no producers available (PUT_C, PARK_C)
Node<M> consumer = this.consumersWaiting.put();
if (!park(consumer, myThread)) {
throw new InterruptedException();
}
return;
} else {
// <0: producers available (TAKE_P, UNPARK_P)
Node<M> producer = this.producersWaiting.take();
while (producer == null) {
// busySpin();
producer = this.producersWaiting.take();
}
item.message1 = producer.item.message1;
unpark(producer, myThread);
return;
}
}
// contention
busySpin();
}
}
/**
* @param myThread
* @return false if we were interrupted, true if we were unparked by another thread
*/
private boolean park(Node<M> myNode, Thread myThread) {
AtomicReference<Thread> waiter = myNode.waiter;
Thread thread;
for (;;) {
thread = waiter.get();
if (waiter.compareAndSet(thread, myThread)) {
if (thread == null) {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else if (waiter.get() != myThread) {
break;
} else {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park();
if (myThread.isInterrupted()) {
waiter.set(null);
return false;
}
break;
}
}
// do {
// // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
// LockSupport.park();
// if (myThread.isInterrupted()) {
// myNode.waiter.set(null);
// return false;
// }
// } while (myNode.waiter.get() == myThread);
waiter.set(null);
return true;
} else if (thread != myThread) {
// no parking
return true;
} else {
// contention
busySpin();
}
}
}
}
holder.message1 = message1;
this.consumer.set(holder);
/**
* Unparks the other node (if it was waiting)
*/
private void unpark(Node<M> otherNode, Thread myThread) {
AtomicReference<Thread> waiter = otherNode.waiter;
Thread thread;
this.consumerLock.lock();
try {
this.consumerNotifyCondition.signalAll();
} finally {
this.consumerLock.unlock();
for (;;) {
thread = waiter.get();
if (waiter.compareAndSet(thread, myThread)) {
if (thread == null) {
// no parking
return;
} else if (thread != myThread) {
// park will always set the waiter back to null
LockSupport.unpark(thread);
return;
} else {
// contention
busySpin();
}
}
}
}
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
public boolean hasPendingMessages() {
return false;
// count the number of consumers waiting, it should be the same as the number of threads configured
return this.consumersWaiting.size() == this.numberConsumerThreads;
}
public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
}
// public MessageHolder poll() {
// return this.consumer.getAndSet(null);
// }
public MessageHolder take() throws InterruptedException {
MessageHolder holder = null;
if ((holder = this.consumer.getAndSet(null)) == null) {
this.consumerLock.lock();
try {
do {
this.consumerNotifyCondition.await();
} while ((holder = this.consumer.getAndSet(null)) == null);
} finally {
this.consumerLock.unlock();
}
}
return holder;
}
// release the event back to the publisher
// notify publisher in case pub was waiting
public void release(MessageHolder holder) {
this.producer.set(holder);
this.publisherLock.lock();
try {
this.publisherNotifyCondition.signalAll();
} finally {
this.publisherLock.unlock();
}
}
}

View File

@ -0,0 +1,458 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out).
* The <em>head</em> of the queue is that element that has been on the
* queue the longest time.
* The <em>tail</em> of the queue is that element that has been on the
* queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.
* A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
* many threads will share access to a common collection.
* This queue does not permit <tt>null</tt> elements.
*
* <p>This implementation employs an efficient &quot;wait-free&quot;
* algorithm based on one described in <a
* href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
* Fast, and Practical Non-Blocking and Blocking Concurrent Queue
* Algorithms</a> by Maged M. Michael and Michael L. Scott.
*
* <p>Beware that, unlike in most collections, the <tt>size</tt> method
* is <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
*
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code ConcurrentLinkedQueue}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code ConcurrentLinkedQueue} in another thread.
*
* <p>This class is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
*
*/
public class ConcurrentLinkedQueueSimple<E> extends AbstractQueue<E> implements Queue<E> {
/*
* This is a straight adaptation of Michael & Scott algorithm.
* For explanation, read the paper. The only (minor) algorithmic
* difference is that this version supports lazy deletion of
* internal nodes (method remove(Object)) -- remove CAS'es item
* fields to null. The normal queue operations unlink but then
* pass over nodes with null item fields. Similarly, iteration
* methods ignore those with nulls.
*
* Also note that like most non-blocking algorithms in this
* package, this implementation relies on the fact that in garbage
* collected systems, there is no possibility of ABA problems due
* to recycled nodes, so there is no need to use "counted
* pointers" or related techniques seen in versions used in
* non-GC'ed settings.
*/
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
private static final AtomicReferenceFieldUpdater<Node, Node>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
private static final AtomicReferenceFieldUpdater<Node, Object>
itemUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Object.class, "item");
Node(E x) { this.item = x; }
Node(E x, Node<E> n) { this.item = x; this.next = n; }
E getItem() {
return this.item;
}
boolean casItem(E cmp, E val) {
return itemUpdater.compareAndSet(this, cmp, val);
}
void setItem(E val) {
itemUpdater.set(this, val);
}
Node<E> getNext() {
return this.next;
}
boolean casNext(Node<E> cmp, Node<E> val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
void setNext(Node<E> val) {
nextUpdater.set(this, val);
}
}
private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueueSimple, Node>
tailUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueueSimple.class, Node.class, "tail");
private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueueSimple, Node>
headUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueueSimple.class, Node.class, "head");
private boolean casTail(Node<E> cmp, Node<E> val) {
return tailUpdater.compareAndSet(this, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return headUpdater.compareAndSet(this, cmp, val);
}
/**
* Pointer to header node, initialized to a dummy node. The first
* actual node is at head.getNext().
*/
private transient volatile Node<E> head = new Node<E>(null, null);
/** Pointer to last node on list **/
private transient volatile Node<E> tail = this.head;
/**
* Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
*/
public ConcurrentLinkedQueueSimple() {}
/**
* Creates a <tt>ConcurrentLinkedQueue</tt>
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ConcurrentLinkedQueueSimple(Collection<? extends E> c) {
for (Iterator<? extends E> it = c.iterator(); it.hasNext();) {
add(it.next());
}
}
// Have to override just to update the javadoc
/**
* Inserts the specified element at the tail of this queue.
*
* @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
@Override
public boolean add(E e) {
return offer(e);
}
/**
* Inserts the specified element at the tail of this queue.
*
* @return <tt>true</tt> (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
Node<E> n = new Node<E>(e, null);
for (;;) {
Node<E> t = this.tail;
Node<E> s = t.getNext();
if (t == this.tail) {
if (s == null) {
if (t.casNext(s, n)) {
casTail(t, n);
return true;
}
} else {
casTail(t, s);
}
}
}
}
@Override
public E poll() {
for (;;) {
Node<E> h = this.head;
Node<E> t = this.tail;
Node<E> first = h.getNext();
if (h == this.head) {
if (h == t) {
if (first == null) {
return null;
} else {
casTail(t, first);
}
} else if (casHead(h, first)) {
E item = first.getItem();
if (item != null) {
first.setItem(null);
return item;
}
// else skip over deleted item, continue loop,
}
}
}
}
@Override
public E peek() { // same as poll except don't remove item
for (;;) {
Node<E> h = this.head;
Node<E> t = this.tail;
Node<E> first = h.getNext();
if (h == this.head) {
if (h == t) {
if (first == null) {
return null;
} else {
casTail(t, first);
}
} else {
E item = first.getItem();
if (item != null) {
return item;
} else {
casHead(h, first);
}
}
}
}
}
/**
* Returns the first actual (non-header) node on list. This is yet
* another variant of poll/peek; here returning out the first
* node, not element (so we cannot collapse with peek() without
* introducing race.)
*/
Node<E> first() {
for (;;) {
Node<E> h = this.head;
Node<E> t = this.tail;
Node<E> first = h.getNext();
if (h == this.head) {
if (h == t) {
if (first == null) {
return null;
} else {
casTail(t, first);
}
} else {
if (first.getItem() != null) {
return first;
} else {
casHead(h, first);
}
}
}
}
}
/**
* Returns <tt>true</tt> if this queue contains no elements.
*
* @return <tt>true</tt> if this queue contains no elements
*/
@Override
public boolean isEmpty() {
return first() == null;
}
/**
* Returns the number of elements in this queue. If this queue
* contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
*
* @return the number of elements in this queue
*/
@Override
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = p.getNext()) {
if (p.getItem() != null) {
// Collections.size() spec says to max out
if (++count == Integer.MAX_VALUE) {
break;
}
}
}
return count;
}
/**
* Returns <tt>true</tt> if this queue contains the specified element.
* More formally, returns <tt>true</tt> if and only if this queue contains
* at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
*
* @param o object to be checked for containment in this queue
* @return <tt>true</tt> if this queue contains the specified element
*/
@Override
public boolean contains(Object o) {
if (o == null) {
return false;
}
for (Node<E> p = first(); p != null; p = p.getNext()) {
E item = p.getItem();
if (item != null &&
o.equals(item)) {
return true;
}
}
return false;
}
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element <tt>e</tt> such
* that <tt>o.equals(e)</tt>, if this queue contains one or more such
* elements.
* Returns <tt>true</tt> if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return <tt>true</tt> if this queue changed as a result of the call
*/
@Override
public boolean remove(Object o) {
if (o == null) {
return false;
}
for (Node<E> p = first(); p != null; p = p.getNext()) {
E item = p.getItem();
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) {
return true;
}
}
return false;
}
/**
* Returns an iterator over the elements in this queue in proper sequence.
* The returned iterator is a "weakly consistent" iterator that
* will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
@Override
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/**
* Next node to return item for.
*/
private Node<E> nextNode;
/**
* nextItem holds on to item fields because once we claim
* that an element exists in hasNext(), we must return it in
* the following next() call even if it was in the process of
* being removed when hasNext() was called.
*/
private E nextItem;
/**
* Node of the last returned item, to support remove.
*/
private Node<E> lastRet;
Itr() {
advance();
}
/**
* Moves to next valid node and returns item to return for
* next(), or null if no such.
*/
private E advance() {
this.lastRet = this.nextNode;
E x = this.nextItem;
Node<E> p = this.nextNode == null? first() : this.nextNode.getNext();
for (;;) {
if (p == null) {
this.nextNode = null;
this.nextItem = null;
return x;
}
E item = p.getItem();
if (item != null) {
this.nextNode = p;
this.nextItem = item;
return x;
} else {
p = p.getNext();
}
}
}
@Override
public boolean hasNext() {
return this.nextNode != null;
}
@Override
public E next() {
if (this.nextNode == null) {
throw new NoSuchElementException();
}
return advance();
}
@Override
public void remove() {
Node<E> l = this.lastRet;
if (l == null) {
throw new IllegalStateException();
}
// rely on a future traversal to relink.
l.setItem(null);
this.lastRet = null;
}
}
}

View File

@ -0,0 +1,355 @@
/*
* Copyright 2013 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
/**
* An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The <em>top</em> of the stack
* is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A
* {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection.
* Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements.
* <p>
* This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique
* avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is
* described in <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728">A Scalable Lock-free Stack Algorithm</a>.
* <p>
* Iterators are <i>weakly consistent</i>, returning elements reflecting the state of the stack at some point at or since the creation of
* the iterator. They do <em>not</em> throw {@link java.util.ConcurrentModificationException}, and may proceed concurrently with other
* operations. Elements contained in the stack since the creation of the iterator will be returned exactly once.
* <p>
* Beware that, unlike in most collections, the {@code size} method is <em>NOT</em> a constant-time operation. Because of the asynchronous
* nature of these stacks, determining the current number of elements requires a traversal of the elements, and so may report inaccurate
* results if this collection is modified during traversal.
*
* @author ben.manes@gmail.com (Ben Manes)
* @see <a href="https://github.com/ben-manes/caffeine">Caffeine</a>
* @param <E> the type of elements held in this collection
*/
public final class EliminationStack<E> {
/*
* A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value
* atomically.
*
* The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows
* pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and
* therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a
* collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the
* same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a
* cancellation occurs.
*
* This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a
* match [4].
*
* [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data
* Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free
* fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel
* http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396
*/
/** The number of CPUs */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The number of slots in the elimination array. */
static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
static final int LOOKAHEAD = Math.min(4, NCPU);
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
static final int SPINS = NCPU == 1 ? 0 : 2000;
/** The number of times to spin per lookahead step */
static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
static final Object FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
static final Object CONSUMER_WAITER = new Object();
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
static final Object WAITER = new Object();
static int ceilingNextPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1);
}
/** The top of the stack. */
final AtomicReference<Node<E>> top;
/** The arena where slots can be used to perform an exchange. */
final AtomicReference<Object>[] arena;
/** Creates a {@code EliminationStack} that is initially empty. */
@SuppressWarnings("unchecked")
public EliminationStack() {
this.top = new AtomicReference<Node<E>>();
this.arena = new AtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new AtomicReference<Object>();
}
}
/**
* Returns <tt>true</tt> if this stack contains no elements.
*
* @return <tt>true</tt> if this stack contains no elements
*/
public boolean hasPendingMessages() {
for (;;) {
Node<E> node = this.top.get();
if (node == null) {
return false;
}
E e = node.get();
if (e == null) {
this.top.compareAndSet(node, node.next);
} else {
return true;
}
}
}
/**
* Removes and returns the top element or returns <tt>null</tt> if this stack is empty.
*
* @return the top of this stack, or <tt>null</tt> if this stack is empty
*/
public E pop() {
for (;;) {
Node<E> current = this.top.get();
if (current == null) {
return null;
}
// Attempt to pop from the stack, backing off to the elimination array if contended
if (this.top.get() == current && this.top.compareAndSet(current, current.next)) {
return current.get();
}
E e = tryReceive();
if (e != null) {
return e;
}
}
}
/**
* Pushes an element onto the stack (in other words, adds an element at the top of this stack).
*
* @param e the element to push
*/
public void push(E e) {
Node<E> node = new Node<E>(e);
for (;;) {
node.next = this.top.get();
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.get() == node.next && this.top.compareAndSet(node.next, node)) {
return;
}
if (tryTransfer(e)) {
return;
}
}
}
public boolean remove(Object o) {
for (Node<E> node = this.top.get(); node != null; node = node.next) {
E value = node.get();
if (o.equals(value) && node.compareAndSet(value, null)) {
return true;
}
}
return false;
}
/**
* Attempts to transfer the element to a waiting consumer.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean tryTransfer(E e) {
int start = startIndex();
return scanAndTransferToWaiter(e, start) || awaitExchange(e, start);
}
/**
* Scans the arena searching for a waiting consumer to exchange with.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean scanAndTransferToWaiter(E e, int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// if some thread is waiting to receive an element then attempt to provide it
if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
}
}
return false;
}
/**
* Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and
* spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another
* slot until a total spin limit is reached.
*
* @param e the element to transfer
* @param start the arena location to start at
* @return if an exchange was completed successfully
*/
boolean awaitExchange(E e, int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
} else if (found == FREE && slot.compareAndSet(FREE, e)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != e) {
return true;
} else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) {
// failed to transfer the element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
}
// failed to transfer the element; give up
return false;
}
/**
* Attempts to receive an element from a waiting provider.
*
* @return an element if successfully transfered or null if unsuccessful
*/
E tryReceive() {
int start = startIndex();
E e = scanAndMatch(start);
return e == null ? awaitMatch(start) : e;
}
/**
* Scans the arena searching for a waiting producer to transfer from.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
E scanAndMatch(int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// accept a transfer if an element is available
Object found = slot.get();
if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
E e = (E) found;
return e;
}
}
return null;
}
/**
* Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and
* spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by
* selecting another slot until a total spin limit is reached.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
E awaitMatch(int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == FREE) {
if (slot.compareAndSet(FREE, WAITER)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
E e = (E) found;
return e;
} else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) {
// failed to receive an element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
} else if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
E e = (E) found;
return e;
}
}
// failed to receive an element; give up
return null;
}
/**
* Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code
* (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform
* distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a
* simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function.
*/
static int startIndex() {
long id = Thread.currentThread().getId();
return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193;
}
/**
* An item on the stack. The node is mutable prior to being inserted to avoid object churn and is immutable by the time it has been
* published to other threads.
*/
static final class Node<E> extends AtomicReference<E> {
private static final long serialVersionUID = 1L;
Node<E> next;
Node(E value) {
super(value);
}
}
}

View File

@ -0,0 +1,492 @@
/*
* Copyright 2013 Ben Manes. All Rights Reserved.
*
* Note, that we cannot use a Treiber stack, since that suffers from ABA problems if we reuse nodes.
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.MessageHolder;
import dorkbox.util.messagebus.common.simpleq.PaddedAtomicReference;
//http://www.cs.bgu.ac.il/~hendlerd/papers/EfficientCAS.pdf
//This is an implementation of the **BEST** CAS FIFO queue for XEON (INTEL) architecture.
//This is the WRONG approach for running on SPARC.
//More info at: http://java.dzone.com/articles/wanna-get-faster-wait-bit
// copywrite dorkbox, llc
/**
* An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The <em>top</em> of the stack
* is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A
* {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection.
* Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements.
* <p>
* This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique
* avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is
* described in <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728">A Scalable Lock-free Stack Algorithm</a>.
* <p>
* Iterators are <i>weakly consistent</i>, returning elements reflecting the state of the stack at some point at or since the creation of
* the iterator. They do <em>not</em> throw {@link java.util.ConcurrentModificationException}, and may proceed concurrently with other
* operations. Elements contained in the stack since the creation of the iterator will be returned exactly once.
* <p>
* Beware that, unlike in most collections, the {@code size} method is <em>NOT</em> a constant-time operation. Because of the asynchronous
* nature of these stacks, determining the current number of elements requires a traversal of the elements, and so may report inaccurate
* results if this collection is modified during traversal.
*
* @author ben.manes@gmail.com (Ben Manes)
* @see <a href="https://github.com/ben-manes/caffeine">Caffeine</a>
* @param <E> the type of elements held in this collection
*/
public final class ExchangerStackORG<E extends MessageHolder> {
static {
// Prevent rare disastrous classloading in first call to LockSupport.park.
// See: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
LockSupport.unpark(Thread.currentThread());
}
/*
* A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value
* atomically.
*
* The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows
* pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and
* therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a
* collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the
* same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a
* cancellation occurs.
*
* This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a
* match [4].
*
* [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data
* Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free
* fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel
* http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396
*/
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The number of slots in the elimination array. */
private static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
private static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
private static final int LOOKAHEAD = Math.min(4, NCPU);
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 2000;
/** The number of times to spin per lookahead step */
private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
private static final Object FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
private static final Object WAITER = new Object();
private static final Object READY = null;
private static final Object LOCK_PARK_FIRST = new Object();
private static final Object LOCK_UNPARK_FIRST = new Object();
private static final Object UNLOCK = new Object();
private static int ceilingNextPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1);
}
/** The top of the stack. */
private final AtomicReference<NodeORG<E>> top;
/** The arena where slots can be used to perform an exchange. */
private final AtomicReference<Object>[] arena;
private final int numberConsumerThreads;
private final ValueCopier<E> copier;
/** Creates a {@code EliminationStack} that is initially empty. */
@SuppressWarnings("unchecked")
public ExchangerStackORG(int numberConsumerThreads, ValueCopier<E> copier) {
this.numberConsumerThreads = numberConsumerThreads;
this.copier = copier;
this.top = new PaddedAtomicReference<NodeORG<E>>();
this.arena = new PaddedAtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new PaddedAtomicReference<Object>();
}
}
public void put(NodeORG<E> producer) throws InterruptedException {
Thread producerThread = Thread.currentThread();
producer.next = null;
producer.waiter = producerThread;
NodeORG<E> topNode;
for (;;) {
topNode = this.top.get();
// it's a [EMPTY or PRODUCER], so we just add ourself as another producer waiting to get popped by a consumer
if (topNode == null || !topNode.isConsumer) {
producer.next = topNode;
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(topNode, producer)) {
// now we wait
if (!park(producer, producerThread)) {
// have to make sure to pass up exceptions
throw new InterruptedException();
}
return;
}
// Contention, so back off to the elimination array
// busySpin();
if (tryTransfer(producerThread, producer)) {
// means we got a consumer and our data has been transferred to it.
return;
}
}
else {
// if consumer, pop it, xfer our data to it, wake it, done
if (this.top.compareAndSet(topNode, topNode.next)) {
// xfer data
this.copier.copyValues(producer.item, topNode.item);
unpark(topNode);
return;
} else {
// contention, so back off
// busySpinPerStep();
}
}
}
}
public void take(NodeORG<E> consumer) throws InterruptedException {
// consumers ALWAYS use the same thread for the same node, so setting the waiter again is not necessary
Thread consumerThread = Thread.currentThread();
NodeORG<E> topNode;
for (;;) {
topNode = this.top.get();
// it's a [EMPTY or CONSUMER], so we just add ourself as another consumer waiting to get popped by a producer
if (topNode == null || topNode.isConsumer) {
consumer.next = topNode;
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(topNode, consumer)) {
// now we wait
if (!park(consumer, consumerThread)) {
// have to make sure to pass up exceptions
throw new InterruptedException();
}
return;
}
// Contention, so back off to the elimination array
// busySpin();
// node = tryReceive(consumerThread);
// if (node != null) {
// // we got a PRODUCER. Have to transfer producer data data to myself
// this.copier.copyValues(node.item, consumer.item);
// return;
// }
}
else {
// if producer, pop it, xfer it's data to us, wake it, done
if (this.top.compareAndSet(topNode, topNode.next)) {
// forget old head (it's still referenced by top)
topNode.next = null;
// xfer data
this.copier.copyValues(topNode.item, consumer.item);
unpark(topNode);
return;
} else {
// contention, so back off
// busySpinPerStep();
}
}
}
}
/**
* @return false if we were interrupted, true if we were unparked by another thread
*/
private boolean park(NodeORG<E> myNode, Thread myThread) {
for (;;) {
if (myNode.state.compareAndSet(READY, LOCK_PARK_FIRST)) {
do {
// park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked.
LockSupport.park();
if (myThread.isInterrupted()) {
myNode.state.set(READY);
return false;
}
} while (myNode.state.get() != UNLOCK);
myNode.state.set(READY);
return true;
} else if (myNode.state.compareAndSet(LOCK_UNPARK_FIRST, READY)) {
// no parking
return true;
}
}
}
/**
* Unparks the other node (if it was waiting)
*/
private void unpark(NodeORG<E> otherNode) {
// busy spin if we are NOT in the READY or LOCK state
for (;;) {
if (otherNode.state.compareAndSet(READY, LOCK_UNPARK_FIRST)) {
// no parking
return;
} else if (otherNode.state.compareAndSet(LOCK_PARK_FIRST, UNLOCK)) {
LockSupport.unpark(otherNode.waiter);
return;
}
}
}
private void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
private void busySpinPerStep() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS_PER_STEP;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
/**
* @return true if all of our consumers are currently waiting for data from producers
*/
public boolean hasPendingMessages() {
// count the number of consumers waiting, it should be the same as the number of threads configured
NodeORG<E> node;
node = this.top.get();
if (node == null || !node.isConsumer) {
return false;
}
int size = 0;
for (node = this.top.get(); node != null; node = node.next) {
if (node.isConsumer) {
size++;
}
}
// return true if we have ALL consumers waiting for data
return size != this.numberConsumerThreads;
}
/**
* Attempts to transfer the element to a waiting consumer.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
private boolean tryTransfer(Thread thread, NodeORG<E> e) {
int start = startIndex(thread);
return scanAndTransferToWaiter(e, start) || awaitExchange(e, start);
}
/**
* Scans the arena searching for a waiting consumer to exchange with.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
private boolean scanAndTransferToWaiter(NodeORG<E> e, int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// if some thread is waiting to receive an element then attempt to provide it
if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
}
}
return false;
}
/**
* Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and
* spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another
* slot until a total spin limit is reached.
*
* @param e the element to transfer
* @param start the arena location to start at
* @return if an exchange was completed successfully
*/
private boolean awaitExchange(NodeORG<E> e, int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
} else if (found == FREE && slot.compareAndSet(FREE, e)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != e) {
return true;
} else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) {
// failed to transfer the element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
}
// failed to transfer the element; give up
return false;
}
/**
* Attempts to receive an element from a waiting producer.
*
* @return an element if successfully transfered or null if unsuccessful
*/
private NodeORG<E> tryReceive(Thread thread) {
int start = startIndex(thread);
NodeORG<E> e = scanAndMatch(start);
if (e == null) {
return awaitMatch(start);
} else {
return e;
}
}
/**
* Scans the arena searching for a waiting producer to transfer from.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
private NodeORG<E> scanAndMatch(int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// accept a transfer if an element is available
Object found = slot.get();
if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
return null;
}
/**
* Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and
* spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by
* selecting another slot until a total spin limit is reached.
*
* @param start the arena location to start at
* @param node
* @return an element if successfully transfered or null if unsuccessful
*/
private NodeORG<E> awaitMatch(int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == FREE) {
if (slot.compareAndSet(FREE, WAITER)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
} else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) {
// failed to receive an element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
} else if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
// failed to receive an element; give up
return null;
}
/**
* Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code
* (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform
* distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a
* simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function.
*/
private static int startIndex(Thread thread) {
long id = thread.getId();
return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193;
}
}

View File

@ -0,0 +1,42 @@
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
public class NodeORG<E> {
// Improve likelihood of isolation on <= 64 byte cache lines
public long z0, z1, z2, z3, z4, z5, z6, z7, z8, z9, za, zb, zc, zd, ze;
public volatile E item;
/** The Thread waiting to be signaled to wake up*/
public volatile Thread waiter;
public AtomicReference<Object> state = new AtomicReference<Object>();
public volatile NodeORG<E> next;
public volatile short ID = 0;
public final boolean isConsumer;
private NodeORG(boolean isConsumer, E item) {
this.isConsumer = isConsumer;
this.item = item;
}
// prevent JIT from optimizing away the padding
public final long sum() {
return this.z0 + this.z1 + this.z2 + this.z3 + this.z4 + this.z5 + this.z6 + this.z7 + this.z8 + this.z9 + this.za + this.zb + + this.zc + this.zd + this.ze;
}
public static <E> NodeORG<E> newProducer(E item) {
// producers VARY which node is used on which thread. (so the waiter is set in the put method)
return new NodeORG<E>(false, item);
}
public static <E> NodeORG<E> newConsumer(E item) {
// consumers will always use the SAME node in the SAME thread
NodeORG<E> node = new NodeORG<E>(true, item);
node.waiter = Thread.currentThread();
return node;
}
}

View File

@ -0,0 +1,315 @@
/*
* Modified by Dorkbox, llc
*
*
* Copyright 2013 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.bakup;
import java.util.concurrent.atomic.AtomicReference;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.PaddedAtomicReference;
/**
* An unbounded thread-safe stack based on linked nodes. This stack orders elements LIFO (last-in-first-out). The <em>top</em> of the stack
* is that element that has been on the stack the shortest time. New elements are inserted at and retrieved from the top of the stack. A
* {@code EliminationStack} is an appropriate choice when many threads will exchange elements through shared access to a common collection.
* Like most other concurrent collection implementations, this class does not permit the use of {@code null} elements.
* <p>
* This implementation employs elimination to transfer elements between threads that are pushing and popping concurrently. This technique
* avoids contention on the stack by attempting to cancel operations if an immediate update to the stack is not successful. This approach is
* described in <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728">A Scalable Lock-free Stack Algorithm</a>.
*
* @author ben.manes@gmail.com (Ben Manes)
* @see <a href="https://github.com/ben-manes/caffeine">Caffeine</a>
* @param <E> the type of elements held in this collection
*/
public final class NodePool<E> {
/*
* A Treiber's stack is represented as a singly-linked list with an atomic top reference and uses compare-and-swap to modify the value
* atomically.
*
* The stack is augmented with an elimination array to minimize the top reference becoming a sequential bottleneck. Elimination allows
* pairs of operations with reverse semantics, like pushes and pops on a stack, to complete without any central coordination, and
* therefore substantially aids scalability [1, 2, 3]. If a thread fails to update the stack's top reference then it backs off to a
* collision arena where a location is chosen at random and it attempts to coordinate with another operation that concurrently chose the
* same location. If a transfer is not successful then the thread repeats the process until the element is added to the stack or a
* cancellation occurs.
*
* This implementation borrows optimizations from {@link java.util.concurrent.Exchanger} for choosing an arena location and awaiting a
* match [4].
*
* [1] A Scalable Lock-free Stack Algorithm http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.156.8728 [2] Concurrent Data
* Structures http://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf [3] Using elimination to implement scalable and lock-free
* fifo queues http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.108.6422 [4] A Scalable Elimination-based Exchange Channel
* http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.59.7396
*/
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The number of slots in the elimination array. */
private static final int ARENA_LENGTH = ceilingNextPowerOfTwo((NCPU + 1) / 2);
/** The mask value for indexing into the arena. */
private static int ARENA_MASK = ARENA_LENGTH - 1;
/** The number of times to step ahead, probe, and try to match. */
private static final int LOOKAHEAD = Math.min(4, NCPU);
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 2000;
/** The number of times to spin per lookahead step */
private static final int SPINS_PER_STEP = SPINS / LOOKAHEAD;
/** A marker indicating that the arena slot is free. */
@SuppressWarnings("rawtypes")
private static final Node FREE = null;
/** A marker indicating that a thread is waiting in that slot to be transfered an element. */
@SuppressWarnings("rawtypes")
private static final Node WAITER = new Node(null);
private static int ceilingNextPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << Integer.SIZE - Integer.numberOfLeadingZeros(x - 1);
}
/** The top of the stack. */
private final AtomicReference<NodeORG<E>> top;
/** The arena where slots can be used to perform an exchange. */
private final AtomicReference<Object>[] arena;
/** Creates a {@code EliminationStack} that is initially empty. */
@SuppressWarnings("unchecked")
public NodePool() {
this.top = new PaddedAtomicReference<NodeORG<E>>();
this.arena = new PaddedAtomicReference[ARENA_LENGTH];
for (int i = 0; i < ARENA_LENGTH; i++) {
this.arena[i] = new PaddedAtomicReference<Object>();
}
}
/**
* Removes and returns the top element or returns <tt>null</tt> if this stack is empty.
*
* @return the top of this stack, or <tt>null</tt> if this stack is empty
*/
public NodeORG<E> pop() {
NodeORG<E> current;
for (;;) {
current = this.top.get();
if (current == null) {
return null;
}
// Attempt to pop from the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(current, current.next)) {
return current;
}
current = tryReceive();
if (current != null) {
return current;
}
}
}
/**
* Pushes an element onto the stack (in other words, adds an element at the top of this stack).
*
* @param e the element to push
*/
public void put(NodeORG<E> node) {
NodeORG<E> current;
for (;;) {
current = this.top.get();
node.next = current;
// Attempt to push to the stack, backing off to the elimination array if contended
if (this.top.compareAndSet(current, node)) {
return;
}
if (tryTransfer(node)) {
return;
}
}
}
/**
* Attempts to transfer the element to a waiting consumer.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean tryTransfer(NodeORG<E> e) {
int start = startIndex();
return scanAndTransferToWaiter(e, start) || awaitExchange(e, start);
}
/**
* Scans the arena searching for a waiting consumer to exchange with.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean scanAndTransferToWaiter(NodeORG<E> e, int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// if some thread is waiting to receive an element then attempt to provide it
if (slot.get() == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
}
}
return false;
}
/**
* Waits for (by spinning) to have the element transfered to another thread. The element is filled into an empty slot in the arena and
* spun on until it is transfered or a per-slot spin limit is reached. This search and wait strategy is repeated by selecting another
* slot until a total spin limit is reached.
*
* @param e the element to transfer
* @param start the arena location to start at
* @return if an exchange was completed successfully
*/
boolean awaitExchange(NodeORG<E> e, int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == WAITER && slot.compareAndSet(WAITER, e)) {
return true;
} else if (found == FREE && slot.compareAndSet(FREE, e)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != e) {
return true;
} else if (slotSpins >= SPINS_PER_STEP && slot.compareAndSet(e, FREE)) {
// failed to transfer the element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
}
// failed to transfer the element; give up
return false;
}
/**
* Attempts to receive an element from a waiting provider.
*
* @return an element if successfully transfered or null if unsuccessful
*/
NodeORG<E> tryReceive() {
int start = startIndex();
NodeORG<E> e = scanAndMatch(start);
return e == null ? awaitMatch(start) : e;
}
/**
* Scans the arena searching for a waiting producer to transfer from.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
NodeORG<E> scanAndMatch(int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = start + i & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
// accept a transfer if an element is available
Object found = slot.get();
if (found != FREE && found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
return null;
}
/**
* Waits for (by spinning) to have an element transfered from another thread. A marker is filled into an empty slot in the arena and
* spun on until it is replaced with an element or a per-slot spin limit is reached. This search and wait strategy is repeated by
* selecting another slot until a total spin limit is reached.
*
* @param start the arena location to start at
* @return an element if successfully transfered or null if unsuccessful
*/
NodeORG<E> awaitMatch(int start) {
for (int step = 0, totalSpins = 0; step < ARENA_LENGTH && totalSpins < SPINS; step++) {
int index = start + step & ARENA_MASK;
AtomicReference<Object> slot = this.arena[index];
Object found = slot.get();
if (found == FREE) {
if (slot.compareAndSet(FREE, WAITER)) {
int slotSpins = 0;
for (;;) {
found = slot.get();
if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
} else if (slotSpins >= SPINS_PER_STEP && found == WAITER && slot.compareAndSet(WAITER, FREE)) {
// failed to receive an element; try a new slot
totalSpins += slotSpins;
break;
}
slotSpins++;
}
}
} else if (found != WAITER && slot.compareAndSet(found, FREE)) {
@SuppressWarnings("unchecked")
NodeORG<E> cast = (NodeORG<E>) found;
return cast;
}
}
// failed to receive an element; give up
return null;
}
/**
* Returns the start index to begin searching the arena with. Uses a one-step FNV-1a hash code
* (http://www.isthe.com/chongo/tech/comp/fnv/) based on the current thread's Thread.getId(). These hash codes have more uniform
* distribution properties with respect to small moduli (here 1-31) than do other simple hashing functions. This technique is a
* simplified version borrowed from {@link java.util.concurrent.Exchanger}'s hashIndex function.
*/
static int startIndex() {
long id = Thread.currentThread().getId();
return ((int) (id ^ id >>> 32) ^ 0x811c9dc5) * 0x01000193;
}
}

View File

@ -0,0 +1,5 @@
package dorkbox.util.messagebus.common.simpleq.bakup;
public interface ValueCopier<M> {
public void copyValues(M source, M dest);
}

View File

@ -0,0 +1,185 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
import java.util.AbstractQueue;
import java.util.Iterator;
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}
/**
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
* the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
* padding.
* <p>
* Offset calculation is separate from access to enable the reuse of a give compute offset.
* <p>
* Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of final field reload after a
* LoadLoad barrier.
* <p>
*
* @author nitsanw
*
* @param <E>
*/
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
protected static final int BUFFER_PAD = 32;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
static {
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
REF_ELEMENT_SHIFT = 2 + SPARSE_SHIFT;
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unknown pointer size");
}
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
+ (BUFFER_PAD << REF_ELEMENT_SHIFT - SPARSE_SHIFT);
}
protected final long mask;
// @Stable :(
protected final E[] buffer;
@SuppressWarnings("unchecked")
public ConcurrentCircularArrayQueue(int capacity) {
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = actualCapacity - 1;
// pad data on either end with some empty slots.
this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
}
/**
* @param index desirable element index
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index) {
return calcElementOffset(index, this.mask);
}
/**
* @param index desirable element index
* @param mask
* @return the offset in bytes within the array for a given index.
*/
protected static final long calcElementOffset(long index, long mask) {
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElement(long offset, E e) {
spElement(this.buffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElement(E[] buffer, long offset, E e) {
UNSAFE.putObject(buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(long offset, E e) {
soElement(this.buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(E[] buffer, long offset, E e) {
UNSAFE.putOrderedObject(buffer, offset, e);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lpElement(long offset) {
return lpElement(this.buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lpElement(E[] buffer, long offset) {
return (E) UNSAFE.getObject(buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lvElement(long offset) {
return lvElement(this.buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lvElement(E[] buffer, long offset) {
return (E) UNSAFE.getObjectVolatile(buffer, offset);
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
// we have to test isEmpty because of the weaker poll() guarantee
while (poll() != null || !isEmpty()) {
;
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E> {
private static final long ARRAY_BASE;
private static final int ELEMENT_SHIFT;
static {
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class);
if (8 == scale) {
ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unexpected long[] element size");
}
// Including the buffer pad in the array base offset
ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << ELEMENT_SHIFT - SPARSE_SHIFT);
}
protected final long[] sequenceBuffer;
public ConcurrentSequencedCircularArrayQueue(int capacity) {
super(capacity);
int actualCapacity = (int) (this.mask + 1);
// pad data on either end with some empty slots.
this.sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
for (long i = 0; i < actualCapacity; i++) {
soSequence(this.sequenceBuffer, calcSequenceOffset(i), i);
}
}
protected final long calcSequenceOffset(long index) {
return calcSequenceOffset(index, this.mask);
}
protected static final long calcSequenceOffset(long index, long mask) {
return ARRAY_BASE + ((index & mask) << ELEMENT_SHIFT);
}
protected final void soSequence(long[] buffer, long offset, long e) {
UNSAFE.putOrderedLong(buffer, offset, e);
}
protected final long lvSequence(long[] buffer, long offset) {
return UNSAFE.getLongVolatile(buffer, offset);
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import java.util.Queue;
/**
* This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface
* sufficient for concurrent message passing.<br>
* Message passing queues offer happens before semantics to messages passed through, namely that writes made by the
* producer before offering the message are visible to the consuming thread after the message has been polled out of the
* queue.
*
* @author nitsanw
*
* @param <M> the event/message type
*/
interface MessagePassingQueue<M> {
/**
* Called from a producer thread subject to the restrictions appropriate to the implementation and according to the
* {@link Queue#offer(Object)} interface.
*
* @param message
* @return true if element was inserted into the queue, false iff full
*/
boolean offer(M message);
/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#poll()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
M poll();
/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#peek()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
M peek();
/**
* This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a
* best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
*
* @return number of messages in the queue, between 0 and queue capacity or {@link Integer#MAX_VALUE} if not bounded
*/
int size();
/**
* This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
*
* @return true if empty, false otherwise
*/
boolean isEmpty();
}

View File

@ -0,0 +1,216 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
/**
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
* any and all threads may call the offer/poll/peek methods and correctness is maintained. <br>
* This implementation follows patterns documented on the package level for False Sharing protection.<br>
* The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See <a
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original
* algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
* Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
* field of the struct. There is a further alternative in the experimental project which uses iteration phase
* markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
* well as this implementation.<br>
* Tradeoffs to keep in mind:
* <ol>
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of
* both arrays. We are trading memory to avoid false sharing(active and passive).
* <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
* elements array. This is doubling/tripling the memory allocated for the buffer.
* <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
* equal to the requested capacity.
* </ol>
*
* @param <E>
* type of the element stored in the {@link java.util.Queue}
*/
public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
/** The number of CPUs */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an
* operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging
* items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU
* resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems.
* The value here is approximately the average of those across a range of tested systems.
*/
private static final int SPINS = NCPU == 1 ? 0 : 600;
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueue(final int capacity) {
super(validateCapacity(capacity));
}
private static int validateCapacity(int capacity) {
if(capacity < 2) {
throw new IllegalArgumentException("Minimum size is 2");
}
return capacity;
}
@Override
public boolean offer(final E e) {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long capacity = mask + 1;
final long[] sBuffer = this.sequenceBuffer;
long currentProducerIndex;
long pSeqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
currentProducerIndex = lvProducerIndex(); // LoadLoad
pSeqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentProducerIndex, mask);
spElement(offset, e);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore
return true;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
// return false;
busySpin();
}
// another producer has moved the sequence by one, retry 2
}
}
/**
* {@inheritDoc}
* <p>
* Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
* and must test producer index when next element is not visible.
*/
@Override
public E poll() {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final long[] sBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long cSeqOffset;
long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
currentConsumerIndex = lvConsumerIndex(); // LoadLoad
cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad
final long delta = seq - (currentConsumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) {
// Successful CAS: full barrier
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex, mask);
final E e = lpElement(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1);// StoreStore
return e;
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
// return null;
busySpin();
}
// another consumer beat us and moved sequence ahead, retry 2
}
}
@Override
public E peek() {
long currConsumerIndex;
E e;
do {
currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty
e = lpElement(calcElementOffset(currConsumerIndex));
// only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex());
return e;
}
@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}
@Override
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
}
private static final void busySpin() {
// busy spin for the amount of time (roughly) of a CPU context switch
int spins = SPINS;
for (;;) {
if (spins > 0) {
--spins;
} else {
break;
}
}
}
}

View File

@ -0,0 +1,27 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
public abstract class MpmcArrayQueueConsumerField<E> extends MpmcArrayQueueL2Pad<E> {
private final static long C_INDEX_OFFSET;
static {
try {
C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long consumerIndex;
public MpmcArrayQueueConsumerField(int capacity) {
super(capacity);
}
protected final long lvConsumerIndex() {
return this.consumerIndex;
}
protected final boolean casConsumerIndex(long expect, long newValue) {
return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
}
}

View File

@ -0,0 +1,10 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
abstract class MpmcArrayQueueL1Pad<E> extends ConcurrentSequencedCircularArrayQueue<E> {
long p10, p11, p12, p13, p14, p15, p16;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueueL1Pad(int capacity) {
super(capacity);
}
}

View File

@ -0,0 +1,10 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
abstract class MpmcArrayQueueL2Pad<E> extends MpmcArrayQueueProducerField<E> {
long p20, p21, p22, p23, p24, p25, p26;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueueL2Pad(int capacity) {
super(capacity);
}
}

View File

@ -0,0 +1,28 @@
package dorkbox.util.messagebus.common.simpleq.jctools;
import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE;
abstract class MpmcArrayQueueProducerField<E> extends MpmcArrayQueueL1Pad<E> {
private final static long P_INDEX_OFFSET;
static {
try {
P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class
.getDeclaredField("producerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long producerIndex;
public MpmcArrayQueueProducerField(int capacity) {
super(capacity);
}
protected final long lvProducerIndex() {
return this.producerIndex;
}
protected final boolean casProducerIndex(long expect, long newValue) {
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
}

View File

@ -0,0 +1,43 @@
/*
* https://github.com/JCTools/JCTools
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
/**
* Power of 2 utility functions.
*/
public class Pow2 {
/**
* Find the next larger positive power of two value up from the given value. If value is a power of two then
* this value will be returned.
*
* @param value from which next positive power of two will be found.
* @return the next positive power of 2 or this value if it is a power of 2.
*/
public static int roundToPowerOfTwo(final int value) {
return 1 << 32 - Integer.numberOfLeadingZeros(value - 1);
}
/**
* Is this value a power of two.
*
* @param value to be tested to see if it is a power of two.
* @return true if the value is a power of 2 otherwise false.
*/
public static boolean isPowerOfTwo(final int value) {
return (value & value - 1) == 0;
}
}

View File

@ -0,0 +1,58 @@
/*
* https://github.com/JCTools/JCTools
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.common.simpleq.jctools;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import sun.misc.Unsafe;
/**
* Why should we resort to using Unsafe?<br>
* <ol>
* <li>To construct class fields which allow volatile/ordered/plain access: This requirement is covered by
* {@link AtomicReferenceFieldUpdater} and similar but their performance is arguably worse than the DIY approach
* (depending on JVM version) while Unsafe intrinsification is a far lesser challenge for JIT compilers.
* <li>To construct flavors of {@link AtomicReferenceArray}.
* <li>Other use cases exist but are not present in this library yet.
* </ol>
*
* @author nitsanw
*
*/
public class UnsafeAccess {
public static final boolean SUPPORTS_GET_AND_SET;
public static final Unsafe UNSAFE;
static {
try {
final Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);
} catch (Exception e) {
SUPPORTS_GET_AND_SET = false;
throw new RuntimeException(e);
}
boolean getAndSetSupport = false;
try {
Unsafe.class.getMethod("getAndSetObject", Object.class, Long.TYPE,Object.class);
getAndSetSupport = true;
} catch (Exception e) {
}
SUPPORTS_GET_AND_SET = getAndSetSupport;
}
}

View File

@ -1,7 +1,5 @@
package dorkbox.util.messagebus.listener;
import java.util.Collection;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
/**
@ -20,7 +18,7 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
*/
public class MessageListener {
private final Collection<MessageHandler> handlers;
private final StrongConcurrentSetV8<MessageHandler> handlers;
private Class<?> listenerDefinition;
public MessageListener(Class<?> listenerDefinition, int size) {
@ -37,7 +35,7 @@ public class MessageListener {
return this.handlers.add(messageHandler);
}
public Collection<MessageHandler> getHandlers() {
public StrongConcurrentSetV8<MessageHandler> getHandlers() {
return this.handlers;
}
}

View File

@ -1,9 +1,9 @@
package dorkbox.util.messagebus.listener;
import java.lang.reflect.Method;
import java.util.Collection;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
@ -12,6 +12,8 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
*
* @author bennidi
* Date: 11/16/12
* @author dorkbox
* Date: 2/2/15
*/
public class MetadataReader {
@ -20,11 +22,18 @@ public class MetadataReader {
public MessageListener getMessageListener(Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
Collection<Method> allHandlers = ReflectionUtils.getMethods(target);
StrongConcurrentSetV8<Method> allHandlers = ReflectionUtils.getMethods(target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
Collection<Method> bottomMostHandlers = new StrongConcurrentSetV8<Method>(allHandlers.size(), 0.8F, 1);
for (Method handler : allHandlers) {
StrongConcurrentSetV8<Method> bottomMostHandlers = new StrongConcurrentSetV8<Method>(allHandlers.size(), 0.8F, 1);
ISetEntry<Method> current = allHandlers.head;
Method handler;
while (current != null) {
handler = current.getValue();
current = current.next();
if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) {
bottomMostHandlers.add(handler);
}
@ -34,7 +43,12 @@ public class MetadataReader {
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
for (Method handler : bottomMostHandlers) {
current = bottomMostHandlers.head;
while (current != null) {
handler = current.getValue();
current = current.next();
Handler handlerConfig = ReflectionUtils.getAnnotation(handler, Handler.class);
if (handlerConfig == null || !handlerConfig.enabled()) {
continue; // disabled or invalid listeners are ignored

View File

@ -1,11 +1,11 @@
package dorkbox.util.messagebus.subscription;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
@ -96,14 +96,20 @@ public class Subscription {
* @return true if there were listeners for this publication, false if there was nothing
*/
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
Collection<Object> listeners = this.listeners;
StrongConcurrentSet<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
ISetEntry<Object> current = listeners.head;
Object listener;
while (current != null) {
listener = current.getValue();
current = current.next();
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) {
@ -150,14 +156,20 @@ public class Subscription {
* @return true if there were listeners for this publication, false if there was nothing
*/
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
Collection<Object> listeners = this.listeners;
StrongConcurrentSet<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
ISetEntry<Object> current = listeners.head;
Object listener;
while (current != null) {
listener = current.getValue();
current = current.next();
try {
invocation.invoke(listener, handler, handleIndex, message1, message2);
} catch (IllegalAccessException e) {
@ -208,14 +220,20 @@ public class Subscription {
* @return true if there were listeners for this publication, false if there was nothing
*/
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
Collection<Object> listeners = this.listeners;
StrongConcurrentSet<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
ISetEntry<Object> current = listeners.head;
Object listener;
while (current != null) {
listener = current.getValue();
current = current.next();
try {
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
} catch (IllegalAccessException e) {

View File

@ -0,0 +1,104 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.simpleq.HandlerFactory;
import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueueAlt;
import dorkbox.util.messagebus.common.simpleq.Node;
public class MpmcQueueAltPerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
HandlerFactory<Integer> factory = new HandlerFactory<Integer>() {
@Override
public Integer newInstance() {
return Integer.valueOf(777);
}
};
final MpmcExchangerQueueAlt<Integer> queue = new MpmcExchangerQueueAlt<Integer>(factory, QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcExchangerQueueAlt<Integer> queue) throws Exception {
// for (int i=0;i<CONCURRENCY_LEVEL;i++) {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
// }
MpmcExchangerQueueAlt<Integer> consumer = queue;
Node<Integer> result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
result = consumer.take();
} while (0 != --i);
long end = System.nanoTime();
thread.join();
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
Integer finalMessage = result.item;
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, finalMessage, queueEmpty, p.queueFull);
return ops;
}
public static class Producer implements Runnable {
private final MpmcExchangerQueueAlt<Integer> queue;
int queueFull = 0;
long start;
public Producer(MpmcExchangerQueueAlt<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcExchangerQueueAlt<Integer> producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();
Node<Integer> result;
do {
producer.put();
} while (0 != --i);
this.queueFull = f;
this.start = s;
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.simpleq.Node;
import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue;
public class MpmcQueueBaselinePerfTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Node<Integer> TEST_VALUE = new Node<Integer>(Integer.valueOf(777));
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static void main(final String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
final MpmcArrayQueue<Node<Integer>> queue = new MpmcArrayQueue<Node<Integer>>(QUEUE_CAPACITY);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, MpmcArrayQueue<Node<Integer>> queue) throws Exception {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
MpmcArrayQueue<Node<Integer>> consumer = queue;
Node<Integer> result;
int i = REPETITIONS;
int queueEmpty = 0;
do {
while (null == (result = consumer.poll())) {
queueEmpty++;
Thread.yield();
}
} while (0 != --i);
long end = System.nanoTime();
thread.join();
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, result.item, queueEmpty, p.queueFull);
return ops;
}
public static class Producer implements Runnable {
private final MpmcArrayQueue<Node<Integer>> queue;
int queueFull = 0;
long start;
public Producer(MpmcArrayQueue<Node<Integer>> queue) {
this.queue = queue;
}
@Override
public void run() {
MpmcArrayQueue<Node<Integer>> producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();
do {
while (!producer.offer(TEST_VALUE)) {
Thread.yield();
f++;
}
} while (0 != --i);
this.queueFull = f;
this.start = s;
}
}
}

View File

@ -4,7 +4,11 @@
package dorkbox.util.messagebus;
import junit.framework.Assert;
import dorkbox.util.messagebus.annotations.Handler;
import com.lmax.disruptor.MessageHolder;
import dorkbox.util.messagebus.common.simpleq.HandlerFactory;
import dorkbox.util.messagebus.common.simpleq.SimpleQueue;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
@ -12,6 +16,13 @@ import dorkbox.util.messagebus.error.PublicationError;
* @author dorkbox, llc Date: 2/2/15
*/
public class PerformanceTest {
// 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17);
public static final int CONCURRENCY_LEVEL = 1;
private static long count = 0;
@ -23,81 +34,118 @@ public class PerformanceTest {
}
};
public static void main(String[] args) {
PerformanceTest multiMessageTest = new PerformanceTest();
multiMessageTest.testMultiMessageSending();
public static void main(String[] args) throws Exception {
System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS);
HandlerFactory<MessageHolder> factory = new HandlerFactory<MessageHolder>() {
@Override
public MessageHolder newInstance() {
return new MessageHolder();
}
};
final SimpleQueue<MessageHolder> queue = new SimpleQueue<MessageHolder>(QUEUE_CAPACITY, factory);
final long[] results = new long[20];
for (int i = 0; i < 20; i++) {
System.gc();
results[i] = performanceRun(i, queue);
}
// only average last 10 results for summary
long sum = 0;
for (int i = 10; i < 20; i++) {
sum += results[i];
}
System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10);
}
private static long performanceRun(int runNumber, SimpleQueue<MessageHolder> queue) throws Exception {
// for (int i=0;i<CONCURRENCY_LEVEL;i++) {
Producer p = new Producer(queue);
Thread thread = new Thread(p);
thread.start(); // producer will timestamp start
// }
SimpleQueue<MessageHolder> consumer = queue;
// Node<Integer> result;
int i = REPETITIONS;
int queueEmpty = 0;
MessageHolder messageHolder = new MessageHolder();
do {
consumer.take(messageHolder);
} while (0 != --i);
long end = System.nanoTime();
thread.join();
long duration = end - p.start;
long ops = REPETITIONS * 1000L * 1000L * 1000L / duration;
String qName = queue.getClass().getSimpleName();
Integer finalMessage = (Integer) messageHolder.message1;
System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops,
qName, finalMessage, queueEmpty, p.queueFull);
return ops;
}
public static class Producer implements Runnable {
private final SimpleQueue<MessageHolder> queue;
int queueFull = 0;
long start;
public Producer(SimpleQueue<MessageHolder> queue) {
this.queue = queue;
}
@Override
public void run() {
SimpleQueue<MessageHolder> producer = this.queue;
int i = REPETITIONS;
int f = 0;
long s = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException ignored) {
}
this.queueFull = f;
this.start = s;
}
}
public PerformanceTest() {
}
public void testMultiMessageSending() {
MultiMBassador bus = new MultiMBassador();
bus.addErrorHandler(TestFailingHandler);
Listener listener1 = new Listener();
bus.subscribe(listener1);
long num = 0;
while (num < Long.MAX_VALUE) {
bus.publishAsync(num++);
}
// bus.publish("s", "s");
// bus.publish("s", "s", "s");
// bus.publish("s", "s", "s", "s");
// bus.publish(1, 2, "s");
// bus.publish(1, 2, 3, 4, 5, 6);
// bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
bus.shutdown();
System.err.println("Count: " + count);
}
@SuppressWarnings("unused")
public static class Listener {
@Handler
public void handleSync(Long o1) {
count++;
}
// @Handler
// public void handleSync(String o1) {
// count++;
//// System.err.println("match String");
// }
// @Handler
// public void handleSync(String o1, String o2) {
// count.getAndIncrement();
// System.err.println("match String, String");
// }
// public void testMultiMessageSending() {
// final MultiMBassador bus = new MultiMBassador(4);
// bus.addErrorHandler(TestFailingHandler);
//
// @Handler
// public void handleSync(String o1, String o2, String o3) {
// count.getAndIncrement();
// System.err.println("match String, String, String");
// }
//
// @Handler
// public void handleSync(Integer o1, Integer o2, String o3) {
// count.getAndIncrement();
// System.err.println("match Integer, Integer, String");
// }
// Listener listener1 = new Listener();
// bus.subscribe(listener1);
//
// @Handler
// public void handleSync(String... o) {
// count.getAndIncrement();
// System.err.println("match String[]");
// }
//
// ConcurrentExecutor.runConcurrent(new Runnable() {
// @Override
// public void run() {
// long num = 0;
// while (num < Long.MAX_VALUE) {
// bus.publishAsync(num++);
// }
// }}, 4);
//
//
// bus.shutdown();
// System.err.println("Count: " + count);
// }
//
// @SuppressWarnings("unused")
// public static class Listener {
// @Handler
// public void handleSync(Integer... o) {
// count.getAndIncrement();
// System.err.println("match Integer[]");
// public void handleSync(Long o1) {
//// System.err.println(Long.toString(o1));
// count++;
// }
}
// }
}

View File

@ -1,12 +1,9 @@
package dorkbox.util.messagebus;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.common.MessageBusTest;
@ -25,39 +22,34 @@ public class SynchronizedHandlerTest extends MessageBusTest {
@Test
public void testSynchronizedWithSynchronousInvocation(){
List<SynchronizedWithSynchronousDelivery> handlers = new LinkedList<SynchronizedWithSynchronousDelivery>();
IMessageBus bus = createBus();
for(int i = 0; i < numberOfListeners; i++){
SynchronizedWithSynchronousDelivery handler = new SynchronizedWithSynchronousDelivery();
handlers.add(handler);
bus.subscribe(handler);
}
for(int i = 0; i < numberOfMessages; i++){
bus.publishAsync(new Object());
for (int i = 0; i < numberOfMessages; i++) {
bus.publishAsync(new Object());
}
int totalCount = numberOfListeners * numberOfMessages;
int expireCount = 1000;
// wait for last publication
while (expireCount-- > 0 && counter.get() < totalCount){
while (bus.hasPendingMessages()) {
pause(100);
}
if (expireCount <= 0) {
if (totalCount != counter.get()) {
fail("Count '" + counter.get() + "' was incorrect!");
}
}
public static class SynchronizedWithSynchronousDelivery {
@Handler
@Synchronized
public void handleMessage(Object o){
counter.getAndIncrement();
counter.getAndIncrement();
// System.err.println(counter.get());
}
}
}