Moved queue perf tests into own package. Added SubscriptionUtils helper to clean up code size

This commit is contained in:
nathan 2015-05-14 01:57:03 +02:00
parent 21541639ca
commit d0dfafb5a5
15 changed files with 268 additions and 130 deletions

View File

@ -220,7 +220,7 @@ public class MultiMBassador implements IMessageBus {
// publish to var arg, only if not already an array
if (manager.hasVarArgPossibility() && !manager.isArray(messageClass)) {
if (manager.hasVarArgPossibility() && !manager.utils.isArray(messageClass)) {
Object[] asArray = null;
StrongConcurrentSetV8<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);

View File

@ -1,6 +1,5 @@
package dorkbox.util.messagebus;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -8,10 +7,9 @@ import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.ISetEntry;
import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.SubscriptionUtils;
import dorkbox.util.messagebus.common.VarArgPossibility;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MetadataReader;
@ -40,6 +38,8 @@ public class SubscriptionManager {
// the metadata reader that is used to inspect objects passed to the subscribe method
private static final MetadataReader metadataReader = new MetadataReader();
final SubscriptionUtils utils;
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
@ -58,9 +58,6 @@ public class SubscriptionManager {
// once a collection of subscriptions is stored it does not change
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Subscription>> subscriptionsPerListener;
private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache;
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Class<?>>> superClassesCache;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
@ -78,12 +75,11 @@ public class SubscriptionManager {
private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent;
private final ClassHolder classHolderSingle;
SubscriptionManager(int numberOfThreads) {
this.STRIPE_SIZE = numberOfThreads;
this.utils = new SubscriptionUtils(LOAD_FACTOR, numberOfThreads);
// modified ONLY during SUB/UNSUB
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, SubscriptionManager.LOAD_FACTOR);
@ -97,10 +93,6 @@ public class SubscriptionManager {
// modified by N threads
{
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Class<?>>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
@ -115,8 +107,6 @@ public class SubscriptionManager {
this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1);
this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.classHolderSingle = new ClassHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
}
public void shutdown() {
@ -126,10 +116,7 @@ public class SubscriptionManager {
this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerListener.clear();
this.superClassesCache.clear();
this.arrayVersionCache.clear();
this.isArrayCache.clear();
this.utils.shutdown();
clearConcurrentCollections();
}
@ -196,8 +183,8 @@ public class SubscriptionManager {
subsPerType = putIfAbsent;
} else {
subHolderConcurrent.set(subHolderConcurrent.initialValue());
boolean isArray = isArray(types[0]);
getSuperClasses(types[0], isArray);
boolean isArray = this.utils.isArray(types[0]);
this.utils.getSuperClasses(types[0], isArray);
if (isArray) {
varArgPossibility.set(true);
}
@ -214,8 +201,8 @@ public class SubscriptionManager {
subsPerType = putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
getSuperClasses(types[0]);
getSuperClasses(types[1]);
this.utils.getSuperClasses(types[0]);
this.utils.getSuperClasses(types[1]);
}
break;
}
@ -229,9 +216,9 @@ public class SubscriptionManager {
subsPerType = putIfAbsent;
} else {
subHolderSingle.set(subHolderSingle.initialValue());
getSuperClasses(types[0]);
getSuperClasses(types[1]);
getSuperClasses(types[2]);
this.utils.getSuperClasses(types[0]);
this.utils.getSuperClasses(types[1]);
this.utils.getSuperClasses(types[2]);
}
break;
}
@ -250,7 +237,7 @@ public class SubscriptionManager {
int length = types.length;
for (int i = 0; i < length; i++) {
c = types[i];
getSuperClasses(c);
this.utils.getSuperClasses(c);
}
}
break;
@ -317,88 +304,6 @@ public class SubscriptionManager {
this.varArgSuperClassSubscriptionsMulti.clear();
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*/
private StrongConcurrentSetV8<Class<?>> getSuperClasses(Class<?> clazz) {
return getSuperClasses(clazz, isArray(clazz));
}
private StrongConcurrentSetV8<Class<?>> getSuperClasses(Class<?> clazz, boolean isArray) {
// this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Class<?>>> local = this.superClassesCache;
ClassHolder classHolderSingle = this.classHolderSingle;
StrongConcurrentSetV8<Class<?>> classes = classHolderSingle.get();
StrongConcurrentSetV8<Class<?>> putIfAbsent = local.putIfAbsent(clazz, classes);
if (putIfAbsent == null) {
// we are the first one in the map
classHolderSingle.set(classHolderSingle.initialValue());
// it doesn't matter if concurrent access stomps on values, since they are always the same.
StrongConcurrentSetV8<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ISetEntry<Class<?>> current = superTypes.head;
Class<?> c;
while (current != null) {
c = current.getValue();
current = current.next();
if (isArray) {
c = getArrayClass(c);
}
if (c != clazz) {
classes.add(c);
}
}
return classes;
} else {
// someone beat us
return putIfAbsent;
}
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/
private Class<?> getArrayClass(Class<?> c) {
Map<Class<?>, Class<?>> arrayVersionCache = this.arrayVersionCache;
Class<?> clazz = arrayVersionCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass();
arrayVersionCache.put(c, clazz);
}
return clazz;
}
/**
* Cache the values of JNI method, isArray(c)
* @return true if the class c is an array type
*/
@SuppressWarnings("boxing")
public final boolean isArray(Class<?> c) {
Map<Class<?>, Boolean> isArrayCache = this.isArrayCache;
Boolean isArray = isArrayCache.get(c);
if (isArray == null) {
boolean b = c.isArray();
isArrayCache.put(c, b);
return b;
}
return isArray;
}
// CAN RETURN NULL
public final StrongConcurrentSetV8<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType);
@ -437,7 +342,7 @@ public class SubscriptionManager {
subHolderConcurrent.set(subHolderConcurrent.initialValue());
// this caches our array type. This is never cleared.
Class<?> arrayVersion = getArrayClass(messageClass);
Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
Map<Class<?>, StrongConcurrentSetV8<Subscription>> local2 = this.subscriptionsPerMessageSingle;
@ -480,8 +385,8 @@ public class SubscriptionManager {
// we are the first one in the map
subHolderConcurrent.set(subHolderConcurrent.initialValue());
Class<?> arrayVersion = getArrayClass(messageClass);
StrongConcurrentSetV8<Class<?>> types = getSuperClasses(arrayVersion, true);
Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
StrongConcurrentSetV8<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true);
if (types.isEmpty()) {
return null;
}
@ -631,7 +536,7 @@ public class SubscriptionManager {
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(superType, subsPerType);
if (putIfAbsent == null) {
// we are the first one in the map
StrongConcurrentSetV8<Class<?>> types = getSuperClasses(superType);
StrongConcurrentSetV8<Class<?>> types = this.utils.getSuperClasses(superType);
if (types.isEmpty()) {
return null;
}
@ -697,8 +602,8 @@ public class SubscriptionManager {
subHolderSingle.set(subHolderSingle.initialValue());
// whenever our subscriptions change, this map is cleared.
StrongConcurrentSetV8<Class<?>> types1 = this.superClassesCache.get(superType1);
StrongConcurrentSetV8<Class<?>> types2 = this.superClassesCache.get(superType2);
StrongConcurrentSetV8<Class<?>> types1 = this.utils.getSuperClasses(superType1);
StrongConcurrentSetV8<Class<?>> types2 = this.utils.getSuperClasses(superType2);
StrongConcurrentSetV8<Subscription> subs;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1;
@ -790,9 +695,9 @@ public class SubscriptionManager {
// we are the first one in the map
subHolderSingle.set(subHolderSingle.initialValue());
StrongConcurrentSetV8<Class<?>> types1 = this.superClassesCache.get(superType1);
StrongConcurrentSetV8<Class<?>> types2 = this.superClassesCache.get(superType2);
StrongConcurrentSetV8<Class<?>> types3 = this.superClassesCache.get(superType3);
StrongConcurrentSetV8<Class<?>> types1 = this.utils.getSuperClasses(superType1);
StrongConcurrentSetV8<Class<?>> types2 = this.utils.getSuperClasses(superType2);
StrongConcurrentSetV8<Class<?>> types3 = this.utils.getSuperClasses(superType3);
StrongConcurrentSetV8<Subscription> subs;
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> leaf1;

View File

@ -0,0 +1,114 @@
package dorkbox.util.messagebus.common;
import java.lang.reflect.Array;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.thread.ClassHolder;
public class SubscriptionUtils {
private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache;
private final ConcurrentMap<Class<?>, StrongConcurrentSetV8<Class<?>>> superClassesCache;
private final ClassHolder classHolderSingle;
public SubscriptionUtils(float loadFactor, int stripeSize) {
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(64, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(64, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, StrongConcurrentSetV8<Class<?>>>(64, loadFactor, stripeSize);
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize);
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*/
public StrongConcurrentSetV8<Class<?>> getSuperClasses(Class<?> clazz) {
return getSuperClasses(clazz, isArray(clazz));
}
public final StrongConcurrentSetV8<Class<?>> getSuperClasses(Class<?> clazz, boolean isArray) {
// this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, StrongConcurrentSetV8<Class<?>>> local = this.superClassesCache;
ClassHolder classHolderSingle = this.classHolderSingle;
StrongConcurrentSetV8<Class<?>> classes = classHolderSingle.get();
StrongConcurrentSetV8<Class<?>> putIfAbsent = local.putIfAbsent(clazz, classes);
if (putIfAbsent == null) {
// we are the first one in the map
classHolderSingle.set(classHolderSingle.initialValue());
// it doesn't matter if concurrent access stomps on values, since they are always the same.
StrongConcurrentSetV8<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ISetEntry<Class<?>> current = superTypes.head;
Class<?> c;
while (current != null) {
c = current.getValue();
current = current.next();
if (isArray) {
c = getArrayClass(c);
}
if (c != clazz) {
classes.add(c);
}
}
return classes;
} else {
// someone beat us
return putIfAbsent;
}
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/
public final Class<?> getArrayClass(Class<?> c) {
Map<Class<?>, Class<?>> arrayVersionCache = this.arrayVersionCache;
Class<?> clazz = arrayVersionCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass();
arrayVersionCache.put(c, clazz);
}
return clazz;
}
/**
* Cache the values of JNI method, isArray(c)
* @return true if the class c is an array type
*/
@SuppressWarnings("boxing")
public final boolean isArray(Class<?> c) {
Map<Class<?>, Boolean> isArrayCache = this.isArrayCache;
Boolean isArray = isArrayCache.get(c);
if (isArray == null) {
boolean b = c.isArray();
isArrayCache.put(c, b);
return b;
}
return isArray;
}
public void shutdown() {
this.isArrayCache.clear();
this.arrayVersionCache.clear();
this.superClassesCache.clear();
}
}

View File

@ -0,0 +1,119 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.annotations.Handler;
public class PerfTest_MBassador {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 10;
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 4;
final int runs = 5;
MultiMBassador bus = new MultiMBassador(2);
Listener listener1 = new Listener();
bus.subscribe(listener1);
long average = averageRun(warmupRuns, runs, bus, true, concurrency, REPETITIONS);
System.out.format("summary,PublishPerfTest, %,d\n", average);
}
public static long averageRun(int warmUpRuns, int sumCount, MultiMBassador bus, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, bus, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, MultiMBassador bus, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(bus, repetitions);
threads[i] = new Thread(producers[i], "Producer " + i);
}
for (int i=0;i<concurrency;i++) {
threads[i].start();
}
for (int i=0;i<concurrency;i++) {
threads[i].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (producers[i].end > end) {
end = producers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
if (showStats) {
System.out.format("%d - ops/sec=%,d\n", runNumber, ops);
}
return ops;
}
public static class Producer implements Runnable {
private final MultiMBassador bus;
volatile long start;
volatile long end;
private int repetitions;
public Producer(MultiMBassador bus, int repetitions) {
this.bus = bus;
this.repetitions = repetitions;
}
@Override
public void run() {
MultiMBassador bus = this.bus;
int i = this.repetitions;
this.start = System.nanoTime();
do {
bus.publish(TEST_VALUE);
} while (0 != --i);
this.end = System.nanoTime();
}
}
@SuppressWarnings("unused")
public static class Listener {
@Handler
public void handleSync(Integer o1) {
}
@Handler(acceptVarargs=true)
public void handleSync(Object... o) {
}
}
}

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedBlockingQueue;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedBlockingQueue;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedBlockingQueue;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedTransferQueue;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedTransferQueue;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedTransferQueue;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import org.jctools.queues.MpmcArrayQueue;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import org.jctools.queues.MpmcArrayQueue;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus;
package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;