From d0584d52b0ebdb29d1c4926ccb9f3760009d8f99 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 20 Feb 2015 19:54:53 +0100 Subject: [PATCH] Cuncurrent sub/pub. 169ish ns/op VS 256 ns/op --- .../engio/mbassy/multi/MultiMBassador.java | 7 +- .../multi/common/StrongConcurrentSet.java | 1 - .../multi/subscription/Subscription.java | 78 ++-- .../subscription/SubscriptionManager.java | 395 ++++++++++-------- 4 files changed, 255 insertions(+), 226 deletions(-) diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index 161a254..6fdb73b 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -44,7 +44,6 @@ public class MultiMBassador implements IMessageBus { // this(2); } - public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { numberOfThreads = 1; // at LEAST 1 thread @@ -128,13 +127,11 @@ public class MultiMBassador implements IMessageBus { } } - @Override public void publish(Object message) { SubscriptionManager manager = this.subscriptionManager; Class messageClass = message.getClass(); -// manager.readLock(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); // Run subscriptions @@ -144,9 +141,8 @@ public class MultiMBassador implements IMessageBus { sub.publishToSubscription(this, message); } } else { - // Dead Event must EXACTLY MATCH (no subclasses or varargs permitted) + // Dead Event must EXACTLY MATCH (no subclasses) Collection deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { DeadMessage deadMessage = new DeadMessage(message); for (Subscription sub : deadSubscriptions) { @@ -167,7 +163,6 @@ public class MultiMBassador implements IMessageBus { sub.publishToSubscription(this, message); } } -// manager.readUnLock(); } @SuppressWarnings("null") diff --git a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java index 6ee6082..585dcae 100644 --- a/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/multi/common/StrongConcurrentSet.java @@ -18,7 +18,6 @@ public class StrongConcurrentSet extends AbstractConcurrentSet { } public StrongConcurrentSet(int size, float loadFactor) { -// super(new ConcurrentHashMap>(size, loadFactor, 1)); super(new Reference2ReferenceOpenHashMap>(size, loadFactor)); } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index e1cb46a..6d1513c 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -91,47 +91,47 @@ public class Subscription { int handleIndex = this.handlerMetadata.getMethodIndex(); IHandlerInvocation invocation = this.invocation; - try { - for (Object listener : listeners) { + for (Object listener : listeners) { + try { invocation.invoke(listener, handler, handleIndex, message); + } catch (IllegalAccessException e) { + e.printStackTrace(); + // errorHandler.handlePublicationError(new PublicationError() + // .setMessage("Error during invocation of message handler. " + + // "The class or method is not accessible") + // .setCause(e) + // .setMethodName(handler.getName()) + //// .setListener(listener) + // .setPublishedObject(message)); + } catch (IllegalArgumentException e) { + e.printStackTrace(); + // errorHandler.handlePublicationError(new PublicationError() + // .setMessage("Error during invocation of message handler. " + + // "Wrong arguments passed to method. Was: " + message.getClass() + // + "Expected: " + handler.getParameterTypes()[0]) + // .setCause(e) + // .setMethodName(handler.getName()) + //// .setListener(listener) + // .setPublishedObject(message)); + } catch (InvocationTargetException e) { + e.printStackTrace(); + // errorHandler.handlePublicationError(new PublicationError() + // .setMessage("Error during invocation of message handler. " + + // "Message handler threw exception") + // .setCause(e) + // .setMethodName(handler.getName()) + //// .setListener(listener) + // .setPublishedObject(message)); + } catch (Throwable e) { + e.printStackTrace(); + // errorHandler.handlePublicationError(new PublicationError() + // .setMessage("Error during invocation of message handler. " + + // "The handler code threw an exception") + // .setCause(e) + // .setMethodName(handler.getName()) + //// .setListener(listener) + // .setPublishedObject(message)); } - } catch (IllegalAccessException e) { - e.printStackTrace(); -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The class or method is not accessible") -// .setCause(e) -// .setMethodName(handler.getName()) -//// .setListener(listener) -// .setPublishedObject(message)); - } catch (IllegalArgumentException e) { - e.printStackTrace(); -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Wrong arguments passed to method. Was: " + message.getClass() -// + "Expected: " + handler.getParameterTypes()[0]) -// .setCause(e) -// .setMethodName(handler.getName()) -//// .setListener(listener) -// .setPublishedObject(message)); - } catch (InvocationTargetException e) { - e.printStackTrace(); -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "Message handler threw exception") -// .setCause(e) -// .setMethodName(handler.getName()) -//// .setListener(listener) -// .setPublishedObject(message)); - } catch (Throwable e) { - e.printStackTrace(); -// errorHandler.handlePublicationError(new PublicationError() -// .setMessage("Error during invocation of message handler. " + -// "The handler code threw an exception") -// .setCause(e) -// .setMethodName(handler.getName()) -//// .setListener(listener) -// .setPublishedObject(message)); } } } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index 7b0d2b3..26d4aa2 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -10,7 +10,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import net.engio.mbassy.multi.common.IdentityObjectTree; @@ -40,7 +39,7 @@ public class SubscriptionManager { // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader = new MetadataReader(); - // all subscriptions per message type + // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time private final ConcurrentHashMap, Collection> subscriptionsPerMessageSingle; @@ -50,7 +49,7 @@ public class SubscriptionManager { // this map provides fast access for subscribing and unsubscribing // write access is synchronized and happens very infrequently // once a collection of subscriptions is stored it does not change - private final Map, Collection> subscriptionsPerListener; + private final ConcurrentHashMap, Collection> subscriptionsPerListener; private final Object holder = new Object[0]; @@ -78,7 +77,7 @@ public class SubscriptionManager { this.subscriptionsPerMessageMulti = new IdentityObjectTree, Collection>(); // only used during SUB/UNSUB - this.subscriptionsPerListener = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, 1); + this.subscriptionsPerListener = new ConcurrentHashMap, Collection>(4, this.LOAD_FACTOR, this.MAP_STRIPING); this.superClassesCache = new ConcurrentHashMap, FastEntrySet>>(8, this.LOAD_FACTOR, this.MAP_STRIPING); @@ -91,92 +90,69 @@ public class SubscriptionManager { this.superClassSubscriptions = new ConcurrentHashMap, FastEntrySet>(8, this.LOAD_FACTOR, this.MAP_STRIPING); } - public void readLock() { - this.LOCK.readLock().lock(); - } - - public void readUnLock() { - this.LOCK.readLock().unlock(); - } - public void unsubscribe(Object listener) { if (listener == null) { return; } Class listenerClass = listener.getClass(); - Collection subscriptions; - boolean nothingLeft = true; - Lock UPDATE = this.LOCK.writeLock(); - try { - UPDATE.lock(); + // these are a concurrent collection + Collection subscriptions = this.subscriptionsPerListener.get(listenerClass); - subscriptions = this.subscriptionsPerListener.get(listenerClass); + if (subscriptions != null) { + for (Subscription subscription : subscriptions) { + subscription.unsubscribe(listener); - if (subscriptions != null) { - for (Subscription subscription : subscriptions) { - subscription.unsubscribe(listener); - - boolean isEmpty = subscription.isEmpty(); - - if (isEmpty) { - // single or multi? - Class[] handledMessageTypes = subscription.getHandledMessageTypes(); - int size = handledMessageTypes.length; - if (size == 1) { - // single - Class clazz = handledMessageTypes[0]; - - // NOTE: Order is important for safe publication - Collection subs = this.subscriptionsPerMessageSingle.get(clazz); - if (subs != null) { - subs.remove(subscription); - - if (subs.isEmpty()) { - // remove element - this.subscriptionsPerMessageSingle.remove(clazz); - - resetSuperClassSubs(); - } - } - } else { - // NOTE: Not thread-safe! must be synchronized in outer scope - IdentityObjectTree, Collection> tree; - - switch (size) { - case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break; - case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; - default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break; - } - - if (tree != null) { - Collection subs = tree.getValue(); - if (subs != null) { - subs.remove(subscription); - - if (subs.isEmpty()) { - // remove tree element - switch (size) { - case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break; - case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; - default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break; - } - } - } - } - } - } - - nothingLeft &= isEmpty; - } +// boolean isEmpty = subscription.isEmpty(); +// +// if (isEmpty) { +// // single or multi? +// Class[] handledMessageTypes = subscription.getHandledMessageTypes(); +// int size = handledMessageTypes.length; +// if (size == 1) { +// // single +// Class clazz = handledMessageTypes[0]; +// +// // NOTE: Order is important for safe publication +// Collection subs = this.subscriptionsPerMessageSingle.get(clazz); +// if (subs != null) { +// subs.remove(subscription); +// +// if (subs.isEmpty()) { +// // remove element +// this.subscriptionsPerMessageSingle.remove(clazz); +// +// resetSuperClassSubs(); +// } +// } +// } else { +// // NOTE: Not thread-safe! must be synchronized in outer scope +// IdentityObjectTree, Collection> tree; +// +// switch (size) { +// case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break; +// case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; +// default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break; +// } +// +// if (tree != null) { +// Collection subs = tree.getValue(); +// if (subs != null) { +// subs.remove(subscription); +// +// if (subs.isEmpty()) { +// // remove tree element +// switch (size) { +// case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break; +// case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break; +// default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break; +// } +// } +// } +// } +// } +// } } - - if (nothingLeft) { - this.subscriptionsPerListener.remove(listenerClass); - } - - } finally { - UPDATE.unlock(); } return; @@ -201,121 +177,180 @@ public class SubscriptionManager { return; } - Collection subscriptions; - Lock WRITE = this.LOCK.writeLock(); - try { - WRITE.lock(); - subscriptions = this.subscriptionsPerListener.get(listenerClass); + Collection subscriptions = this.subscriptionsPerListener.get(listenerClass); + if (subscriptions == null) { + // a listener is subscribed for the first time + Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); + int handlersSize = messageHandlers.size(); - if (subscriptions != null) { - // subscriptions already exist and must only be updated - for (Subscription subscription : subscriptions) { - subscription.subscribe(listener); - } + if (handlersSize == 0) { + // remember the class as non listening class if no handlers are found + this.nonListeners.put(listenerClass, this.holder); } else { - // a listener is subscribed for the first time - Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); - int handlersSize = messageHandlers.size(); - - if (handlersSize == 0) { - // remember the class as non listening class if no handlers are found - this.nonListeners.put(listenerClass, this.holder); + Collection putIfAbsent = this.subscriptionsPerListener.putIfAbsent(listenerClass, this.subInitialValue.get()); + if (putIfAbsent != null) { + subscriptions = putIfAbsent; } else { - subscriptions = new StrongConcurrentSet(handlersSize, this.LOAD_FACTOR); -// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, this.MAP_STRIPING)); -// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1)); -// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR)); - this.subscriptionsPerListener.put(listenerClass, subscriptions); + subscriptions = this.subInitialValue.get(); + this.subInitialValue.set(new StrongConcurrentSet(8, this.LOAD_FACTOR)); + } - resetSuperClassSubs(); + resetSuperClassSubs(); - // create NEW subscriptions for all detected message handlers - for (MessageHandler messageHandler : messageHandlers) { - // create the subscription - Subscription subscription = new Subscription(messageHandler); - subscription.subscribe(listener); + // create NEW subscriptions for all detected message handlers + for (MessageHandler messageHandler : messageHandlers) { + // create the subscription + Subscription subscription = new Subscription(messageHandler); + subscription.subscribe(listener); - subscriptions.add(subscription); + subscriptions.add(subscription); - // - // save the subscription per message type - // - // single or multi? - Class[] handledMessageTypes = subscription.getHandledMessageTypes(); - int size = handledMessageTypes.length; - boolean acceptsSubtypes = subscription.acceptsSubtypes(); + // + // save the subscription per message type + // + Class[] handledMessageTypes = subscription.getHandledMessageTypes(); + int size = handledMessageTypes.length; + boolean acceptsSubtypes = subscription.acceptsSubtypes(); - if (size == 1) { - // single - Class clazz = handledMessageTypes[0]; + if (size == 1) { + // single + Class clazz = handledMessageTypes[0]; - Collection subs = this.subscriptionsPerMessageSingle.get(clazz); - if (subs == null) { - Collection putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get()); - if (putIfAbsent != null) { - subs = putIfAbsent; - } else { - subs = this.subInitialValue.get(); -// this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1))); -// this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR))); - this.subInitialValue.set(new StrongConcurrentSet(8, this.LOAD_FACTOR)); -// this.subInitialValue.set(new ArrayDeque(8)); - } - } - - subs.add(subscription); - - if (acceptsSubtypes) { - // race conditions will result in duplicate answers, which we don't care about - setupSuperClassCache(clazz); + Collection subs = this.subscriptionsPerMessageSingle.get(clazz); + if (subs == null) { + putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get()); + if (putIfAbsent != null) { + subs = putIfAbsent; + } else { + subs = this.subInitialValue.get(); + this.subInitialValue.set(new StrongConcurrentSet(8, this.LOAD_FACTOR)); } } - else { -// // NOTE: Not thread-safe! must be synchronized in outer scope -// IdentityObjectTree, Collection> tree; -// -// switch (size) { -// case 2: { -// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); -// if (acceptsSubtypes) { -// setupSuperClassCache(handledMessageTypes[0]); -// setupSuperClassCache(handledMessageTypes[1]); -// } -// break; -// } -// case 3: { -// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); -// if (acceptsSubtypes) { -// setupSuperClassCache(handledMessageTypes[0]); -// setupSuperClassCache(handledMessageTypes[1]); -// setupSuperClassCache(handledMessageTypes[2]); -// } -// break; -// } -// default: { -// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); -// if (acceptsSubtypes) { -// for (Class c : handledMessageTypes) { -// setupSuperClassCache(c); -// } -// } -// break; -// } -// } -// -// Collection subs = tree.getValue(); -// if (subs == null) { -// subs = new StrongConcurrentSet(16, this.LOAD_FACTOR); -// tree.putValue(subs); -// } -// subs.add(subscription); + + subs.add(subscription); + + if (acceptsSubtypes) { + // race conditions will result in duplicate answers, which we don't care about + setupSuperClassCache(clazz); } } } } - } finally { - WRITE.unlock(); + } else { + // subscriptions already exist and must only be updated + for (Subscription subscription : subscriptions) { + subscription.subscribe(listener); + } } + +// +// if (subscriptions != null) { +// // subscriptions already exist and must only be updated +// for (Subscription subscription : subscriptions) { +// subscription.subscribe(listener); +// } +// } +// else { +// // a listener is subscribed for the first time +// Collection messageHandlers = this.metadataReader.getMessageListener(listenerClass).getHandlers(); +// int handlersSize = messageHandlers.size(); +// +// if (handlersSize == 0) { +// // remember the class as non listening class if no handlers are found +// this.nonListeners.put(listenerClass, this.holder); +// } else { +// subscriptions = new StrongConcurrentSet(handlersSize, this.LOAD_FACTOR); +//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, this.MAP_STRIPING)); +//// subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1)); +//// subscriptions = Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR)); +// this.subscriptionsPerListener.put(listenerClass, subscriptions); +// +// resetSuperClassSubs(); +// +// // create NEW subscriptions for all detected message handlers +// for (MessageHandler messageHandler : messageHandlers) { +// // create the subscription +// Subscription subscription = new Subscription(messageHandler); +// subscription.subscribe(listener); +// +// subscriptions.add(subscription); +// +// // +// // save the subscription per message type +// // +// // single or multi? +// Class[] handledMessageTypes = subscription.getHandledMessageTypes(); +// int size = handledMessageTypes.length; +// boolean acceptsSubtypes = subscription.acceptsSubtypes(); +// +// if (size == 1) { +// // single +// Class clazz = handledMessageTypes[0]; +// +// Collection subs = this.subscriptionsPerMessageSingle.get(clazz); +// if (subs == null) { +// Collection putIfAbsent = this.subscriptionsPerMessageSingle.putIfAbsent(clazz, this.subInitialValue.get()); +// if (putIfAbsent != null) { +// subs = putIfAbsent; +// } else { +// subs = this.subInitialValue.get(); +//// this.subInitialValue.set(Collections.newSetFromMap(new ConcurrentHashMap(8, this.LOAD_FACTOR, 1))); +//// this.subInitialValue.set(Collections.newSetFromMap(new Reference2BooleanOpenHashMap(8, this.LOAD_FACTOR))); +// this.subInitialValue.set(new StrongConcurrentSet(8, this.LOAD_FACTOR)); +//// this.subInitialValue.set(new ArrayDeque(8)); +// } +// } +// +// subs.add(subscription); +// +// if (acceptsSubtypes) { +// // race conditions will result in duplicate answers, which we don't care about +// setupSuperClassCache(clazz); +// } +// } +// else { +//// // NOTE: Not thread-safe! must be synchronized in outer scope +//// IdentityObjectTree, Collection> tree; +//// +//// switch (size) { +//// case 2: { +//// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]); +//// if (acceptsSubtypes) { +//// setupSuperClassCache(handledMessageTypes[0]); +//// setupSuperClassCache(handledMessageTypes[1]); +//// } +//// break; +//// } +//// case 3: { +//// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]); +//// if (acceptsSubtypes) { +//// setupSuperClassCache(handledMessageTypes[0]); +//// setupSuperClassCache(handledMessageTypes[1]); +//// setupSuperClassCache(handledMessageTypes[2]); +//// } +//// break; +//// } +//// default: { +//// tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes); +//// if (acceptsSubtypes) { +//// for (Class c : handledMessageTypes) { +//// setupSuperClassCache(c); +//// } +//// } +//// break; +//// } +//// } +//// +//// Collection subs = tree.getValue(); +//// if (subs == null) { +//// subs = new StrongConcurrentSet(16, this.LOAD_FACTOR); +//// tree.putValue(subs); +//// } +//// subs.add(subscription); +// } +// } +// } +// } } // must be protected by read lock