Added publish array. WIP

This commit is contained in:
nathan 2015-06-06 21:29:30 +02:00
parent 4bd2b24f25
commit cc9cb15440
8 changed files with 97 additions and 37 deletions

View File

@ -59,6 +59,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
*/
public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport {
enum Mode {
/**
* Will only publish to listeners with this exact message signature. This is the fastest

View File

@ -70,8 +70,8 @@ public class MultiMBassador implements IMessageBus {
case Exact:
subscriptionMatcher = new Matcher() {
@Override
public void publish(final Object message) throws Throwable {
subscriptionManager.publishExact(message);
public void publish(final Object message1) throws Throwable {
subscriptionManager.publishExact(message1);
}
@Override
@ -83,13 +83,19 @@ public class MultiMBassador implements IMessageBus {
public void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
subscriptionManager.publishExact(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) throws Throwable {
subscriptionManager.publishExact(messages);
}
};
break;
case ExactWithSuperTypes:
subscriptionMatcher = new Matcher() {
@Override
public void publish(final Object message) throws Throwable {
subscriptionManager.publishExactAndSuper(message);
public void publish(final Object message1) throws Throwable {
subscriptionManager.publishExactAndSuper(message1);
}
@Override
@ -101,24 +107,37 @@ public class MultiMBassador implements IMessageBus {
public void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
subscriptionManager.publishExactAndSuper(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) throws Throwable {
subscriptionManager.publishExactAndSuper(messages);
}
};
break;
case ExactWithSuperTypesAndVarArgs:
default:
subscriptionMatcher = new Matcher() {
@Override
public void publish(final Object message) throws Throwable {
subscriptionManager.publishAll(message);
public void publish(final Object message1) throws Throwable {
subscriptionManager.publishAll(message1);
}
@Override
public void publish(final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishAll(message1, message2);
// we don't support var-args for multiple messages (var-args can only be a single type)
subscriptionManager.publishExactAndSuper(message1, message2);
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
subscriptionManager.publishAll(message1, message2, message3);
// we don't support var-args for multiple messages (var-args can only be a single type)
subscriptionManager.publishExactAndSuper(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) throws Throwable {
// we don't support var-args for multiple messages (var-args can only be a single type)
subscriptionManager.publishExactAndSuper(messages);
}
};
}
@ -148,8 +167,12 @@ public class MultiMBassador implements IMessageBus {
publish(node.item1, node.item2);
break;
}
default: {
case 3: {
publish(node.item1, node.item2, node.item3);
break;
}
default: {
publish(node.item1);
}
}
}
@ -269,6 +292,16 @@ public class MultiMBassador implements IMessageBus {
}
}
@Override
public void publish(final Object[] messages) {
try {
subscriptionMatcher.publish(messages);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(messages));
}
}
@Override
public void publishAsync(final Object message) {
if (message != null) {
@ -313,4 +346,20 @@ public class MultiMBassador implements IMessageBus {
throw new NullPointerException("Messages cannot be null.");
}
}
@Override
public void publishAsync(final Object[] messages) {
if (messages != null) {
try {
this.dispatchQueue.transfer(messages, MessageType.ARRAY);
} catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(messages));
}
}
else {
throw new NullPointerException("Message cannot be null.");
}
}
}

View File

@ -53,6 +53,14 @@ public interface PubSubSupport {
*/
void publish(Object message1, Object message2, Object message3);
/**
* Synchronously publish <b>AN ARRAY</b> of messages to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured
* to reject valid subtypes. The call returns when all matching handlers of all registered listeners have
* been notified (invoked) of the message.
*/
void publish(Object[] message);
/**
* Publish the message asynchronously to all registered listeners (that match the signature). This includes
* listeners defined for super types of the given message type, provided they are not configured to reject
@ -60,11 +68,9 @@ public interface PubSubSupport {
* (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
* The behavior of this method depends on availability of workers. If all workers are busy, then this method
* will block until there is an available worker. If workers are available, then this method will immediately
* return.
*/
void publishAsync(Object message);
@ -75,11 +81,9 @@ public interface PubSubSupport {
* been notified (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
* The behavior of this method depends on availability of workers. If all workers are busy, then this method
* will block until there is an available worker. If workers are available, then this method will immediately
* return.
*/
void publishAsync(Object message1, Object message2);
@ -90,11 +94,22 @@ public interface PubSubSupport {
* notified (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on the configured queuing strategy:
* <p>
* <p>
* If an unbound queuing strategy is used the call returns immediately.
* If a bounded queue is used the call might block until the message can be placed in the queue.
* The behavior of this method depends on availability of workers. If all workers are busy, then this method
* will block until there is an available worker. If workers are available, then this method will immediately
* return.
*/
void publishAsync(Object message1, Object message2, Object message3);
/**
* Publish <b>AN ARRAY</b> of messages asynchronously to all registered listeners (that match the signature). This
* includes listeners defined for super types of the given message type, provided they are not configured to
* reject valid subtypes. The call returns when all matching handlers of all registered listeners have been
* notified (invoked) of the message.
* <p>
* <p>
* The behavior of this method depends on availability of workers. If all workers are busy, then this method
* will block until there is an available worker. If workers are available, then this method will immediately
* return.
*/
void publishAsync(Object[] messages);
}

View File

@ -4,4 +4,5 @@ public class MessageType {
public static final int ONE = 1;
public static final int TWO = 2;
public static final int THREE = 3;
public static final int ARRAY = 4;
}

View File

@ -60,7 +60,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
* The item can be a single object (MessageType.ONE), or an array object (MessageType.ARRAY)
* </p>
*/
public void transfer(final Object item) throws InterruptedException {
public void transfer(final Object item, final int messageType) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
@ -106,7 +106,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
spType(node, TYPE_PRODUCER);
spThread(node, myThread);
spMessageType(node, MessageType.ONE);
spMessageType(node, messageType);
spItem1(node, item);
@ -148,7 +148,7 @@ public final class MpmcMultiTransferArrayQueue extends MpmcArrayQueue<Object> {
// (seeing this value from a consumer will lead to retry 2)
soSequence(sBuffer, cSeqOffset, mask + newConsumerIndex); // StoreStore
spMessageType(e, MessageType.ONE);
spMessageType(e, messageType);
spItem1(e, item);
unpark(e); // StoreStore

View File

@ -6,4 +6,6 @@ public interface Matcher {
void publish(Object message1, Object message2) throws Throwable;
void publish(Object message1, Object message2, Object message3) throws Throwable;
void publish(Object[] messages) throws Throwable;
}

View File

@ -672,13 +672,6 @@ public final class SubscriptionManager {
}
}
public void publishAll(final Object message1, final Object message2) throws Throwable {
}
public void publishAll(final Object message1, final Object message2, final Object message3) throws Throwable {
}
// public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
// @Override

View File

@ -1,5 +1,6 @@
package dorkbox.util.messagebus.queuePerf;
import dorkbox.util.messagebus.common.simpleq.MessageType;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
@ -132,12 +133,10 @@ public class PerfTest_MpmcTransferArrayQueue_Block {
try {
do {
producer.transfer(TEST_VALUE);
producer.transfer(TEST_VALUE, MessageType.ONE);
} while (0 != --i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// log.error(e);
}
}
}