Fixed typo in multi2 super vararg subscriptions. Code polish

This commit is contained in:
nathan 2015-05-13 20:41:36 +02:00
parent 060a00f875
commit 2b0f614595
4 changed files with 81 additions and 58 deletions

View File

@ -58,7 +58,7 @@ public class MultiMBassador implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages * @param numberOfThreads how many threads to have for dispatching async messages
*/ */
public MultiMBassador(int numberOfThreads) { public MultiMBassador(int numberOfThreads) {
this(false, numberOfThreads); this(true, numberOfThreads);
} }
/** /**

View File

@ -75,7 +75,7 @@ public class SubscriptionManager {
// stripe size of maps for concurrency // stripe size of maps for concurrency
private final int STRIPE_SIZE; private final int STRIPE_SIZE;
private final SubscriptionHolder subHolder; private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent; private final SubscriptionHolder subHolderConcurrent;
@ -111,7 +111,7 @@ public class SubscriptionManager {
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR); this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
} }
this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1);
this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
} }
@ -175,37 +175,43 @@ public class SubscriptionManager {
// now add this subscription to each of the handled types // now add this subscription to each of the handled types
Class<?>[] types = messageHandler.getHandledMessages(); Class<?>[] types = messageHandler.getHandledMessages();
int size = types.length; int size = types.length;
switch (size) { switch (size) {
case 1: { case 1: {
StrongConcurrentSetV8<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], this.subHolderConcurrent.get()); SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
StrongConcurrentSetV8<Subscription> putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subHolderConcurrent.get());
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
subsPerType = this.subHolderConcurrent.get(); subsPerType = subHolderConcurrent.get();
this.subHolderConcurrent.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE)); subHolderConcurrent.set(subHolderConcurrent.initialValue());
getSuperClasses(types[0]); getSuperClasses(types[0]);
} }
break; break;
} }
case 2: { case 2: {
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]); // the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subHolderSingle.get(), types[0], types[1]);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
subsPerType = this.subHolder.get(); subsPerType = subHolderSingle.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1)); subHolderSingle.set(subHolderSingle.initialValue());
getSuperClasses(types[0]); getSuperClasses(types[0]);
getSuperClasses(types[1]); getSuperClasses(types[1]);
} }
break; break;
} }
case 3: { case 3: {
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]); // the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subHolderSingle.get(), types[0], types[1], types[2]);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
subsPerType = this.subHolder.get(); subsPerType = subHolderSingle.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1)); subHolderSingle.set(subHolderSingle.initialValue());
getSuperClasses(types[0]); getSuperClasses(types[0]);
getSuperClasses(types[1]); getSuperClasses(types[1]);
getSuperClasses(types[2]); getSuperClasses(types[2]);
@ -213,12 +219,14 @@ public class SubscriptionManager {
break; break;
} }
default: { default: {
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types); // the HashMapTree uses read/write locks, so it is only accessible one thread at a time
SubscriptionHolder subHolderSingle = this.subHolderSingle;
StrongConcurrentSetV8<Subscription> putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subHolderSingle.get(), types);
if (putIfAbsent != null) { if (putIfAbsent != null) {
subsPerType = putIfAbsent; subsPerType = putIfAbsent;
} else { } else {
subsPerType = this.subHolder.get(); subsPerType = subHolderSingle.get();
this.subHolder.set(new StrongConcurrentSetV8<Subscription>(16, SubscriptionManager.LOAD_FACTOR, 1)); subHolderSingle.set(subHolderSingle.initialValue());
Class<?> c; Class<?> c;
int length = types.length; int length = types.length;
@ -490,9 +498,25 @@ public class SubscriptionManager {
// if the leaf exists, then the value exists. // if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue(); subsPerType = subsPerTypeLeaf.getValue();
} else { } else {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
subsPerType = new StrongConcurrentSetV8<Subscription>(varargSuperSubscriptions1.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
ISetEntry<Subscription> current;
Subscription sub;
current = varargSuperSubscriptions1.head;
while (current != null) {
sub = current.getValue();
current = current.next();
if (varargSuperSubscriptions2.contains(sub)) {
subsPerType.add(sub);
}
}
StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); StrongConcurrentSetV8<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
if (putIfAbsent != null) { if (putIfAbsent != null) {
@ -523,20 +547,19 @@ public class SubscriptionManager {
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); StrongConcurrentSetV8<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); StrongConcurrentSetV8<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
StrongConcurrentSetV8<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3);
subsPerType = new StrongConcurrentSetV8<Subscription>(varargSuperSubscriptions1.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); subsPerType = new StrongConcurrentSetV8<Subscription>(varargSuperSubscriptions1.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
ISetEntry<Subscription> current; ISetEntry<Subscription> current;
Subscription sub; Subscription sub;
current = varargSuperSubscriptions1.head; current = varargSuperSubscriptions1.head;
while (current != null) { while (current != null) {
sub = current.getValue(); sub = current.getValue();
current = current.next(); current = current.next();
if (varargSuperSubscriptions2.contains(sub)) { if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) {
subsPerType.add(sub); subsPerType.add(sub);
} }
} }
@ -604,7 +627,7 @@ public class SubscriptionManager {
return subsPerType; return subsPerType;
} }
// must be protected by read lock // CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) { public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti; HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti;
@ -692,7 +715,7 @@ public class SubscriptionManager {
return subsPerType; return subsPerType;
} }
// must be protected by read lock // CAN NOT RETURN NULL
// ALSO checks to see if the superClass accepts subtypes. // ALSO checks to see if the superClass accepts subtypes.
public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) { public StrongConcurrentSetV8<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti; HashMapTree<Class<?>, StrongConcurrentSetV8<Subscription>> local = this.superClassSubscriptionsMulti;

View File

@ -15,7 +15,7 @@ public class SubscriptionHolder extends ThreadLocal<StrongConcurrentSetV8<Subscr
} }
@Override @Override
protected StrongConcurrentSetV8<Subscription> initialValue() { public StrongConcurrentSetV8<Subscription> initialValue() {
return new StrongConcurrentSetV8<Subscription>(16, this.loadFactor, this.stripeSize); return new StrongConcurrentSetV8<Subscription>(16, this.loadFactor, this.stripeSize);
} }
} }

View File

@ -113,43 +113,43 @@ public class Subscription {
while (current != null) { while (current != null) {
listener = current.getValue(); listener = current.getValue();
current = current.next(); current = current.next();
//count++; this.count++;
try { // try {
invocation.invoke(listener, handler, handleIndex, message); // invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) { // } catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"The class or method is not accessible") // "The class or method is not accessible")
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} catch (IllegalArgumentException e) { // } catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass() // "Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0]) // + "Expected: " + handler.getParameterTypes()[0])
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} catch (InvocationTargetException e) { // } catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"Message handler threw exception") // "Message handler threw exception")
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} catch (Throwable e) { // } catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError() // errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " + // .setMessage("Error during invocation of message handler. " +
"The handler code threw an exception") // "The handler code threw an exception")
.setCause(e) // .setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex]) // .setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener) // .setListener(listener)
.setPublishedObject(message)); // .setPublishedObject(message));
} // }
} }
return true; return true;
} }