WIP - getting better performance, using non-blocking. around 300ms for 2million, 550 for 4million
This commit is contained in:
parent
120054afd4
commit
86d04c899c
|
@ -131,13 +131,9 @@ public class MultiMBassador implements IMessageBus {
|
|||
SubscriptionManager manager = this.subscriptionManager;
|
||||
|
||||
Class<?> messageClass = message.getClass();
|
||||
// manager.readLock();
|
||||
|
||||
|
||||
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
||||
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
|
||||
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
|
||||
// manager.readUnLock();
|
||||
|
||||
|
||||
// Run subscriptions
|
||||
if (subscriptions != null && !subscriptions.isEmpty()) {
|
||||
for (Subscription sub : subscriptions) {
|
||||
|
@ -145,10 +141,8 @@ public class MultiMBassador implements IMessageBus {
|
|||
sub.publishToSubscription(this, message);
|
||||
}
|
||||
} else {
|
||||
// manager.readLock();
|
||||
// Dead Event must EXACTLY MATCH (no subclasses or varargs permitted)
|
||||
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
// manager.readUnLock();
|
||||
|
||||
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
|
||||
DeadMessage deadMessage = new DeadMessage(message);
|
||||
|
@ -161,7 +155,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
}
|
||||
|
||||
|
||||
|
||||
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
|
||||
// now get superClasses
|
||||
if (superSubscriptions != null) {
|
||||
for (Subscription sub : superSubscriptions) {
|
||||
|
@ -169,28 +163,6 @@ public class MultiMBassador implements IMessageBus {
|
|||
sub.publishToSubscription(this, message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// now get varargs
|
||||
if (varArgs != null && !varArgs.isEmpty()) {
|
||||
// messy, but the ONLY way to do it.
|
||||
Object[] vararg = null;
|
||||
|
||||
for (Subscription sub : varArgs) {
|
||||
if (sub.isVarArg()) {
|
||||
if (vararg == null) {
|
||||
vararg = (Object[]) Array.newInstance(messageClass, 1);
|
||||
vararg[0] = message;
|
||||
|
||||
Object[] newInstance = new Object[1];
|
||||
newInstance[0] = vararg;
|
||||
vararg = newInstance;
|
||||
}
|
||||
sub.publishToSubscription(this, vararg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("null")
|
||||
|
|
|
@ -228,7 +228,7 @@ public class IdentityObjectTree<KEY, VALUE> {
|
|||
IdentityObjectTree<KEY, VALUE> objectTree;
|
||||
|
||||
if (this.children == null) {
|
||||
this.children = new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(4, .9f, 1);
|
||||
this.children = new ConcurrentHashMap<KEY, IdentityObjectTree<KEY, VALUE>>(2, .8f, 1);
|
||||
|
||||
// might as well add too
|
||||
objectTree = new IdentityObjectTree<KEY, VALUE>();
|
||||
|
|
|
@ -33,8 +33,6 @@ public class MessageHandler {
|
|||
private final boolean acceptsSubtypes;
|
||||
private final MessageListener listenerConfig;
|
||||
|
||||
// if ONE of the handled messages is of type array, then we configure it to ALSO accept var args!
|
||||
private final boolean isVarArg;
|
||||
private final boolean isSynchronized;
|
||||
|
||||
public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata){
|
||||
|
@ -53,10 +51,6 @@ public class MessageHandler {
|
|||
this.listenerConfig = listenerMetadata;
|
||||
this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null;
|
||||
this.handledMessages = handledMessages;
|
||||
|
||||
|
||||
// if ONE of the handled messages is of type array, then we configure it to ALSO accept var args!
|
||||
this.isVarArg = handledMessages.length == 1 && handledMessages[0].isArray();
|
||||
}
|
||||
|
||||
public boolean isSynchronized(){
|
||||
|
@ -75,15 +69,6 @@ public class MessageHandler {
|
|||
return this.handledMessages;
|
||||
}
|
||||
|
||||
/*
|
||||
* @author dorkbox, llc
|
||||
* Date: 2/2/15
|
||||
*/
|
||||
/** Check if this handler permits sending objects as a VarArg (variable argument) */
|
||||
public boolean isVarArg() {
|
||||
return this.isVarArg;
|
||||
}
|
||||
|
||||
public boolean acceptsSubtypes() {
|
||||
return this.acceptsSubtypes;
|
||||
}
|
||||
|
@ -93,7 +78,6 @@ public class MessageHandler {
|
|||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + (this.acceptsSubtypes ? 1231 : 1237);
|
||||
result = prime * result + (this.isVarArg ? 1231 : 1237);
|
||||
result = prime * result + Arrays.hashCode(this.handledMessages);
|
||||
result = prime * result + (this.handler == null ? 0 : this.handler.hashCode());
|
||||
result = prime * result + (this.isSynchronized ? 1231 : 1237);
|
||||
|
@ -115,9 +99,6 @@ public class MessageHandler {
|
|||
if (this.acceptsSubtypes != other.acceptsSubtypes) {
|
||||
return false;
|
||||
}
|
||||
if (this.isVarArg != other.isVarArg) {
|
||||
return false;
|
||||
}
|
||||
if (!Arrays.equals(this.handledMessages, other.handledMessages)) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -57,11 +57,6 @@ public class Subscription {
|
|||
return this.handlerMetadata.isFromListener(listener);
|
||||
}
|
||||
|
||||
/** Check if this subscription permits sending objects as a VarArg (variable argument) */
|
||||
public boolean isVarArg() {
|
||||
return this.handlerMetadata.isVarArg();
|
||||
}
|
||||
|
||||
public boolean acceptsSubtypes() {
|
||||
return this.handlerMetadata.acceptsSubtypes();
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package net.engio.mbassy.multi.subscription;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
|
@ -52,9 +51,6 @@ public class SubscriptionManager {
|
|||
|
||||
private final Object holder = new Object[0];
|
||||
|
||||
// remember classes that can have VarArg casting performed
|
||||
private final Map<Class<?>, Class<?>> varArgClasses;
|
||||
|
||||
private final Map<Class<?>, Collection<Class<?>>> superClassesCache;
|
||||
|
||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||
|
@ -80,7 +76,6 @@ public class SubscriptionManager {
|
|||
// // only used during SUB/UNSUB
|
||||
// this.subscriptionsPerListener = new IdentityHashMap<Class<?>, Collection<Subscription>>(4);
|
||||
//
|
||||
// this.varArgClasses = new IdentityHashMap<Class<?>, Class<?>>(8);
|
||||
// this.superClassesCache = new IdentityHashMap<Class<?>, Collection<Class<?>>>(8);
|
||||
|
||||
this.subscriptionsPerMessageSingle = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||
|
@ -89,12 +84,17 @@ public class SubscriptionManager {
|
|||
// only used during SUB/UNSUB
|
||||
this.subscriptionsPerListener = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(4, this.LOAD_FACTOR, 1);
|
||||
|
||||
this.varArgClasses = new ConcurrentHashMap<Class<?>, Class<?>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||
this.superClassesCache = new ConcurrentHashMap<Class<?>, Collection<Class<?>>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||
|
||||
this.nonListeners = new ConcurrentHashMap<Class<?>, Object>(4, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||
}
|
||||
|
||||
private final void resetSuperClassSubs() {
|
||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
||||
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||
}
|
||||
|
||||
public void unsubscribe(Object listener) {
|
||||
if (listener == null) {
|
||||
return;
|
||||
|
@ -132,9 +132,7 @@ public class SubscriptionManager {
|
|||
// remove element
|
||||
this.subscriptionsPerMessageSingle.remove(clazz);
|
||||
|
||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
||||
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||
resetSuperClassSubs();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -199,10 +197,6 @@ public class SubscriptionManager {
|
|||
UPDATE.lock();
|
||||
subscriptions = this.subscriptionsPerListener.get(listenerClass);
|
||||
|
||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
||||
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||
|
||||
if (subscriptions != null) {
|
||||
// subscriptions already exist and must only be updated
|
||||
for (Subscription subscription : subscriptions) {
|
||||
|
@ -223,6 +217,8 @@ public class SubscriptionManager {
|
|||
|
||||
subscriptions = new StrongConcurrentSet<Subscription>(8, this.LOAD_FACTOR);
|
||||
|
||||
resetSuperClassSubs();
|
||||
|
||||
// create NEW subscriptions for all detected message handlers
|
||||
for (MessageHandler messageHandler : messageHandlers) {
|
||||
// create the subscription
|
||||
|
@ -249,26 +245,7 @@ public class SubscriptionManager {
|
|||
subs.add(subscription);
|
||||
}
|
||||
|
||||
// have to save our the VarArg class types, because creating var-arg arrays for objects is expensive
|
||||
if (subscription.isVarArg()) {
|
||||
Class<?> componentType = clazz.getComponentType();
|
||||
this.varArgClasses.put(componentType, clazz);
|
||||
|
||||
// since it's vararg, this means that it's an ARRAY, so we ALSO
|
||||
// have to add the component classes of the array
|
||||
if (acceptsSubtypes) {
|
||||
Collection<Class<?>> superClasses = setupSuperClassCache(componentType);
|
||||
|
||||
// have to setup each vararg chain
|
||||
for (Class<?> superClass : superClasses) {
|
||||
if (!this.varArgClasses.containsKey(superClass)) {
|
||||
// this is expensive, so we check the cache first
|
||||
Class<?> c2 = Array.newInstance(superClass, 1).getClass();
|
||||
this.varArgClasses.put(superClass, c2);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (acceptsSubtypes) {
|
||||
setupSuperClassCache(clazz);
|
||||
}
|
||||
}
|
||||
|
@ -357,7 +334,7 @@ public class SubscriptionManager {
|
|||
public Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
|
||||
Map<Class<?>, Collection<Subscription>> superClassSubs = this.superClassSubscriptions;
|
||||
if (superClassSubs == null) {
|
||||
// we haven't created it yet
|
||||
// we haven't created it yet (via subscribe)
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -523,57 +500,6 @@ public class SubscriptionManager {
|
|||
return types;
|
||||
}
|
||||
|
||||
// must be protected by read lock
|
||||
public Collection<Subscription> getVarArgs(Class<?> clazz) {
|
||||
Class<?> varArgClass = this.varArgClasses.get(clazz);
|
||||
if (varArgClass != null) {
|
||||
return this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// must be protected by read lock
|
||||
public Collection<Subscription> getVarArgs(Class<?> clazz1, Class<?> clazz2) {
|
||||
if (clazz1 == clazz2) {
|
||||
Class<?> varArgClass = this.varArgClasses.get(clazz1);
|
||||
if (varArgClass != null) {
|
||||
return this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// must be protected by read lock
|
||||
public Collection<Subscription> getVarArgs(Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) {
|
||||
if (clazz1 == clazz2 && clazz2 == clazz3) {
|
||||
Class<?> varArgClass = this.varArgClasses.get(clazz1);
|
||||
if (varArgClass != null) {
|
||||
return this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// must be protected by read lock
|
||||
public Collection<Subscription> getVarArgs(Class<?>... classes) {
|
||||
// classes IS ALREADY ALL SAME TYPE!
|
||||
Class<?> firstClass = classes[0];
|
||||
|
||||
Class<?> varArgClass = this.varArgClasses.get(firstClass);
|
||||
if (varArgClass != null) {
|
||||
return this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void readLock() {
|
||||
this.LOCK.readLock().lock();
|
||||
}
|
||||
|
||||
public void readUnLock() {
|
||||
this.LOCK.readLock().unlock();
|
||||
}
|
||||
|
||||
public static class SuperClassIterator implements Iterator<Class<?>> {
|
||||
private final Iterator<Class<?>> iterator;
|
||||
private Class<?> clazz;
|
||||
|
|
Loading…
Reference in New Issue
Block a user