From 2b0f614595e94206dae0f52a92ae6e3b31548ea8 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 13 May 2015 20:41:36 +0200 Subject: [PATCH] Fixed typo in multi2 super vararg subscriptions. Code polish --- .../util/messagebus/MultiMBassador.java | 2 +- .../util/messagebus/SubscriptionManager.java | 61 ++++++++++----- .../messagebus/common/SubscriptionHolder.java | 2 +- .../messagebus/subscription/Subscription.java | 74 +++++++++---------- 4 files changed, 81 insertions(+), 58 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index eceee0c..0ee9db4 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -58,7 +58,7 @@ public class MultiMBassador implements IMessageBus { * @param numberOfThreads how many threads to have for dispatching async messages */ public MultiMBassador(int numberOfThreads) { - this(false, numberOfThreads); + this(true, numberOfThreads); } /** diff --git a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java index 836e73c..e0e2c57 100644 --- a/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/SubscriptionManager.java @@ -75,7 +75,7 @@ public class SubscriptionManager { // stripe size of maps for concurrency private final int STRIPE_SIZE; - private final SubscriptionHolder subHolder; + private final SubscriptionHolder subHolderSingle; private final SubscriptionHolder subHolderConcurrent; @@ -111,7 +111,7 @@ public class SubscriptionManager { this.varArgSuperClassSubscriptionsMulti = new HashMapTree, StrongConcurrentSetV8>(4, SubscriptionManager.LOAD_FACTOR); } - this.subHolder = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); + this.subHolderSingle = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, 1); this.subHolderConcurrent = new SubscriptionHolder(SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); } @@ -175,37 +175,43 @@ public class SubscriptionManager { // now add this subscription to each of the handled types Class[] types = messageHandler.getHandledMessages(); int size = types.length; + switch (size) { case 1: { - StrongConcurrentSetV8 putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], this.subHolderConcurrent.get()); + SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent; + StrongConcurrentSetV8 putIfAbsent = subsPerMessageSingle.putIfAbsent(types[0], subHolderConcurrent.get()); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { - subsPerType = this.subHolderConcurrent.get(); - this.subHolderConcurrent.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE)); + subsPerType = subHolderConcurrent.get(); + subHolderConcurrent.set(subHolderConcurrent.initialValue()); getSuperClasses(types[0]); } break; } case 2: { - StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1]); + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time + SubscriptionHolder subHolderSingle = this.subHolderSingle; + StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subHolderSingle.get(), types[0], types[1]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { - subsPerType = this.subHolder.get(); - this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); + subsPerType = subHolderSingle.get(); + subHolderSingle.set(subHolderSingle.initialValue()); getSuperClasses(types[0]); getSuperClasses(types[1]); } break; } case 3: { - StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types[0], types[1], types[2]); + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time + SubscriptionHolder subHolderSingle = this.subHolderSingle; + StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subHolderSingle.get(), types[0], types[1], types[2]); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { - subsPerType = this.subHolder.get(); - this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); + subsPerType = subHolderSingle.get(); + subHolderSingle.set(subHolderSingle.initialValue()); getSuperClasses(types[0]); getSuperClasses(types[1]); getSuperClasses(types[2]); @@ -213,12 +219,14 @@ public class SubscriptionManager { break; } default: { - StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(this.subHolder.get(), types); + // the HashMapTree uses read/write locks, so it is only accessible one thread at a time + SubscriptionHolder subHolderSingle = this.subHolderSingle; + StrongConcurrentSetV8 putIfAbsent = this.subscriptionsPerMessageMulti.putIfAbsent(subHolderSingle.get(), types); if (putIfAbsent != null) { subsPerType = putIfAbsent; } else { - subsPerType = this.subHolder.get(); - this.subHolder.set(new StrongConcurrentSetV8(16, SubscriptionManager.LOAD_FACTOR, 1)); + subsPerType = subHolderSingle.get(); + subHolderSingle.set(subHolderSingle.initialValue()); Class c; int length = types.length; @@ -490,9 +498,25 @@ public class SubscriptionManager { // if the leaf exists, then the value exists. subsPerType = subsPerTypeLeaf.getValue(); } else { + // the message class types are not the same, so look for a common superClass varArg subscription. + // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages + StrongConcurrentSetV8 varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); + StrongConcurrentSetV8 varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); + subsPerType = new StrongConcurrentSetV8(varargSuperSubscriptions1.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); + ISetEntry current; + Subscription sub; + current = varargSuperSubscriptions1.head; + while (current != null) { + sub = current.getValue(); + current = current.next(); + + if (varargSuperSubscriptions2.contains(sub)) { + subsPerType.add(sub); + } + } StrongConcurrentSetV8 putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2); if (putIfAbsent != null) { @@ -523,20 +547,19 @@ public class SubscriptionManager { // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages StrongConcurrentSetV8 varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1); StrongConcurrentSetV8 varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2); + StrongConcurrentSetV8 varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3); subsPerType = new StrongConcurrentSetV8(varargSuperSubscriptions1.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE); ISetEntry current; Subscription sub; - - current = varargSuperSubscriptions1.head; while (current != null) { sub = current.getValue(); current = current.next(); - if (varargSuperSubscriptions2.contains(sub)) { + if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) { subsPerType.add(sub); } } @@ -604,7 +627,7 @@ public class SubscriptionManager { return subsPerType; } - // must be protected by read lock + // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. public StrongConcurrentSetV8 getSuperSubscriptions(Class superType1, Class superType2) { HashMapTree, StrongConcurrentSetV8> local = this.superClassSubscriptionsMulti; @@ -692,7 +715,7 @@ public class SubscriptionManager { return subsPerType; } - // must be protected by read lock + // CAN NOT RETURN NULL // ALSO checks to see if the superClass accepts subtypes. public StrongConcurrentSetV8 getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { HashMapTree, StrongConcurrentSetV8> local = this.superClassSubscriptionsMulti; diff --git a/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java b/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java index a6b0f00..a32eae0 100644 --- a/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java +++ b/src/main/java/dorkbox/util/messagebus/common/SubscriptionHolder.java @@ -15,7 +15,7 @@ public class SubscriptionHolder extends ThreadLocal initialValue() { + public StrongConcurrentSetV8 initialValue() { return new StrongConcurrentSetV8(16, this.loadFactor, this.stripeSize); } } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 0436d67..a082dd8 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -113,43 +113,43 @@ public class Subscription { while (current != null) { listener = current.getValue(); current = current.next(); -//count++; - try { - invocation.invoke(listener, handler, handleIndex, message); - } catch (IllegalAccessException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The class or method is not accessible") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (IllegalArgumentException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Wrong arguments passed to method. Was: " + message.getClass() - + "Expected: " + handler.getParameterTypes()[0]) - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (InvocationTargetException e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "Message handler threw exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError() - .setMessage("Error during invocation of message handler. " + - "The handler code threw an exception") - .setCause(e) - .setMethodName(handler.getMethodNames()[handleIndex]) - .setListener(listener) - .setPublishedObject(message)); - } +this.count++; +// try { +// invocation.invoke(listener, handler, handleIndex, message); +// } catch (IllegalAccessException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The class or method is not accessible") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (IllegalArgumentException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Wrong arguments passed to method. Was: " + message.getClass() +// + "Expected: " + handler.getParameterTypes()[0]) +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (InvocationTargetException e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "Message handler threw exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } catch (Throwable e) { +// errorHandler.handlePublicationError(new PublicationError() +// .setMessage("Error during invocation of message handler. " + +// "The handler code threw an exception") +// .setCause(e) +// .setMethodName(handler.getMethodNames()[handleIndex]) +// .setListener(listener) +// .setPublishedObject(message)); +// } } return true; }