polish, pre-work for contention

This commit is contained in:
nathan 2015-05-20 11:53:57 +02:00
parent 9a787506ee
commit 4defde737a
5 changed files with 187 additions and 174 deletions

View File

@ -6,7 +6,6 @@ import java.util.Iterator;
import org.jctools.util.Pow2;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
@ -208,6 +207,7 @@ public class MultiMBassador implements IMessageBus {
Subscription sub;
// Run subscriptions
if (subscriptions != null) {
for (iterator = subscriptions.iterator(); iterator.hasNext();) {
@ -218,17 +218,17 @@ public class MultiMBassador implements IMessageBus {
}
}
if (!this.forceExactMatches) {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses
if (superSubscriptions != null) {
for (iterator = superSubscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
// this catches all exception types
sub.publishToSubscription(this, subsPublished, message);
}
}
// if (!this.forceExactMatches) {
// Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// // now get superClasses
// if (superSubscriptions != null) {
// for (iterator = superSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
//
// // this catches all exception types
// sub.publishToSubscription(this, subsPublished, message);
// }
// }
// // publish to var arg, only if not already an array
@ -261,21 +261,21 @@ public class MultiMBassador implements IMessageBus {
// }
// }
// }
}
if (!subsPublished.bool) {
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message);
for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
// this catches all exception types
sub.publishToSubscription(this, subsPublished, deadMessage);
}
}
}
// }
//
// if (!subsPublished.bool) {
// // Dead Event must EXACTLY MATCH (no subclasses)
// Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
// if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
// DeadMessage deadMessage = new DeadMessage(message);
//
// for (iterator = deadSubscriptions.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// // this catches all exception types
// sub.publishToSubscription(this, subsPublished, deadMessage);
// }
// }
// }
}
@Override

View File

