Really fast, not sure why exactly
This commit is contained in:
parent
3db34cc7dd
commit
bd25415dff
|
@ -1,6 +1,7 @@
|
|||
package dorkbox.util.messagebus.subscription;
|
||||
|
||||
import com.esotericsoftware.reflectasm.MethodAccess;
|
||||
import dorkbox.util.messagebus.common.HashMapTree;
|
||||
import dorkbox.util.messagebus.common.MessageHandler;
|
||||
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
|
||||
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
|
||||
|
@ -9,8 +10,11 @@ import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
|
|||
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
|
||||
import org.omg.CORBA.BooleanHolder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
|
@ -54,6 +58,10 @@ public class Subscription {
|
|||
this.invocation = invocation;
|
||||
}
|
||||
|
||||
public final MessageHandler getHandlerMetadata() {
|
||||
return handlerMetadata;
|
||||
}
|
||||
|
||||
public Class<?>[] getHandledMessageTypes() {
|
||||
return this.handlerMetadata.getHandledMessages();
|
||||
}
|
||||
|
@ -236,4 +244,99 @@ public class Subscription {
|
|||
Subscription other = (Subscription) obj;
|
||||
return this.ID == other.ID;
|
||||
}
|
||||
|
||||
// inside a write lock
|
||||
// also puts it into the correct map if it's not already there
|
||||
public Collection<Subscription> createPublicationSubscriptions(final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
|
||||
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
|
||||
AtomicBoolean varArgPossibility, SubscriptionUtils utils) {
|
||||
|
||||
final Class<?>[] messageHandlerTypes = handlerMetadata.getHandledMessages();
|
||||
final int size = messageHandlerTypes.length;
|
||||
|
||||
// ConcurrentSet<Subscription> subsPerType;
|
||||
|
||||
// SubscriptionUtils utils = this.utils;
|
||||
Class<?> type0 = messageHandlerTypes[0];
|
||||
|
||||
switch (size) {
|
||||
case 1: {
|
||||
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
|
||||
if (subs == null) {
|
||||
subs = new ArrayList<Subscription>();
|
||||
|
||||
boolean isArray = type0.isArray();
|
||||
if (isArray) {
|
||||
varArgPossibility.lazySet(true);
|
||||
}
|
||||
utils.cacheSuperClasses(type0);
|
||||
|
||||
subsPerMessageSingle.put(type0, subs);
|
||||
}
|
||||
|
||||
return subs;
|
||||
}
|
||||
case 2: {
|
||||
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
|
||||
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
|
||||
// subsPerType = subHolderSingle.publish();
|
||||
//
|
||||
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]);
|
||||
// if (putIfAbsent != null) {
|
||||
// return putIfAbsent;
|
||||
// } else {
|
||||
// subHolderSingle.set(subHolderSingle.initialValue());
|
||||
//
|
||||
// // cache the super classes
|
||||
// utils.cacheSuperClasses(type0);
|
||||
// utils.cacheSuperClasses(types[1]);
|
||||
//
|
||||
// return subsPerType;
|
||||
// }
|
||||
}
|
||||
case 3: {
|
||||
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
|
||||
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
|
||||
// subsPerType = subHolderSingle.publish();
|
||||
//
|
||||
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]);
|
||||
// if (putIfAbsent != null) {
|
||||
// return putIfAbsent;
|
||||
// } else {
|
||||
// subHolderSingle.set(subHolderSingle.initialValue());
|
||||
//
|
||||
// // cache the super classes
|
||||
// utils.cacheSuperClasses(type0);
|
||||
// utils.cacheSuperClasses(types[1]);
|
||||
// utils.cacheSuperClasses(types[2]);
|
||||
//
|
||||
// return subsPerType;
|
||||
// }
|
||||
}
|
||||
default: {
|
||||
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
|
||||
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
|
||||
// subsPerType = subHolderSingle.publish();
|
||||
//
|
||||
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
|
||||
// if (putIfAbsent != null) {
|
||||
// return putIfAbsent;
|
||||
// } else {
|
||||
// subHolderSingle.set(subHolderSingle.initialValue());
|
||||
//
|
||||
// Class<?> c;
|
||||
// int length = types.length;
|
||||
// for (int i = 0; i < length; i++) {
|
||||
// c = types[i];
|
||||
//
|
||||
// // cache the super classes
|
||||
// utils.cacheSuperClasses(c);
|
||||
// }
|
||||
//
|
||||
// return subsPerType;
|
||||
// }
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,64 +115,72 @@ public class SubscriptionManager {
|
|||
|
||||
// the subscriptions from the map were null, so create them
|
||||
if (subscriptions == null) {
|
||||
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
|
||||
// of the huge number of reads compared to writes.
|
||||
// it is important to note that this section CAN be repeated, however the write lock is gained before
|
||||
// anything 'permanent' is saved. This is so the time spent inside the writelock is minimized.
|
||||
|
||||
StampedLock lock = this.lock;
|
||||
long stamp = lock.writeLock();
|
||||
// Lock writeLock = this.lock.writeLock();
|
||||
// writeLock.lock();
|
||||
|
||||
ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
|
||||
subscriptions = subsPerListenerMap.get(listenerClass);
|
||||
|
||||
// it was still null, so we actually have to create the rest of the subs
|
||||
if (subscriptions == null) {
|
||||
MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
|
||||
int handlersSize = messageHandlers.length;
|
||||
|
||||
// remember the class as non listening class if no handlers are found
|
||||
if (handlersSize == 0) {
|
||||
this.nonListeners.put(listenerClass, Boolean.TRUE);
|
||||
lock.unlockWrite(stamp);
|
||||
// writeLock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
|
||||
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
|
||||
|
||||
ArrayList<Subscription> subsPerListener = new ArrayList<Subscription>(handlersSize);
|
||||
Collection<Subscription> subsForPublication = null;
|
||||
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
|
||||
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
|
||||
|
||||
final ArrayList<Subscription> subsPerListener = new ArrayList<Subscription>(handlersSize);
|
||||
Collection<Subscription> subsForPublication;
|
||||
|
||||
// create the subscription
|
||||
MessageHandler messageHandler;
|
||||
Subscription subscription;
|
||||
|
||||
for (int i = 0; i < handlersSize; i++) {
|
||||
messageHandler = messageHandlers[i];
|
||||
|
||||
// create the subscription
|
||||
Subscription subscription = new Subscription(messageHandler);
|
||||
subscription = new Subscription(messageHandler);
|
||||
subscription.subscribe(listener);
|
||||
|
||||
subsPerListener.add(subscription); // activates this sub for sub/unsub
|
||||
}
|
||||
|
||||
|
||||
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
|
||||
// of the huge number of reads compared to writes.
|
||||
|
||||
StampedLock lock = this.lock;
|
||||
long stamp = lock.writeLock();
|
||||
|
||||
final ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
|
||||
subscriptions = subsPerListenerMap.get(listenerClass);
|
||||
|
||||
// it was still null, so we actually have to create the rest of the subs
|
||||
if (subscriptions == null) {
|
||||
final AtomicBoolean varArgPossibility = this.varArgPossibility;
|
||||
final SubscriptionUtils utils = this.utils;
|
||||
|
||||
for (int i = 0; i < handlersSize; i++) {
|
||||
subscription = subsPerListener.get(i);
|
||||
|
||||
// now add this subscription to each of the handled types
|
||||
subsForPublication = getSubsForPublication(messageHandler, subsPerMessageSingle,
|
||||
subsPerMessageMulti);
|
||||
subsForPublication = subscription
|
||||
.createPublicationSubscriptions(subsPerMessageSingle, subsPerMessageMulti, varArgPossibility, utils);
|
||||
|
||||
//noinspection ConstantConditions
|
||||
subsForPublication.add(subscription); // activates this sub for publication
|
||||
}
|
||||
|
||||
subsPerListenerMap.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()]));
|
||||
lock.unlockWrite(stamp);
|
||||
// writeLock.unlock();
|
||||
|
||||
return;
|
||||
}
|
||||
else {
|
||||
lock.unlockWrite(stamp);
|
||||
// writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,100 +194,6 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
|
||||
// inside a write lock
|
||||
// also puts it into the correct map if it's not already there
|
||||
private Collection<Subscription> getSubsForPublication(final MessageHandler messageHandler,
|
||||
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
|
||||
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti) {
|
||||
|
||||
final Class<?>[] messageHandlerTypes = messageHandler.getHandledMessages();
|
||||
final int size = messageHandlerTypes.length;
|
||||
|
||||
// ConcurrentSet<Subscription> subsPerType;
|
||||
|
||||
// SubscriptionUtils utils = this.utils;
|
||||
Class<?> type0 = messageHandlerTypes[0];
|
||||
|
||||
switch (size) {
|
||||
case 1: {
|
||||
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
|
||||
if (subs == null) {
|
||||
subs = new ArrayList<Subscription>();
|
||||
|
||||
boolean isArray = type0.isArray();
|
||||
if (isArray) {
|
||||
varArgPossibility.lazySet(true);
|
||||
}
|
||||
utils.cacheSuperClasses(type0);
|
||||
subsPerMessageSingle.put(type0, subs);
|
||||
}
|
||||
|
||||
return subs;
|
||||
}
|
||||
case 2: {
|
||||
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
|
||||
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
|
||||
// subsPerType = subHolderSingle.publish();
|
||||
//
|
||||
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]);
|
||||
// if (putIfAbsent != null) {
|
||||
// return putIfAbsent;
|
||||
// } else {
|
||||
// subHolderSingle.set(subHolderSingle.initialValue());
|
||||
//
|
||||
// // cache the super classes
|
||||
// utils.cacheSuperClasses(type0);
|
||||
// utils.cacheSuperClasses(types[1]);
|
||||
//
|
||||
// return subsPerType;
|
||||
// }
|
||||
}
|
||||
case 3: {
|
||||
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
|
||||
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
|
||||
// subsPerType = subHolderSingle.publish();
|
||||
//
|
||||
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]);
|
||||
// if (putIfAbsent != null) {
|
||||
// return putIfAbsent;
|
||||
// } else {
|
||||
// subHolderSingle.set(subHolderSingle.initialValue());
|
||||
//
|
||||
// // cache the super classes
|
||||
// utils.cacheSuperClasses(type0);
|
||||
// utils.cacheSuperClasses(types[1]);
|
||||
// utils.cacheSuperClasses(types[2]);
|
||||
//
|
||||
// return subsPerType;
|
||||
// }
|
||||
}
|
||||
default: {
|
||||
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
|
||||
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
|
||||
// subsPerType = subHolderSingle.publish();
|
||||
//
|
||||
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
|
||||
// if (putIfAbsent != null) {
|
||||
// return putIfAbsent;
|
||||
// } else {
|
||||
// subHolderSingle.set(subHolderSingle.initialValue());
|
||||
//
|
||||
// Class<?> c;
|
||||
// int length = types.length;
|
||||
// for (int i = 0; i < length; i++) {
|
||||
// c = types[i];
|
||||
//
|
||||
// // cache the super classes
|
||||
// utils.cacheSuperClasses(c);
|
||||
// }
|
||||
//
|
||||
// return subsPerType;
|
||||
// }
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public final void unsubscribe(Object listener) {
|
||||
if (listener == null) {
|
||||
return;
|
||||
|
@ -305,6 +219,7 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private void clearConcurrentCollections() {
|
||||
this.utils.clear();
|
||||
this.varArgUtils.clear();
|
||||
|
|
Loading…
Reference in New Issue
Block a user