WIP, converting parts to array for sub/unsub

This commit is contained in:
nathan 2015-05-30 03:27:33 +02:00
parent df14de3252
commit bbade8aa72
9 changed files with 437 additions and 491 deletions

View File

@ -199,24 +199,25 @@ public class MultiMBassador implements IMessageBus {
Subscription[] subscriptions;
Subscription sub;
subscriptions = manager.getSubscriptionsByMessageType(messageClass); // can return null
if (this.forceExactMatches) {
subscriptions = manager.getSubscriptionsForcedExact(messageClass);
} else {
subscriptions = manager.getSubscriptions(messageClass);
}
// Run subscriptions
if (subscriptions != null) {
int length = subscriptions.length;
if (length > 0) {
for (int i=0;i<length;i++) {
sub = subscriptions[i];
int length = subscriptions.length;
if (length > 0) {
for (int i=0;i<length;i++) {
sub = subscriptions[i];
sub.publish(message);
}
subsPublished = true;
sub.publish(message);
}
subsPublished = true;
}
if (!this.forceExactMatches) {
// if (!this.forceExactMatches) {
// Subscription[] superSubscriptions = manager.getSuperSubscriptions(messageClass); // NOT return null
// // now get superClasses
// int length = superSubscriptions.length;
@ -273,21 +274,19 @@ public class MultiMBassador implements IMessageBus {
// }
// }
// }
}
// }
if (!subsPublished) {
// Dead Event must EXACTLY MATCH (no subclasses)
Subscription[] deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null) {
int length = deadSubscriptions.length;
if (length > 0) {
DeadMessage deadMessage = new DeadMessage(message);
Subscription[] deadSubscriptions = manager.getSubscriptionsForcedExact(DeadMessage.class);
length = deadSubscriptions.length;
if (length > 0) {
DeadMessage deadMessage = new DeadMessage(message);
for (int i=0;i<length;i++) {
sub = deadSubscriptions[i];
for (int i=0;i<length;i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
sub.publish(deadMessage);
}
}
}

View File

@ -2,9 +2,11 @@ package dorkbox.util.messagebus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
@ -12,8 +14,6 @@ import dorkbox.util.messagebus.common.SubscriptionUtils;
import dorkbox.util.messagebus.common.VarArgPossibility;
import dorkbox.util.messagebus.common.VarArgUtils;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.common.thread.StampedLock;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MetadataReader;
import dorkbox.util.messagebus.subscription.Subscription;
@ -61,7 +61,7 @@ public class SubscriptionManager {
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> subscriptionsPerListener;
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
private final VarArgUtils varArgUtils;
@ -70,12 +70,8 @@ public class SubscriptionManager {
private final int STRIPE_SIZE;
private final StampedLock lock = new StampedLock();
// private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent;
// private final StampedLock lock = new StampedLock();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
SubscriptionManager(int numberOfThreads) {
this.STRIPE_SIZE = numberOfThreads;
@ -90,7 +86,7 @@ public class SubscriptionManager {
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor, this.STRIPE_SIZE);
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>();
}
this.utils = new SubscriptionUtils(this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor, numberOfThreads);
@ -98,9 +94,6 @@ public class SubscriptionManager {
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, this.subscriptionsPerMessageSingle, loadFactor, this.STRIPE_SIZE);
this.subHolderSingle = new SubscriptionHolder();
this.subHolderConcurrent = new SubscriptionHolder();
}
public void shutdown() {
@ -133,124 +126,92 @@ public class SubscriptionManager {
// these are concurrent collections
clearConcurrentCollections();
Subscription[] subscriptions = getListenerSubs(listenerClass);
ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> subsPerListener2 = this.subscriptionsPerListener;
ConcurrentSet<Subscription> subsPerListener;
// the subscriptions from the map were null, so create them
if (subscriptions == null) {
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
Lock writeLock = this.lock.writeLock();
writeLock.lock();
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility,
// but for concurrency because there are race conditions here if we don't.
StampedLock lock = this.lock;
long stamp = lock.writeLock();
// Lock writeLock = this.lock.writeLock();
// writeLock.lock();
ConcurrentMap<Class<?>, Subscription[]> subsPerListener2 = this.subscriptionsPerListener;
subscriptions = subsPerListener2.get(listenerClass);
subsPerListener = subsPerListener2.get(listenerClass);
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
int handlersSize = messageHandlers.length;
if (subsPerListener != null) {
// subscriptions already exist and must only be updated
Iterator<Subscription> iterator;
Subscription sub;
// remember the class as non listening class if no handlers are found
if (handlersSize == 0) {
this.nonListeners.put(listenerClass, Boolean.TRUE);
writeLock.unlock();
return;
}
for (iterator = subsPerListener.iterator(); iterator.hasNext();) {
sub = iterator.next();
sub.subscribe(listener);
}
ArrayList<Subscription> subsPerListener = new ArrayList<Subscription>();
Collection<Subscription> subsForPublication = null;
this.lock.unlockWrite(stamp);
// writeLock.unlock();
return;
}
VarArgPossibility varArgPossibility = this.varArgPossibility;
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
// subsPerListener == null, so we now enter an exclusive write lock and double check.
long origStamp = stamp;
if ((stamp = this.lock.tryConvertToWriteLock(stamp)) == 0) {
this.lock.unlockRead(origStamp);
stamp = this.lock.writeLock();
}
subsPerListener = subsPerListener2.get(listenerClass);
if (subsPerListener != null) {
// subscriptions already exist and must only be updated
Iterator<Subscription> iterator;
Subscription sub;
for (iterator = subsPerListener.iterator(); iterator.hasNext();) {
sub = iterator.next();
sub.subscribe(listener);
}
this.lock.unlockWrite(stamp);
// writeLock.unlock();
return;
}
// subsPerListener == null, which means we really do have to create it.
// a listener is subscribed for the first time
Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass, LOAD_FACTOR, this.STRIPE_SIZE).getHandlers();
int handlersSize = messageHandlers.size();
if (handlersSize == 0) {
// remember the class as non listening class if no handlers are found
this.nonListeners.put(listenerClass, Boolean.TRUE);
this.lock.unlockWrite(stamp);
// writeLock.unlock();
return;
} else {
subsPerListener = new ConcurrentSet<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
VarArgPossibility varArgPossibility = this.varArgPossibility;
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
Iterator<MessageHandler> iterator;
MessageHandler messageHandler;
Collection<Subscription> subsForPublication = null;
for (iterator = messageHandlers.iterator(); iterator.hasNext();) {
messageHandler = iterator.next();
// now add this subscription to each of the handled types
// this can safely be called concurrently
subsForPublication = getSubsForPublication(messageHandler, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
// create the subscription
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
MessageHandler messageHandler;
subsPerListener.add(subscription); // activates this sub for sub/unsub
subsForPublication.add(subscription); // activates this sub for publication
for (int i=0;i<handlersSize;i++) {
messageHandler = messageHandlers[i];
// create the subscription
Subscription subscription = new Subscription(messageHandler);
subscription.subscribe(listener);
// now add this subscription to each of the handled types
subsForPublication = getSubsForPublication(messageHandler.getHandledMessages(), subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
subsPerListener.add(subscription); // activates this sub for sub/unsub
subsForPublication.add(subscription); // activates this sub for publication
}
subsPerListener2.put(listenerClass, subsPerListener.toArray(new Subscription[subsPerListener.size()]));
writeLock.unlock();
return;
} else {
writeLock.unlock();
}
}
subsPerListener2.put(listenerClass, subsPerListener);
// subscriptions already exist and must only be updated
// only get here if our single-check was OK, or our double-check was OK
Subscription subscription;
this.lock.unlockWrite(stamp);
// writeLock.unlock();
for (int i=0;i<subscriptions.length;i++) {
subscription = subscriptions[i];
subscription.subscribe(listener);
}
}
// inside a write lock
private final Collection<Subscription> getSubsForPublication(MessageHandler messageHandler,
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
VarArgPossibility varArgPossibility) {
private final Collection<Subscription> getSubsForPublication(final Class<?>[] messageHandlerTypes,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final VarArgPossibility varArgPossibility) {
Class<?>[] types = messageHandler.getHandledMessages();
int size = types.length;
final int size = messageHandlerTypes.length;
// ConcurrentSet<Subscription> subsPerType;
SubscriptionUtils utils = this.utils;
Class<?> type0 = types[0];
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 1: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
subs = new ArrayList<>(8);
subs = new ArrayList<Subscription>(8);
boolean isArray = utils.isArray(type0);
if (isArray) {
@ -341,26 +302,17 @@ public class SubscriptionManager {
}
// these are concurrent collections
// clearConcurrentCollections();
clearConcurrentCollections();
long stamp = this.lock.writeLock();
// Lock writeLock = this.lock.writeLock();
// writeLock.lock();
Collection<Subscription> subscriptions = this.subscriptionsPerListener.get(listenerClass);
Subscription[] subscriptions = getListenerSubs(listenerClass);
if (subscriptions != null) {
Iterator<Subscription> iterator;
Subscription sub;
Subscription subscription;
for (iterator = subscriptions.iterator(); iterator.hasNext();) {
sub = iterator.next();
sub.unsubscribe(listener);
for (int i=0;i<subscriptions.length;i++) {
subscription = subscriptions[i];
subscription.unsubscribe(listener);
}
}
this.lock.unlockWrite(stamp);
// writeLock.unlock();
}
private void clearConcurrentCollections() {
@ -368,60 +320,122 @@ public class SubscriptionManager {
this.varArgUtils.clear();
}
// CAN RETURN NULL
public final Subscription[] getSubscriptionsByMessageType(Class<?> messageType) {
Collection<Subscription> collection;
Subscription[] subscriptions = null;
private final Subscription[] getListenerSubs(Class<?> listenerClass) {
Subscription[] subscriptions;
long stamp = this.lock.readLock();
// Lock writeLock = this.lock.readLock();
// writeLock.lock();
try {
collection = this.subscriptionsPerMessageSingle.get(messageType);
if (collection != null) {
subscriptions = collection.toArray(EMPTY);
}
}
finally {
this.lock.unlockRead(stamp);
// writeLock.unlock();
}
// long stamp = this.lock.tryOptimisticRead(); // non blocking
//
// collection = this.subscriptionsPerMessageSingle.get(messageType);
// if (collection != null) {
//// subscriptions = new ArrayDeque<>(collection);
// subscriptions = new ArrayList<>(collection);
//// subscriptions = new LinkedList<>();
//// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
//
//// subscriptions.addAll(collection);
// }
//
// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock
// stamp = this.lock.readLock();
// try {
// collection = this.subscriptionsPerMessageSingle.get(messageType);
// if (collection != null) {
//// subscriptions = new ArrayDeque<>(collection);
// subscriptions = new ArrayList<>(collection);
//// subscriptions = new LinkedList<>();
//// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
//
//// subscriptions.addAll(collection);
// }
// }
// finally {
// this.lock.unlockRead(stamp);
// }
// }
Lock readLock = this.lock.readLock();
readLock.lock();
subscriptions = this.subscriptionsPerListener.get(listenerClass);
readLock.unlock();
return subscriptions;
}
// retrieves all of the appropriate subscriptions for the message type
public final Subscription[] getSubscriptionsForcedExact(final Class<?> messageClass) {
ArrayList<Subscription> collection;
Subscription[] subscriptions;
Lock readLock = this.lock.readLock();
readLock.lock();
collection = this.subscriptionsPerMessageSingle.get(messageClass);
if (collection != null) {
subscriptions = collection.toArray(new Subscription[collection.size()]);
} else {
subscriptions = EMPTY;
}
readLock.unlock();
return subscriptions;
}
// never return null
public final Subscription[] getSubscriptions(final Class<?> messageClass) {
ArrayList<Subscription> collection;
Subscription[] subscriptions = null;
Lock readLock = this.lock.readLock();
readLock.lock();
collection = this.subscriptionsPerMessageSingle.get(messageClass);
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
// now get superClasses
ArrayList<Subscription> superSubscriptions = this.utils.getSuperSubscriptions(messageClass); // NOT return null
collection.addAll(superSubscriptions);
} else {
// now get superClasses
collection = this.utils.getSuperSubscriptions(messageClass); // NOT return null
}
subscriptions = collection.toArray(new Subscription[collection.size()]);
readLock.unlock();
return subscriptions;
}
// CAN RETURN NULL
// public final Subscription[] getSubscriptionsByMessageType(final Class<?> messageType) {
// Collection<Subscription> collection;
// Subscription[] subscriptions = null;
//
//// long stamp = this.lock.readLock();
// Lock readLock = this.lock.readLock();
// readLock.lock();
//
// try {
// collection = this.subscriptionsPerMessageSingle.get(messageType);
// if (collection != null) {
// subscriptions = collection.toArray(EMPTY);
// }
// }
// finally {
//// this.lock.unlockRead(stamp);
// readLock.unlock();
// }
//
//
//// long stamp = this.lock.tryOptimisticRead(); // non blocking
////
//// collection = this.subscriptionsPerMessageSingle.get(messageType);
//// if (collection != null) {
////// subscriptions = new ArrayDeque<>(collection);
//// subscriptions = new ArrayList<>(collection);
////// subscriptions = new LinkedList<>();
////// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
////
////// subscriptions.addAll(collection);
//// }
////
//// if (!this.lock.validate(stamp)) { // if a write occurred, try again with a read lock
//// stamp = this.lock.readLock();
//// try {
//// collection = this.subscriptionsPerMessageSingle.get(messageType);
//// if (collection != null) {
////// subscriptions = new ArrayDeque<>(collection);
//// subscriptions = new ArrayList<>(collection);
////// subscriptions = new LinkedList<>();
////// subscriptions = new TreeSet<Subscription>(SubscriptionByPriorityDesc);
////
////// subscriptions.addAll(collection);
//// }
//// }
//// finally {
//// this.lock.unlockRead(stamp);
//// }
//// }
//
// return subscriptions;
// }
// public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
// @Override
// public int compare(Subscription o1, Subscription o2) {
@ -483,11 +497,11 @@ public class SubscriptionManager {
}
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.
public final Subscription[] getSuperSubscriptions(Class<?> superType) {
return this.utils.getSuperSubscriptions(superType);
}
// // CAN NOT RETURN NULL
// // ALSO checks to see if the superClass accepts subtypes.
// public final Subscription[] getSuperSubscriptions(Class<?> superType) {
// return this.utils.getSuperSubscriptions(superType);
// }
// CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes.

View File

@ -3,8 +3,8 @@ package dorkbox.util.messagebus.common;
import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
@ -18,13 +18,14 @@ import dorkbox.util.messagebus.common.thread.ConcurrentSet;
*/
public class ReflectionUtils {
public static Collection<Method> getMethods(Class<?> target) {
Collection<Method> hashSet = new ConcurrentSet<Method>(16, .8F, 1);
getMethods(target, hashSet);
return hashSet;
public static Method[] getMethods(Class<?> target) {
ArrayList<Method> methods = new ArrayList<Method>();
getMethods(target, methods);
return methods.toArray(new Method[methods.size()]);
}
private static void getMethods(Class<?> target, Collection<Method> methods) {
private static void getMethods(Class<?> target, ArrayList<Method> methods) {
try {
for (Method method : target.getDeclaredMethods()) {
if (getAnnotation(method, Handler.class) != null) {
@ -34,6 +35,7 @@ public class ReflectionUtils {
} catch (Exception ignored) {
}
// recursively go until root
if (!target.equals(Object.class)) {
getMethods(target.getSuperclass(), methods);
}
@ -68,8 +70,8 @@ public class ReflectionUtils {
* @param from The root class to start with
* @return A set of classes, each representing a super type of the root class
*/
public static Collection<Class<?>> getSuperTypes(Class<?> from) {
Collection<Class<?>> superclasses = new ConcurrentSet<Class<?>>(8, 0.8F, 1);
public static ArrayList<Class<?>> getSuperTypes(Class<?> from) {
ArrayList<Class<?>> superclasses = new ArrayList<Class<?>>();
collectInterfaces( from, superclasses );
@ -78,6 +80,7 @@ public class ReflectionUtils {
from = from.getSuperclass();
collectInterfaces( from, superclasses );
}
return superclasses;
}
@ -88,22 +91,22 @@ public class ReflectionUtils {
}
}
public static boolean containsOverridingMethod(final Collection<Method> allMethods, final Method methodToCheck) {
Iterator<Method> iterator;
public static final boolean containsOverridingMethod(final Method[] allMethods, final Method methodToCheck) {
final int length = allMethods.length;
Method method;
for (iterator = allMethods.iterator(); iterator.hasNext();) {
method = iterator.next();
for (int i=0;i<length;i++) {
method = allMethods[i];
if (isOverriddenBy(methodToCheck, method)) {
return true;
}
}
return false;
}
/**
* Searches for an Annotation of the given type on the class. Supports meta annotations.
*
@ -136,7 +139,7 @@ public class ReflectionUtils {
}
//
private static boolean isOverriddenBy( Method superclassMethod, Method subclassMethod ) {
private static boolean isOverriddenBy(final Method superclassMethod, final Method subclassMethod ) {
// if the declaring classes are the same or the subclass method is not defined in the subclass
// hierarchy of the given superclass method or the method names are not the same then
// subclassMethod does not override superclassMethod
@ -146,8 +149,8 @@ public class ReflectionUtils {
return false;
}
Class<?>[] superClassMethodParameters = superclassMethod.getParameterTypes();
Class<?>[] subClassMethodParameters = subclassMethod.getParameterTypes();
final Class<?>[] superClassMethodParameters = superclassMethod.getParameterTypes();
final Class<?>[] subClassMethodParameters = subclassMethod.getParameterTypes();
// method must specify the same number of parameters
//the parameters must occur in the exact same order

View File

@ -5,7 +5,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
@ -22,13 +21,13 @@ public class SubscriptionUtils {
private final Map<Class<?>, Class<?>> arrayVersionCache;
private final Map<Class<?>, Boolean> isArrayCache;
private final ConcurrentMap<Class<?>, ArrayList<Class<?>>> superClassesCache;
private final Map<Class<?>, Class<?>[]> superClassesCache;
private final ClassHolder classHolderSingle;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private final ConcurrentMap<Class<?>, ArrayList<Subscription>> superClassSubscriptions;
private final Map<Class<?>, ArrayList<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> superClassSubscriptionsMulti;
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
@ -49,7 +48,7 @@ public class SubscriptionUtils {
this.arrayVersionCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, stripeSize);
this.isArrayCache = new ConcurrentHashMapV8<Class<?>, Boolean>(32, loadFactor, stripeSize);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, ArrayList<Class<?>>>(32, loadFactor, 1);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, 1);
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
@ -66,23 +65,23 @@ public class SubscriptionUtils {
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset, since it never needs to be reset (as the class hierarchy doesn't change at runtime)
*
* if parameter clazz is of type array, then the super classes are of array type as well
*
* protected by read lock by caller
*/
public final Class<?>[] getSuperClasses_NL(Class<?> clazz, boolean isArray) {
public final Class<?>[] getSuperClasses_NL(final Class<?> clazz, final boolean isArray) {
// this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, ArrayList<Class<?>>> local = this.superClassesCache;
Class<?>[] classes;
final Map<Class<?>, Class<?>[]> local = this.superClassesCache;
Class<?>[] classes = local.get(clazz);
ArrayList<Class<?>> arrayList = local.get(clazz);
if (arrayList != null) {
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
} else {
if (classes == null) {
// get all super types of class
Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
arrayList = new ArrayList<Class<?>>(superTypes.size());
final Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ArrayList<Class<?>> newList = new ArrayList<Class<?>>(superTypes.size());
Iterator<Class<?>> iterator;
Class<?> c;
@ -94,102 +93,101 @@ public class SubscriptionUtils {
}
if (c != clazz) {
arrayList.add(c);
newList.add(c);
}
}
local.put(clazz, arrayList);
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
classes = newList.toArray(new Class<?>[newList.size()]);
local.put(clazz, classes);
}
return classes;
}
// called inside sub/unsub write lock
public void cacheSuperClasses(Class<?> clazz) {
// TODO Auto-generated method stub
public final void cacheSuperClasses(final Class<?> clazz) {
getSuperClasses_NL(clazz, isArray(clazz));
}
// called inside sub/unsub write lock
public void cacheSuperClasses(Class<?> clazz, boolean isArray) {
// TODO Auto-generated method stub
public final void cacheSuperClasses(final Class<?> clazz, final boolean isArray) {
getSuperClasses_NL(clazz, isArray);
}
public final Class<?>[] getSuperClasses(Class<?> clazz, boolean isArray) {
// this is never reset, since it never needs to be.
ConcurrentMap<Class<?>, ArrayList<Class<?>>> local = this.superClassesCache;
Class<?>[] classes;
StampedLock lock = this.superClassLock;
long stamp = lock.tryOptimisticRead();
if (stamp > 0) {
ArrayList<Class<?>> arrayList = local.get(clazz);
if (arrayList != null) {
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
if (lock.validate(stamp)) {
return classes;
} else {
stamp = lock.readLock();
arrayList = local.get(clazz);
if (arrayList != null) {
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
lock.unlockRead(stamp);
return classes;
}
}
}
}
// unable to get a valid subscription. Have to acquire a write lock
long origStamp = stamp;
if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
lock.unlockRead(origStamp);
stamp = lock.writeLock();
}
// get all super types of class
Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
ArrayList<Class<?>> arrayList = new ArrayList<Class<?>>(superTypes.size());
Iterator<Class<?>> iterator;
Class<?> c;
for (iterator = superTypes.iterator(); iterator.hasNext();) {
c = iterator.next();
if (isArray) {
c = getArrayClass(c);
}
if (c != clazz) {
arrayList.add(c);
}
}
local.put(clazz, arrayList);
classes = arrayList.toArray(SUPER_CLASS_EMPTY);
lock.unlockWrite(stamp);
return classes;
}
// public final Class<?>[] getSuperClasses(Class<?> clazz, boolean isArray) {
// // this is never reset, since it never needs to be.
// Map<Class<?>, ArrayList<Class<?>>> local = this.superClassesCache;
// Class<?>[] classes;
//
// StampedLock lock = this.superClassLock;
// long stamp = lock.tryOptimisticRead();
//
// if (stamp > 0) {
// ArrayList<Class<?>> arrayList = local.get(clazz);
// if (arrayList != null) {
// classes = arrayList.toArray(SUPER_CLASS_EMPTY);
//
// if (lock.validate(stamp)) {
// return classes;
// } else {
// stamp = lock.readLock();
//
// arrayList = local.get(clazz);
// if (arrayList != null) {
// classes = arrayList.toArray(SUPER_CLASS_EMPTY);
// lock.unlockRead(stamp);
// return classes;
// }
// }
// }
// }
//
// // unable to get a valid subscription. Have to acquire a write lock
// long origStamp = stamp;
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// lock.unlockRead(origStamp);
// stamp = lock.writeLock();
// }
//
//
// // get all super types of class
// Collection<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
// ArrayList<Class<?>> arrayList = new ArrayList<Class<?>>(superTypes.size());
// Iterator<Class<?>> iterator;
// Class<?> c;
//
// for (iterator = superTypes.iterator(); iterator.hasNext();) {
// c = iterator.next();
//
// if (isArray) {
// c = getArrayClass(c);
// }
//
// if (c != clazz) {
// arrayList.add(c);
// }
// }
//
// local.put(clazz, arrayList);
// classes = arrayList.toArray(SUPER_CLASS_EMPTY);
//
// lock.unlockWrite(stamp);
//
// return classes;
// }
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
* never reset
*/
public final Class<?> getArrayClass(Class<?> c) {
Map<Class<?>, Class<?>> arrayVersionCache = this.arrayVersionCache;
public final Class<?> getArrayClass(final Class<?> c) {
final Map<Class<?>, Class<?>> arrayVersionCache = this.arrayVersionCache;
Class<?> clazz = arrayVersionCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
Object[] newInstance = (Object[]) Array.newInstance(c, 1);
final Object[] newInstance = (Object[]) Array.newInstance(c, 1);
clazz = newInstance.getClass();
arrayVersionCache.put(c, clazz);
}
@ -202,10 +200,10 @@ public class SubscriptionUtils {
* @return true if the class c is an array type
*/
@SuppressWarnings("boxing")
public final boolean isArray(Class<?> c) {
Map<Class<?>, Boolean> isArrayCache = this.isArrayCache;
public final boolean isArray(final Class<?> c) {
final Map<Class<?>, Boolean> isArrayCache = this.isArrayCache;
Boolean isArray = isArrayCache.get(c);
final Boolean isArray = isArrayCache.get(c);
if (isArray == null) {
boolean b = c.isArray();
isArrayCache.put(c, b);
@ -223,108 +221,56 @@ public class SubscriptionUtils {
private static Subscription[] EMPTY = new Subscription[0];
private static Class<?>[] EMPTY2 = new Class<?>[0];
private StampedLock superSubLock = new StampedLock();
/**
* Returns an array copy of the super subscriptions for the specified type.
* Returns an array COPY of the super subscriptions for the specified type.
*
* This ALSO checks to see if the superClass accepts subtypes.
*
* protected by read lock by caller
*
* @return CAN NOT RETURN NULL
*/
public final Subscription[] getSuperSubscriptions(Class<?> superType) {
public final ArrayList<Subscription> getSuperSubscriptions(final Class<?> superType) {
// whenever our subscriptions change, this map is cleared.
ConcurrentMap<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
Subscription[] subscriptions;
//
// StampedLock lock = this.superSubLock;
// long stamp = lock.tryOptimisticRead();
//
// if (stamp > 0) {
// ArrayList<Subscription> arrayList = local.get(superType);
// if (arrayList != null) {
// subscriptions = arrayList.toArray(EMPTY);
//
// if (lock.validate(stamp)) {
// return subscriptions;
// } else {
// stamp = lock.readLock();
//
// arrayList = local.get(superType);
// if (arrayList != null) {
// subscriptions = arrayList.toArray(EMPTY);
// lock.unlockRead(stamp);
// return subscriptions;
// }
//
// // unable to get a valid subscription. Have to acquire a write lock
// long origStamp = stamp;
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// lock.unlock(origStamp);
// stamp = lock.writeLock();
// }
// }
// } else {
// // unable to get a valid subscription. Have to acquire a write lock
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// stamp = lock.writeLock();
// }
// }
// } else {
// // unable to get a valid subscription. Have to acquire a write lock
// if ((stamp = lock.tryConvertToWriteLock(stamp)) == 0) {
// stamp = lock.writeLock();
// }
// }
final Map<Class<?>, ArrayList<Subscription>> local = this.superClassSubscriptions;
ArrayList<Subscription> arrayList = local.get(superType);
if (arrayList != null) {
subscriptions = arrayList.toArray(EMPTY);
ArrayList<Subscription> superSubscriptions = local.get(superType);
if (superSubscriptions == null) {
final Class<?>[] superClasses = getSuperClasses_NL(superType, isArray(superType)); // never returns null, cached response
final int length = superClasses.length;
// lock.unlockWrite(stamp);
return subscriptions;
}
// types was not empty, so get subscriptions for each type and collate them
final Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Class<?> superClass;
// array was null, return EMPTY collection
Class<?>[] types = getSuperClasses_NL(superType, isArray(superType));
int length = types.length;
if (length == 0) {
local.put(superType, new ArrayList<Subscription>(0));
// lock.unlockWrite(stamp);
return EMPTY;
}
ArrayList<Subscription> subs;
Subscription sub;
// types was not empty, so get subscriptions for each type and collate them
Map<Class<?>, ArrayList<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Class<?> superClass;
superSubscriptions = new ArrayList<Subscription>(length);
ArrayList<Subscription> subs;
Subscription sub;
arrayList = new ArrayList<Subscription>(16);
for (int i=0;i<length;i++) {
superClass = superClasses[i];
subs = local2.get(superClass);
if (subs != null) {
for (int j=0;j<subs.size();j++) {
sub = subs.get(j);
for (int i=0;i<length;i++) {
superClass = types[i];
subs = local2.get(superClass);
if (subs != null) {
for (int j=0;j<subs.size();j++) {
sub = subs.get(j);
if (sub.acceptsSubtypes()) {
arrayList.add(sub);
if (sub.acceptsSubtypes()) {
superSubscriptions.add(sub);
}
}
}
}
local.put(superType, superSubscriptions);
}
local.put(superType, arrayList);
subscriptions = arrayList.toArray(EMPTY);
// lock.unlockWrite(stamp);
return subscriptions;
return superSubscriptions;
}
// CAN NOT RETURN NULL

View File

@ -1,6 +1,7 @@
package dorkbox.util.messagebus.listener;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import com.esotericsoftware.reflectasm.MethodAccess;
@ -30,17 +31,55 @@ import dorkbox.util.messagebus.common.ReflectionUtils;
*/
public class MessageHandler {
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public static final MessageHandler[] get(final Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
final Method[] allMethods = ReflectionUtils.getMethods(target);
final int length = allMethods.length;
final ArrayList<MessageHandler> finalMethods = new ArrayList<MessageHandler>(length);
Method method;
for (int i=0;i<length;i++) {
method = allMethods[i];
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
if (!ReflectionUtils.containsOverridingMethod(allMethods, method)) {
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
final Handler handler = ReflectionUtils.getAnnotation(method, Handler.class);
if (handler == null || !handler.enabled()) {
// disabled or invalid listeners are ignored
continue;
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target);
if (overriddenHandler == null) {
overriddenHandler = method;
}
// if a handler is overwritten it inherits the configuration of its parent method
finalMethods.add(new MessageHandler(overriddenHandler, handler));
}
}
MessageHandler[] array = finalMethods.toArray(new MessageHandler[finalMethods.size()]);
return array;
}
private final MethodAccess handler;
private final int methodIndex;
private final Class<?>[] handledMessages;
private final boolean acceptsSubtypes;
private final boolean acceptsVarArgs;
private final MessageListener listenerConfig;
private final boolean isSynchronized;
public MessageHandler(Method handler, Handler handlerConfig, MessageListener listenerMetadata) {
public MessageHandler(Method handler, Handler handlerConfig) {
super();
if (handler == null) {
@ -53,7 +92,6 @@ public class MessageHandler {
this.methodIndex = this.handler.getIndex(handler.getName(), handledMessages);
this.acceptsSubtypes = handlerConfig.acceptSubtypes();
this.listenerConfig = listenerMetadata;
this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null;
this.handledMessages = handledMessages;
@ -64,11 +102,6 @@ public class MessageHandler {
return this.isSynchronized;
}
// only in unit test
public boolean isFromListener(Class<?> listener){
return this.listenerConfig.isFromListener(listener);
}
public MethodAccess getHandler() {
return this.handler;
}

View File

@ -1,44 +0,0 @@
package dorkbox.util.messagebus.listener;
import java.util.Collection;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
/**
* All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, a message
* listener is any object capable of receiving messages by means of defined message handlers. There are no restrictions about the number of
* allowed message handlers in a message listener.
*
* A message listener can be configured using the @Listener annotation but is always implicitly configured by the handler definition it
* contains.
*
* This class is an internal representation of a message listener used to encapsulate all relevant objects and data about that message
* listener, especially all its handlers. There will be only one instance of MessageListener per message listener class and message bus
* instance.
*
* @author bennidi Date: 12/16/12
*/
public class MessageListener {
private final Collection<MessageHandler> handlers;
private Class<?> listenerDefinition;
public MessageListener(Class<?> listenerDefinition, int size, float loadFactor, int stripeSize) {
this.handlers = new ConcurrentSet<MessageHandler>(size, loadFactor, stripeSize);
this.listenerDefinition = listenerDefinition;
}
// only in unit test
public boolean isFromListener(Class<?> listener) {
return this.listenerDefinition.equals(listener);
}
public boolean addHandler(MessageHandler messageHandler) {
return this.handlers.add(messageHandler);
}
public Collection<MessageHandler> getHandlers() {
return this.handlers;
}
}

View File

@ -1,12 +1,10 @@
package dorkbox.util.messagebus.listener;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Iterator;
import java.util.ArrayList;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
/**
* The meta data reader is responsible for parsing and validating message handler configurations.
@ -20,46 +18,40 @@ public class MetadataReader {
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public MessageListener getMessageListener(Class<?> target, float loadFactor, int stripeSize) {
public MessageHandler[] getMessageHandlers(final Class<?> target) {
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
Collection<Method> allHandlers = ReflectionUtils.getMethods(target);
final Method[] allMethods = ReflectionUtils.getMethods(target);
final int length = allMethods.length;
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
Collection<Method> bottomMostHandlers = new ConcurrentSet<Method>(allHandlers.size(), loadFactor, stripeSize);
Iterator<Method> iterator;
Method handler;
final ArrayList<MessageHandler> finalMethods = new ArrayList<MessageHandler>(length);
Method method;
for (iterator = allHandlers.iterator(); iterator.hasNext();) {
handler = iterator.next();
for (int i=0;i<length;i++) {
method = allMethods[i];
if (!ReflectionUtils.containsOverridingMethod(allHandlers, handler)) {
bottomMostHandlers.add(handler);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
if (!ReflectionUtils.containsOverridingMethod(allMethods, method)) {
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
final Handler handler = ReflectionUtils.getAnnotation(method, Handler.class);
if (handler == null || !handler.enabled()) {
// disabled or invalid listeners are ignored
continue;
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target);
if (overriddenHandler == null) {
overriddenHandler = method;
}
// if a handler is overwritten it inherits the configuration of its parent method
finalMethods.add(new MessageHandler(overriddenHandler, handler));
}
}
MessageListener listenerMetadata = new MessageListener(target, bottomMostHandlers.size(), loadFactor, stripeSize);
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
for (iterator = bottomMostHandlers.iterator(); iterator.hasNext();) {
handler = iterator.next();
Handler handlerConfig = ReflectionUtils.getAnnotation(handler, Handler.class);
if (handlerConfig == null || !handlerConfig.enabled()) {
continue; // disabled or invalid listeners are ignored
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
if (overriddenHandler == null) {
overriddenHandler = handler;
}
// if a handler is overwritten it inherits the configuration of its parent method
MessageHandler handlerMetadata = new MessageHandler(overriddenHandler, handlerConfig, listenerMetadata);
listenerMetadata.addHandler(handlerMetadata);
}
return listenerMetadata;
MessageHandler[] array = finalMethods.toArray(new MessageHandler[finalMethods.size()]);
return array;
}
}

View File

@ -82,14 +82,6 @@ public class Subscription {
return this.listeners.remove(existingListener);
}
/**
* Check whether this subscription manages a message handler of the given message listener class
*/
// only in unit test
public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener);
}
// only used in unit-test
public int size() {
return this.listeners.size();

View File

@ -1,16 +1,10 @@
package dorkbox.util.messagebus.common;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import dorkbox.util.messagebus.SubscriptionManager;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.*;
/**
*
* @author bennidi
@ -45,14 +39,10 @@ public class SubscriptionValidator extends AssertSupport{
// we split subs + superSubs into TWO calls.
Collection<Subscription> collection = new ArrayDeque<Subscription>(8);
Subscription[] subscriptions = manager.getSubscriptionsByMessageType(messageType);
Subscription[] subscriptions = manager.getSubscriptions(messageType);
if (subscriptions != null) {
collection.addAll(Arrays.asList(subscriptions));
}
Subscription[] superSubs = manager.getSuperSubscriptions(messageType);
if (superSubs != null) {
collection.addAll(Arrays.asList(superSubs));
}
assertEquals(validationEntries.size(), collection.size());
@ -73,6 +63,27 @@ public class SubscriptionValidator extends AssertSupport{
}
/**
* Check whether this subscription manages a message handler of the given message listener class
*/
// only in unit test
public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener);
// only in unit test
public boolean isFromListener(Class<?> listener){
return this.listenerConfig.isFromListener(listener);
}
}
// only in unit test
public boolean isFromListener(Class<?> listener) {
return this.listenerDefinition.equals(listener);
}
private Collection<ValidationEntry> getEntries(Class<?> messageType){
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
for (ValidationEntry validationValidationEntry : this.validations){