From 8b69d53263e688f9dde2ef5265bee39c47074825 Mon Sep 17 00:00:00 2001 From: nathan Date: Tue, 17 Feb 2015 22:43:48 +0100 Subject: [PATCH] WIP, cleaning code. Getting multi-arg versions working --- .../engio/mbassy/multi/DispatchRunnable.java | 140 --- .../engio/mbassy/multi/InvokeRunnable.java | 54 -- .../engio/mbassy/multi/MultiMBassador.java | 896 ++++++++---------- .../net/engio/mbassy/multi/SubRunnable.java | 20 - .../common/ConcurrentCircularArrayQueue.java | 184 ---- ...ConcurrentSequencedCircularArrayQueue.java | 54 -- .../mbassy/multi/common/FrontPadding.java | 31 - .../mbassy/multi/common/IConcurrentSet.java | 30 - .../multi/common/InterruptRunnable.java | 13 - .../multi/common/MessagePassingQueue.java | 71 -- .../mbassy/multi/common/MpmcArrayQueue.java | 250 ----- ...adFactory.java => NamedThreadFactory.java} | 6 +- .../engio/mbassy/multi/common/PTLQueue.java | 158 --- .../mbassy/multi/common/PaddedAtomicLong.java | 31 - .../multi/common/PaddedAtomicLongBase.java | 84 -- .../net/engio/mbassy/multi/common/Pow2.java | 41 - .../mbassy/multi/common/UnsafeAccess.java | 56 -- .../mbassy/multi/listener/MessageHandler.java | 196 ---- .../multi/subscription/Subscription.java | 37 +- .../subscription/SubscriptionManager.java | 570 ++++++----- .../engio/mbassy/multi/ConcurrentSetTest.java | 99 +- .../mbassy/multi/MetadataReaderTest.java | 199 +++- .../mbassy/multi/WeakConcurrentSetTest.java | 24 +- 23 files changed, 968 insertions(+), 2276 deletions(-) delete mode 100644 src/main/java/net/engio/mbassy/multi/DispatchRunnable.java delete mode 100644 src/main/java/net/engio/mbassy/multi/InvokeRunnable.java delete mode 100644 src/main/java/net/engio/mbassy/multi/SubRunnable.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/ConcurrentCircularArrayQueue.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/ConcurrentSequencedCircularArrayQueue.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/FrontPadding.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/InterruptRunnable.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/MessagePassingQueue.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/MpmcArrayQueue.java rename src/main/java/net/engio/mbassy/multi/common/{DisruptorThreadFactory.java => NamedThreadFactory.java} (95%) delete mode 100644 src/main/java/net/engio/mbassy/multi/common/PTLQueue.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLong.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLongBase.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/Pow2.java delete mode 100644 src/main/java/net/engio/mbassy/multi/common/UnsafeAccess.java diff --git a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java b/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java deleted file mode 100644 index 59e365e..0000000 --- a/src/main/java/net/engio/mbassy/multi/DispatchRunnable.java +++ /dev/null @@ -1,140 +0,0 @@ -package net.engio.mbassy.multi; - -import java.lang.reflect.Array; -import java.util.Collection; -import java.util.concurrent.locks.LockSupport; - -import net.engio.mbassy.multi.common.DeadMessage; -import net.engio.mbassy.multi.common.TransferQueue; -import net.engio.mbassy.multi.error.ErrorHandlingSupport; -import net.engio.mbassy.multi.error.PublicationError; -import net.engio.mbassy.multi.subscription.Subscription; -import net.engio.mbassy.multi.subscription.SubscriptionManager; - -/** - * @author dorkbox, llc - * Date: 2/2/15 - */ -public class DispatchRunnable implements Runnable { - - private ErrorHandlingSupport errorHandler; - private TransferQueue dispatchQueue; - private TransferQueue invokeQueue; - private SubscriptionManager manager; - - public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager, - TransferQueue dispatchQueue, TransferQueue invokeQueue) { - - this.errorHandler = errorHandler; - this.manager = subscriptionManager; - this.dispatchQueue = dispatchQueue; - this.invokeQueue = invokeQueue; - } - - @Override - public void run() { - final SubscriptionManager manager = this.manager; - final ErrorHandlingSupport errorHandler = this.errorHandler; - final TransferQueue IN_queue = this.dispatchQueue; - final TransferQueue OUT_queue = this.invokeQueue; - - final Runnable dummyRunnable = new Runnable() { - @Override - public void run() { - } - }; - - Object message = null; - int counter; - - while (true) { - try { - counter = 200; - while ((message = IN_queue.poll()) == null) { - if (counter > 0) { - --counter; - LockSupport.parkNanos(1L); - } else { - message = IN_queue.take(); - break; - } - } - - @SuppressWarnings("null") - Class messageClass = message.getClass(); - - manager.readLock(); - - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - boolean empty = subscriptions.isEmpty(); - - Collection deadSubscriptions = null; - if (empty) { - // Dead Event. must EXACTLY MATCH (no subclasses or varargs) - deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - } - Collection> superClasses = manager.getSuperClasses(messageClass); - Collection varArgs = manager.getVarArgs(messageClass); - - manager.readUnLock(); - - - if (!empty) { - for (Subscription sub : subscriptions) { - sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); - } - -// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); - } else if (deadSubscriptions != null) { - if (!deadSubscriptions.isEmpty()) { - DeadMessage deadMessage = new DeadMessage(message); - - for (Subscription sub : deadSubscriptions) { - sub.publishToSubscriptionSingle(OUT_queue, errorHandler, deadMessage); - } - -// OUT_queue.put(new InvokeRunnable(errorHandler, deadSubscriptions, deadMessage)); - } - } - - // now get superClasses - for (Class superClass : superClasses) { - subscriptions = manager.getSubscriptionsByMessageType(superClass); - - if (!subscriptions.isEmpty()) { - for (Subscription sub : subscriptions) { - sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message); - } - -// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message)); - } - } - - // now get varargs - if (!varArgs.isEmpty()) { - // messy, but the ONLY way to do it. - Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1); - vararg[0] = message; - - Object[] newInstance = new Object[1]; - newInstance[0] = vararg; - vararg = newInstance; - - for (Subscription sub : varArgs) { - sub.publishToSubscription(OUT_queue, errorHandler, vararg); - } - -// OUT_queue.put(new InvokeRunnable(errorHandler, varArgs, vararg)); - } - - // make sure it's synced at this point -// OUT_queue.transfer(dummyRunnable); - - } catch (InterruptedException e) { - return; - } catch (Throwable e) { - errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message)); - } - } - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java b/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java deleted file mode 100644 index ff1b85c..0000000 --- a/src/main/java/net/engio/mbassy/multi/InvokeRunnable.java +++ /dev/null @@ -1,54 +0,0 @@ -package net.engio.mbassy.multi; - -import java.lang.reflect.Array; -import java.util.Collection; - -import net.engio.mbassy.multi.error.ErrorHandlingSupport; -import net.engio.mbassy.multi.subscription.Subscription; - - -/** - * @author dorkbox, llc Date: 2/2/15 - */ -public class InvokeRunnable implements Runnable { - - private ErrorHandlingSupport errorHandler; - private Collection subscriptions; - private Object message; - - public InvokeRunnable(ErrorHandlingSupport errorHandler, Collection subscriptions, Object message) { - this.errorHandler = errorHandler; - this.subscriptions = subscriptions; - this.message = message; - } - - @Override - public void run() { - ErrorHandlingSupport errorHandler = this.errorHandler; - Collection subs = this.subscriptions; - Object message = this.message; - Object[] vararg = null; - - for (Subscription sub : subs) { - boolean handled = false; - if (sub.isVarArg()) { - // messageClass will NEVER be an array to begin with, since that will call the multi-arg method - if (vararg == null) { - // messy, but the ONLY way to do it. - vararg = (Object[]) Array.newInstance(message.getClass(), 1); - vararg[0] = message; - - Object[] newInstance = new Object[1]; - newInstance[0] = vararg; - vararg = newInstance; - } - handled = true; - sub.publishToSubscription(errorHandler, vararg); - } - - if (!handled) { - sub.publishToSubscription(errorHandler, message); - } - } - } -} diff --git a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java index e382750..09aadf9 100644 --- a/src/main/java/net/engio/mbassy/multi/MultiMBassador.java +++ b/src/main/java/net/engio/mbassy/multi/MultiMBassador.java @@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import net.engio.mbassy.multi.common.DeadMessage; -import net.engio.mbassy.multi.common.DisruptorThreadFactory; +import net.engio.mbassy.multi.common.NamedThreadFactory; import net.engio.mbassy.multi.common.LinkedTransferQueue; import net.engio.mbassy.multi.common.TransferQueue; import net.engio.mbassy.multi.error.IPublicationErrorHandler; @@ -29,9 +29,8 @@ public class MultiMBassador implements IMessageBus { // this handler will receive all errors that occur during message dispatch or message handling private final List errorHandlers = new ArrayList(); - private final SubscriptionManager subscriptionManager; - - private final TransferQueue dispatchQueue; + private final SubscriptionManager subscriptionManager = new SubscriptionManager(); + private final TransferQueue dispatchQueue = new LinkedTransferQueue(); // all threads that are available for asynchronous message dispatching @@ -44,22 +43,16 @@ public class MultiMBassador implements IMessageBus { public MultiMBassador(int numberOfThreads) { if (numberOfThreads < 1) { - numberOfThreads = 1; // at LEAST 1 threads + numberOfThreads = 1; // at LEAST 1 thread } + this.threads = new ArrayList(numberOfThreads); - this.subscriptionManager = new SubscriptionManager(); - this.dispatchQueue = new LinkedTransferQueue(); - - - int dispatchSize = numberOfThreads; - this.threads = new ArrayList(); - - - DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch"); - for (int i = 0; i < dispatchSize; i++) { + NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus"); + for (int i = 0; i < numberOfThreads; i++) { // each thread will run forever and process incoming message publication requests Runnable runnable = new Runnable() { + @SuppressWarnings("null") @Override public void run() { TransferQueue IN_QUEUE= MultiMBassador.this.dispatchQueue; @@ -131,29 +124,29 @@ public class MultiMBassador implements IMessageBus { } + @SuppressWarnings("null") @Override public void publish(Object message) { - Class messageClass = message.getClass(); - SubscriptionManager manager = this.subscriptionManager; -// Collection subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass); + Class messageClass = message.getClass(); manager.readLock(); + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); + boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); - Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass); - boolean empty = subscriptions.isEmpty(); - - Collection deadSubscriptions = null; - if (empty) { - // Dead Event. must EXACTLY MATCH (no subclasses or varargs) - deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - } - Collection> superClasses = manager.getSuperClasses(messageClass); - Collection varArgs = manager.getVarArgs(messageClass); + Collection deadSubscriptions = null; + if (!validSubs) { + // Dead Event. must EXACTLY MATCH (no subclasses or varargs) + deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + } + Collection superSubscriptions = manager.getSuperSubscriptions(messageClass); + Collection varArgs = manager.getVarArgs(messageClass); manager.readUnLock(); - if (!empty) { + + // Run subscriptions + if (validSubs) { for (Subscription sub : subscriptions) { // this catches all exception types sub.publishToSubscription(this, message); @@ -165,260 +158,268 @@ public class MultiMBassador implements IMessageBus { // this catches all exception types sub.publishToSubscription(this, deadMessage); } + // Dead Event. only matches EXACT handlers (no vararg, no subclasses) + return; } + // now get superClasses + if (superSubscriptions != null) { + for (Subscription sub : superSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, message); + } + } + // now get varargs + if (varArgs != null && !varArgs.isEmpty()) { + // messy, but the ONLY way to do it. + Object[] vararg = null; + for (Subscription sub : varArgs) { + if (sub.isVarArg()) { + if (vararg == null) { + vararg = (Object[]) Array.newInstance(messageClass, 1); + vararg[0] = message; - - - - - -// if (subscriptions.isEmpty()) { -// // Dead Event. only matches EXACT handlers (no vararg, no subclasses) -// subscriptions = this.subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class); -// -// DeadMessage deadMessage = new DeadMessage(message); -// if (!subscriptions.isEmpty()) { -// for (Subscription sub : subscriptions) { -// // this catches all exception types -// sub.publishToSubscription(this, deadMessage); -// } -// } -// } -// else { -//// Object[] vararg = null; -// for (Subscription sub : subscriptions) { -// // this catches all exception types -// sub.publishToSubscription(this, message); -// -//// if (sub.isVarArg()) { -//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method -//// if (vararg == null) { -//// // messy, but the ONLY way to do it. -//// vararg = (Object[]) Array.newInstance(message.getClass(), 1); -//// vararg[0] = message; -//// -//// Object[] newInstance = new Object[1]; -//// newInstance[0] = vararg; -//// vararg = newInstance; -//// } -//// -//// // this catches all exception types -//// sub.publishToSubscription(this, vararg); -//// continue; -//// } -// } -// } + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; + } + sub.publishToSubscription(this, vararg); + } + } + } } - + @SuppressWarnings("null") @Override public void publish(Object message1, Object message2) { - try { - Class messageClass1 = message1.getClass(); - Class messageClass2 = message2.getClass(); + SubscriptionManager manager = this.subscriptionManager; - SubscriptionManager manager = this.subscriptionManager; + Class messageClass1 = message1.getClass(); + Class messageClass2 = message2.getClass(); + manager.readLock(); Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2); + boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); - if (subscriptions == null || subscriptions.isEmpty()) { - subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); - - DeadMessage deadMessage = new DeadMessage(message1, message2); - - for (Subscription sub : subscriptions) { - sub.publishToSubscription(this, deadMessage); - } + Collection deadSubscriptions = null; + if (!validSubs) { + // Dead Event. must EXACTLY MATCH (no subclasses or varargs) + deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); } - else { - Object[] vararg = null; - for (Subscription sub : subscriptions) { - if (sub.isVarArg()) { - Class class1 = message1.getClass(); - Class class2 = message2.getClass(); - if (!class1.isArray() && class1 == class2) { - if (vararg == null) { - // messy, but the ONLY way to do it. - vararg = (Object[]) Array.newInstance(class1, 2); - vararg[0] = message1; - vararg[1] = message2; + Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2); + Collection varArgs = manager.getVarArgs(messageClass1, messageClass2); + manager.readUnLock(); - Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1); - newInstance[0] = vararg; - vararg = newInstance; - } - sub.publishToSubscription(this, vararg); - continue; - } + // Run subscriptions + if (validSubs) { + for (Subscription sub : subscriptions) { + // this catches all exception types + sub.publishToSubscription(this, message1, message2); + } + } else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { + DeadMessage deadMessage = new DeadMessage(message1, message2); + + for (Subscription sub : deadSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, deadMessage); + } + // Dead Event. only matches EXACT handlers (no vararg, no subclasses) + return; + } + + + // now get superClasses + if (superSubscriptions != null) { + for (Subscription sub : superSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, message1, message2); + } + } + + // now get varargs + if (varArgs != null && !varArgs.isEmpty()) { + // messy, but the ONLY way to do it. + Object[] vararg = null; + + for (Subscription sub : varArgs) { + if (sub.isVarArg()) { + if (vararg == null) { + vararg = (Object[]) Array.newInstance(messageClass1, 2); + vararg[0] = message1; + vararg[1] = message2; + + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; } - - sub.publishToSubscription(this, message1, message2); + sub.publishToSubscription(this, vararg); } } - } catch (Throwable e) { - handlePublicationError(new PublicationError() - .setMessage("Error during publication of message") - .setCause(e) - .setPublishedObject(message1, message2)); } } + @SuppressWarnings("null") @Override public void publish(Object message1, Object message2, Object message3) { -// try { -// Class messageClass1 = message1.getClass(); -// Class messageClass2 = message2.getClass(); -// Class messageClass3 = message3.getClass(); -// -// SubscriptionManager manager = this.subscriptionManager; -// Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); -// -// if (subscriptions == null || subscriptions.isEmpty()) { -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// DeadMessage deadMessage = new DeadMessage(message1, message2, message3); -// -// for (Subscription sub : subscriptions) { -// sub.publishToSubscription(this, deadMessage); -// } -// } else { -// Object[] vararg = null; -// -// for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (sub.isVarArg()) { -// Class class1 = message1.getClass(); -// Class class2 = message2.getClass(); -// Class class3 = message3.getClass(); -// if (!class1.isArray() && class1 == class2 && class2 == class3) { -// // messy, but the ONLY way to do it. -// if (vararg == null) { -// vararg = (Object[]) Array.newInstance(class1, 3); -// vararg[0] = message1; -// vararg[1] = message2; -// vararg[2] = message3; -// -// Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1); -// newInstance[0] = vararg; -// vararg = newInstance; -// } -// -// handled = true; -// sub.publishToSubscription(this, vararg); -// } -// } -// -// if (!handled) { -// sub.publishToSubscription(this, message1, message2, message3); -// } -// } -// -// // if the message did not have any listener/handler accept it -// if (subscriptions.isEmpty()) { -// // cannot have DeadMessage published to this, so no extra check necessary -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// DeadMessage deadMessage = new DeadMessage(message1, message2, message3); -// -// for (Subscription sub : subscriptions) { -// sub.publishToSubscription(this, deadMessage); -// } -// -// // cleanup -// deadMessage = null; -// } -// } -// } catch (Throwable e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error during publication of message") -// .setCause(e) -// .setPublishedObject(message1, message2, message3)); -// } + SubscriptionManager manager = this.subscriptionManager; + + Class messageClass1 = message1.getClass(); + Class messageClass2 = message2.getClass(); + Class messageClass3 = message3.getClass(); + manager.readLock(); + Collection subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3); + boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); + + Collection deadSubscriptions = null; + if (!validSubs) { + // Dead Event. must EXACTLY MATCH (no subclasses or varargs) + deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + } + + Collection superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); + Collection varArgs = manager.getVarArgs(messageClass1, messageClass2, messageClass3); + manager.readUnLock(); + + + // Run subscriptions + if (validSubs) { + for (Subscription sub : subscriptions) { + // this catches all exception types + sub.publishToSubscription(this, message1, message2, message3); + } + } else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { + DeadMessage deadMessage = new DeadMessage(message1, message2, message3); + + for (Subscription sub : deadSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, deadMessage); + } + // Dead Event. only matches EXACT handlers (no vararg, no subclasses) + return; + } + + + // now get superClasses + if (superSubscriptions != null) { + for (Subscription sub : superSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, message1, message2, message3); + } + } + + // now get varargs + if (varArgs != null && !varArgs.isEmpty()) { + // messy, but the ONLY way to do it. + Object[] vararg = null; + + for (Subscription sub : varArgs) { + if (sub.isVarArg()) { + if (vararg == null) { + vararg = (Object[]) Array.newInstance(messageClass1, 3); + vararg[0] = message1; + vararg[0] = message2; + vararg[0] = message3; + + Object[] newInstance = new Object[1]; + newInstance[0] = vararg; + vararg = newInstance; + } + sub.publishToSubscription(this, vararg); + } + } + } } + @SuppressWarnings("null") @Override public void publish(Object... messages) { -// try { -// // cannot have DeadMessage published to this! -// int size = messages.length; -// boolean allSameType = true; -// -// Class[] messageClasses = new Class[size]; -// Class first = null; -// if (size > 0) { -// first = messageClasses[0] = messages[0].getClass(); + SubscriptionManager manager = this.subscriptionManager; + + int size = messages.length; + boolean allSameType = true; + + Class[] messageClasses = new Class[size]; + Class first = null; + if (size > 0) { + first = messageClasses[0] = messages[0].getClass(); + } + + for (int i=1;i subscriptions = manager.getSubscriptionsByMessageType(messageClasses); + boolean validSubs = subscriptions != null && !subscriptions.isEmpty(); + + Collection deadSubscriptions = null; + if (!validSubs) { + // Dead Event. must EXACTLY MATCH (no subclasses or varargs) + deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); + } + + // we don't support super subscriptions for var-args + // Collection superSubscriptions = manager.getSuperSubscriptions(messageClasses); + Collection varArgs = null; + if (allSameType) { + varArgs = manager.getVarArgs(messageClasses); + } + manager.readUnLock(); + + // Run subscriptions + if (validSubs) { + for (Subscription sub : subscriptions) { + // this catches all exception types + sub.publishToSubscription(this, messages); + } + } else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) { + DeadMessage deadMessage = new DeadMessage(messages); + + for (Subscription sub : deadSubscriptions) { + // this catches all exception types + sub.publishToSubscription(this, deadMessage); + } + // Dead Event. only matches EXACT handlers (no vararg, no subclasses) + return; + } + + + // now get superClasses (not supported) +// if (superSubscriptions != null) { +// for (Subscription sub : superSubscriptions) { +// // this catches all exception types +// sub.publishToSubscription(this, message); // } -// -// for (int i=1;i subscriptions = manager.getSubscriptionsByMessageType(messageClasses); -// -// if (subscriptions == null || subscriptions.isEmpty()) { -// // Dead Event -// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class); -// DeadMessage deadMessage = new DeadMessage(messages); -// -// for (Subscription sub : subscriptions) { -// sub.publishToSubscription(this, deadMessage); -// } -// } else { -// Object[] vararg = null; -// -// for (Subscription sub : subscriptions) { -// boolean handled = false; -// if (first != null && allSameType && sub.isVarArg()) { -// if (vararg == null) { -// // messy, but the ONLY way to do it. -// vararg = (Object[]) Array.newInstance(first, size); -// -// for (int i=0;i 100) { -//// --counter; -//// Thread.yield(); -//// } else -// if (counter > 0) { -// --counter; -// LockSupport.parkNanos(1L); -// } else { - try { - this.dispatchQueue.transfer(runnable); - return; - } catch (InterruptedException e) { - e.printStackTrace(); - // log.error(e); - - handlePublicationError(new PublicationError() - .setMessage("Error while adding an asynchronous message") - .setCause(e) - .setPublishedObject(message)); - } + @Override + public void publishAsync(final Object message1, final Object message2) { + if (message1 != null && message2 != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(message1, message2); } -// } -// } + }; + + try { + this.dispatchQueue.transfer(runnable); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2)); + } + } } @Override - public void publishAsync(Object message1, Object message2) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.ringBuffer; -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.TWO; -// eventJob.message1 = message1; -// eventJob.message2 = message2; -// } catch (Exception e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message1, message2)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } + public void publishAsync(final Object message1, final Object message2, final Object message3) { + if (message1 != null || message2 != null | message3 != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(message1, message2, message3); + } + }; + + + try { + this.dispatchQueue.transfer(runnable); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } + } } @Override - public void publishAsync(Object message1, Object message2, Object message3) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.ringBuffer; -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.THREE; -// eventJob.message1 = message1; -// eventJob.message2 = message2; -// eventJob.message3 = message3; -// } catch (Exception e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(new Object[] {message1, message2, message3})); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } + public void publishAsync(final Object... messages) { + if (messages != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(messages); + } + }; + + try { + this.dispatchQueue.transfer(runnable); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(messages)); + } + } } @Override - public void publishAsync(Object... messages) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.ringBuffer; -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ARRAY; -// eventJob.messages = messages; -// } catch (Exception e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(messages)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } + public void publishAsync(long timeout, TimeUnit unit, final Object message) { + if (message != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(message); + } + }; + + try { + this.dispatchQueue.tryTransfer(runnable, timeout, unit); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } + } + } + @Override + public void publishAsync(long timeout, TimeUnit unit, final Object message1, final Object message2) { + if (message1 != null && message2 != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(message1, message2); + } + }; + + try { + this.dispatchQueue.tryTransfer(runnable, timeout, unit); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2)); + } + } + } + + + @Override + public void publishAsync(long timeout, TimeUnit unit, final Object message1, final Object message2, final Object message3) { + if (message1 != null && message2 != null && message3 != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(message1, message2, message3); + } + }; + + try { + this.dispatchQueue.tryTransfer(runnable, timeout, unit); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } + } } @Override - public void publishAsync(long timeout, TimeUnit unit, Object message) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.ringBuffer; -// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); -// -// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space -// // to become available. -// while (!ringBuffer.hasAvailableCapacity(1)) { -// LockSupport.parkNanos(10L); -// if (expireTimestamp <= System.currentTimeMillis()) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(new Exception("Timeout")) -// .setPublishedObject(message)); -// return; -// } -// } -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ONE; -// eventJob.message1 = message; -// } catch (Exception e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } - } - @Override - public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.ringBuffer; -// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); -// -// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space -// // to become available. -// while (!ringBuffer.hasAvailableCapacity(1)) { -// LockSupport.parkNanos(10L); -// if (expireTimestamp <= System.currentTimeMillis()) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(new Exception("Timeout")) -// .setPublishedObject(message1, message2)); -// return; -// } -// } -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.TWO; -// eventJob.message1 = message1; -// eventJob.message2 = message2; -// } catch (Exception e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message1, message2)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } - } - @Override - public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.ringBuffer; -// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); -// -// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space -// // to become available. -// while (!ringBuffer.hasAvailableCapacity(1)) { -// LockSupport.parkNanos(10L); -// if (expireTimestamp <= System.currentTimeMillis()) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(new Exception("Timeout")) -// .setPublishedObject(message1, message2, message3)); -// return; -// } -// } -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.THREE; -// eventJob.message1 = message1; -// eventJob.message2 = message2; -// eventJob.message3 = message3; -// } catch (Exception e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(message1, message2, message3)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } - } + public void publishAsync(long timeout, TimeUnit unit, final Object... messages) { + if (messages != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + MultiMBassador.this.publish(messages); + } + }; - @Override - public void publishAsync(long timeout, TimeUnit unit, Object... messages) { -// // put this on the disruptor ring buffer -// final RingBuffer ringBuffer = this.ringBuffer; -// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis(); -// -// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space -// // to become available. -// while (!ringBuffer.hasAvailableCapacity(1)) { -// LockSupport.parkNanos(10L); -// if (expireTimestamp <= System.currentTimeMillis()) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(new Exception("Timeout")) -// .setPublishedObject(messages)); -// return; -// } -// } -// -// // setup the job -// final long seq = ringBuffer.next(); -// try { -// MessageHolder eventJob = ringBuffer.get(seq); -// eventJob.messageType = MessageType.ARRAY; -// eventJob.messages = messages; -// } catch (Exception e) { -// handlePublicationError(new PublicationError() -// .setMessage("Error while adding an asynchronous message") -// .setCause(e) -// .setPublishedObject(messages)); -// } finally { -// // always publish the job -// ringBuffer.publish(seq); -// } + try { + this.dispatchQueue.tryTransfer(runnable, timeout, unit); + } catch (InterruptedException e) { + e.printStackTrace(); + // log.error(e); + + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(messages)); + } + } } } diff --git a/src/main/java/net/engio/mbassy/multi/SubRunnable.java b/src/main/java/net/engio/mbassy/multi/SubRunnable.java deleted file mode 100644 index 4fcaf76..0000000 --- a/src/main/java/net/engio/mbassy/multi/SubRunnable.java +++ /dev/null @@ -1,20 +0,0 @@ -package net.engio.mbassy.multi; - -import java.lang.reflect.Method; - - -/** - * @author dorkbox, llc Date: 2/2/15 - */ -public class SubRunnable { - - public Method handler; - public Object listener; - public Object message; - - public SubRunnable(Method handler, Object listener, Object message) { - this.handler = handler; - this.listener = listener; - this.message = message; - } -} diff --git a/src/main/java/net/engio/mbassy/multi/common/ConcurrentCircularArrayQueue.java b/src/main/java/net/engio/mbassy/multi/common/ConcurrentCircularArrayQueue.java deleted file mode 100644 index 2e0b511..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/ConcurrentCircularArrayQueue.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.engio.mbassy.multi.common; - -import static net.engio.mbassy.multi.common.UnsafeAccess.UNSAFE; - -import java.util.AbstractQueue; -import java.util.Iterator; - -abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue implements MessagePassingQueue { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} - -/** - * A concurrent access enabling class used by circular array based queues this class exposes an offset computation - * method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and - * the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post - * padding. - *

- * Offset calculation is separate from access to enable the reuse of a give compute offset. - *

- * Load/Store methods using a buffer parameter are provided to allow the prevention of final field reload after a - * LoadLoad barrier. - *

- * - * @author nitsanw - * - * @param - */ -public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); - protected static final int BUFFER_PAD = 32; - private static final long REF_ARRAY_BASE; - private static final int REF_ELEMENT_SHIFT; - static { - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); - if (4 == scale) { - REF_ELEMENT_SHIFT = 2 + SPARSE_SHIFT; - } else if (8 == scale) { - REF_ELEMENT_SHIFT = 3 + SPARSE_SHIFT; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - // Including the buffer pad in the array base offset - REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) - + (BUFFER_PAD << REF_ELEMENT_SHIFT - SPARSE_SHIFT); - } - protected final long mask; - // @Stable :( - protected final E[] buffer; - - @SuppressWarnings("unchecked") - public ConcurrentCircularArrayQueue(int capacity) { - int actualCapacity = Pow2.roundToPowerOfTwo(capacity); - this.mask = actualCapacity - 1; - // pad data on either end with some empty slots. - this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - } - - /** - * @param index desirable element index - * @return the offset in bytes within the array for a given index. - */ - protected final long calcElementOffset(long index) { - return calcElementOffset(index, this.mask); - } - /** - * @param index desirable element index - * @param mask - * @return the offset in bytes within the array for a given index. - */ - protected final long calcElementOffset(long index, long mask) { - return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); - } - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e a kitty - */ - protected final void spElement(long offset, E e) { - spElement(this.buffer, offset, e); - } - - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void spElement(E[] buffer, long offset, E e) { - UNSAFE.putObject(buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void soElement(long offset, E e) { - soElement(this.buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void soElement(E[] buffer, long offset, E e) { - UNSAFE.putOrderedObject(buffer, offset, e); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final E lpElement(long offset) { - return lpElement(this.buffer, offset); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected final E lpElement(E[] buffer, long offset) { - return (E) UNSAFE.getObject(buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final E lvElement(long offset) { - return lvElement(this.buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected final E lvElement(E[] buffer, long offset) { - return (E) UNSAFE.getObjectVolatile(buffer, offset); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - @Override - public void clear() { - // we have to test isEmpty because of the weaker poll() guarantee - while (poll() != null || !isEmpty()) { - ; - } - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/ConcurrentSequencedCircularArrayQueue.java b/src/main/java/net/engio/mbassy/multi/common/ConcurrentSequencedCircularArrayQueue.java deleted file mode 100644 index 52b4103..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/ConcurrentSequencedCircularArrayQueue.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.engio.mbassy.multi.common; - - -public abstract class ConcurrentSequencedCircularArrayQueue extends ConcurrentCircularArrayQueue { - private static final long ARRAY_BASE; - private static final int ELEMENT_SHIFT; - static { - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class); - if (8 == scale) { - ELEMENT_SHIFT = 3 + SPARSE_SHIFT; - } else { - throw new IllegalStateException("Unexpected long[] element size"); - } - // Including the buffer pad in the array base offset - ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << ELEMENT_SHIFT - SPARSE_SHIFT); - } - protected final long[] sequenceBuffer; - - public ConcurrentSequencedCircularArrayQueue(int capacity) { - super(capacity); - int actualCapacity = (int) (this.mask + 1); - // pad data on either end with some empty slots. - this.sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - for (long i = 0; i < actualCapacity; i++) { - soSequence(this.sequenceBuffer, calcSequenceOffset(i), i); - } - } - - protected final long calcSequenceOffset(long index) { - return ARRAY_BASE + ((index & this.mask) << ELEMENT_SHIFT); - } - - protected final void soSequence(long[] buffer, long offset, long e) { - UnsafeAccess.UNSAFE.putOrderedLong(buffer, offset, e); - } - - protected final long lvSequence(long[] buffer, long offset) { - return UnsafeAccess.UNSAFE.getLongVolatile(buffer, offset); - } - -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/FrontPadding.java b/src/main/java/net/engio/mbassy/multi/common/FrontPadding.java deleted file mode 100644 index a5c008d..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/FrontPadding.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2014 David Karnok - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package net.engio.mbassy.multi.common; - -import java.io.Serializable; - -/** - * Padding up to 128 bytes at the front. - * Based on netty's implementation - */ -abstract class FrontPadding implements Serializable { - /** */ - private static final long serialVersionUID = -596356687591714352L; - /** Padding. */ - public transient long p1, p2, p3, p4, p5, p6; // 48 bytes (header is 16 bytes) - /** Padding. */ - public transient long p8, p9, p10, p11, p12, p13, p14, p15; // 64 bytes -} diff --git a/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java b/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java deleted file mode 100644 index b60269e..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/IConcurrentSet.java +++ /dev/null @@ -1,30 +0,0 @@ -package net.engio.mbassy.multi.common; - -import java.util.Collection; - -/** - * Todo: Add javadoc - * - * @author bennidi - * Date: 3/29/13 - */ -public interface IConcurrentSet extends Collection { - - @Override - boolean add(T element); - - boolean contains(Object element); - - @Override - int size(); - - @Override - boolean isEmpty(); - - void addAll(Iterable elements); - - /** - * @return TRUE if the element was removed - */ - boolean remove(Object element); -} diff --git a/src/main/java/net/engio/mbassy/multi/common/InterruptRunnable.java b/src/main/java/net/engio/mbassy/multi/common/InterruptRunnable.java deleted file mode 100644 index ece9a02..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/InterruptRunnable.java +++ /dev/null @@ -1,13 +0,0 @@ -package net.engio.mbassy.multi.common; - -public abstract class InterruptRunnable implements Runnable { - protected volatile boolean running = true; - - public InterruptRunnable() { - super(); - } - - public void stop() { - this.running = false; - } -} diff --git a/src/main/java/net/engio/mbassy/multi/common/MessagePassingQueue.java b/src/main/java/net/engio/mbassy/multi/common/MessagePassingQueue.java deleted file mode 100644 index 2ba4e4f..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/MessagePassingQueue.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.engio.mbassy.multi.common; - -import java.util.Queue; - -/** - * This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface - * sufficient for concurrent message passing.
- * Message passing queues offer happens before semantics to messages passed through, namely that writes made by the - * producer before offering the message are visible to the consuming thread after the message has been polled out of the - * queue. - * - * @author nitsanw - * - * @param the event/message type - */ -interface MessagePassingQueue { - - /** - * Called from a producer thread subject to the restrictions appropriate to the implementation and according to the - * {@link Queue#offer(Object)} interface. - * - * @param message - * @return true if element was inserted into the queue, false iff full - */ - boolean offer(M message); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#poll()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - M poll(); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#peek()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - M peek(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a - * best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1). - * - * @return number of messages in the queue, between 0 and queue capacity or {@link Integer#MAX_VALUE} if not bounded - */ - int size(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the observation is carried out. - * - * @return true if empty, false otherwise - */ - boolean isEmpty(); - -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/MpmcArrayQueue.java b/src/main/java/net/engio/mbassy/multi/common/MpmcArrayQueue.java deleted file mode 100644 index 1b5cf5c..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/MpmcArrayQueue.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.engio.mbassy.multi.common; - - -abstract class MpmcArrayQueueL1Pad extends ConcurrentSequencedCircularArrayQueue { - long p10, p11, p12, p13, p14, p15, p16; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueueL1Pad(int capacity) { - super(capacity); - } -} - -abstract class MpmcArrayQueueProducerField extends MpmcArrayQueueL1Pad { - private final static long P_INDEX_OFFSET; - static { - try { - P_INDEX_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class - .getDeclaredField("producerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long producerIndex; - - public MpmcArrayQueueProducerField(int capacity) { - super(capacity); - } - - protected final long lvProducerIndex() { - return this.producerIndex; - } - - protected final boolean casProducerIndex(long expect, long newValue) { - return UnsafeAccess.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); - } -} - -abstract class MpmcArrayQueueL2Pad extends MpmcArrayQueueProducerField { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueueL2Pad(int capacity) { - super(capacity); - } -} - -abstract class MpmcArrayQueueConsumerField extends MpmcArrayQueueL2Pad { - private final static long C_INDEX_OFFSET; - static { - try { - C_INDEX_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class - .getDeclaredField("consumerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long consumerIndex; - - public MpmcArrayQueueConsumerField(int capacity) { - super(capacity); - } - - protected final long lvConsumerIndex() { - return this.consumerIndex; - } - - protected final boolean casConsumerIndex(long expect, long newValue) { - return UnsafeAccess.UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); - } -} - -/** - * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that - * any and all threads may call the offer/poll/peek methods and correctness is maintained.
- * This implementation follows patterns documented on the package level for False Sharing protection.
- * The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original - * algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in - * Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each - * field of the struct. There is a further alternative in the experimental project which uses iteration phase - * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as - * well as this implementation.
- * Tradeoffs to keep in mind: - *

    - *
  1. Padding for false sharing: counter fields and queue fields are all padded as well as either side of - * both arrays. We are trading memory to avoid false sharing(active and passive). - *
  2. 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the - * elements array. This is doubling/tripling the memory allocated for the buffer. - *
  3. Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or - * equal to the requested capacity. - *
- * - * @param - * type of the element stored in the {@link java.util.Queue} - */ -public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueue(final int capacity) { - super(Math.max(2, capacity)); - } - - @Override - public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - - // local load of field to avoid repeated loads after volatile reads - final long capacity = this.mask + 1; - final long[] lSequenceBuffer = this.sequenceBuffer; - long currentProducerIndex; - long seqOffset; - long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - while (true) { - currentProducerIndex = lvProducerIndex(); // LoadLoad - seqOffset = calcSequenceOffset(currentProducerIndex); - final long seq = lvSequence(lSequenceBuffer, seqOffset); // LoadLoad - final long delta = seq - currentProducerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) { - // Successful CAS: full barrier - break; - } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - currentProducerIndex - capacity <= cIndex && // test against cached cIndex - currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - return false; - } - - // another producer has moved the sequence by one, retry 2 - } - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long elementOffset = calcElementOffset(currentProducerIndex); - spElement(elementOffset, e); - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(lSequenceBuffer, seqOffset, currentProducerIndex + 1); // StoreStore - - return true; - } - - /** - * {@inheritDoc} - *

- * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll - * and must test producer index when next element is not visible. - */ - @Override - public E poll() { - // local load of field to avoid repeated loads after volatile reads - final long[] lSequenceBuffer = this.sequenceBuffer; - long currentConsumerIndex; - long seqOffset; - long pIndex = -1; // start with bogus value, hope we don't need it - while (true) { - currentConsumerIndex = lvConsumerIndex();// LoadLoad - seqOffset = calcSequenceOffset(currentConsumerIndex); - final long seq = lvSequence(lSequenceBuffer, seqOffset);// LoadLoad - final long delta = seq - (currentConsumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) { - // Successful CAS: full barrier - break; - } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - currentConsumerIndex >= pIndex && // test against cached pIndex - currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - return null; - } - - // another consumer beat us and moved sequence ahead, retry 2 - } - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex); - final E e = lpElement(offset); - spElement(offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + this.mask + 1);// StoreStore - - return e; - } - - @Override - public E peek() { - long currConsumerIndex; - E e; - do { - currConsumerIndex = lvConsumerIndex(); - // other consumers may have grabbed the element, or queue might be empty - e = lpElement(calcElementOffset(currConsumerIndex)); - // only return null if queue is empty - } while (e == null && currConsumerIndex != lvProducerIndex()); - return e; - } - - @Override - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return lvConsumerIndex() == lvProducerIndex(); - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/DisruptorThreadFactory.java b/src/main/java/net/engio/mbassy/multi/common/NamedThreadFactory.java similarity index 95% rename from src/main/java/net/engio/mbassy/multi/common/DisruptorThreadFactory.java rename to src/main/java/net/engio/mbassy/multi/common/NamedThreadFactory.java index 7828989..189e2f4 100644 --- a/src/main/java/net/engio/mbassy/multi/common/DisruptorThreadFactory.java +++ b/src/main/java/net/engio/mbassy/multi/common/NamedThreadFactory.java @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @author dorkbox, llc * Date: 2/2/15 */ -public class DisruptorThreadFactory implements ThreadFactory { +public class NamedThreadFactory implements ThreadFactory { /** * The stack size is arbitrary based on JVM implementation. Default is 0 * 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k @@ -64,7 +64,7 @@ public class DisruptorThreadFactory implements ThreadFactory { private final ThreadGroup group; private final String groupName; - public DisruptorThreadFactory(String groupName) { + public NamedThreadFactory(String groupName) { this.groupName = groupName; this.group = new ThreadGroup(groupName); } @@ -80,7 +80,7 @@ public class DisruptorThreadFactory implements ThreadFactory { // 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k // To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit) // Setting the size MAY or MAY NOT have any effect!!! - Thread t = new Thread(this.group, r, stringBuilder.toString(), DisruptorThreadFactory.stackSizeForThreads); + Thread t = new Thread(this.group, r, stringBuilder.toString(), NamedThreadFactory.stackSizeForThreads); t.setDaemon(true);// FORCE these threads to finish before allowing the JVM to exit if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); diff --git a/src/main/java/net/engio/mbassy/multi/common/PTLQueue.java b/src/main/java/net/engio/mbassy/multi/common/PTLQueue.java deleted file mode 100644 index 80beda3..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/PTLQueue.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright 2014 David Karnok - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package net.engio.mbassy.multi.common; - -import java.util.concurrent.atomic.AtomicLongArray; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * A bounded MPMC queue implementation based on - * https://blogs.oracle.com/dave/entry/ptlqueue_a_scalable_bounded_capacity - *

- * Does not support null values. - */ -public final class PTLQueue { - private final int mask; - private final int length; - - private final PaddedAtomicLong offerCursor; - private final PaddedAtomicLong pollCursor; - private final AtomicLongArray turns; - private final AtomicReferenceArray values; - - public PTLQueue(int capacity) { - capacity = Pow2.roundToPowerOfTwo(capacity); - this.mask = capacity - 1; - this.length = capacity; - this.offerCursor = new PaddedAtomicLong(); - this.pollCursor = new PaddedAtomicLong(); - this.turns = new AtomicLongArray(capacity); - this.values = new AtomicReferenceArray<>(capacity); - for (int i = 0; i < this.length - 1; i++) { - this.turns.lazySet(i, i); - } - this.turns.set(this.length - 1, this.length - 1); - } - - public void put(E value, Runnable ifWait) { - nullCheck(value); - long ticket = this.offerCursor.getAndIncrement(); - int slot = (int)ticket & this.mask; - while (this.turns.get(slot) != ticket) { - ifWait.run(); - } - this.values.set(slot, value); - } - - public boolean offer(E value) { - nullCheck(value); - for (;;) { - long ticket = this.offerCursor.get(); - int slot = (int)ticket & this.mask; - if (this.turns.get(slot) != ticket) { - return false; - } - if (this.offerCursor.compareAndSet(ticket, ticket + 1)) { - this.values.set(slot, value); - return true; - } - } - } - - private void nullCheck(E value) { - if (value == null) { - throw new NullPointerException("Null values not allowed here!"); - } - } - - public E take(Runnable ifWait) { - long ticket = this.pollCursor.getAndIncrement(); - int slot = (int)ticket & this.mask; - while (this.turns.get(slot) != ticket) { - ifWait.run(); - } - for (;;) { - E v = this.values.get(slot); - if (v != null) { - this.values.lazySet(slot, null); - this.turns.set(slot, ticket /* + mask + 1*/); - return v; - } - ifWait.run(); - } - } - - public boolean isEmpty() { - for (;;) { - long ticket = this.pollCursor.get(); - int slot = (int)ticket & this.mask; - if (this.turns.get(slot) != ticket) { - return true; - } - E v = this.values.get(slot); - if (v == null) { - return true; - } - - return false; - } - } - - public E poll() { - for (;;) { - long ticket = this.pollCursor.get(); - int slot = (int)ticket & this.mask; - if (this.turns.get(slot) != ticket) { - return null; - } - E v = this.values.get(slot); - if (v == null) { - return null; - } - if (this.pollCursor.compareAndSet(ticket, ticket + 1)) { - this.values.lazySet(slot, null); - this.turns.set(slot, ticket + this.length); - return v; - } - } - } - - public E pollStrong() { - for (;;) { - long ticket = this.pollCursor.get(); - int slot = (int)ticket & this.mask; - if (this.turns.get(slot) != ticket) { - if (this.pollCursor.get() != ticket) { - continue; - } - return null; - } - E v = this.values.get(slot); - if (v == null) { - if ((this.pollCursor.get() ^ ticket | this.turns.get(slot) ^ ticket) != 0) { - continue; - } - return null; - } - if (this.pollCursor.compareAndSet(ticket, ticket + 1)) { - this.values.lazySet(slot, null); - this.turns.set(slot, ticket + this.length); - return v; - } - } - } -} diff --git a/src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLong.java b/src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLong.java deleted file mode 100644 index 163a221..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLong.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2014 David Karnok - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package net.engio.mbassy.multi.common; - -/** - * A padded atomic integer to fill in 4 cache lines to avoid any false sharing or - * adjacent prefetch. - * Based on Netty's implementation. - */ -public final class PaddedAtomicLong extends PaddedAtomicLongBase { - /** */ - private static final long serialVersionUID = 8781891581317286855L; - /** Padding. */ - public transient long p16, p17, p18, p19, p20, p21, p22; // 56 bytes (the remaining 8 is in the base) - /** Padding. */ - public transient long p24, p25, p26, p27, p28, p29, p30, p31; // 64 bytes -} diff --git a/src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLongBase.java b/src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLongBase.java deleted file mode 100644 index 30f7540..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/PaddedAtomicLongBase.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2014 David Karnok - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package net.engio.mbassy.multi.common; - -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -/** - * The atomic reference base padded at the front. - * Based on Netty's implementation. - */ -abstract class PaddedAtomicLongBase extends FrontPadding { - - private static final long serialVersionUID = 6513142711280243198L; - - private static final AtomicLongFieldUpdater updater; - - static { - updater = AtomicLongFieldUpdater.newUpdater(PaddedAtomicLongBase.class, "value"); - } - - private volatile long value; // 8-byte object field (or 4-byte + padding) - - public final long get() { - return this.value; - } - - public final void set(long referent) { - this.value = referent; - } - - public final void lazySet(long referent) { - updater.lazySet(this, referent); - } - - public final boolean compareAndSet(long expect, long update) { - return updater.compareAndSet(this, expect, update); - } - - public final boolean weakCompareAndSet(long expect, long update) { - return updater.weakCompareAndSet(this, expect, update); - } - - public final long getAndSet(long newValue) { - return updater.getAndSet(this, this.value); - } - - public final long getAndAdd(long delta) { - return updater.getAndAdd(this, delta); - } - public final long incrementAndGet() { - return updater.incrementAndGet(this); - } - public final long decrementAndGet() { - return updater.decrementAndGet(this); - } - public final long getAndIncrement() { - return updater.getAndIncrement(this); - } - public final long getAndDecrement() { - return updater.getAndDecrement(this); - } - public final long addAndGet(long delta) { - return updater.addAndGet(this, delta); - } - - @Override - public String toString() { - return String.valueOf(get()); - } -} diff --git a/src/main/java/net/engio/mbassy/multi/common/Pow2.java b/src/main/java/net/engio/mbassy/multi/common/Pow2.java deleted file mode 100644 index daa1740..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/Pow2.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.engio.mbassy.multi.common; - -/** - * Power of 2 utility functions. - */ -public class Pow2 { - - /** - * Find the next larger positive power of two value up from the given value. If value is a power of two then - * this value will be returned. - * - * @param value from which next positive power of two will be found. - * @return the next positive power of 2 or this value if it is a power of 2. - */ - public static int roundToPowerOfTwo(final int value) { - return 1 << 32 - Integer.numberOfLeadingZeros(value - 1); - } - - /** - * Is this value a power of two. - * - * @param value to be tested to see if it is a power of two. - * @return true if the value is a power of 2 otherwise false. - */ - public static boolean isPowerOfTwo(final int value) { - return (value & value - 1) == 0; - } -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/common/UnsafeAccess.java b/src/main/java/net/engio/mbassy/multi/common/UnsafeAccess.java deleted file mode 100644 index 7dda536..0000000 --- a/src/main/java/net/engio/mbassy/multi/common/UnsafeAccess.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.engio.mbassy.multi.common; - -import java.lang.reflect.Field; -import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import sun.misc.Unsafe; - -/** - * Why should we resort to using Unsafe?
- *

    - *
  1. To construct class fields which allow volatile/ordered/plain access: This requirement is covered by - * {@link AtomicReferenceFieldUpdater} and similar but their performance is arguably worse than the DIY approach - * (depending on JVM version) while Unsafe intrinsification is a far lesser challenge for JIT compilers. - *
  2. To construct flavors of {@link AtomicReferenceArray}. - *
  3. Other use cases exist but are not present in this library yet. - *
- * - * @author nitsanw - * - */ -public class UnsafeAccess { - public static final boolean SUPPORTS_GET_AND_SET; - public static final Unsafe UNSAFE; - static { - try { - final Field field = Unsafe.class.getDeclaredField("theUnsafe"); - field.setAccessible(true); - UNSAFE = (Unsafe) field.get(null); - } catch (Exception e) { - SUPPORTS_GET_AND_SET = false; - throw new RuntimeException(e); - } - boolean getAndSetSupport = false; - try { - Unsafe.class.getMethod("getAndSetObject", Object.class, Long.TYPE,Object.class); - getAndSetSupport = true; - } catch (Exception e) { - } - SUPPORTS_GET_AND_SET = getAndSetSupport; - } - -} \ No newline at end of file diff --git a/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java b/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java index a39ad5e..526dd09 100644 --- a/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java +++ b/src/main/java/net/engio/mbassy/multi/listener/MessageHandler.java @@ -88,202 +88,6 @@ public class MessageHandler { return this.acceptsSubtypes; } - /** - * @return true if the message types are handled - */ - public boolean handlesMessage(Class messageType) { - Class[] handledMessages = this.handledMessages; - int handledLength = handledMessages.length; - - if (handledLength != 1) { - return false; - } - - if (this.acceptsSubtypes) { - if (!handledMessages[0].isAssignableFrom(messageType)) { - return false; - } - } else { - if (handledMessages[0] != messageType) { - return false; - } - } - - return true; - } - - /** - * @return true if the message types are handled - */ - public boolean handlesMessage(Class messageType1, Class messageType2) { - Class[] handledMessages = this.handledMessages; - int handledLength = handledMessages.length; - - if (handledLength != 2) { - return false; - } - - if (this.acceptsSubtypes) { - if (!handledMessages[0].isAssignableFrom(messageType1)) { - return false; - } - if (!handledMessages[1].isAssignableFrom(messageType2)) { - return false; - } - } else { - if (handledMessages[0] != messageType1) { - return false; - } - if (handledMessages[1] != messageType2) { - return false; - } - } - - return true; - } - - /** - * @return true if the message types are handled - */ - public boolean handlesMessage(Class messageType1, Class messageType2, Class messageType3) { - Class[] handledMessages = this.handledMessages; - int handledLength = handledMessages.length; - - if (handledLength != 3) { - return false; - } - - if (this.acceptsSubtypes) { - if (!handledMessages[0].isAssignableFrom(messageType1)) { - return false; - } - if (!handledMessages[1].isAssignableFrom(messageType2)) { - return false; - } - if (!handledMessages[2].isAssignableFrom(messageType3)) { - return false; - } - } else { - if (handledMessages[0] != messageType1) { - return false; - } - if (handledMessages[1] != messageType2) { - return false; - } - if (handledMessages[2] != messageType3) { - return false; - } - } - - return true; - } - - /** - * @return true if the message types are handled - */ - public boolean handlesMessage(Class... messageTypes) { - Class[] handledMessages = this.handledMessages; - int handledLength = handledMessages.length; - int handledLengthMinusVarArg = handledLength-1; - - int messagesLength = messageTypes.length; - - // do we even have enough to even CHECK the var-arg? - if (messagesLength < handledLengthMinusVarArg) { - // totally wrong number of args - return false; - } - - // check BEFORE var-arg in handler (var-arg can ONLY be last element in array) - if (handledLengthMinusVarArg <= messagesLength) { - if (this.acceptsSubtypes) { - for (int i = 0; i < handledLengthMinusVarArg; i++) { - Class handledMessage = handledMessages[i]; - Class messageType = messageTypes[i]; - - if (!handledMessage.isAssignableFrom(messageType)) { - return false; - } - } - } else { - for (int i = 0; i < handledLengthMinusVarArg; i++) { - Class handledMessage = handledMessages[i]; - Class messageType = messageTypes[i]; - - if (handledMessage != messageType) { - return false; - } - } - } - } - - // do we even HAVE var-arg? - if (!handledMessages[handledLengthMinusVarArg].isArray()) { - // DO NOT HAVE VAR_ARG PRESENT IN HANDLERS - - // fast exit - if (handledLength != messagesLength) { - return false; - } - - // compare remaining arg - Class handledMessage = handledMessages[handledLengthMinusVarArg]; - Class messageType = messageTypes[handledLengthMinusVarArg]; - - if (this.acceptsSubtypes) { - if (!handledMessage.isAssignableFrom(messageType)) { - return false; - } - } else { - if (handledMessage != messageType) { - return false; - } - } - // all args are dandy - return true; - } - - // WE HAVE VAR_ARG PRESENT IN HANDLER - - // do we have enough args to NEED to check the var-arg? - if (handledLengthMinusVarArg == messagesLength) { - // var-arg doesn't need checking - return true; - } - - // then check var-arg in handler - - // all the args to check for the var-arg MUST be the same! (ONLY ONE ARRAY THOUGH CAN BE PRESENT) - int messagesLengthMinusVarArg = messagesLength-1; - - Class typeCheck = messageTypes[handledLengthMinusVarArg]; - for (int i = handledLengthMinusVarArg; i < messagesLength; i++) { - Class t1 = messageTypes[i]; - if (t1 != typeCheck) { - return false; - } - } - - // if we got this far, then the args are the same type. IF we have more than one, AND they are arrays, NOPE! - if (messagesLength - handledLengthMinusVarArg > 1 && messageTypes[messagesLengthMinusVarArg].isArray()) { - return false; - } - - // are we comparing array -> array or string -> array - Class componentType; - if (messageTypes[messagesLengthMinusVarArg].isArray()) { - componentType = handledMessages[handledLengthMinusVarArg]; - } else { - componentType = handledMessages[handledLengthMinusVarArg].getComponentType(); - } - - if (this.acceptsSubtypes) { - return componentType.isAssignableFrom(typeCheck); - } else { - return typeCheck == componentType; - } - } - @Override public int hashCode() { final int prime = 31; diff --git a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java index 77bb28a..a9877cf 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/Subscription.java @@ -66,40 +66,11 @@ public class Subscription { return this.handlerMetadata.acceptsSubtypes(); } - /** - * Check whether this subscription manages a message handler - */ - public boolean handlesMessageType(Class messageType) { - return this.handlerMetadata.handlesMessage(messageType); - } - - /** - * Check whether this subscription manages a message handler - */ - public boolean handlesMessageType(Class messageType1, Class messageType2) { - return this.handlerMetadata.handlesMessage(messageType1, messageType2); - } - - /** - * Check whether this subscription manages a message handler - */ - public boolean handlesMessageType(Class messageType1, Class messageType2, Class messageType3) { - return this.handlerMetadata.handlesMessage(messageType1, messageType2, messageType3); - } - - /** - * Check whether this subscription manages a message handler - */ - public boolean handlesMessageType(Class... messageTypes) { - return this.handlerMetadata.handlesMessage(messageTypes); - } - public Class[] getHandledMessageTypes() { return this.handlerMetadata.getHandledMessages(); } public void subscribe(Object listener) { -// this.listeners.put(listener, Boolean.TRUE); this.listeners.add(listener); } @@ -119,17 +90,16 @@ public class Subscription { return this.listeners.size(); } -// private AtomicLong counter = new AtomicLong(); public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) { Collection listeners = this.listeners; if (listeners.size() > 0) { Method handler = this.handlerMetadata.getHandler(); -// int count = 0; + IHandlerInvocation invocation = this.invocation; + for (Object listener : listeners) { -// count++; try { - this.invocation.invoke(listener, handler, message); + invocation.invoke(listener, handler, message); } catch (IllegalAccessException e) { errorHandler.handlePublicationError(new PublicationError() .setMessage("Error during invocation of message handler. " + @@ -165,7 +135,6 @@ public class Subscription { .setPublishedObject(message)); } } -// this.counter.getAndAdd(count); } } diff --git a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java index a2bc552..3728da2 100644 --- a/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java +++ b/src/main/java/net/engio/mbassy/multi/subscription/SubscriptionManager.java @@ -2,10 +2,9 @@ package net.engio.mbassy.multi.subscription; import java.lang.reflect.Array; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -17,6 +16,7 @@ import net.engio.mbassy.multi.common.ReflectionUtils; import net.engio.mbassy.multi.common.StrongConcurrentSet; import net.engio.mbassy.multi.listener.MessageHandler; import net.engio.mbassy.multi.listener.MetadataReader; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; @@ -32,6 +32,8 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; * Date: 2/2/15 */ public class SubscriptionManager { + private static final int DEFAULT_SUPER_CLASS_TREE_SIZE = 4; + // the metadata reader that is used to inspect objects passed to the subscribe method private final MetadataReader metadataReader = new MetadataReader(); @@ -48,13 +50,15 @@ public class SubscriptionManager { // once a collection of subscriptions is stored it does not change private final Map, Collection> subscriptionsPerListener = new IdentityHashMap, Collection>(50); - private final Object holder = new Object[0]; // remember classes that can have VarArg casting performed private final ConcurrentHashMap, Class> varArgClasses = new ConcurrentHashMap, Class>(); - private final Map, ArrayList>> superClassesCache = new IdentityHashMap, ArrayList>>(); + private final Map, ArrayDeque>> superClassesCache = new IdentityHashMap, ArrayDeque>>(); +// private final Map, Collection> superClassSubscriptionsPerMessageSingle = new IdentityHashMap, Collection>(50); + + // remember already processed classes that do not contain any message handlers private final ConcurrentHashMap, Object> nonListeners = new ConcurrentHashMap, Object>(); @@ -101,6 +105,15 @@ public class SubscriptionManager { this.subscriptionsPerMessageSingle.remove(clazz); } } +// Collection superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz); +// if (superSubs != null) { +// superSubs.remove(subscription); +// +// if (superSubs.isEmpty()) { +// // remove element +// this.superClassSubscriptionsPerMessageSingle.remove(clazz); +// } +// } } else { // NOTE: Not thread-safe! must be synchronized in outer scope IdentityObjectTree, Collection> tree; @@ -197,14 +210,25 @@ public class SubscriptionManager { // single Class clazz = handledMessageTypes[0]; - // NOTE: Order is important for safe publication Collection subs = this.subscriptionsPerMessageSingle.get(clazz); +// Collection superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz); if (subs == null) { + // NOTE: Order is important for safe publication subs = new StrongConcurrentSet(2); subs.add(subscription); this.subscriptionsPerMessageSingle.put(clazz, subs); + +// if (subscription.acceptsSubtypes()) { +// superSubs = new StrongConcurrentSet(2); +// superSubs.add(subscription); +// this.superClassSubscriptionsPerMessageSingle.put(clazz, superSubs); +// } } else { subs.add(subscription); + +// if (subscription.acceptsSubtypes()) { +// superSubs.add(subscription); +// } } // have to save our the VarArg class types, because creating var-arg arrays for objects is expensive @@ -215,11 +239,10 @@ public class SubscriptionManager { // since it's vararg, this means that it's an ARRAY, so we ALSO // have to add the component classes of the array if (subscription.acceptsSubtypes()) { - ArrayList> setupSuperClassCache2 = setupSuperClassCache(componentType); - // have to setup each vararg chain - for (int i = 0; i < setupSuperClassCache2.size(); i++) { - Class superClass = setupSuperClassCache2.get(i); + ArrayDeque> superClasses = setupSuperClassCache(componentType); + // have to setup each vararg chain + for (Class superClass : superClasses) { if (!this.varArgClasses.containsKey(superClass)) { // this is expensive, so we check the cache first Class c2 = Array.newInstance(superClass, 1).getClass(); @@ -285,309 +308,182 @@ public class SubscriptionManager { } } - private final Collection EMPTY_LIST = Collections.emptyList(); - - - // cannot return null, not thread safe. + // must be protected by read lock + // CAN RETURN NULL - not thread safe. public Collection getSubscriptionsByMessageType(Class messageType) { - Collection subs = this.subscriptionsPerMessageSingle.get(messageType); - if (subs != null) { - - return subs; - } else { - return this.EMPTY_LIST; - } + return this.subscriptionsPerMessageSingle.get(messageType); } - - // obtain the set of subscriptions for the given message type - // Note: never returns null! - public Collection DEPRECATED_getSubscriptionsByMessageType(Class messageType) { - // thread safe publication - Collection subscriptions; - - try { - this.LOCK.readLock().lock(); - - int count = 0; - Collection subs = this.subscriptionsPerMessageSingle.get(messageType); - if (subs != null) { - subscriptions = new ArrayDeque(count); - subscriptions.addAll(subs); - } else { - subscriptions = new ArrayDeque(16); - } - - // also add all subscriptions that match super types - ArrayList> types1 = setupSuperClassCache(messageType); - if (types1 != null) { - Class eventSuperType; - int i; - for (i = 0; i < types1.size(); i++) { - eventSuperType = types1.get(i); - subs = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType)) { - subscriptions.add(sub); - } - } - } - addVarArgClass(subscriptions, eventSuperType); - } - } - - addVarArgClass(subscriptions, messageType); - - } finally { - this.LOCK.readLock().unlock(); - } - - return subscriptions; - } - - - // obtain the set of subscriptions for the given message types - // Note: never returns null! + // must be protected by read lock + // CAN RETURN NULL - not thread safe. public Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2) { - // thread safe publication - Collection subscriptions = new ArrayDeque(); - - try { - this.LOCK.readLock().lock(); - - // also add all subscriptions that match super types - ArrayList> types1 = setupSuperClassCache(messageType1); - ArrayList> types2 = setupSuperClassCache(messageType2); - - Collection subs; - Class eventSuperType1 = messageType1; - IdentityObjectTree, Collection> leaf1; - Class eventSuperType2 = messageType1; - IdentityObjectTree, Collection> leaf2; - - int i; - int j; - for (i = -1; i < types1.size(); i++) { - if (i > -1) { - eventSuperType1 = types1.get(i); - } - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - - if (leaf1 != null) { - for (j = -1; j < types2.size(); j++) { - if (j > -1) { - eventSuperType2 = types2.get(j); - } - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null) { - subs = leaf2.getValue(); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType1, messageType2)) { - subscriptions.add(sub); - } - } - } - } - } - } - } - - - /////////////// - // if they are ALL the same type, a var-arg handler might match - /////////////// - if (messageType1 == messageType2) { - addVarArgClasses(subscriptions, messageType1, types1); - } - } finally { - this.LOCK.readLock().unlock(); - } - - return subscriptions; + return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2); } - // obtain the set of subscriptions for the given message types - // Note: never returns null! + // must be protected by read lock + // CAN RETURN NULL - not thread safe. public Collection getSubscriptionsByMessageType(Class messageType1, Class messageType2, Class messageType3) { - // thread safe publication - Collection subscriptions = new ArrayDeque(); - - try { - this.LOCK.readLock().lock(); - - // also add all subscriptions that match super types - ArrayList> types1 = setupSuperClassCache(messageType1); - ArrayList> types2 = setupSuperClassCache(messageType2); - ArrayList> types3 = setupSuperClassCache(messageType3); - - Class eventSuperType1 = messageType1; - IdentityObjectTree, Collection> leaf1; - Class eventSuperType2 = messageType2; - IdentityObjectTree, Collection> leaf2; - Class eventSuperType3 = messageType3; - IdentityObjectTree, Collection> leaf3; - - Collection subs; - int i; - int j; - int k; - for (i = -1; i < types1.size(); i++) { - if (i > -1) { - eventSuperType1 = types1.get(i); - } - leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); - - if (leaf1 != null) { - for (j = -1; j < types2.size(); j++) { - if (j > -1) { - eventSuperType2 = types2.get(j); - } - leaf2 = leaf1.getLeaf(eventSuperType2); - - if (leaf2 != null) { - for (k = -1; k < types3.size(); k++) { - if (k > -1) { - eventSuperType3 = types3.get(k); - } - leaf3 = leaf2.getLeaf(eventSuperType3); - - if (leaf3 != null) { - subs = leaf3.getValue(); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageType1, messageType2, messageType3)) { - subscriptions.add(sub); - } - } - } - } - } - } - } - } - } - - - /////////////// - // if they are ALL the same type, a var-arg handler might match - /////////////// - if (messageType1 == messageType2 && messageType2 == messageType3) { - addVarArgClasses(subscriptions, messageType1, types1); - } - } finally { - this.LOCK.readLock().unlock(); - } - - return subscriptions; + return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3); } - // obtain the set of subscriptions for the given message types - // Note: never returns null! + // must be protected by read lock + // CAN RETURN NULL - not thread safe. public Collection getSubscriptionsByMessageType(Class... messageTypes) { - // thread safe publication - Collection subscriptions = new ArrayDeque(); + return this.subscriptionsPerMessageMulti.getValue(messageTypes); + } - try { - this.LOCK.readLock().lock(); - int count = 16; + // must be protected by read lock + // ALSO checks to see if the superClass accepts subtypes. + public Collection getSuperSubscriptions(Class superType) { + Collection> types = this.superClassesCache.get(superType); + if (types == null || types.isEmpty()) { + return null; + } - // NOTE: Not thread-safe! must be synchronized in outer scope - Collection subs = this.subscriptionsPerMessageMulti.getValue(messageTypes); + Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); + + for (Class superClass : types) { + Collection subs = this.subscriptionsPerMessageSingle.get(superClass); if (subs != null) { for (Subscription sub : subs) { - if (sub.handlesMessageType(messageTypes)) { - subscriptions.add(sub); + if (sub.acceptsSubtypes()) { + subsPerType.add(sub); } } } + } - int size = messageTypes.length; - if (size > 0) { - boolean allSameType = true; - Class firstType = messageTypes[0]; + return subsPerType; + } - int i; - for (i = 0;i getSuperSubscriptions(Class superType1, Class superType2) { + // not thread safe. DO NOT MODIFY + Collection> types1 = this.superClassesCache.get(superType1); + Collection> types2 = this.superClassesCache.get(superType2); + + Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); + + Collection subs; + IdentityObjectTree, Collection> leaf1; + IdentityObjectTree, Collection> leaf2; + + Iterator> iterator1 = new SuperClassIterator(superType1, types1); + Iterator> iterator2; + + Class eventSuperType1; + Class eventSuperType2; + + while (iterator1.hasNext()) { + eventSuperType1 = iterator1.next(); + boolean type1Matches = eventSuperType1 == superType1; + + leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); + if (leaf1 != null) { + iterator2 = new SuperClassIterator(superType2, types2); + + while (iterator2.hasNext()) { + eventSuperType2 = iterator2.next(); + if (type1Matches && eventSuperType2 == superType2) { + continue; } - } + leaf2 = leaf1.getLeaf(eventSuperType2); - // add all subscriptions that match super types combinations - // have to use recursion for this. BLEH - getSubsVarArg(subscriptions, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes); - - /////////////// - // if they are ALL the same type, a var-arg handler might match - /////////////// - if (allSameType) { - // do we have a var-arg (it shows as an array) subscribed? - - ArrayList> superClasses = setupSuperClassCache(firstType); - - Class eventSuperType = firstType; - int j; - - for (j = -1; j < superClasses.size(); j++) { - if (j > -1) { - eventSuperType = superClasses.get(j); - } - if (this.varArgClasses.containsKey(eventSuperType)) { - // messy, but the ONLY way to do it. - // NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method - eventSuperType = Array.newInstance(eventSuperType, 1).getClass(); - - // also add all subscriptions that match super types - subs = this.subscriptionsPerMessageSingle.get(eventSuperType); - if (subs != null) { - for (Subscription sub : subs) { - subscriptions.add(sub); + if (leaf2 != null) { + subs = leaf2.getValue(); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.acceptsSubtypes()) { + subsPerType.add(sub); } } } } } - } - - - } finally { - this.LOCK.readLock().unlock(); } - return subscriptions; + return subsPerType; } - - private final Collection> EMPTY_LIST_CLASSES = Collections.emptyList(); // must be protected by read lock - public Collection> getSuperClasses(Class clazz) { + // ALSO checks to see if the superClass accepts subtypes. + public Collection getSuperSubscriptions(Class superType1, Class superType2, Class superType3) { // not thread safe. DO NOT MODIFY - ArrayList> types = this.superClassesCache.get(clazz); + Collection> types1 = this.superClassesCache.get(superType1); + Collection> types2 = this.superClassesCache.get(superType2); + Collection> types3 = this.superClassesCache.get(superType3); - if (types != null) { - return types; + Collection subsPerType = new ArrayDeque(DEFAULT_SUPER_CLASS_TREE_SIZE); + + Collection subs; + IdentityObjectTree, Collection> leaf1; + IdentityObjectTree, Collection> leaf2; + IdentityObjectTree, Collection> leaf3; + + Iterator> iterator1 = new SuperClassIterator(superType1, types1); + Iterator> iterator2; + Iterator> iterator3; + + Class eventSuperType1; + Class eventSuperType2; + Class eventSuperType3; + + while (iterator1.hasNext()) { + eventSuperType1 = iterator1.next(); + boolean type1Matches = eventSuperType1 == superType1; + + leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1); + if (leaf1 != null) { + iterator2 = new SuperClassIterator(superType2, types2); + + while (iterator2.hasNext()) { + eventSuperType2 = iterator2.next(); + boolean type12Matches = type1Matches && eventSuperType2 == superType2; + + leaf2 = leaf1.getLeaf(eventSuperType2); + + if (leaf2 != null) { + iterator3 = new SuperClassIterator(superType3, types3); + + while (iterator3.hasNext()) { + eventSuperType3 = iterator3.next(); + if (type12Matches && eventSuperType3 == superType3) { + continue; + } + + leaf3 = leaf2.getLeaf(eventSuperType3); + + subs = leaf3.getValue(); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.acceptsSubtypes()) { + subsPerType.add(sub); + } + } + } + } + } + } + } } - return this.EMPTY_LIST_CLASSES; + return subsPerType; } - - // not a thread safe collection. must be locked by caller - private ArrayList> setupSuperClassCache(Class clazz) { - ArrayList> types = this.superClassesCache.get(clazz); + // not a thread safe collection, but it doesn't matter + private ArrayDeque> setupSuperClassCache(Class clazz) { + ArrayDeque> types = this.superClassesCache.get(clazz); if (types == null) { // it doesn't matter if concurrent access stomps on values, since they are always the same. Set> superTypes = ReflectionUtils.getSuperTypes(clazz); - types = new ArrayList>(superTypes); - // NOTE: no need to write lock, since race conditions will result in duplicate answers + types = new ArrayDeque>(superTypes); + // NOTE: no need to write lock, since race conditions will result in duplicate answers (which we don't care about) this.superClassesCache.put(clazz, types); } @@ -619,20 +515,56 @@ public class SubscriptionManager { public Collection getVarArgs(Class clazz) { Class varArgClass = this.varArgClasses.get(clazz); if (varArgClass != null) { - Collection subs = this.subscriptionsPerMessageSingle.get(varArgClass); - if (subs != null) { - return subs; + return this.subscriptionsPerMessageSingle.get(varArgClass); + } + return null; + } + + // must be protected by read lock + public Collection getVarArgs(Class clazz1, Class clazz2) { + if (clazz1 == clazz2) { + Class varArgClass = this.varArgClasses.get(clazz1); + if (varArgClass != null) { + return this.subscriptionsPerMessageSingle.get(varArgClass); } } - - return this.EMPTY_LIST; + return null; } + // must be protected by read lock + public Collection getVarArgs(Class clazz1, Class clazz2, Class clazz3) { + if (clazz1 == clazz2 && clazz2 == clazz3) { + Class varArgClass = this.varArgClasses.get(clazz1); + if (varArgClass != null) { + return this.subscriptionsPerMessageSingle.get(varArgClass); + } + } + return null; + } + + // must be protected by read lock + public Collection getVarArgs(Class... classes) { + // classes IS ALREADY ALL SAME TYPE! + Class firstClass = classes[0]; + + Class varArgClass = this.varArgClasses.get(firstClass); + if (varArgClass != null) { + return this.subscriptionsPerMessageSingle.get(varArgClass); + } + return null; + } + + + + + + + /////////////// // a var-arg handler might match // tricky part. We have to check the ARRAY version /////////////// - private void addVarArgClasses(Collection subscriptions, Class messageType, ArrayList> types1) { + private void addVarArgClasses(Collection subscriptions, Class messageType, ArrayDeque> types1) { Collection subs; Class varArgClass = this.varArgClasses.get(messageType); @@ -665,7 +597,7 @@ public class SubscriptionManager { Class classType = messageTypes[index]; // get all the super types, if there are any. - ArrayList> superClasses = setupSuperClassCache(classType); + ArrayDeque> superClasses = setupSuperClassCache(classType); IdentityObjectTree, Collection> leaf; Collection subs; @@ -674,27 +606,27 @@ public class SubscriptionManager { int i; int newIndex; - for (i = -1; i < superClasses.size(); i++) { - if (i > -1) { - superClass = superClasses.get(i); - } - leaf = tree.getLeaf(superClass); - if (leaf != null) { - newIndex = index+1; - if (index == length) { - subs = leaf.getValue(); - if (subs != null) { - for (Subscription sub : subs) { - if (sub.handlesMessageType(messageTypes)) { - subscriptions.add(sub); - } - } - } - } else { - getSubsVarArg(subscriptions, length, newIndex, leaf, messageTypes); - } - } - } +// for (i = -1; i < superClasses.size(); i++) { +// if (i > -1) { +// superClass = superClasses.get(i); +// } +// leaf = tree.getLeaf(superClass); +// if (leaf != null) { +// newIndex = index+1; +// if (index == length) { +// subs = leaf.getValue(); +// if (subs != null) { +// for (Subscription sub : subs) { +// if (sub.handlesMessageType(messageTypes)) { +// subscriptions.add(sub); +// } +// } +// } +// } else { +// getSubsVarArg(subscriptions, length, newIndex, leaf, messageTypes); +// } +// } +// } } public void readLock() { @@ -704,4 +636,52 @@ public class SubscriptionManager { public void readUnLock() { this.LOCK.readLock().unlock(); } + + public static class SuperClassIterator implements Iterator> { + private final Iterator> iterator; + private Class clazz; + + public SuperClassIterator(Class clazz, Collection> types) { + this.clazz = clazz; + if (types != null) { + this.iterator = types.iterator(); + } else { + this.iterator = null; + } + } + + @Override + public boolean hasNext() { + if (this.clazz != null) { + return true; + } + + if (this.iterator != null) { + return this.iterator.hasNext(); + } + + return false; + } + + @Override + public Class next() { + if (this.clazz != null) { + Class clazz2 = this.clazz; + this.clazz = null; + + return clazz2; + } + + if (this.iterator != null) { + return this.iterator.next(); + } + + return null; + } + + @Override + public void remove() { + throw new NotImplementedException(); + } + } } diff --git a/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java index 10963ea..d84fa4d 100644 --- a/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/multi/ConcurrentSetTest.java @@ -1,17 +1,23 @@ package net.engio.mbassy.multi; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; + import junit.framework.Assert; import net.engio.mbassy.multi.common.AssertSupport; import net.engio.mbassy.multi.common.ConcurrentExecutor; -import net.engio.mbassy.multi.common.IConcurrentSet; import org.junit.Before; import org.junit.Test; -import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicInteger; - /** * This test ensures the correct behaviour of the set implementation that is the building * block of the subscription implementations used by the Mbassador message bus. @@ -31,13 +37,14 @@ public abstract class ConcurrentSetTest extends AssertSupport { protected Set gcProtector = new HashSet(); + @Override @Before public void beforeTest(){ super.beforeTest(); - gcProtector = new HashSet(); + this.gcProtector = new HashSet(); } - - protected abstract IConcurrentSet createSet(); + + protected abstract Collection createSet(); @Test @@ -45,12 +52,12 @@ public abstract class ConcurrentSetTest extends AssertSupport { final LinkedList duplicates = new LinkedList(); final HashSet distinct = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); Random rand = new Random(); // getAll set of distinct objects and list of duplicates Object candidate = new Object(); - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { if (rand.nextInt() % 3 == 0) { candidate = new Object(); } @@ -66,7 +73,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.add(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // check that the control set and the test set contain the exact same elements assertEquals(distinct.size(), testSet.size()); @@ -77,21 +84,22 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Test() public void testIterationWithConcurrentRemoval() { - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); final Random rand = new Random(); - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { AtomicInteger element = new AtomicInteger(); testSet.add(element); - gcProtector.add(element); + this.gcProtector.add(element); } Runnable incrementer = new Runnable() { @Override public void run() { while(testSet.size() > 100){ - for(AtomicInteger element : testSet) + for(AtomicInteger element : testSet) { element.incrementAndGet(); + } } } @@ -101,9 +109,11 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Override public void run() { while(testSet.size() > 100){ - for(AtomicInteger element : testSet) - if(rand.nextInt() % 3 == 0 && testSet.size() > 100) + for(AtomicInteger element : testSet) { + if(rand.nextInt() % 3 == 0 && testSet.size() > 100) { testSet.remove(element); + } + } } } }; @@ -128,9 +138,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // getAll set of distinct objects and mark a subset of those for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); if (i % 3 == 0) { @@ -146,7 +156,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.add(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // remove all candidates that have previously been marked for removal from the test set ConcurrentExecutor.runConcurrent(new Runnable() { @@ -156,7 +166,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.remove(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it for (Object tar : testSet) { @@ -166,7 +176,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { // for removal assertEquals(source.size() - toRemove.size(), testSet.size()); for (Object src : source) { - if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); + if (!toRemove.contains(src)) { + assertTrue(testSet.contains(src)); + } } } @@ -175,9 +187,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); if (i % 3 == 0) { @@ -192,11 +204,12 @@ public abstract class ConcurrentSetTest extends AssertSupport { public void run() { for (Object src : source) { testSet.add(src); - if (toRemove.contains(src)) + if (toRemove.contains(src)) { testSet.remove(src); + } } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it for (Object tar : testSet) { @@ -206,17 +219,19 @@ public abstract class ConcurrentSetTest extends AssertSupport { // for removal assertEquals(source.size() - toRemove.size(), testSet.size()); for (Object src : source) { - if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); + if (!toRemove.contains(src)) { + assertTrue(testSet.contains(src)); + } } } @Test public void testCompleteRemoval() { final HashSet source = new HashSet(); - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); testSet.add(candidate); @@ -231,7 +246,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { testSet.remove(src); } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set still contains all objects from the source set that have not been marked @@ -245,10 +260,10 @@ public abstract class ConcurrentSetTest extends AssertSupport { @Test public void testRemovalViaIterator() { final HashSet source = new HashSet(); - final IConcurrentSet setUnderTest = createSet(); + final Collection setUnderTest = createSet(); // getAll set of candidates and mark subset for removal - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); setUnderTest.add(candidate); @@ -264,7 +279,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { iterator.remove(); } } - }, numberOfThreads); + }, this.numberOfThreads); // ensure that the test set still contains all objects from the source set that have not been marked @@ -288,7 +303,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { */ @Test public void testConcurrentAddRemove() { - final IConcurrentSet testSet = createSet(); + final Collection testSet = createSet(); // a set of unique integers that will stay permanently in the test set final List permanentObjects = new ArrayList(); // a set of objects that will be added and removed at random to the test set to force rehashing @@ -306,6 +321,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { // Adds and removes items // thus forcing constant rehashing of the backing hashtable Runnable rehasher = new Runnable() { + @Override public void run() { Random rand = new Random(); for(int times = 0; times < 1000 ; times++){ @@ -330,8 +346,9 @@ public abstract class ConcurrentSetTest extends AssertSupport { for (Object permanent : permanentObjects) { // permanent items are never touched, // --> set.contains(j) should always return true - if(!testSet.contains(permanent)) + if(!testSet.contains(permanent)) { missing.add(permanent); + } } } } @@ -344,19 +361,23 @@ public abstract class ConcurrentSetTest extends AssertSupport { public Set createWithRandomIntegers(int size, List excluding){ - if(excluding == null) excluding = new ArrayList(); + if(excluding == null) { + excluding = new ArrayList(); + } Set result = new HashSet(size); Random rand = new Random(); while(result.size() < size){ result.add(rand.nextInt()); } - for(Integer excluded : excluding) + for(Integer excluded : excluding) { result.remove(excluded); + } return result; } protected void protectFromGarbageCollector(Set elements){ - for(Object element : elements) - gcProtector.add(element); + for(Object element : elements) { + this.gcProtector.add(element); + } } } diff --git a/src/test/java/net/engio/mbassy/multi/MetadataReaderTest.java b/src/test/java/net/engio/mbassy/multi/MetadataReaderTest.java index 61fd576..48edc29 100644 --- a/src/test/java/net/engio/mbassy/multi/MetadataReaderTest.java +++ b/src/test/java/net/engio/mbassy/multi/MetadataReaderTest.java @@ -138,7 +138,7 @@ public class MetadataReaderTest extends AssertSupport { public List getHandlers(MessageListener listener, Class... messageTypes) { List matching = new LinkedList(); for (MessageHandler handler : listener.getHandlers()) { - if (handler.handlesMessage(messageTypes)) { + if (handlesMessage(handler, messageTypes)) { matching.add(handler); } } @@ -244,4 +244,201 @@ public class MetadataReaderTest extends AssertSupport { @Handler public void handleStringXplus1plusN(String[] s, Object o, Object... o1) {} } + + /** + * @return true if the message types are handled + */ + private static boolean handlesMessage(MessageHandler handler, Class messageType) { + Class[] handledMessages = handler.getHandledMessages(); + int handledLength = handledMessages.length; + + if (handledLength != 1) { + return false; + } + + if (handler.acceptsSubtypes()) { + if (!handledMessages[0].isAssignableFrom(messageType)) { + return false; + } + } else { + if (handledMessages[0] != messageType) { + return false; + } + } + + return true; + } + + /** + * @return true if the message types are handled + */ + private static boolean handlesMessage(MessageHandler handler, Class messageType1, Class messageType2) { + Class[] handledMessages = handler.getHandledMessages(); + int handledLength = handledMessages.length; + + if (handledLength != 2) { + return false; + } + + if (handler.acceptsSubtypes()) { + if (!handledMessages[0].isAssignableFrom(messageType1)) { + return false; + } + if (!handledMessages[1].isAssignableFrom(messageType2)) { + return false; + } + } else { + if (handledMessages[0] != messageType1) { + return false; + } + if (handledMessages[1] != messageType2) { + return false; + } + } + + return true; + } + + /** + * @return true if the message types are handled + */ + private static boolean handlesMessage(MessageHandler handler, Class messageType1, Class messageType2, Class messageType3) { + Class[] handledMessages = handler.getHandledMessages(); + int handledLength = handledMessages.length; + + if (handledLength != 3) { + return false; + } + + if (handler.acceptsSubtypes()) { + if (!handledMessages[0].isAssignableFrom(messageType1)) { + return false; + } + if (!handledMessages[1].isAssignableFrom(messageType2)) { + return false; + } + if (!handledMessages[2].isAssignableFrom(messageType3)) { + return false; + } + } else { + if (handledMessages[0] != messageType1) { + return false; + } + if (handledMessages[1] != messageType2) { + return false; + } + if (handledMessages[2] != messageType3) { + return false; + } + } + + return true; + } + + /** + * @return true if the message types are handled + */ + private static boolean handlesMessage(MessageHandler handler, Class... messageTypes) { + Class[] handledMessages = handler.getHandledMessages(); + + int handledLength = handledMessages.length; + int handledLengthMinusVarArg = handledLength-1; + + int messagesLength = messageTypes.length; + + // do we even have enough to even CHECK the var-arg? + if (messagesLength < handledLengthMinusVarArg) { + // totally wrong number of args + return false; + } + + // check BEFORE var-arg in handler (var-arg can ONLY be last element in array) + if (handledLengthMinusVarArg <= messagesLength) { + if (handler.acceptsSubtypes()) { + for (int i = 0; i < handledLengthMinusVarArg; i++) { + Class handledMessage = handledMessages[i]; + Class messageType = messageTypes[i]; + + if (!handledMessage.isAssignableFrom(messageType)) { + return false; + } + } + } else { + for (int i = 0; i < handledLengthMinusVarArg; i++) { + Class handledMessage = handledMessages[i]; + Class messageType = messageTypes[i]; + + if (handledMessage != messageType) { + return false; + } + } + } + } + + // do we even HAVE var-arg? + if (!handledMessages[handledLengthMinusVarArg].isArray()) { + // DO NOT HAVE VAR_ARG PRESENT IN HANDLERS + + // fast exit + if (handledLength != messagesLength) { + return false; + } + + // compare remaining arg + Class handledMessage = handledMessages[handledLengthMinusVarArg]; + Class messageType = messageTypes[handledLengthMinusVarArg]; + + if (handler.acceptsSubtypes()) { + if (!handledMessage.isAssignableFrom(messageType)) { + return false; + } + } else { + if (handledMessage != messageType) { + return false; + } + } + // all args are dandy + return true; + } + + // WE HAVE VAR_ARG PRESENT IN HANDLER + + // do we have enough args to NEED to check the var-arg? + if (handledLengthMinusVarArg == messagesLength) { + // var-arg doesn't need checking + return true; + } + + // then check var-arg in handler + + // all the args to check for the var-arg MUST be the same! (ONLY ONE ARRAY THOUGH CAN BE PRESENT) + int messagesLengthMinusVarArg = messagesLength-1; + + Class typeCheck = messageTypes[handledLengthMinusVarArg]; + for (int i = handledLengthMinusVarArg; i < messagesLength; i++) { + Class t1 = messageTypes[i]; + if (t1 != typeCheck) { + return false; + } + } + + // if we got this far, then the args are the same type. IF we have more than one, AND they are arrays, NOPE! + if (messagesLength - handledLengthMinusVarArg > 1 && messageTypes[messagesLengthMinusVarArg].isArray()) { + return false; + } + + // are we comparing array -> array or string -> array + Class componentType; + if (messageTypes[messagesLengthMinusVarArg].isArray()) { + componentType = handledMessages[handledLengthMinusVarArg]; + } else { + componentType = handledMessages[handledLengthMinusVarArg].getComponentType(); + } + + if (handler.acceptsSubtypes()) { + return componentType.isAssignableFrom(typeCheck); + } else { + return typeCheck == componentType; + } + } } diff --git a/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java b/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java index 5a1f615..cbe2ca1 100644 --- a/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/multi/WeakConcurrentSetTest.java @@ -1,15 +1,13 @@ package net.engio.mbassy.multi; -import net.engio.mbassy.multi.common.ConcurrentExecutor; -import net.engio.mbassy.multi.common.IConcurrentSet; -import net.engio.mbassy.multi.common.WeakConcurrentSet; - -import org.junit.Before; -import org.junit.Test; - +import java.util.Collection; import java.util.HashSet; import java.util.Random; -import java.util.Set; + +import net.engio.mbassy.multi.common.ConcurrentExecutor; +import net.engio.mbassy.multi.common.WeakConcurrentSet; + +import org.junit.Test; /** * @@ -24,7 +22,7 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ @Override - protected IConcurrentSet createSet() { + protected Collection createSet() { return new WeakConcurrentSet(); } @@ -33,10 +31,10 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ // Assemble final HashSet permanentElements = new HashSet(); - final IConcurrentSet testSetWeak = createSet(); + final Collection testSetWeak = createSet(); final Random rand = new Random(); - for (int i = 0; i < numberOfElements; i++) { + for (int i = 0; i < this.numberOfElements; i++) { Object candidate = new Object(); if (rand.nextInt() % 3 == 0) { @@ -58,13 +56,13 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{ System.currentTimeMillis(); } } - }, numberOfThreads); + }, this.numberOfThreads); // the set should have cleaned up the garbage collected elements // it must still contain all of the permanent objects // since different GC mechanisms can be used (not necessarily full, stop-the-world) not all dead objects // must have been collected - assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < numberOfElements); + assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < this.numberOfElements); for (Object test : testSetWeak) { assertTrue(permanentElements.contains(test)); }