From eae63b3f8afb9458ae0a2dc4be2ec1d890c67627 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 15 Jan 2016 02:26:23 +0100 Subject: [PATCH] GREAT performance with Disruptor values tweaked. WIP getting all collections lock-free --- src/dorkbox/util/messagebus/MessageBus.java | 32 ++- .../publication/PublisherExact.java | 30 +-- .../PublisherExactWithSuperTypes.java | 31 +-- ...PublisherExactWithSuperTypesAndVarity.java | 50 ++-- ...PublisherExactWithSuperTypes_FirstArg.java | 231 ---------------- .../publication/PublisherExact_FirstArg.java | 227 ---------------- .../subscription/FirstArgSubscriber.java | 255 ------------------ .../messagebus/subscription/Subscriber.java | 28 +- .../messagebus/subscription/Subscription.java | 29 +- .../subscription/SubscriptionManager.java | 108 ++++---- .../messagebus/utils/SubscriptionUtils.java | 2 +- .../util/messagebus/utils/VarArgUtils.java | 4 +- .../util/messagebus/ObjectTreeTest.java | 2 +- .../util/messagebus/PerfTest_Collections.java | 2 +- .../messagebus/SubscriptionManagerTest.java | 4 +- .../PerfTest_LinkedBlockingQueue.java | 28 -- .../PerfTest_LinkedBlockingQueue_Block.java | 161 ----------- ...PerfTest_LinkedBlockingQueue_NonBlock.java | 148 ---------- .../PerfTest_LinkedTransferQueue.java | 28 -- .../PerfTest_LinkedTransferQueue_Block.java | 145 ---------- ...PerfTest_LinkedTransferQueue_NonBlock.java | 144 ---------- 21 files changed, 150 insertions(+), 1539 deletions(-) delete mode 100644 src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_FirstArg.java delete mode 100644 src/dorkbox/util/messagebus/publication/PublisherExact_FirstArg.java delete mode 100644 src/dorkbox/util/messagebus/subscription/FirstArgSubscriber.java delete mode 100644 test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue.java delete mode 100644 test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_Block.java delete mode 100644 test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_NonBlock.java delete mode 100644 test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue.java delete mode 100644 test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_Block.java delete mode 100644 test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_NonBlock.java diff --git a/src/dorkbox/util/messagebus/MessageBus.java b/src/dorkbox/util/messagebus/MessageBus.java index 4b6dc89..842cd5d 100644 --- a/src/dorkbox/util/messagebus/MessageBus.java +++ b/src/dorkbox/util/messagebus/MessageBus.java @@ -24,7 +24,6 @@ import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.Sequencer; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WorkProcessor; -import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.common.thread.NamedThreadFactory; import dorkbox.util.messagebus.error.DefaultErrorHandler; import dorkbox.util.messagebus.error.ErrorHandlingSupport; @@ -51,6 +50,7 @@ import java.util.concurrent.locks.LockSupport; public class MessageBus implements IMessageBus { private final ErrorHandlingSupport errorHandler; +// private final LinkedBlockingQueue dispatchQueue; // private final ArrayBlockingQueue dispatchQueue; // private final LinkedTransferQueue dispatchQueue; // private final Collection threads; @@ -94,7 +94,7 @@ class MessageBus implements IMessageBus { */ public MessageBus(final PublishMode publishMode) { - this(publishMode, Runtime.getRuntime().availableProcessors()/2); + this(publishMode, Runtime.getRuntime().availableProcessors()); } /** * @param publishMode Specifies which publishMode to operate the publication of messages. @@ -103,16 +103,14 @@ class MessageBus implements IMessageBus { public MessageBus(final PublishMode publishMode, int numberOfThreads) { // round to the nearest power of 2 - numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads))); + numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1)); this.errorHandler = new DefaultErrorHandler(); -// this.dispatchQueue = new ArrayBlockingQueue(6); +// this.dispatchQueue = new LinkedBlockingQueue(1024); +// this.dispatchQueue = new ArrayBlockingQueue(1024); // this.dispatchQueue = new LinkedTransferQueue(); classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); - final StampedLock lock = new StampedLock(); - - final Subscriber subscriber; /** * Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish) @@ -121,19 +119,19 @@ class MessageBus implements IMessageBus { switch (publishMode) { case Exact: - publisher = new PublisherExact(errorHandler, subscriber, lock); + publisher = new PublisherExact(errorHandler, subscriber); break; case ExactWithSuperTypes: - publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber, lock); + publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber); break; case ExactWithSuperTypesAndVarity: default: - publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber, lock); + publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber); } - this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock); + this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber); // Now we setup the disruptor and work handlers @@ -156,6 +154,7 @@ class MessageBus implements IMessageBus { // final int BUFFER_SIZE = ringBufferSize * 64; // final int BUFFER_SIZE = 1024 * 64; // final int BUFFER_SIZE = 1024; +// final int BUFFER_SIZE = 16; final int BUFFER_SIZE = 8; @@ -212,8 +211,9 @@ class MessageBus implements IMessageBus { // @Override // public // void run() { -// ArrayBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; -//// LinkedTransferQueue IN_QUEUE = MessageBus.this.dispatchQueue; +//// LinkedBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; +//// ArrayBlockingQueue IN_QUEUE = MessageBus.this.dispatchQueue; +// LinkedTransferQueue IN_QUEUE = MessageBus.this.dispatchQueue; // // MultiNode node = new MultiNode(); // while (!MessageBus.this.shuttingDown) { @@ -369,7 +369,8 @@ class MessageBus implements IMessageBus { } // try { -// this.dispatchQueue.put(message); +// this.dispatchQueue.transfer(message); +//// this.dispatchQueue.put(message); // } catch (Exception e) { // errorHandler.handlePublicationError(new PublicationError().setMessage( // "Error while adding an asynchronous message").setCause(e).setPublishedObject(message)); @@ -458,7 +459,6 @@ class MessageBus implements IMessageBus { // for (Thread t : this.threads) { // t.start(); // } - } @Override @@ -483,6 +483,4 @@ class MessageBus implements IMessageBus { this.subscriptionManager.shutdown(); this.classUtils.clear(); } - - } diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact.java b/src/dorkbox/util/messagebus/publication/PublisherExact.java index 0981677..8381a39 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExact.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExact.java @@ -16,7 +16,6 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; -import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscriber; @@ -27,13 +26,11 @@ public class PublisherExact implements Publisher { private final ErrorHandlingSupport errorHandler; private final Subscriber subscriber; - private final StampedLock lock; public - PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { + PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) { this.errorHandler = errorHandler; this.subscriber = subscriber; - this.lock = lock; } @Override @@ -42,10 +39,7 @@ class PublisherExact implements Publisher { try { final Class messageClass = message1.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null - lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -57,9 +51,7 @@ class PublisherExact implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -85,10 +77,10 @@ class PublisherExact implements Publisher { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); +// final StampedLock lock = this.lock; +// long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -100,9 +92,9 @@ class PublisherExact implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2); @@ -129,10 +121,10 @@ class PublisherExact implements Publisher { final Class messageClass2 = message2.getClass(); final Class messageClass3 = message3.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); +// final StampedLock lock = this.lock; +// long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2, messageClass3); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -144,9 +136,9 @@ class PublisherExact implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java index 7748d02..5423a32 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes.java @@ -16,7 +16,6 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; -import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscriber; @@ -27,13 +26,12 @@ public class PublisherExactWithSuperTypes implements Publisher { private final ErrorHandlingSupport errorHandler; private final Subscriber subscriber; - private final StampedLock lock; +// private final StampedLock lock; public - PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { + PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) { this.errorHandler = errorHandler; this.subscriber = subscriber; - this.lock = lock; } @Override @@ -42,10 +40,7 @@ class PublisherExactWithSuperTypes implements Publisher { try { final Class messageClass = message1.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null - lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -57,9 +52,7 @@ class PublisherExactWithSuperTypes implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -85,10 +78,10 @@ class PublisherExactWithSuperTypes implements Publisher { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); +// final StampedLock lock = this.lock; +// long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -100,9 +93,9 @@ class PublisherExactWithSuperTypes implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2); @@ -130,11 +123,11 @@ class PublisherExactWithSuperTypes implements Publisher { final Class messageClass3 = message3.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); +// final StampedLock lock = this.lock; +// long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); // Run subscriptions if (subscriptions != null) { @@ -146,9 +139,9 @@ class PublisherExactWithSuperTypes implements Publisher { } else { // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java index d7030f7..cc1500e 100644 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java +++ b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypesAndVarity.java @@ -16,7 +16,6 @@ package dorkbox.util.messagebus.publication; import dorkbox.util.messagebus.common.DeadMessage; -import dorkbox.util.messagebus.common.adapter.StampedLock; import dorkbox.util.messagebus.error.ErrorHandlingSupport; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscriber; @@ -32,16 +31,14 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { private final ErrorHandlingSupport errorHandler; private final Subscriber subscriber; - private final StampedLock lock; private final AtomicBoolean varArgPossibility; final VarArgUtils varArgUtils; public - PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { + PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) { this.errorHandler = errorHandler; this.subscriber = subscriber; - this.lock = lock; varArgPossibility = subscriber.getVarArgPossibility(); varArgUtils = subscriber.getVarArgUtils(); @@ -54,10 +51,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { final Class messageClass = message1.getClass(); final boolean isArray = messageClass.isArray(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null - lock.unlockRead(stamp); boolean hasSubs = false; // Run subscriptions @@ -74,9 +68,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // publish to var arg, only if not already an array (because that would be unnecessary) if (varArgPossibility.get() && !isArray) { - stamp = lock.readLock(); final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subscriber); // CAN NOT RETURN NULL - lock.unlockRead(stamp); Subscription sub; int length = varArgSubs.length; @@ -96,10 +88,8 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) - stamp = lock.readLock(); final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass, subscriber); // CAN NOT RETURN NULL - lock.unlockRead(stamp); length = varArgSuperSubs.length; @@ -121,9 +111,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // only get here if there were no other subscriptions // Dead Event must EXACTLY MATCH (no subclasses) if (!hasSubs) { - stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); - lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1); @@ -149,10 +137,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { final Class messageClass1 = message1.getClass(); final Class messageClass2 = message2.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); +// final StampedLock lock = this.lock; +// long stamp = lock.readLock(); final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); boolean hasSubs = false; // Run subscriptions @@ -171,9 +159,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // vararg can ONLY work if all types are the same if (messageClass1 == messageClass2) { - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); final int length = varArgSubs.length; if (length > 0) { @@ -192,10 +180,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { } // now publish array based superClasses (but only if those ALSO accept vararg) - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, subscriber); // CAN NOT RETURN NULL - lock.unlockRead(stamp); +// lock.unlockRead(stamp); final int length = varArgSuperSubs.length; @@ -221,9 +209,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) - lock.unlockRead(stamp); +// lock.unlockRead(stamp); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2); @@ -250,10 +238,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { final Class messageClass2 = message2.getClass(); final Class messageClass3 = message3.getClass(); - final StampedLock lock = this.lock; - long stamp = lock.readLock(); +// final StampedLock lock = this.lock; +// long stamp = lock.readLock(); final Subscription[] subs = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); boolean hasSubs = false; @@ -274,9 +262,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // vararg can ONLY work if all types are the same if (messageClass1 == messageClass2 && messageClass1 == messageClass3) { - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); final int length = varArgSubs.length; if (length > 0) { @@ -297,10 +285,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { // now publish array based superClasses (but only if those ALSO accept vararg) - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3, subscriber); // CAN NOT RETURN NULL - lock.unlockRead(stamp); +// lock.unlockRead(stamp); final int length = varArgSuperSubs.length; @@ -327,9 +315,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher { if (!hasSubs) { // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); +// stamp = lock.readLock(); final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); +// lock.unlockRead(stamp); if (deadSubscriptions != null) { final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); diff --git a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_FirstArg.java b/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_FirstArg.java deleted file mode 100644 index 601e8b1..0000000 --- a/src/dorkbox/util/messagebus/publication/PublisherExactWithSuperTypes_FirstArg.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.publication; - -import dorkbox.util.messagebus.common.DeadMessage; -import dorkbox.util.messagebus.common.adapter.StampedLock; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.subscription.Subscriber; -import dorkbox.util.messagebus.subscription.Subscription; - -import java.util.Arrays; - -@SuppressWarnings("Duplicates") -public -class PublisherExactWithSuperTypes_FirstArg implements Publisher { - private final ErrorHandlingSupport errorHandler; - private final Subscriber subscriber; - private final StampedLock lock; - - public - PublisherExactWithSuperTypes_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { - this.errorHandler = errorHandler; - this.subscriber = subscriber; - this.lock = lock; - } - - @Override - public - void publish(final Object message1) { - try { - final Class messageClass = message1.getClass(); - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == 1) { - sub.publish(message1); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1)); - } - } - - @Override - public - void publish(final Object message1, final Object message2) { - try { - final Class messageClass = message1.getClass(); - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass, null); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == 2) { - sub.publish(message1, message2); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2)); - } - } - - @Override - public - void publish(final Object message1, final Object message2, final Object message3) { - try { - final Class messageClass = message1.getClass(); - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == 3) { - sub.publish(message1, message2, message3); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2, message3)); - } - } - - @Override - public - void publish(final Object[] messages) { - try { - final Object message1 = messages[0]; - final Class messageClass = message1.getClass(); - final int length = messages.length; - final Object[] newMessages = Arrays.copyOfRange(messages, 1, length); - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == length) { - sub.publish(message1, newMessages); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, newMessages); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(messages)); - } - } -} diff --git a/src/dorkbox/util/messagebus/publication/PublisherExact_FirstArg.java b/src/dorkbox/util/messagebus/publication/PublisherExact_FirstArg.java deleted file mode 100644 index 034b358..0000000 --- a/src/dorkbox/util/messagebus/publication/PublisherExact_FirstArg.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.publication; - -import dorkbox.util.messagebus.common.DeadMessage; -import dorkbox.util.messagebus.common.adapter.StampedLock; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.error.PublicationError; -import dorkbox.util.messagebus.subscription.Subscriber; -import dorkbox.util.messagebus.subscription.Subscription; - -@SuppressWarnings("Duplicates") -public -class PublisherExact_FirstArg implements Publisher { - private final ErrorHandlingSupport errorHandler; - private final Subscriber subscriber; - private final StampedLock lock; - - public - PublisherExact_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) { - this.errorHandler = errorHandler; - this.subscriber = subscriber; - this.lock = lock; - } - - @Override - public - void publish(final Object message1) { - try { - final Class messageClass = message1.getClass(); - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == 1) { - sub.publish(message1); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1)); - } - } - - @Override - public - void publish(final Object message1, final Object message2) { - try { - final Class messageClass = message1.getClass(); - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == 2) { - sub.publish(message1, message2); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - lock.unlockRead(stamp); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2)); - } - } - - @Override - public - void publish(final Object message1, final Object message2, final Object message3) { - try { - final Class messageClass = message1.getClass(); - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == 3) { - sub.publish(message1, message2, message3); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(message1, message2, message3); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(message1, message2, message3)); - } - } - - @Override - public - void publish(final Object[] messages) { - try { - final Class messageClass = messages[0].getClass(); - final int length = messages.length; - - final StampedLock lock = this.lock; - long stamp = lock.readLock(); - final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null - lock.unlockRead(stamp); - - // Run subscriptions - if (subscriptions != null) { - Class[] handledMessages; - Subscription sub; - for (int i = 0; i < subscriptions.length; i++) { - sub = subscriptions[i]; - - handledMessages = sub.getHandler().getHandledMessages(); - if (handledMessages.length == length) { - sub.publish(messages); - } - } - } - else { - // Dead Event must EXACTLY MATCH (no subclasses) - stamp = lock.readLock(); - final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null - lock.unlockRead(stamp); - - if (deadSubscriptions != null) { - final DeadMessage deadMessage = new DeadMessage(messages); - - Subscription sub; - for (int i = 0; i < deadSubscriptions.length; i++) { - sub = deadSubscriptions[i]; - sub.publish(deadMessage); - } - } - } - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.") - .setCause(e) - .setPublishedObject(messages)); - } - } -} diff --git a/src/dorkbox/util/messagebus/subscription/FirstArgSubscriber.java b/src/dorkbox/util/messagebus/subscription/FirstArgSubscriber.java deleted file mode 100644 index 94e517b..0000000 --- a/src/dorkbox/util/messagebus/subscription/FirstArgSubscriber.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Copyright 2015 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.util.messagebus.subscription; - -import dorkbox.util.messagebus.common.MessageHandler; -import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; -import dorkbox.util.messagebus.error.ErrorHandlingSupport; -import dorkbox.util.messagebus.utils.ClassUtils; -import dorkbox.util.messagebus.utils.SubscriptionUtils; -import dorkbox.util.messagebus.utils.VarArgUtils; - -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - - -/** - * Permits subscriptions that only use the first parameters as the signature. The publisher MUST provide the correct additional parameters, - * and they must be of the correct type, otherwise it will throw an error. - *

- * Parameter length checking during publication is performed, so that you can have multiple handlers with the same signature, but each - * with a different number of parameters - */ -public class FirstArgSubscriber implements Subscriber { - - private final ErrorHandlingSupport errorHandler; - - private final SubscriptionUtils subUtils; - - // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required - // this is the primary list for dispatching a specific message - // write access is synchronized and happens only when a listener of a specific class is registered the first time - - // the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case) - private final Map, ArrayList> subscriptionsPerMessage; - - - public FirstArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) { - this.errorHandler = errorHandler; - - // the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case) - this.subscriptionsPerMessage = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1); - - this.subUtils = new SubscriptionUtils(classUtils, Subscriber.LOAD_FACTOR); - } - - // inside a write lock - // add this subscription to each of the handled types - // to activate this sub for publication - @Override - public void register(final Class listenerClass, final int handlersSize, final Subscription[] subsPerListener) { - - final Map, ArrayList> subscriptions = this.subscriptionsPerMessage; - - Subscription subscription; - MessageHandler handler; - Class[] messageHandlerTypes; - int size; - - Class type0; - ArrayList subs; - - for (int i = 0; i < handlersSize; i++) { - subscription = subsPerListener[i]; - - // activate this subscription for publication - // now add this subscription to each of the handled types - - // only register based on the FIRST parameter - handler = subscription.getHandler(); - messageHandlerTypes = handler.getHandledMessages(); - size = messageHandlerTypes.length; - - if (size == 0) { - errorHandler.handleError("Error while trying to subscribe class: " + messageHandlerTypes.getClass(), listenerClass); - continue; - } - - type0 = messageHandlerTypes[0]; - subs = subscriptions.get(type0); - if (subs == null) { - subs = new ArrayList(); - - subscriptions.put(type0, subs); - } - - subs.add(subscription); - } - } - - @Override - public AtomicBoolean getVarArgPossibility() { - return null; - } - - @Override - public VarArgUtils getVarArgUtils() { - return null; - } - - @Override - public void shutdown() { - this.subscriptionsPerMessage.clear(); - } - - @Override - public void clear() { - - } - - // can return null - @Override - public ArrayList getExactAsArray(final Class messageClass) { - return subscriptionsPerMessage.get(messageClass); - } - - // can return null - @Override - public ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2) { - return subscriptionsPerMessage.get(messageClass1); - } - - // can return null - @Override - public ArrayList getExactAsArray(final Class messageClass1, final Class messageClass2, - final Class messageClass3) { - return subscriptionsPerMessage.get(messageClass1); - } - - // can return null - @Override - public - Subscription[] getExact(final Class messageClass) { - final ArrayList collection = getExactAsArray(messageClass); - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - - return subscriptions; - } - - return null; - } - - // can return null - @Override - public Subscription[] getExact(final Class messageClass1, final Class messageClass2) { - return null; - } - - // can return null - @Override - public Subscription[] getExact(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - return null; - } - - // can return null - @Override - public Subscription[] getExactAndSuper(final Class messageClass) { - ArrayList collection = getExactAsArray(messageClass); // can return null - - // now publish superClasses - final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubscriptions.isEmpty()) { - collection.addAll(superSubscriptions); - } - } - else if (!superSubscriptions.isEmpty()) { - collection = superSubscriptions; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - @Override - public Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2) { - ArrayList collection = getExactAsArray(messageClass1); // can return null - - // now publish superClasses - final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass1, this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubscriptions.isEmpty()) { - collection.addAll(superSubscriptions); - } - } - else if (!superSubscriptions.isEmpty()) { - collection = superSubscriptions; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } - - @Override - public Subscription[] getExactAndSuper(final Class messageClass1, final Class messageClass2, final Class messageClass3) { - ArrayList collection = getExactAsArray(messageClass1); // can return null - - // now publish superClasses - final ArrayList superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass1, this); // NOT return null - - if (collection != null) { - collection = new ArrayList(collection); - - if (!superSubscriptions.isEmpty()) { - collection.addAll(superSubscriptions); - } - } - else if (!superSubscriptions.isEmpty()) { - collection = superSubscriptions; - } - - if (collection != null) { - final Subscription[] subscriptions = new Subscription[collection.size()]; - collection.toArray(subscriptions); - return subscriptions; - } - else { - return null; - } - } -} diff --git a/src/dorkbox/util/messagebus/subscription/Subscriber.java b/src/dorkbox/util/messagebus/subscription/Subscriber.java index 4687ada..2eb8c65 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscriber.java +++ b/src/dorkbox/util/messagebus/subscription/Subscriber.java @@ -25,11 +25,13 @@ import dorkbox.util.messagebus.utils.VarArgUtils; import java.util.ArrayList; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; /** * Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted */ +@SuppressWarnings("Duplicates") public class Subscriber { public static final float LOAD_FACTOR = 0.8F; @@ -42,13 +44,13 @@ class Subscriber { // all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required // this is the primary list for dispatching a specific message // write access is synchronized and happens only when a listener of a specific class is registered the first time - private final Map, ArrayList> subscriptionsPerMessageSingle; + final ConcurrentMap, ArrayList> subscriptionsPerMessageSingle; private final HashMapTree, ArrayList> subscriptionsPerMessageMulti; // shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments) - private final AtomicBoolean varArgPossibility = new AtomicBoolean(false); + final AtomicBoolean varArgPossibility = new AtomicBoolean(false); - private ThreadLocal> listCache = new ThreadLocal>() { + ThreadLocal> listCache = new ThreadLocal>() { @Override protected ArrayList initialValue() { @@ -63,7 +65,7 @@ class Subscriber { this.errorHandler = errorHandler; this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1); - this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(4, LOAD_FACTOR); + this.subscriptionsPerMessageMulti = new HashMapTree, ArrayList>(); this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR); @@ -160,23 +162,6 @@ class Subscriber { } } - public - void register(final Class listenerClass, final int handlersSize, final Subscription[] subsPerListener) { - - final Map, ArrayList> subsPerMessageSingle = this.subscriptionsPerMessageSingle; - final HashMapTree, ArrayList> subsPerMessageMulti = this.subscriptionsPerMessageMulti; - final AtomicBoolean varArgPossibility = this.varArgPossibility; - - Subscription subscription; - - for (int i = 0; i < handlersSize; i++) { - subscription = subsPerListener[i]; - - // activate this subscription for publication - // now add this subscription to each of the handled types - registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility); - } - } public void shutdown() { @@ -207,6 +192,7 @@ class Subscriber { final ArrayList collection = getExactAsArray(messageClass); if (collection != null) { + // convert to Array because the subscriptions can change and we want safe iteration over the list final Subscription[] subscriptions = new Subscription[collection.size()]; collection.toArray(subscriptions); diff --git a/src/dorkbox/util/messagebus/subscription/Subscription.java b/src/dorkbox/util/messagebus/subscription/Subscription.java index b762abb..c98e21b 100644 --- a/src/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/dorkbox/util/messagebus/subscription/Subscription.java @@ -71,15 +71,17 @@ class Subscription { // the handler's metadata -> for each handler in a listener, a unique subscription context is created - private final MessageHandler handlerMetadata; + private final MessageHandler handler; private final IHandlerInvocation invocation; private final Collection listeners; public - Subscription(final MessageHandler handler, final float loadFactor, final int stripeSize) { - this.handlerMetadata = handler; -// this.listeners = new StrongConcurrentSetV8(16, loadFactor, stripeSize); + Subscription(final MessageHandler handler) { + this.handler = handler; + +// this.listeners = Collections.newSetFromMap(new ConcurrentHashMap(8)); // really bad performance +// this.listeners = new StrongConcurrentSetV8(16, 0.7F, 8); ///this is by far, the fastest this.listeners = new ConcurrentSkipListSet<>(new Comparator() { @@ -87,6 +89,7 @@ class Subscription { public int compare(final Object o1, final Object o2) { return Integer.compare(o1.hashCode(), o2.hashCode()); +// return 0; } }); // this.listeners = new StrongConcurrentSet(16, 0.85F); @@ -104,7 +107,7 @@ class Subscription { public MessageHandler getHandler() { - return handlerMetadata; + return handler; } public @@ -133,8 +136,8 @@ class Subscription { public void publish(final Object message) throws Throwable { - final MethodAccess handler = this.handlerMetadata.getHandler(); - final int handleIndex = this.handlerMetadata.getMethodIndex(); + final MethodAccess handler = this.handler.getHandler(); + final int handleIndex = this.handler.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; Iterator iterator; @@ -149,8 +152,8 @@ class Subscription { public void publish(final Object message1, final Object message2) throws Throwable { - final MethodAccess handler = this.handlerMetadata.getHandler(); - final int handleIndex = this.handlerMetadata.getMethodIndex(); + final MethodAccess handler = this.handler.getHandler(); + final int handleIndex = this.handler.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; Iterator iterator; @@ -165,8 +168,8 @@ class Subscription { public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { - final MethodAccess handler = this.handlerMetadata.getHandler(); - final int handleIndex = this.handlerMetadata.getMethodIndex(); + final MethodAccess handler = this.handler.getHandler(); + final int handleIndex = this.handler.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; Iterator iterator; @@ -181,8 +184,8 @@ class Subscription { public void publishToSubscription(final Object... messages) throws Throwable { - final MethodAccess handler = this.handlerMetadata.getHandler(); - final int handleIndex = this.handlerMetadata.getMethodIndex(); + final MethodAccess handler = this.handler.getHandler(); + final int handleIndex = this.handler.getMethodIndex(); final IHandlerInvocation invocation = this.invocation; Iterator iterator; diff --git a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java index caafe6a..7a18b41 100644 --- a/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -17,9 +17,11 @@ package dorkbox.util.messagebus.subscription; import dorkbox.util.messagebus.common.MessageHandler; import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter; -import dorkbox.util.messagebus.common.adapter.StampedLock; +import java.util.ArrayList; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; /** * The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process. @@ -39,19 +41,15 @@ class SubscriptionManager { // this map provides fast access for subscribing and unsubscribing // write access is synchronized and happens very infrequently // once a collection of subscriptions is stored it does not change - private final Map, Subscription[]> subscriptionsPerListener; + private final ConcurrentMap, Subscription[]> subscriptionsPerListener; - private final StampedLock lock; - private final int numberOfThreads; private final Subscriber subscriber; public - SubscriptionManager(final int numberOfThreads, final Subscriber subscriber, final StampedLock lock) { - this.numberOfThreads = numberOfThreads; + SubscriptionManager(final int numberOfThreads, final Subscriber subscriber) { this.subscriber = subscriber; - this.lock = lock; // modified ONLY during SUB/UNSUB @@ -85,11 +83,13 @@ class SubscriptionManager { // these are concurrent collections subscriber.clear(); - Subscription[] subscriptions = getListenerSubs(listenerClass); + // this is an array, because subscriptions for a specific listener CANNOT change, either they exist or do not exist. + // ONCE subscriptions are in THIS map, they are considered AVAILABLE. + Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass); // the subscriptions from the map were null, so create them if (subscriptions == null) { - // it is important to note that this section CAN be repeated, however the write lock is gained before + // it is important to note that this section CAN be repeated. // anything 'permanent' is saved. This is so the time spent inside the writelock is minimized. final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass); @@ -101,50 +101,80 @@ class SubscriptionManager { return; } - final Subscription[] subsPerListener = new Subscription[handlersSize]; - // create the subscription - MessageHandler messageHandler; + final AtomicBoolean varArgPossibility = subscriber.varArgPossibility; Subscription subscription; + MessageHandler messageHandler; + Class[] messageHandlerTypes; + Class handlerType; + + // create the subscriptions + final ConcurrentMap, ArrayList> subsPerMessageSingle = subscriber.subscriptionsPerMessageSingle; + subscriptions = new Subscription[handlersSize]; + for (int i = 0; i < handlersSize; i++) { + // THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE! messageHandler = messageHandlers[i]; - // create the subscription - subscription = new Subscription(messageHandler, Subscriber.LOAD_FACTOR, numberOfThreads); - subscription.subscribe(listener); + // is this handler able to accept var args? + if (messageHandler.getVarArgClass() != null) { + varArgPossibility.lazySet(true); + } - subsPerListener[i] = subscription; // activates this sub for sub/unsub + // now create a list of subscriptions for this specific handlerType (but don't add anything yet). + // we only store things based on the FIRST type (for lookup) then parse the rest of the types during publication + messageHandlerTypes = messageHandler.getHandledMessages(); + handlerType = messageHandlerTypes[0]; + + // using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types + final ArrayList cachedSubs = subscriber.listCache.get(); + ArrayList subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs); + if (subs == null) { + subscriber.listCache.set(new ArrayList(8)); + } + + // create the subscription. This can be thrown away if the subscription succeeds in another thread + subscription = new Subscription(messageHandler); + subscriptions[i] = subscription; + + // now add this subscription to each of the handled types } - final Map, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener; + // now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions - // now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because - // of the huge number of reads compared to writes. + // putIfAbsent + final Subscription[] previousSubs = subscriptionsPerListener.putIfAbsent(listenerClass, subscriptions); // activates this sub for sub/unsub + if (previousSubs != null) { + // another thread beat us to creating subs (for this exact listenerClass). Since another thread won, we have to make sure + // all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUT the correct list again. + // This is to make sure that "invalid" subscriptions don't exist in subsPerMessageSingle. - final StampedLock lock = this.lock; - final long stamp = lock.writeLock(); + // since nothing is yet "subscribed" we can assign the correct values for everything now + subscriptions = previousSubs; + } else { + // we can now safely add for publication AND subscribe since the data structures are consistent + for (int i = 0; i < handlersSize; i++) { + subscription = subscriptions[i]; + subscription.subscribe(listener); // register this callback listener to this subscription - subscriptions = subsPerListenerMap.get(listenerClass); + // THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE! + messageHandler = messageHandlers[i]; - // it was still null, so we actually have to create the rest of the subs - if (subscriptions == null) { - subscriber.register(listenerClass, handlersSize, subsPerListener); // this adds to subscriptionsPerMessage + // register for publication + messageHandlerTypes = messageHandler.getHandledMessages(); + handlerType = messageHandlerTypes[0]; - subsPerListenerMap.put(listenerClass, subsPerListener); - lock.unlockWrite(stamp); + // makes this subscription visible for publication + subsPerMessageSingle.get(handlerType).add(subscription); + } return; } - else { - // continue to subscription - lock.unlockWrite(stamp); - } } // subscriptions already exist and must only be updated - // only publish here if our single-check was OK, or our double-check was OK Subscription subscription; for (int i = 0; i < subscriptions.length; i++) { subscription = subscriptions[i]; @@ -167,7 +197,7 @@ class SubscriptionManager { // these are concurrent collections subscriber.clear(); - final Subscription[] subscriptions = getListenerSubs(listenerClass); + final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass); if (subscriptions != null) { Subscription subscription; @@ -177,16 +207,4 @@ class SubscriptionManager { } } } - - private - Subscription[] getListenerSubs(final Class listenerClass) { - - final StampedLock lock = this.lock; - final long stamp = lock.readLock(); - - final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass); - - lock.unlockRead(stamp); - return subscriptions; - } } diff --git a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java index 9dd212e..8cf2bca 100644 --- a/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java +++ b/src/dorkbox/util/messagebus/utils/SubscriptionUtils.java @@ -42,7 +42,7 @@ class SubscriptionUtils { // superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically. // it's a hit on SUB/UNSUB, but improves performance of handlers this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, 1); - this.superClassSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); + this.superClassSubscriptionsMulti = new HashMapTree, ArrayList>(); } public diff --git a/src/dorkbox/util/messagebus/utils/VarArgUtils.java b/src/dorkbox/util/messagebus/utils/VarArgUtils.java index 0254bfa..6960a7b 100644 --- a/src/dorkbox/util/messagebus/utils/VarArgUtils.java +++ b/src/dorkbox/util/messagebus/utils/VarArgUtils.java @@ -41,10 +41,10 @@ class VarArgUtils { this.superClassUtils = superClassUtils; this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1); - this.varArgSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); + this.varArgSubscriptionsMulti = new HashMapTree, ArrayList>(); this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1); - this.varArgSuperSubscriptionsMulti = new HashMapTree, ArrayList>(4, loadFactor); + this.varArgSuperSubscriptionsMulti = new HashMapTree, ArrayList>(); } diff --git a/test/dorkbox/util/messagebus/ObjectTreeTest.java b/test/dorkbox/util/messagebus/ObjectTreeTest.java index 5642d3a..c9d36e0 100644 --- a/test/dorkbox/util/messagebus/ObjectTreeTest.java +++ b/test/dorkbox/util/messagebus/ObjectTreeTest.java @@ -43,7 +43,7 @@ public class ObjectTreeTest extends AssertSupport { @Test public void testObjectTree() { - HashMapTree, String> tree = new HashMapTree, String>(8, 0.8F); + HashMapTree, String> tree = new HashMapTree, String>(); test(tree, "s", String.class); test(tree, "x", String.class); diff --git a/test/dorkbox/util/messagebus/PerfTest_Collections.java b/test/dorkbox/util/messagebus/PerfTest_Collections.java index 07c2424..3f7a601 100644 --- a/test/dorkbox/util/messagebus/PerfTest_Collections.java +++ b/test/dorkbox/util/messagebus/PerfTest_Collections.java @@ -89,7 +89,7 @@ class PerfTest_Collections { for (int i = 0; i < size; i++) { for (MessageHandler x : allHandlers) { - set.add(new Subscription(x, .85F, 1)); + set.add(new Subscription(x)); } } diff --git a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java index 248b386..5ed422a 100644 --- a/test/dorkbox/util/messagebus/SubscriptionManagerTest.java +++ b/test/dorkbox/util/messagebus/SubscriptionManagerTest.java @@ -161,7 +161,7 @@ public class SubscriptionManagerTest extends AssertSupport { final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); final Subscriber subscriber = new Subscriber(errorHandler, classUtils); - SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock); + SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class).handles( @@ -185,7 +185,7 @@ public class SubscriptionManagerTest extends AssertSupport { final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR); final Subscriber subscriber = new Subscriber(errorHandler, classUtils); - final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock); + final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1); diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue.java deleted file mode 100644 index 39b1ae3..0000000 --- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue.java +++ /dev/null @@ -1,28 +0,0 @@ -package dorkbox.util.messagebus.queuePerf; - -import java.util.concurrent.LinkedBlockingQueue; - -public class PerfTest_LinkedBlockingQueue { - public static final int REPETITIONS = 50 * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - - public static void main(final String[] args) throws Exception { - System.out.println("reps:" + REPETITIONS); - - final int warmupRuns = 4; - final int runs = 3; - - for (int concurrency = 1; concurrency < 5; concurrency++) { - final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE); - long average = PerfTest_LinkedBlockingQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); - System.out.format("PerfTest_LinkedBlockingQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency); - } - - for (int concurrency = 1; concurrency < 5; concurrency++) { - final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE); - long average = PerfTest_LinkedBlockingQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); - System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency); - } - } -} diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_Block.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_Block.java deleted file mode 100644 index 45a4f2d..0000000 --- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_Block.java +++ /dev/null @@ -1,161 +0,0 @@ -package dorkbox.util.messagebus.queuePerf; - -import java.util.concurrent.LinkedBlockingQueue; - -public -class PerfTest_LinkedBlockingQueue_Block { - public static final int REPETITIONS = 50 * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - private static final int concurrency = 4; - - public static - void main(final String[] args) throws Exception { - System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); - - final int warmupRuns = 4; - final int runs = 5; - - final LinkedBlockingQueue queue = new LinkedBlockingQueue(concurrency); - long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); - - System.out.format("summary,QueuePerfTest,%s %,d\n", - queue.getClass() - .getSimpleName(), - average); - } - - public static - long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) - throws Exception { - int runs = warmUpRuns + sumCount; - final long[] results = new long[runs]; - for (int i = 0; i < runs; i++) { - System.gc(); - results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); - } - // only average last X results for summary - long sum = 0; - for (int i = warmUpRuns; i < runs; i++) { - sum += results[i]; - } - - return sum / sumCount; - } - - private static - long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) - throws Exception { - - Producer[] producers = new Producer[concurrency]; - Consumer[] consumers = new Consumer[concurrency]; - Thread[] threads = new Thread[concurrency * 2]; - - for (int i = 0; i < concurrency; i++) { - producers[i] = new Producer(queue, repetitions); - consumers[i] = new Consumer(queue, repetitions); - } - - for (int j = 0, i = 0; i < concurrency; i++, j += 2) { - threads[j] = new Thread(producers[i], "Producer " + i); - threads[j + 1] = new Thread(consumers[i], "Consumer " + i); - } - - for (int i = 0; i < concurrency * 2; i += 2) { - threads[i].start(); - threads[i + 1].start(); - } - - for (int i = 0; i < concurrency * 2; i += 2) { - threads[i].join(); - threads[i + 1].join(); - } - - long start = Long.MAX_VALUE; - long end = -1; - - for (int i = 0; i < concurrency; i++) { - if (producers[i].start < start) { - start = producers[i].start; - } - - if (consumers[i].end > end) { - end = consumers[i].end; - } - } - - - long duration = end - start; - long ops = repetitions * 1_000_000_000L / duration; - String qName = queue.getClass() - .getSimpleName(); - - if (showStats) { - System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); - } - return ops; - } - - public static - class Producer implements Runnable { - private final LinkedBlockingQueue queue; - volatile long start; - private int repetitions; - - public - Producer(LinkedBlockingQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public - void run() { - LinkedBlockingQueue producer = this.queue; - int i = this.repetitions; - this.start = System.nanoTime(); - - try { - do { - producer.put(TEST_VALUE); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - - public static - class Consumer implements Runnable { - private final LinkedBlockingQueue queue; - Object result; - volatile long end; - private int repetitions; - - public - Consumer(LinkedBlockingQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public - void run() { - LinkedBlockingQueue consumer = this.queue; - Object result = null; - int i = this.repetitions; - - try { - do { - result = consumer.take(); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - this.result = result; - this.end = System.nanoTime(); - } - } -} diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_NonBlock.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_NonBlock.java deleted file mode 100644 index 151976d..0000000 --- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedBlockingQueue_NonBlock.java +++ /dev/null @@ -1,148 +0,0 @@ -package dorkbox.util.messagebus.queuePerf; - -import java.util.concurrent.LinkedBlockingQueue; - -public class PerfTest_LinkedBlockingQueue_NonBlock { - public static final int REPETITIONS = 50 * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - public static final int QUEUE_CAPACITY = 1 << 17; - - private static final int concurrency = 4; - - public static void main(final String[] args) throws Exception { - System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); - - final int warmupRuns = 5; - final int runs = 5; - - long average = 0; - - final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE); - average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); - - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); - } - - public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { - int runs = warmUpRuns + sumCount; - final long[] results = new long[runs]; - for (int i = 0; i < runs; i++) { - System.gc(); - results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); - } - // only average last X results for summary - long sum = 0; - for (int i = warmUpRuns; i < runs; i++) { - sum += results[i]; - } - - return sum/sumCount; - } - - private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { - - Producer[] producers = new Producer[concurrency]; - Consumer[] consumers = new Consumer[concurrency]; - Thread[] threads = new Thread[concurrency*2]; - - for (int i=0;i end) { - end = consumers[i].end; - } - } - - - long duration = end - start; - long ops = repetitions * 1_000_000_000L / duration; - String qName = queue.getClass().getSimpleName(); - - if (showStats) { - System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); - } - return ops; - } - - private - PerfTest_LinkedBlockingQueue_NonBlock() { - } - - public static class Producer implements Runnable { - private final LinkedBlockingQueue queue; - volatile long start; - private int repetitions; - - public Producer(LinkedBlockingQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public void run() { - LinkedBlockingQueue producer = this.queue; - int i = this.repetitions; - this.start = System.nanoTime(); - - do { - while (!producer.offer(TEST_VALUE)) { - Thread.yield(); - } - } while (0 != --i); - } - } - - public static class Consumer implements Runnable { - private final LinkedBlockingQueue queue; - Object result; - volatile long end; - private int repetitions; - - public Consumer(LinkedBlockingQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public void run() { - LinkedBlockingQueue consumer = this.queue; - Object result = null; - int i = this.repetitions; - - do { - while (null == (result = consumer.poll())) { - Thread.yield(); - } - } while (0 != --i); - - this.result = result; - this.end = System.nanoTime(); - } - } -} diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue.java deleted file mode 100644 index 1167f2a..0000000 --- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue.java +++ /dev/null @@ -1,28 +0,0 @@ -package dorkbox.util.messagebus.queuePerf; - -import java.util.concurrent.LinkedTransferQueue; - -public class PerfTest_LinkedTransferQueue { - public static final int REPETITIONS = 50 * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - - public static void main(final String[] args) throws Exception { - System.out.println("reps:" + REPETITIONS); - - final int warmupRuns = 4; - final int runs = 3; - - for (int concurrency = 1; concurrency < 5; concurrency++) { - final LinkedTransferQueue queue = new LinkedTransferQueue(); - long average = PerfTest_LinkedTransferQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); - System.out.format("PerfTest_LinkedTransferQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency); - } - - for (int concurrency = 1; concurrency < 5; concurrency++) { - final LinkedTransferQueue queue = new LinkedTransferQueue(); - long average = PerfTest_LinkedTransferQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS); - System.out.format("PerfTest_LinkedTransferQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency); - } - } -} diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_Block.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_Block.java deleted file mode 100644 index c69ea68..0000000 --- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_Block.java +++ /dev/null @@ -1,145 +0,0 @@ -package dorkbox.util.messagebus.queuePerf; - -import java.util.concurrent.LinkedTransferQueue; - -public class PerfTest_LinkedTransferQueue_Block { - public static final int REPETITIONS = 50 * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - private static final int concurrency = 4; - - public static void main(final String[] args) throws Exception { - System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); - - final int warmupRuns = 4; - final int runs = 5; - - final LinkedTransferQueue queue = new LinkedTransferQueue(); - long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); - - - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); - } - - public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { - int runs = warmUpRuns + sumCount; - final long[] results = new long[runs]; - for (int i = 0; i < runs; i++) { - System.gc(); - results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); - } - // only average last X results for summary - long sum = 0; - for (int i = warmUpRuns; i < runs; i++) { - sum += results[i]; - } - - return sum/sumCount; - } - - private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { - - Producer[] producers = new Producer[concurrency]; - Consumer[] consumers = new Consumer[concurrency]; - Thread[] threads = new Thread[concurrency*2]; - - for (int i=0;i end) { - end = consumers[i].end; - } - } - - - long duration = end - start; - long ops = repetitions * 1_000_000_000L / duration; - String qName = queue.getClass().getSimpleName(); - - if (showStats) { - System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); - } - return ops; - } - - public static class Producer implements Runnable { - private final LinkedTransferQueue queue; - volatile long start; - private int repetitions; - - public Producer(LinkedTransferQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public void run() { - LinkedTransferQueue producer = this.queue; - int i = this.repetitions; - this.start = System.nanoTime(); - - try { - do { - producer.transfer(TEST_VALUE); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - public static class Consumer implements Runnable { - private final LinkedTransferQueue queue; - Object result; - volatile long end; - private int repetitions; - - public Consumer(LinkedTransferQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public void run() { - LinkedTransferQueue consumer = this.queue; - Object result = null; - int i = this.repetitions; - - try { - do { - result = consumer.take(); - } while (0 != --i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - this.result = result; - this.end = System.nanoTime(); - } - } -} diff --git a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_NonBlock.java b/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_NonBlock.java deleted file mode 100644 index 36dddbb..0000000 --- a/test/dorkbox/util/messagebus/queuePerf/PerfTest_LinkedTransferQueue_NonBlock.java +++ /dev/null @@ -1,144 +0,0 @@ -package dorkbox.util.messagebus.queuePerf; - -import java.util.concurrent.LinkedTransferQueue; - -public class PerfTest_LinkedTransferQueue_NonBlock { - public static final int REPETITIONS = 50 * 1000 * 100; - public static final Integer TEST_VALUE = Integer.valueOf(777); - - public static final int QUEUE_CAPACITY = 1 << 17; - - private static final int concurrency = 4; - - public static void main(final String[] args) throws Exception { - System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); - - final int warmupRuns = 5; - final int runs = 5; - - long average = 0; - - final LinkedTransferQueue queue = new LinkedTransferQueue(); - average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); - - System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); - } - - public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { - int runs = warmUpRuns + sumCount; - final long[] results = new long[runs]; - for (int i = 0; i < runs; i++) { - System.gc(); - results[i] = performanceRun(i, queue, showStats, concurrency, repetitions); - } - // only average last X results for summary - long sum = 0; - for (int i = warmUpRuns; i < runs; i++) { - sum += results[i]; - } - - return sum/sumCount; - } - - private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { - - Producer[] producers = new Producer[concurrency]; - Consumer[] consumers = new Consumer[concurrency]; - Thread[] threads = new Thread[concurrency*2]; - - for (int i=0;i end) { - end = consumers[i].end; - } - } - - - long duration = end - start; - long ops = repetitions * 1_000_000_000L / duration; - String qName = queue.getClass().getSimpleName(); - - if (showStats) { - System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); - } - return ops; - } - - public static class Producer implements Runnable { - private final LinkedTransferQueue queue; - volatile long start; - private int repetitions; - - public Producer(LinkedTransferQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public void run() { - LinkedTransferQueue producer = this.queue; - int i = this.repetitions; - this.start = System.nanoTime(); - - do { - while (!producer.offer(TEST_VALUE)) { - Thread.yield(); - } - } while (0 != --i); - } - } - - public static class Consumer implements Runnable { - private final LinkedTransferQueue queue; - Object result; - volatile long end; - private int repetitions; - - public Consumer(LinkedTransferQueue queue, int repetitions) { - this.queue = queue; - this.repetitions = repetitions; - } - - @Override - public void run() { - LinkedTransferQueue consumer = this.queue; - Object result = null; - int i = this.repetitions; - - do { - while (null == (result = consumer.poll())) { - Thread.yield(); - } - } while (0 != --i); - - this.result = result; - this.end = System.nanoTime(); - } - } -}