From cc9cb1544001390fd5ee49ba1879c1085ef236ec Mon Sep 17 00:00:00 2001 From: nathan Date: Sat, 6 Jun 2015 21:29:30 +0200 Subject: [PATCH] Added publish array. WIP --- .../dorkbox/util/messagebus/IMessageBus.java | 1 + .../util/messagebus/MultiMBassador.java | 67 ++++++++++++++++--- .../util/messagebus/PubSubSupport.java | 45 ++++++++----- .../common/simpleq/MessageType.java | 1 + .../simpleq/MpmcMultiTransferArrayQueue.java | 6 +- .../util/messagebus/subscription/Matcher.java | 2 + .../subscription/SubscriptionManager.java | 7 -- ...PerfTest_MpmcTransferArrayQueue_Block.java | 5 +- 8 files changed, 97 insertions(+), 37 deletions(-) diff --git a/src/main/java/dorkbox/util/messagebus/IMessageBus.java b/src/main/java/dorkbox/util/messagebus/IMessageBus.java index 2554e86..8557729 100644 --- a/src/main/java/dorkbox/util/messagebus/IMessageBus.java +++ b/src/main/java/dorkbox/util/messagebus/IMessageBus.java @@ -59,6 +59,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport; */ public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport { + enum Mode { /** * Will only publish to listeners with this exact message signature. This is the fastest diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index daccc61..8286719 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -70,8 +70,8 @@ public class MultiMBassador implements IMessageBus { case Exact: subscriptionMatcher = new Matcher() { @Override - public void publish(final Object message) throws Throwable { - subscriptionManager.publishExact(message); + public void publish(final Object message1) throws Throwable { + subscriptionManager.publishExact(message1); } @Override @@ -83,13 +83,19 @@ public class MultiMBassador implements IMessageBus { public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { subscriptionManager.publishExact(message1, message2, message3); } + + @Override + public void publish(final Object[] messages) throws Throwable { + subscriptionManager.publishExact(messages); + } + }; break; case ExactWithSuperTypes: subscriptionMatcher = new Matcher() { @Override - public void publish(final Object message) throws Throwable { - subscriptionManager.publishExactAndSuper(message); + public void publish(final Object message1) throws Throwable { + subscriptionManager.publishExactAndSuper(message1); } @Override @@ -101,24 +107,37 @@ public class MultiMBassador implements IMessageBus { public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { subscriptionManager.publishExactAndSuper(message1, message2, message3); } + + @Override + public void publish(final Object[] messages) throws Throwable { + subscriptionManager.publishExactAndSuper(messages); + } }; break; case ExactWithSuperTypesAndVarArgs: default: subscriptionMatcher = new Matcher() { @Override - public void publish(final Object message) throws Throwable { - subscriptionManager.publishAll(message); + public void publish(final Object message1) throws Throwable { + subscriptionManager.publishAll(message1); } @Override public void publish(final Object message1, final Object message2) throws Throwable { - subscriptionManager.publishAll(message1, message2); + // we don't support var-args for multiple messages (var-args can only be a single type) + subscriptionManager.publishExactAndSuper(message1, message2); } @Override public void publish(final Object message1, final Object message2, final Object message3) throws Throwable { - subscriptionManager.publishAll(message1, message2, message3); + // we don't support var-args for multiple messages (var-args can only be a single type) + subscriptionManager.publishExactAndSuper(message1, message2, message3); + } + + @Override + public void publish(final Object[] messages) throws Throwable { + // we don't support var-args for multiple messages (var-args can only be a single type) + subscriptionManager.publishExactAndSuper(messages); } }; } @@ -148,8 +167,12 @@ public class MultiMBassador implements IMessageBus { publish(node.item1, node.item2); break; } - default: { + case 3: { publish(node.item1, node.item2, node.item3); + break; + } + default: { + publish(node.item1); } } } @@ -269,6 +292,16 @@ public class MultiMBassador implements IMessageBus { } } + @Override + public void publish(final Object[] messages) { + try { + subscriptionMatcher.publish(messages); + } catch (Throwable e) { + handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) + .setPublishedObject(messages)); + } + } + @Override public void publishAsync(final Object message) { if (message != null) { @@ -313,4 +346,20 @@ public class MultiMBassador implements IMessageBus { throw new NullPointerException("Messages cannot be null."); } } + + @Override + public void publishAsync(final Object[] messages) { + if (messages != null) { + try { + this.dispatchQueue.transfer(messages, MessageType.ARRAY); + } catch (Exception e) { + handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e) + .setPublishedObject(messages)); + } + } + else { + throw new NullPointerException("Message cannot be null."); + } + } + } diff --git a/src/main/java/dorkbox/util/messagebus/PubSubSupport.java b/src/main/java/dorkbox/util/messagebus/PubSubSupport.java index d0f34a6..caf9acf 100644 --- a/src/main/java/dorkbox/util/messagebus/PubSubSupport.java +++ b/src/main/java/dorkbox/util/messagebus/PubSubSupport.java @@ -53,6 +53,14 @@ public interface PubSubSupport { */ void publish(Object message1, Object message2, Object message3); + /** + * Synchronously publish AN ARRAY of messages to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured + * to reject valid subtypes. The call returns when all matching handlers of all registered listeners have + * been notified (invoked) of the message. + */ + void publish(Object[] message); + /** * Publish the message asynchronously to all registered listeners (that match the signature). This includes * listeners defined for super types of the given message type, provided they are not configured to reject @@ -60,11 +68,9 @@ public interface PubSubSupport { * (invoked) of the message. *

*

- * The behavior of this method depends on the configured queuing strategy: - *

- *

- * If an unbound queuing strategy is used the call returns immediately. - * If a bounded queue is used the call might block until the message can be placed in the queue. + * The behavior of this method depends on availability of workers. If all workers are busy, then this method + * will block until there is an available worker. If workers are available, then this method will immediately + * return. */ void publishAsync(Object message); @@ -75,11 +81,9 @@ public interface PubSubSupport { * been notified (invoked) of the message. *

*

- * The behavior of this method depends on the configured queuing strategy: - *

- *

- * If an unbound queuing strategy is used the call returns immediately. - * If a bounded queue is used the call might block until the message can be placed in the queue. + * The behavior of this method depends on availability of workers. If all workers are busy, then this method + * will block until there is an available worker. If workers are available, then this method will immediately + * return. */ void publishAsync(Object message1, Object message2); @@ -90,11 +94,22 @@ public interface PubSubSupport { * notified (invoked) of the message. *

*

- * The behavior of this method depends on the configured queuing strategy: - *

- *

- * If an unbound queuing strategy is used the call returns immediately. - * If a bounded queue is used the call might block until the message can be placed in the queue. + * The behavior of this method depends on availability of workers. If all workers are busy, then this method + * will block until there is an available worker. If workers are available, then this method will immediately + * return. */ void publishAsync(Object message1, Object message2, Object message3); + + /** + * Publish AN ARRAY of messages asynchronously to all registered listeners (that match the signature). This + * includes listeners defined for super types of the given message type, provided they are not configured to + * reject valid subtypes. The call returns when all matching handlers of all registered listeners have been + * notified (invoked) of the message. + *

+ *

+ * The behavior of this method depends on availability of workers. If all workers are busy, then this method + * will block until there is an available worker. If workers are available, then this method will immediately + * return. + */ + void publishAsync(Object[] messages); } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/MessageType.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MessageType.java index 8a8c8d6..ddb566e 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/MessageType.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MessageType.java @@ -4,4 +4,5 @@ public class MessageType { public static final int ONE = 1; public static final int TWO = 2; public static final int THREE = 3; + public static final int ARRAY = 4; } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java index 05b1f0d..bc1a8cb 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcMultiTransferArrayQueue.java @@ -60,7 +60,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue { * The item can be a single object (MessageType.ONE), or an array object (MessageType.ARRAY) *

*/ - public void transfer(final Object item) throws InterruptedException { + public void transfer(final Object item, final int messageType) throws InterruptedException { // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final Object[] buffer = this.buffer; @@ -106,7 +106,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue { spType(node, TYPE_PRODUCER); spThread(node, myThread); - spMessageType(node, MessageType.ONE); + spMessageType(node, messageType); spItem1(node, item); @@ -148,7 +148,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue { // (seeing this value from a consumer will lead to retry 2) soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore - spMessageType(e, MessageType.ONE); + spMessageType(e, messageType); spItem1(e, item); unpark(e); // StoreStore diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java b/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java index 1c7514a..6b1fc5c 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Matcher.java @@ -6,4 +6,6 @@ public interface Matcher { void publish(Object message1, Object message2) throws Throwable; void publish(Object message1, Object message2, Object message3) throws Throwable; + + void publish(Object[] messages) throws Throwable; } diff --git a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java index afea1e5..ec9c9e3 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/SubscriptionManager.java @@ -672,13 +672,6 @@ public final class SubscriptionManager { } } - public void publishAll(final Object message1, final Object message2) throws Throwable { - } - - public void publishAll(final Object message1, final Object message2, final Object message3) throws Throwable { - } - - // public static final Comparator SubscriptionByPriorityDesc = new Comparator() { // @Override diff --git a/src/test/java/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcTransferArrayQueue_Block.java b/src/test/java/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcTransferArrayQueue_Block.java index afd79ca..758fc3f 100644 --- a/src/test/java/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcTransferArrayQueue_Block.java +++ b/src/test/java/dorkbox/util/messagebus/queuePerf/PerfTest_MpmcTransferArrayQueue_Block.java @@ -1,5 +1,6 @@ package dorkbox.util.messagebus.queuePerf; +import dorkbox.util.messagebus.common.simpleq.MessageType; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MultiNode; @@ -132,12 +133,10 @@ public class PerfTest_MpmcTransferArrayQueue_Block { try { do { - producer.transfer(TEST_VALUE); + producer.transfer(TEST_VALUE, MessageType.ONE); } while (0 != --i); } catch (InterruptedException e) { - // TODO Auto-generated catch block e.printStackTrace(); - // log.error(e); } } }