GREAT performance with Disruptor values tweaked. WIP getting all collections lock-free

This commit is contained in:
nathan 2016-01-15 02:26:23 +01:00
parent 0ed9b1b243
commit eae63b3f8a
21 changed files with 150 additions and 1539 deletions

View File

@ -24,7 +24,6 @@ import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
@ -51,6 +50,7 @@ import java.util.concurrent.locks.LockSupport;
public
class MessageBus implements IMessageBus {
private final ErrorHandlingSupport errorHandler;
// private final LinkedBlockingQueue<Object> dispatchQueue;
// private final ArrayBlockingQueue<Object> dispatchQueue;
// private final LinkedTransferQueue<Object> dispatchQueue;
// private final Collection<Thread> threads;
@ -94,7 +94,7 @@ class MessageBus implements IMessageBus {
*/
public
MessageBus(final PublishMode publishMode) {
this(publishMode, Runtime.getRuntime().availableProcessors()/2);
this(publishMode, Runtime.getRuntime().availableProcessors());
}
/**
* @param publishMode Specifies which publishMode to operate the publication of messages.
@ -103,16 +103,14 @@ class MessageBus implements IMessageBus {
public
MessageBus(final PublishMode publishMode, int numberOfThreads) {
// round to the nearest power of 2
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads)));
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1));
this.errorHandler = new DefaultErrorHandler();
// this.dispatchQueue = new ArrayBlockingQueue<Object>(6);
// this.dispatchQueue = new LinkedBlockingQueue<Object>(1024);
// this.dispatchQueue = new ArrayBlockingQueue<Object>(1024);
// this.dispatchQueue = new LinkedTransferQueue<Object>();
classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final StampedLock lock = new StampedLock();
final Subscriber subscriber;
/**
* Will subscribe and publish using all provided parameters in the method signature (for subscribe), and arguments (for publish)
@ -121,19 +119,19 @@ class MessageBus implements IMessageBus {
switch (publishMode) {
case Exact:
publisher = new PublisherExact(errorHandler, subscriber, lock);
publisher = new PublisherExact(errorHandler, subscriber);
break;
case ExactWithSuperTypes:
publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber, lock);
publisher = new PublisherExactWithSuperTypes(errorHandler, subscriber);
break;
case ExactWithSuperTypesAndVarity:
default:
publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber, lock);
publisher = new PublisherExactWithSuperTypesAndVarity(errorHandler, subscriber);
}
this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber, lock);
this.subscriptionManager = new SubscriptionManager(numberOfThreads, subscriber);
// Now we setup the disruptor and work handlers
@ -156,6 +154,7 @@ class MessageBus implements IMessageBus {
// final int BUFFER_SIZE = ringBufferSize * 64;
// final int BUFFER_SIZE = 1024 * 64;
// final int BUFFER_SIZE = 1024;
// final int BUFFER_SIZE = 16;
final int BUFFER_SIZE = 8;
@ -212,8 +211,9 @@ class MessageBus implements IMessageBus {
// @Override
// public
// void run() {
// ArrayBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//// LinkedTransferQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//// LinkedBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//// ArrayBlockingQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
// LinkedTransferQueue<?> IN_QUEUE = MessageBus.this.dispatchQueue;
//
// MultiNode node = new MultiNode();
// while (!MessageBus.this.shuttingDown) {
@ -369,7 +369,8 @@ class MessageBus implements IMessageBus {
}
// try {
// this.dispatchQueue.put(message);
// this.dispatchQueue.transfer(message);
//// this.dispatchQueue.put(message);
// } catch (Exception e) {
// errorHandler.handlePublicationError(new PublicationError().setMessage(
// "Error while adding an asynchronous message").setCause(e).setPublishedObject(message));
@ -458,7 +459,6 @@ class MessageBus implements IMessageBus {
// for (Thread t : this.threads) {
// t.start();
// }
}
@Override
@ -483,6 +483,4 @@ class MessageBus implements IMessageBus {
this.subscriptionManager.shutdown();
this.classUtils.clear();
}
}

View File

@ -16,7 +16,6 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
@ -27,13 +26,11 @@ public
class PublisherExact implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public
PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
PublisherExact(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
@ -42,10 +39,7 @@ class PublisherExact implements Publisher {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -57,9 +51,7 @@ class PublisherExact implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -85,10 +77,10 @@ class PublisherExact implements Publisher {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -100,9 +92,9 @@ class PublisherExact implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
@ -129,10 +121,10 @@ class PublisherExact implements Publisher {
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass1, messageClass2, messageClass3); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -144,9 +136,9 @@ class PublisherExact implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);

View File

@ -16,7 +16,6 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
@ -27,13 +26,12 @@ public
class PublisherExactWithSuperTypes implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
// private final StampedLock lock;
public
PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
@ -42,10 +40,7 @@ class PublisherExactWithSuperTypes implements Publisher {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -57,9 +52,7 @@ class PublisherExactWithSuperTypes implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -85,10 +78,10 @@ class PublisherExactWithSuperTypes implements Publisher {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -100,9 +93,9 @@ class PublisherExactWithSuperTypes implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
@ -130,11 +123,11 @@ class PublisherExactWithSuperTypes implements Publisher {
final Class<?> messageClass3 = message3.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2,
messageClass3); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
@ -146,9 +139,9 @@ class PublisherExactWithSuperTypes implements Publisher {
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);

View File

@ -16,7 +16,6 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
@ -32,16 +31,14 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
private final AtomicBoolean varArgPossibility;
final VarArgUtils varArgUtils;
public
PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
PublisherExactWithSuperTypesAndVarity(final ErrorHandlingSupport errorHandler, final Subscriber subscriber) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
varArgPossibility = subscriber.getVarArgPossibility();
varArgUtils = subscriber.getVarArgUtils();
@ -54,10 +51,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
final Class<?> messageClass = message1.getClass();
final boolean isArray = messageClass.isArray();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
@ -74,9 +68,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// publish to var arg, only if not already an array (because that would be unnecessary)
if (varArgPossibility.get() && !isArray) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass, subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
Subscription sub;
int length = varArgSubs.length;
@ -96,10 +88,8 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass,
subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
length = varArgSuperSubs.length;
@ -121,9 +111,7 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// only get here if there were no other subscriptions
// Dead Event must EXACTLY MATCH (no subclasses)
if (!hasSubs) {
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class);
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
@ -149,10 +137,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
@ -171,9 +159,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2) {
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
final int length = varArgSubs.length;
if (length > 0) {
@ -192,10 +180,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2,
subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
@ -221,9 +209,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
@ -250,10 +238,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
// final StampedLock lock = this.lock;
// long stamp = lock.readLock();
final Subscription[] subs = subscriber.getExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
boolean hasSubs = false;
@ -274,9 +262,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1, subscriber); // can NOT return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
final int length = varArgSubs.length;
if (length > 0) {
@ -297,10 +285,10 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3,
subscriber); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
@ -327,9 +315,9 @@ class PublisherExactWithSuperTypesAndVarity implements Publisher {
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
// stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
// lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);

View File

@ -1,231 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.Arrays;
@SuppressWarnings("Duplicates")
public
class PublisherExactWithSuperTypes_FirstArg implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public
PublisherExactWithSuperTypes_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
public
void publish(final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == 1) {
sub.publish(message1);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1));
}
}
@Override
public
void publish(final Object message1, final Object message2) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass, null); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == 2) {
sub.publish(message1, message2);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2));
}
}
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == 3) {
sub.publish(message1, message2, message3);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public
void publish(final Object[] messages) {
try {
final Object message1 = messages[0];
final Class<?> messageClass = message1.getClass();
final int length = messages.length;
final Object[] newMessages = Arrays.copyOfRange(messages, 1, length);
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExactAndSuper(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == length) {
sub.publish(message1, newMessages);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, newMessages);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(messages));
}
}
}

View File

@ -1,227 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscriber;
import dorkbox.util.messagebus.subscription.Subscription;
@SuppressWarnings("Duplicates")
public
class PublisherExact_FirstArg implements Publisher {
private final ErrorHandlingSupport errorHandler;
private final Subscriber subscriber;
private final StampedLock lock;
public
PublisherExact_FirstArg(final ErrorHandlingSupport errorHandler, final Subscriber subscriber, final StampedLock lock) {
this.errorHandler = errorHandler;
this.subscriber = subscriber;
this.lock = lock;
}
@Override
public
void publish(final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == 1) {
sub.publish(message1);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1));
}
}
@Override
public
void publish(final Object message1, final Object message2) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == 2) {
sub.publish(message1, message2);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
lock.unlockRead(stamp);
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2));
}
}
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
try {
final Class<?> messageClass = message1.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == 3) {
sub.publish(message1, message2, message3);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public
void publish(final Object[] messages) {
try {
final Class<?> messageClass = messages[0].getClass();
final int length = messages.length;
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriber.getExact(messageClass); // can return null
lock.unlockRead(stamp);
// Run subscriptions
if (subscriptions != null) {
Class<?>[] handledMessages;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
handledMessages = sub.getHandler().getHandledMessages();
if (handledMessages.length == length) {
sub.publish(messages);
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
stamp = lock.readLock();
final Subscription[] deadSubscriptions = subscriber.getExact(DeadMessage.class); // can return null
lock.unlockRead(stamp);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(messages);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.")
.setCause(e)
.setPublishedObject(messages));
}
}
}

View File

@ -1,255 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions that only use the first parameters as the signature. The publisher MUST provide the correct additional parameters,
* and they must be of the correct type, otherwise it will throw an error.
* </p>
* Parameter length checking during publication is performed, so that you can have multiple handlers with the same signature, but each
* with a different number of parameters
*/
public class FirstArgSubscriber implements Subscriber {
private final ErrorHandlingSupport errorHandler;
private final SubscriptionUtils subUtils;
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
// the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case)
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessage;
public FirstArgSubscriber(final ErrorHandlingSupport errorHandler, final ClassUtils classUtils) {
this.errorHandler = errorHandler;
// the following are used ONLY for FIRST ARG subscription/publication. (subscriptionsPerMessageMulti isn't used in this case)
this.subscriptionsPerMessage = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1);
this.subUtils = new SubscriptionUtils(classUtils, Subscriber.LOAD_FACTOR);
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
@Override
public void register(final Class<?> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
final Map<Class<?>, ArrayList<Subscription>> subscriptions = this.subscriptionsPerMessage;
Subscription subscription;
MessageHandler handler;
Class<?>[] messageHandlerTypes;
int size;
Class<?> type0;
ArrayList<Subscription> subs;
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
// now add this subscription to each of the handled types
// only register based on the FIRST parameter
handler = subscription.getHandler();
messageHandlerTypes = handler.getHandledMessages();
size = messageHandlerTypes.length;
if (size == 0) {
errorHandler.handleError("Error while trying to subscribe class: " + messageHandlerTypes.getClass(), listenerClass);
continue;
}
type0 = messageHandlerTypes[0];
subs = subscriptions.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
subscriptions.put(type0, subs);
}
subs.add(subscription);
}
}
@Override
public AtomicBoolean getVarArgPossibility() {
return null;
}
@Override
public VarArgUtils getVarArgUtils() {
return null;
}
@Override
public void shutdown() {
this.subscriptionsPerMessage.clear();
}
@Override
public void clear() {
}
// can return null
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass) {
return subscriptionsPerMessage.get(messageClass);
}
// can return null
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2) {
return subscriptionsPerMessage.get(messageClass1);
}
// can return null
@Override
public ArrayList<Subscription> getExactAsArray(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
return subscriptionsPerMessage.get(messageClass1);
}
// can return null
@Override
public
Subscription[] getExact(final Class<?> messageClass) {
final ArrayList<Subscription> collection = getExactAsArray(messageClass);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
return null;
}
// can return null
@Override
public Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2) {
return null;
}
// can return null
@Override
public Subscription[] getExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
return null;
}
// can return null
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass) {
ArrayList<Subscription> collection = getExactAsArray(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass1, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
@Override
public Subscription[] getExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
ArrayList<Subscription> collection = getExactAsArray(messageClass1); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass1, this); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
}