@ -142,10 +142,12 @@ public class SubscriptionManager {
this.nonListeners.put(listenerClass, Boolean.TRUE);
return;
} else {
VarArgPossibility varArgPossibility = this.varArgPossibility;
subsPerListener = new ConcurrentSet<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
VarArgPossibility varArgPossibility = this.varArgPossibility;
ConcurrentMap<Class<?>, Collection<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
HashMapTree<Class<?>, Collection<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
Iterator<MessageHandler> iterator;
MessageHandler messageHandler;
@ -154,84 +156,9 @@ public class SubscriptionManager {
for (iterator = messageHandlers.iterator(); iterator.hasNext();) {
messageHandler = iterator.next();
// now add this subscription to each of the handled types
Class<?>[] types = messageHandler.getHandledMessages();
int size = types.length;
switch (size) {
case 1: {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get();
Collection<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subHolderConcurrent.set(subHolderConcurrent.initialValue());
boolean isArray = this.utils.isArray(types[0]);
// cache the super classes
this.utils.getSuperClasses(types[0], isArray);
if (isArray) {
varArgPossibility.set(true);
}
}
break;
}
case 2: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
this.utils.getSuperClasses(types[0]);
this.utils.getSuperClasses(types[1]);
}
break;
}
case 3: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
this.utils.getSuperClasses(types[0]);
this.utils.getSuperClasses(types[1]);
this.utils.getSuperClasses(types[2]);
}
break;
}
default: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subsPerType, types);
if (putIfAbsent != null) {
subsPerType = putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
Class<?> c;
int length = types.length;
for (int i = 0; i < length; i++) {
c = types[i];
this.utils.getSuperClasses(c);
}
}
break;
}
}
// this can safely be called concurrently
subsPerType = getSubsPerType(messageHandler, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
// create the subscription
Subscription subscription = new Subscription(messageHandler);
@ -255,6 +182,101 @@ public class SubscriptionManager {
}
}
private final Collection<Subscription> getSubsPerType(MessageHandler messageHandler,
ConcurrentMap<Class<?>, Collection<Subscription>> subsPerMessageSingle,
HashMapTree<Class<?>, Collection<Subscription>> subsPerMessageMulti,
VarArgPossibility varArgPossibility) {
Class<?>[] types = messageHandler.getHandledMessages();
int size = types.length;
ConcurrentSet<Subscription> subsPerType;
SubscriptionUtils utils = this.utils;
switch (size) {
case 1: {
SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
subsPerType = subHolderConcurrent.get();
Collection<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subsPerType);
if (putIfAbsent != null) {
return putIfAbsent;
} else {
subHolderConcurrent.set(subHolderConcurrent.initialValue());
boolean isArray = utils.isArray(types[0]);
if (isArray) {
varArgPossibility.set(true);
}
// cache the super classes
utils.getSuperClasses(types[0], isArray);
return subsPerType;
}
}
case 2: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1]);
if (putIfAbsent != null) {
return putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
utils.getSuperClasses(types[0]);
utils.getSuperClasses(types[1]);
return subsPerType;
}
}
case 3: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types[0], types[1], types[2]);
if (putIfAbsent != null) {
return putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
// cache the super classes
utils.getSuperClasses(types[0]);
utils.getSuperClasses(types[1]);
utils.getSuperClasses(types[2]);
return subsPerType;
}
}
default: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
subsPerType = subHolderSingle.get();
Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
if (putIfAbsent != null) {
return putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
Class<?> c;
int length = types.length;
for (int i = 0; i < length; i++) {
c = types[i];
// cache the super classes
utils.getSuperClasses(c);
}
return subsPerType;
}
}
}
}
public final void unsubscribe(Object listener) {
if (listener == null) {
return;
@ -277,6 +299,7 @@ public class SubscriptionManager {
for (iterator = subscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
sub.unsubscribe(listener);
}
}

View File

@ -30,7 +30,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
public ConcurrentSet(int size, float loadFactor, int stripeSize) {
super();
this.entries = new ConcurrentHashMapV8<T, Node<T>>(size, loadFactor, stripeSize);
this.entries = new ConcurrentHashMapV8<T, Node<T>>(size, loadFactor, 32);
}
@Override
@ -39,6 +39,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
return false;
}
// had to modify the super implementation so we get Node<T> back
Node<T> alreadyPresent = this.entries.putIfAbsent(element, this.IN_PROGRESS_MARKER);
if (alreadyPresent == null) {
// this doesn't already exist
@ -76,7 +77,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
@Override
public boolean isEmpty() {
return this.entries.isEmpty();
return super.isEmpty();
}
/**
@ -219,12 +220,12 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
@Override
public Object[] toArray() {
return this.entries.entrySet().toArray();
return this.entries.keySet().toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return this.entries.entrySet().toArray(a);
return this.entries.keySet().toArray(a);
}
@Override

View File

@ -1,6 +1,5 @@
package dorkbox.util.messagebus.subscription;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
@ -12,7 +11,6 @@ import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.listener.MessageHandler;
/**
@ -93,6 +91,11 @@ public class Subscription {
return this.listeners.size();
}
int c = 0;
public int c() {
return this.c;
}
/**
* @return true if there were listeners for this publication, false if there was nothing
*/
@ -110,42 +113,36 @@ public class Subscription {
for (iterator = listeners.iterator(); iterator.hasNext();) {
listener = iterator.next();
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0])
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
}
this.c++;
// try {
// invocation.invoke(listener, handler, handleIndex, message);
// } catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message));
// } catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " + message.getClass()
// + "Expected: " + handler.getParameterTypes()[0])
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The Message handler implementation threw an exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message));
// }
}
booleanHolder.bool = true;
}
@ -192,18 +189,10 @@ public class Subscription {
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (InvocationTargetException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// "The Message handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
@ -257,18 +246,10 @@ public class Subscription {
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (InvocationTargetException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// "The Message handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)

View File

@ -6,6 +6,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedTransferQueue;
@ -51,17 +52,16 @@ public class PerfTest_Collections {
// }
System.err.println("Done");
bench(size, new ConcurrentLinkedQueue2<Subscription>());
bench(size, new ConcurrentSet<Subscription>(size*2, LOAD_FACTOR, 5));
bench(size, Collections.<Subscription>newSetFromMap(new ConcurrentHashMapV8<Subscription, Boolean>(size*2, LOAD_FACTOR, 1)));
bench(size, new ArrayList<Subscription>(size*2));
bench(size, new ConcurrentSet<Subscription>(size*2, LOAD_FACTOR, 5));
bench(size, new ConcurrentLinkedQueue2<Subscription>());
bench(size, new ConcurrentLinkedQueue<Subscription>());
bench(size, new LinkedTransferQueue<Subscription>());
bench(size, new ArrayDeque<Subscription>(size*2));
bench(size, new ConcurrentLinkedQueue<Subscription>());
bench(size, new LinkedList<Subscription>());
bench(size, new StrongConcurrentSetV8<Subscription>(size*2, LOAD_FACTOR));
bench(size, new StrongConcurrentSet<Subscription>(size*2, LOAD_FACTOR));
bench(size, Collections.<Subscription>newSetFromMap(new ConcurrentHashMapV8<Subscription, Boolean>(size*2, LOAD_FACTOR, 1)));
bench(size, new HashSet<Subscription>());
// bench(size, new ConcurrentSkipListSet<Subscription>()); // needs comparable
}
@ -81,9 +81,13 @@ public class PerfTest_Collections {
}
}
for (int i=2;i<6;i++) {
long average = averageRun(warmupRuns, runs, set, false, i, REPETITIONS);
if (showOutput) {
if (!showOutput) {
for (int i=2;i<6;i++) {
averageRun(warmupRuns, runs, set, false, i, REPETITIONS);
}
} else {
for (int i=1;i<10;i++) {
long average = averageRun(warmupRuns, runs, set, false, i, REPETITIONS);
System.out.format("summary,IteratorPerfTest,%s - %,d (%d)\n", set.getClass().getSimpleName(), average, i);
}
}
@ -174,12 +178,16 @@ public class PerfTest_Collections {
int i = this.repetitions;
this.start = System.nanoTime();
Iterator<Subscription> iterator;
Subscription sub;
// Entry<Subscription> current;
// Subscription sub;
int count = 0;
do {
for (Subscription sub : set) {
for (iterator = set.iterator(); iterator.hasNext();) {
sub = iterator.next();
// if (sub.acceptsSubtypes()) {
// count--;
// } else {