View File

@ -25,11 +25,13 @@ import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Permits subscriptions with a varying length of parameters as the signature, which must be match by the publisher for it to be accepted
*/
@SuppressWarnings("Duplicates")
public
class Subscriber {
public static final float LOAD_FACTOR = 0.8F;
@ -42,13 +44,13 @@ class Subscriber {
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
final ConcurrentMap<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
private ThreadLocal<ArrayList<Subscription>> listCache = new ThreadLocal<ArrayList<Subscription>>() {
ThreadLocal<ArrayList<Subscription>> listCache = new ThreadLocal<ArrayList<Subscription>>() {
@Override
protected
ArrayList<Subscription> initialValue() {
@ -63,7 +65,7 @@ class Subscriber {
this.errorHandler = errorHandler;
this.subscriptionsPerMessageSingle = JavaVersionAdapter.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
this.subUtils = new SubscriptionUtils(classUtils, LOAD_FACTOR);
@ -160,23 +162,6 @@ class Subscriber {
}
}
public
void register(final Class<?> listenerClass, final int handlersSize, final Subscription[] subsPerListener) {
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
Subscription subscription;
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
// now add this subscription to each of the handled types
registerMulti(subscription, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
}
public
void shutdown() {
@ -207,6 +192,7 @@ class Subscriber {
final ArrayList<Subscription> collection = getExactAsArray(messageClass);
if (collection != null) {
// convert to Array because the subscriptions can change and we want safe iteration over the list
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);

View File

@ -71,15 +71,17 @@ class Subscription {
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handlerMetadata;
private final MessageHandler handler;
private final IHandlerInvocation invocation;
private final Collection<Object> listeners;
public
Subscription(final MessageHandler handler, final float loadFactor, final int stripeSize) {
this.handlerMetadata = handler;
// this.listeners = new StrongConcurrentSetV8<Object>(16, loadFactor, stripeSize);
Subscription(final MessageHandler handler) {
this.handler = handler;
// this.listeners = Collections.newSetFromMap(new ConcurrentHashMap<Object, Boolean>(8)); // really bad performance
// this.listeners = new StrongConcurrentSetV8<Object>(16, 0.7F, 8);
///this is by far, the fastest
this.listeners = new ConcurrentSkipListSet<>(new Comparator() {
@ -87,6 +89,7 @@ class Subscription {
public
int compare(final Object o1, final Object o2) {
return Integer.compare(o1.hashCode(), o2.hashCode());
// return 0;
}
});
// this.listeners = new StrongConcurrentSet<Object>(16, 0.85F);
@ -104,7 +107,7 @@ class Subscription {
public
MessageHandler getHandler() {
return handlerMetadata;
return handler;
}
public
@ -133,8 +136,8 @@ class Subscription {
public
void publish(final Object message) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final MethodAccess handler = this.handler.getHandler();
final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
@ -149,8 +152,8 @@ class Subscription {
public
void publish(final Object message1, final Object message2) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final MethodAccess handler = this.handler.getHandler();
final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
@ -165,8 +168,8 @@ class Subscription {
public
void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final MethodAccess handler = this.handler.getHandler();
final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
@ -181,8 +184,8 @@ class Subscription {
public
void publishToSubscription(final Object... messages) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final MethodAccess handler = this.handler.getHandler();
final int handleIndex = this.handler.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;

View File

@ -17,9 +17,11 @@ package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
@ -39,19 +41,15 @@ class SubscriptionManager {
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final Map<Class<?>, Subscription[]> subscriptionsPerListener;
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
private final StampedLock lock;
private final int numberOfThreads;
private final Subscriber subscriber;
public
SubscriptionManager(final int numberOfThreads, final Subscriber subscriber, final StampedLock lock) {
this.numberOfThreads = numberOfThreads;
SubscriptionManager(final int numberOfThreads, final Subscriber subscriber) {
this.subscriber = subscriber;
this.lock = lock;
// modified ONLY during SUB/UNSUB
@ -85,11 +83,13 @@ class SubscriptionManager {
// these are concurrent collections
subscriber.clear();
Subscription[] subscriptions = getListenerSubs(listenerClass);
// this is an array, because subscriptions for a specific listener CANNOT change, either they exist or do not exist.
// ONCE subscriptions are in THIS map, they are considered AVAILABLE.
Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
// the subscriptions from the map were null, so create them
if (subscriptions == null) {
// it is important to note that this section CAN be repeated, however the write lock is gained before
// it is important to note that this section CAN be repeated.
// anything 'permanent' is saved. This is so the time spent inside the writelock is minimized.
final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
@ -101,50 +101,80 @@ class SubscriptionManager {
return;
}
final Subscription[] subsPerListener = new Subscription[handlersSize];
// create the subscription
MessageHandler messageHandler;
final AtomicBoolean varArgPossibility = subscriber.varArgPossibility;
Subscription subscription;
MessageHandler messageHandler;
Class<?>[] messageHandlerTypes;
Class<?> handlerType;
// create the subscriptions
final ConcurrentMap<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = subscriber.subscriptionsPerMessageSingle;
subscriptions = new Subscription[handlersSize];
for (int i = 0; i < handlersSize; i++) {
// THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
messageHandler = messageHandlers[i];
// create the subscription
subscription = new Subscription(messageHandler, Subscriber.LOAD_FACTOR, numberOfThreads);
subscription.subscribe(listener);
// is this handler able to accept var args?
if (messageHandler.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
subsPerListener[i] = subscription; // activates this sub for sub/unsub
// now create a list of subscriptions for this specific handlerType (but don't add anything yet).
// we only store things based on the FIRST type (for lookup) then parse the rest of the types during publication
messageHandlerTypes = messageHandler.getHandledMessages();
handlerType = messageHandlerTypes[0];
// using ThreadLocal cache's is SIGNIFICANTLY faster for subscribing to new types
final ArrayList<Subscription> cachedSubs = subscriber.listCache.get();
ArrayList<Subscription> subs = subsPerMessageSingle.putIfAbsent(handlerType, cachedSubs);
if (subs == null) {
subscriber.listCache.set(new ArrayList<Subscription>(8));
}
// create the subscription. This can be thrown away if the subscription succeeds in another thread
subscription = new Subscription(messageHandler);
subscriptions[i] = subscription;
// now add this subscription to each of the handled types
}
final Map<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
// now subsPerMessageSingle has a unique list of subscriptions for a specific handlerType, and MAY already have subscriptions
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
// putIfAbsent
final Subscription[] previousSubs = subscriptionsPerListener.putIfAbsent(listenerClass, subscriptions); // activates this sub for sub/unsub
if (previousSubs != null) {
// another thread beat us to creating subs (for this exact listenerClass). Since another thread won, we have to make sure
// all of the subscriptions are correct for a specific handler type, so we have to RECONSTRUT the correct list again.
// This is to make sure that "invalid" subscriptions don't exist in subsPerMessageSingle.
final StampedLock lock = this.lock;
final long stamp = lock.writeLock();
// since nothing is yet "subscribed" we can assign the correct values for everything now
subscriptions = previousSubs;
} else {
// we can now safely add for publication AND subscribe since the data structures are consistent
for (int i = 0; i < handlersSize; i++) {
subscription = subscriptions[i];
subscription.subscribe(listener); // register this callback listener to this subscription
subscriptions = subsPerListenerMap.get(listenerClass);
// THE HANDLER IS THE SAME FOR ALL SUBSCRIPTIONS OF THE SAME TYPE!
messageHandler = messageHandlers[i];
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
subscriber.register(listenerClass, handlersSize, subsPerListener); // this adds to subscriptionsPerMessage
// register for publication
messageHandlerTypes = messageHandler.getHandledMessages();
handlerType = messageHandlerTypes[0];
subsPerListenerMap.put(listenerClass, subsPerListener);
lock.unlockWrite(stamp);
// makes this subscription visible for publication
subsPerMessageSingle.get(handlerType).add(subscription);
}
return;
}
else {
// continue to subscription
lock.unlockWrite(stamp);
}
}
// subscriptions already exist and must only be updated
// only publish here if our single-check was OK, or our double-check was OK
Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
@ -167,7 +197,7 @@ class SubscriptionManager {
// these are concurrent collections
subscriber.clear();
final Subscription[] subscriptions = getListenerSubs(listenerClass);
final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
if (subscriptions != null) {
Subscription subscription;
@ -177,16 +207,4 @@ class SubscriptionManager {
}
}
}
private
Subscription[] getListenerSubs(final Class<?> listenerClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
lock.unlockRead(stamp);
return subscriptions;
}
}

View File

@ -42,7 +42,7 @@ class SubscriptionUtils {
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = JavaVersionAdapter.concurrentMap(8, loadFactor, 1);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
}
public

View File

@ -41,10 +41,10 @@ class VarArgUtils {
this.superClassUtils = superClassUtils;
this.varArgSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1);
this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.concurrentMap(16, loadFactor, 1);
this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>();
}

View File

@ -43,7 +43,7 @@ public class ObjectTreeTest extends AssertSupport {
@Test
public void testObjectTree() {
HashMapTree<Class<?>, String> tree = new HashMapTree<Class<?>, String>(8, 0.8F);
HashMapTree<Class<?>, String> tree = new HashMapTree<Class<?>, String>();
test(tree, "s", String.class);
test(tree, "x", String.class);

View File

@ -89,7 +89,7 @@ class PerfTest_Collections {
for (int i = 0; i < size; i++) {
for (MessageHandler x : allHandlers) {
set.add(new Subscription(x, .85F, 1));
set.add(new Subscription(x));
}
}

View File

@ -161,7 +161,7 @@ public class SubscriptionManagerTest extends AssertSupport {
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);
SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class).handles(
@ -185,7 +185,7 @@ public class SubscriptionManagerTest extends AssertSupport {
final ClassUtils classUtils = new ClassUtils(Subscriber.LOAD_FACTOR);
final Subscriber subscriber = new Subscriber(errorHandler, classUtils);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber, lock);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, subscriber);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);

View File

@ -1,28 +0,0 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS);
final int warmupRuns = 4;
final int runs = 3;
for (int concurrency = 1; concurrency < 5; concurrency++) {
final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
long average = PerfTest_LinkedBlockingQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
System.out.format("PerfTest_LinkedBlockingQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
}
for (int concurrency = 1; concurrency < 5; concurrency++) {
final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
long average = PerfTest_LinkedBlockingQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
System.out.format("PerfTest_MpmcTransferArrayQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
}
}
}

View File

@ -1,161 +0,0 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedBlockingQueue;
public
class PerfTest_LinkedBlockingQueue_Block {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 4;
public static
void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 4;
final int runs = 5;
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(concurrency);
long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n",
queue.getClass()
.getSimpleName(),
average);
}
public static
long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions)
throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum / sumCount;
}
private static
long performanceRun(int runNumber, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions)
throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency * 2];
for (int i = 0; i < concurrency; i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j = 0, i = 0; i < concurrency; i++, j += 2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j + 1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i = 0; i < concurrency * 2; i += 2) {
threads[i].start();
threads[i + 1].start();
}
for (int i = 0; i < concurrency * 2; i += 2) {
threads[i].join();
threads[i + 1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i = 0; i < concurrency; i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass()
.getSimpleName();
if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops;
}
public static
class Producer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
volatile long start;
private int repetitions;
public
Producer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public
void run() {
LinkedBlockingQueue<Integer> producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
try {
do {
producer.put(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static
class Consumer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
Object result;
volatile long end;
private int repetitions;
public
Consumer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public
void run() {
LinkedBlockingQueue<Integer> consumer = this.queue;
Object result = null;
int i = this.repetitions;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -1,148 +0,0 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue_NonBlock {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 5;
final int runs = 5;
long average = 0;
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(Integer.MAX_VALUE);
average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops;
}
private
PerfTest_LinkedBlockingQueue_NonBlock() {
}
public static class Producer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
volatile long start;
private int repetitions;
public Producer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedBlockingQueue<Integer> producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
do {
while (!producer.offer(TEST_VALUE)) {
Thread.yield();
}
} while (0 != --i);
}
}
public static class Consumer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedBlockingQueue<Integer> consumer = this.queue;
Object result = null;
int i = this.repetitions;
do {
while (null == (result = consumer.poll())) {
Thread.yield();
}
} while (0 != --i);
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -1,28 +0,0 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedTransferQueue;
public class PerfTest_LinkedTransferQueue {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS);
final int warmupRuns = 4;
final int runs = 3;
for (int concurrency = 1; concurrency < 5; concurrency++) {
final LinkedTransferQueue queue = new LinkedTransferQueue();
long average = PerfTest_LinkedTransferQueue_Block.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
System.out.format("PerfTest_LinkedTransferQueue_Block %,d (%dx%d)\n", average, concurrency, concurrency);
}
for (int concurrency = 1; concurrency < 5; concurrency++) {
final LinkedTransferQueue queue = new LinkedTransferQueue();
long average = PerfTest_LinkedTransferQueue_NonBlock.averageRun(warmupRuns, runs, queue, false, concurrency, REPETITIONS);
System.out.format("PerfTest_LinkedTransferQueue_NonBlock %,d (%dx%d)\n", average, concurrency, concurrency);
}
}
}

View File

@ -1,145 +0,0 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedTransferQueue;
public class PerfTest_LinkedTransferQueue_Block {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 4;
final int runs = 5;
final LinkedTransferQueue queue = new LinkedTransferQueue();
long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops;
}
public static class Producer implements Runnable {
private final LinkedTransferQueue queue;
volatile long start;
private int repetitions;
public Producer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedTransferQueue producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
try {
do {
producer.transfer(TEST_VALUE);
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class Consumer implements Runnable {
private final LinkedTransferQueue queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedTransferQueue consumer = this.queue;
Object result = null;
int i = this.repetitions;
try {
do {
result = consumer.take();
} while (0 != --i);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.result = result;
this.end = System.nanoTime();
}
}
}

View File

@ -1,144 +0,0 @@
package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedTransferQueue;
public class PerfTest_LinkedTransferQueue_NonBlock {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
public static final int QUEUE_CAPACITY = 1 << 17;
private static final int concurrency = 4;
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 5;
final int runs = 5;
long average = 0;
final LinkedTransferQueue queue = new LinkedTransferQueue();
average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
}
public static long averageRun(int warmUpRuns, int sumCount, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount;
final long[] results = new long[runs];
for (int i = 0; i < runs; i++) {
System.gc();
results[i] = performanceRun(i, queue, showStats, concurrency, repetitions);
}
// only average last X results for summary
long sum = 0;
for (int i = warmUpRuns; i < runs; i++) {
sum += results[i];
}
return sum/sumCount;
}
private static long performanceRun(int runNumber, LinkedTransferQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2];
for (int i=0;i<concurrency;i++) {
producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions);
}
for (int j=0,i=0;i<concurrency;i++,j+=2) {
threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i);
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].start();
threads[i+1].start();
}
for (int i=0;i<concurrency*2;i+=2) {
threads[i].join();
threads[i+1].join();
}
long start = Long.MAX_VALUE;
long end = -1;
for (int i=0;i<concurrency;i++) {
if (producers[i].start < start) {
start = producers[i].start;
}
if (consumers[i].end > end) {
end = consumers[i].end;
}
}
long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName();
if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
}
return ops;
}
public static class Producer implements Runnable {
private final LinkedTransferQueue queue;
volatile long start;
private int repetitions;
public Producer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedTransferQueue producer = this.queue;
int i = this.repetitions;
this.start = System.nanoTime();
do {
while (!producer.offer(TEST_VALUE)) {
Thread.yield();
}
} while (0 != --i);
}
}
public static class Consumer implements Runnable {
private final LinkedTransferQueue queue;
Object result;
volatile long end;
private int repetitions;
public Consumer(LinkedTransferQueue queue, int repetitions) {
this.queue = queue;
this.repetitions = repetitions;
}
@Override
public void run() {
LinkedTransferQueue consumer = this.queue;
Object result = null;
int i = this.repetitions;
do {
while (null == (result = consumer.poll())) {
Thread.yield();
}
} while (0 != --i);
this.result = result;
this.end = System.nanoTime();
}
}
}