Tweaked publication/dispatch and which threads (if async) get the subscriptions for the messages (previously, the calling thread did this. NOW whatever thread the dispatcher is in does this

This commit is contained in:
nathan 2016-02-08 02:12:08 +01:00
parent 95697164f5
commit 4686461bc7
15 changed files with 461 additions and 399 deletions

View File

@ -100,7 +100,7 @@ import dorkbox.messagebus.error.IPublicationErrorHandler;
*/
public interface IMessageBus extends PubSubSupport {
enum PublishMode {
enum DispatchMode {
/**
* Will only publish to listeners with this exact message signature. This is the fastest
*/

View File

@ -17,9 +17,9 @@ package dorkbox.messagebus;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.IPublicationErrorHandler;
import dorkbox.messagebus.publication.Publisher;
import dorkbox.messagebus.publication.PublisherExact;
import dorkbox.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.messagebus.dispatch.Dispatch;
import dorkbox.messagebus.dispatch.DispatchExact;
import dorkbox.messagebus.dispatch.DispatchExactWithSuperTypes;
import dorkbox.messagebus.subscription.SubscriptionManager;
import dorkbox.messagebus.synchrony.AsyncABQ;
import dorkbox.messagebus.synchrony.AsyncABQ_noGc;
@ -106,7 +106,7 @@ class MessageBus implements IMessageBus {
private final SubscriptionManager subscriptionManager;
private final Publisher publisher;
private final Dispatch dispatch;
private final Synchrony syncPublication;
private final Synchrony asyncPublication;
@ -124,26 +124,26 @@ class MessageBus implements IMessageBus {
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBus(int numberOfThreads) {
this(PublishMode.ExactWithSuperTypes, numberOfThreads);
MessageBus(final int numberOfThreads) {
this(DispatchMode.ExactWithSuperTypes, numberOfThreads);
}
/**
* By default, will use half of CPUs available for dispatching async messages
*
* @param publishMode Specifies which publishMode to operate the publication of messages.
* @param dispatchMode Specifies which publishMode to operate the publication of messages.
*/
public
MessageBus(final PublishMode publishMode) {
this(publishMode, Runtime.getRuntime().availableProcessors());
MessageBus(final DispatchMode dispatchMode) {
this(dispatchMode, Runtime.getRuntime().availableProcessors());
}
/**
* @param publishMode Specifies which publishMode to operate the publication of messages.
* @param dispatchMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to use for dispatching async messages
*/
public
MessageBus(final PublishMode publishMode, int numberOfThreads) {
MessageBus(final DispatchMode dispatchMode, int numberOfThreads) {
// round to the nearest power of 2
numberOfThreads = 1 << (32 - Integer.numberOfLeadingZeros(getMinNumberOfThreads(numberOfThreads) - 1));
@ -154,22 +154,22 @@ class MessageBus implements IMessageBus {
*/
this.subscriptionManager = new SubscriptionManager(useStrongReferencesByDefault);
switch (publishMode) {
switch (dispatchMode) {
case Exact:
publisher = new PublisherExact(subscriptionManager);
dispatch = new DispatchExact(errorHandler, subscriptionManager);
break;
case ExactWithSuperTypes:
default:
publisher = new PublisherExactWithSuperTypes(errorHandler, subscriptionManager);
dispatch = new DispatchExactWithSuperTypes(errorHandler, subscriptionManager);
break;
}
syncPublication = new Sync(errorHandler, subscriptionManager);
syncPublication = new Sync();
// the disruptor is preferred, but if it cannot be loaded -- we want to try to continue working, hence the use of ArrayBlockingQueue
if (useDisruptorForAsyncPublish) {
asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, syncPublication);
asyncPublication = new AsyncDisruptor(numberOfThreads, errorHandler, syncPublication, subscriptionManager);
} else {
if (useZeroGarbageVersionOfABQ) {
// no garbage is created, but this is slow (but faster than other messagebus implementations)
@ -239,8 +239,8 @@ class MessageBus implements IMessageBus {
*/
@Override
public
void publish(final Object message) {
publisher.publish(syncPublication, message);
void publish(final Object message1) {
syncPublication.publish(dispatch, message1);
}
@ -253,7 +253,7 @@ class MessageBus implements IMessageBus {
@Override
public
void publish(final Object message1, final Object message2) {
publisher.publish(syncPublication, message1, message2);
syncPublication.publish(dispatch, message1, message2);
}
@ -266,7 +266,7 @@ class MessageBus implements IMessageBus {
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
publisher.publish(syncPublication, message1, message2, message3);
syncPublication.publish(dispatch, message1, message2, message3);
}
@ -278,7 +278,7 @@ class MessageBus implements IMessageBus {
@Override
public
void publishAsync(final Object message) {
publisher.publish(asyncPublication, message);
asyncPublication.publish(dispatch, message);
}
@ -290,7 +290,7 @@ class MessageBus implements IMessageBus {
@Override
public
void publishAsync(final Object message1, final Object message2) {
publisher.publish(asyncPublication, message1, message2);
asyncPublication.publish(dispatch, message1, message2);
}
@ -302,7 +302,7 @@ class MessageBus implements IMessageBus {
@Override
public
void publishAsync(final Object message1, final Object message2, final Object message3) {
publisher.publish(asyncPublication, message1, message2, message3);
asyncPublication.publish(dispatch, message1, message2, message3);
}

View File

@ -13,12 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messagebus.publication;
package dorkbox.messagebus.dispatch;
import dorkbox.messagebus.synchrony.Synchrony;
public interface Publisher {
void publish(final Synchrony synchrony, Object message1);
void publish(final Synchrony synchrony, Object message1, Object message2);
void publish(final Synchrony synchrony, Object message1, Object message2, Object message3);
public interface Dispatch {
void publish(Object message1);
void publish(Object message1, Object message2);
void publish(Object message1, Object message2, Object message3);
}

View File

@ -0,0 +1,153 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messagebus.dispatch;
import dorkbox.messagebus.error.DeadMessage;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager;
/**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*/
@SuppressWarnings("Duplicates")
public
class DispatchExact implements Dispatch {
private final ErrorHandler errorHandler;
private final SubscriptionManager subManager;
public
DispatchExact(final ErrorHandler errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subManager = subManager;
}
@Override
public
void publish(final Object message1) {
final ErrorHandler errorHandler = this.errorHandler;
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
}
@Override
public
void publish(final Object message1, final Object message2) {
final ErrorHandler errorHandler = this.errorHandler;
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
}
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
final ErrorHandler errorHandler = this.errorHandler;
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
}
}

View File

@ -0,0 +1,179 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messagebus.dispatch;
import dorkbox.messagebus.error.DeadMessage;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager;
@SuppressWarnings("Duplicates")
public
class DispatchExactWithSuperTypes implements Dispatch {
private final ErrorHandler errorHandler;
private final SubscriptionManager subManager;
public
DispatchExactWithSuperTypes(final ErrorHandler errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subManager = subManager;
}
@Override
public
void publish(final Object message1) {
final ErrorHandler errorHandler = this.errorHandler;
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1); // NOT return null
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
}
@Override
public
void publish(final Object message1, final Object message2) {
final ErrorHandler errorHandler = this.errorHandler;
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
}
@Override
public
void publish(final Object message1, final Object message2, final Object message3) {
final ErrorHandler errorHandler = this.errorHandler;
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if ((subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
}
}

View File

@ -1,66 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messagebus.publication;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager;
import dorkbox.messagebus.synchrony.Synchrony;
/**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*/
@SuppressWarnings("Duplicates")
public
class PublisherExact implements Publisher {
private final SubscriptionManager subManager;
public
PublisherExact(final SubscriptionManager subManager) {
this.subManager = subManager;
}
@Override
public
void publish(final Synchrony synchrony, final Object message1) {
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass); // can return null
synchrony.publish(subscriptions, null, message1);
}
@Override
public
void publish(final Synchrony synchrony, final Object message1, final Object message2) {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null
synchrony.publish(subscriptions, null, message1, message2);
}
@Override
public
void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null
synchrony.publish(subscriptions, null, message1, message2, message3);
}
}

View File

@ -1,72 +0,0 @@
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.messagebus.publication;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager;
import dorkbox.messagebus.synchrony.Synchrony;
@SuppressWarnings("Duplicates")
public
class PublisherExactWithSuperTypes implements Publisher {
private final ErrorHandler errorHandler;
private final SubscriptionManager subManager;
public
PublisherExactWithSuperTypes(final ErrorHandler errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subManager = subManager;
}
@Override
public
void publish(final Synchrony synchrony, final Object message1) {
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1); // can return null
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1); // NOT return null
synchrony.publish(subscriptions, superSubscriptions, message1);
}
@Override
public
void publish(final Synchrony synchrony, final Object message1, final Object message2) {
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2); // can return null
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2); // NOT return null
synchrony.publish(subscriptions, superSubscriptions, message1, message2);
}
@Override
public
void publish(final Synchrony synchrony, final Object message1, final Object message2, final Object message3) {
final SubscriptionManager subManager = this.subManager;
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subManager.getSubs(messageClass1, messageClass2, messageClass3); // can return null
final Subscription[] superSubscriptions = subManager.getSuperSubs(messageClass1, messageClass2, messageClass3); // NOT return null
synchrony.publish(subscriptions, superSubscriptions, message1, message2, message3);
}
}

View File

@ -18,7 +18,7 @@ package dorkbox.messagebus.synchrony;
import dorkbox.messagebus.common.NamedThreadFactory;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.dispatch.Dispatch;
import dorkbox.messagebus.synchrony.disruptor.MessageType;
import java.util.ArrayDeque;
@ -70,7 +70,7 @@ class AsyncABQ implements Synchrony {
final ErrorHandler errorHandler1 = errorHandler;
while (!AsyncABQ.this.shuttingDown) {
process(IN_QUEUE, syncPublication1, errorHandler1);
process(IN_QUEUE, errorHandler1);
}
synchronized (shutdown) {
@ -92,12 +92,11 @@ class AsyncABQ implements Synchrony {
@SuppressWarnings("Duplicates")
private
void process(final ArrayBlockingQueue<MessageHolder> queue, final Synchrony sync, final ErrorHandler errorHandler) {
void process(final ArrayBlockingQueue<MessageHolder> queue, final ErrorHandler errorHandler) {
MessageHolder event;
int messageType = MessageType.ONE;
Subscription[] subs;
Subscription[] superSubs;
Dispatch dispatch;
Object message1 = null;
Object message2 = null;
Object message3 = null;
@ -106,23 +105,22 @@ class AsyncABQ implements Synchrony {
event = queue.take();
messageType = event.type;
subs = event.subs;
superSubs = event.superSubs;
dispatch = event.dispatch;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
switch (messageType) {
case MessageType.ONE: {
sync.publish(subs, superSubs, message1);
dispatch.publish(message1);
return;
}
case MessageType.TWO: {
sync.publish(subs, superSubs, message1, message2);
dispatch.publish(message1, message2);
return;
}
case MessageType.THREE: {
sync.publish(subs, superSubs, message1, message2, message3);
dispatch.publish(message1, message2, message3);
//noinspection UnnecessaryReturnStatement
return;
}
@ -156,16 +154,16 @@ class AsyncABQ implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
MessageHolder take = new MessageHolder();
void publish(final Dispatch dispatch, final Object message1) {
MessageHolder job = new MessageHolder();
take.type = MessageType.ONE;
take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1;
job.type = MessageType.ONE;
job.dispatch = dispatch;
job.message1 = message1;
try {
this.dispatchQueue.put(take);
this.dispatchQueue.put(job);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
@ -175,17 +173,17 @@ class AsyncABQ implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) {
MessageHolder take = new MessageHolder();
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
MessageHolder job = new MessageHolder();
take.type = MessageType.TWO;
take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1;
take.message2 = message2;
job.type = MessageType.TWO;
job.dispatch = dispatch;
job.message1 = message1;
job.message2 = message2;
try {
this.dispatchQueue.put(take);
this.dispatchQueue.put(job);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
@ -195,18 +193,18 @@ class AsyncABQ implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
MessageHolder take = new MessageHolder();
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
MessageHolder job = new MessageHolder();
take.type = MessageType.THREE;
take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1;
take.message2 = message2;
take.message3 = message3;
job.type = MessageType.THREE;
job.dispatch = dispatch;
job.message1 = message1;
job.message2 = message2;
job.message3 = message3;
try {
this.dispatchQueue.put(take);
this.dispatchQueue.put(job);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)

View File

@ -18,7 +18,7 @@ package dorkbox.messagebus.synchrony;
import dorkbox.messagebus.common.NamedThreadFactory;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.error.PublicationError;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.dispatch.Dispatch;
import dorkbox.messagebus.synchrony.disruptor.MessageType;
import java.util.ArrayDeque;
@ -82,7 +82,7 @@ class AsyncABQ_noGc implements Synchrony {
final ErrorHandler errorHandler1 = errorHandler;
while (!AsyncABQ_noGc.this.shuttingDown) {
process(IN_QUEUE, OUT_QUEUE, syncPublication1, errorHandler1);
process(IN_QUEUE, OUT_QUEUE, errorHandler1);
}
synchronized (shutdown) {
@ -105,15 +105,12 @@ class AsyncABQ_noGc implements Synchrony {
@SuppressWarnings("Duplicates")
private
void process(final ArrayBlockingQueue<MessageHolder> queue,
final ArrayBlockingQueue<MessageHolder> gcQueue,
final Synchrony sync,
final ErrorHandler errorHandler) {
final ArrayBlockingQueue<MessageHolder> gcQueue, final ErrorHandler errorHandler) {
MessageHolder event;
int messageType = MessageType.ONE;
Subscription[] subs;
Subscription[] superSubs;
Dispatch dispatch;
Object message1 = null;
Object message2 = null;
Object message3 = null;
@ -122,8 +119,7 @@ class AsyncABQ_noGc implements Synchrony {
event = queue.take();
messageType = event.type;
subs = event.subs;
superSubs = event.superSubs;
dispatch = event.dispatch;
message1 = event.message1;
message2 = event.message2;
message3 = event.message3;
@ -132,15 +128,15 @@ class AsyncABQ_noGc implements Synchrony {
switch (messageType) {
case MessageType.ONE: {
sync.publish(subs, superSubs, message1);
dispatch.publish(message1);
return;
}
case MessageType.TWO: {
sync.publish(subs, superSubs, message1, message2);
dispatch.publish(message1, message2);
return;
}
case MessageType.THREE: {
sync.publish(subs, superSubs, message1, message2, message3);
dispatch.publish(message1, message2, message3);
//noinspection UnnecessaryReturnStatement
return;
}
@ -174,15 +170,16 @@ class AsyncABQ_noGc implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
void publish(final Dispatch dispatch, final Object message1) {
try {
MessageHolder take = gcQueue.take();
MessageHolder job = gcQueue.take();
take.type = MessageType.ONE;
take.subs = subscriptions;
take.message1 = message1;
job.type = MessageType.ONE;
job.dispatch = dispatch;
this.dispatchQueue.put(take);
job.message1 = message1;
this.dispatchQueue.put(job);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
@ -192,17 +189,17 @@ class AsyncABQ_noGc implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) {
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
try {
MessageHolder take = gcQueue.take();
MessageHolder job = gcQueue.take();
take.type = MessageType.TWO;
take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1;
take.message2 = message2;
job.type = MessageType.TWO;
job.dispatch = dispatch;
this.dispatchQueue.put(take);
job.message1 = message1;
job.message2 = message2;
this.dispatchQueue.put(job);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)
@ -212,18 +209,18 @@ class AsyncABQ_noGc implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
try {
MessageHolder take = gcQueue.take();
MessageHolder job = gcQueue.take();
take.type = MessageType.THREE;
take.subs = subscriptions;
take.superSubs = superSubscriptions;
take.message1 = message1;
take.message2 = message2;
take.message3 = message3;
job.type = MessageType.THREE;
job.dispatch = dispatch;
this.dispatchQueue.put(take);
job.message1 = message1;
job.message2 = message2;
job.message3 = message3;
this.dispatchQueue.put(job);
} catch (InterruptedException e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Interrupted error during message queue.")
.setCause(e)

View File

@ -25,7 +25,8 @@ import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkProcessor;
import dorkbox.messagebus.common.NamedThreadFactory;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.dispatch.Dispatch;
import dorkbox.messagebus.subscription.SubscriptionManager;
import dorkbox.messagebus.synchrony.disruptor.EventBusFactory;
import dorkbox.messagebus.synchrony.disruptor.MessageHandler;
import dorkbox.messagebus.synchrony.disruptor.MessageType;
@ -53,7 +54,7 @@ class AsyncDisruptor implements Synchrony {
private final Sequence workSequence;
public
AsyncDisruptor(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication) {
AsyncDisruptor(final int numberOfThreads, final ErrorHandler errorHandler, final Synchrony syncPublication, final SubscriptionManager subManager) {
// Now we setup the disruptor and work handlers
ExecutorService executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
@ -67,7 +68,7 @@ class AsyncDisruptor implements Synchrony {
// setup the work handlers
handlers = new MessageHandler[numberOfThreads];
for (int i = 0; i < handlers.length; i++) {
handlers[i] = new MessageHandler(syncPublication); // exactly one per thread is used
handlers[i] = new MessageHandler(); // exactly one per thread is used
}
@ -126,13 +127,14 @@ class AsyncDisruptor implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
void publish(final Dispatch dispatch, final Object message1) {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.ONE;
job.subs = subscriptions;
job.superSubs = superSubscriptions;
job.dispatch = dispatch;
job.message1 = message1;
ringBuffer.publish(seq);
@ -140,13 +142,14 @@ class AsyncDisruptor implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2) {
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.TWO;
job.subs = subscriptions;
job.superSubs = superSubscriptions;
job.dispatch = dispatch;
job.message1 = message1;
job.message2 = message2;
@ -155,13 +158,14 @@ class AsyncDisruptor implements Synchrony {
@Override
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
long seq = ringBuffer.next();
MessageHolder job = ringBuffer.get(seq);
job.type = MessageType.THREE;
job.subs = subscriptions;
job.superSubs = superSubscriptions;
job.dispatch = dispatch;
job.message1 = message1;
job.message3 = message2;
job.message2 = message3;

View File

@ -15,7 +15,7 @@
*/
package dorkbox.messagebus.synchrony;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.dispatch.Dispatch;
import dorkbox.messagebus.synchrony.disruptor.MessageType;
/**
@ -24,8 +24,7 @@ import dorkbox.messagebus.synchrony.disruptor.MessageType;
public
class MessageHolder {
public int type = MessageType.ONE;
public Subscription[] subs;
public Subscription[] superSubs;
public Dispatch dispatch = null;
public Object message1 = null;
public Object message2 = null;

View File

@ -15,155 +15,35 @@
*/
package dorkbox.messagebus.synchrony;
import dorkbox.messagebus.error.DeadMessage;
import dorkbox.messagebus.error.ErrorHandler;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.subscription.SubscriptionManager;
import dorkbox.messagebus.dispatch.Dispatch;
/**
* By default, it is the calling thread that has to get the subscriptions, which the sync/async logic then uses.
*
* The exception to this rule is when checking/calling DeadMessage publication.
*
* @author dorkbox, llc Date: 2/2/15
*/
@SuppressWarnings("Duplicates")
public final
class Sync implements Synchrony {
private final ErrorHandler errorHandler;
private final SubscriptionManager subManager;
public
Sync(final ErrorHandler errorHandler, final SubscriptionManager subManager) {
this.errorHandler = errorHandler;
this.subManager = subManager;
Sync() {
}
public
void publish(final Subscription[] subscriptions, final Subscription[] superSubscriptions, final Object message1) {
final ErrorHandler errorHandler = this.errorHandler;
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
// the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL)
if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
void publish(final Dispatch dispatch, final Object message1) {
dispatch.publish(message1);
}
@Override
public
void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2) {
final ErrorHandler errorHandler = this.errorHandler;
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
// the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL)
if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
void publish(final Dispatch dispatch, final Object message1, final Object message2) {
dispatch.publish(message1, message2);
}
@Override
public
void publish(final Subscription[] subscriptions, Subscription[] superSubscriptions, final Object message1, final Object message2, final Object message3) {
final ErrorHandler errorHandler = this.errorHandler;
Subscription sub;
int subLength;
boolean hasSubs = false;
// Run subscriptions. if the subscriptions are NULL or length == 0, it means we don't have any that were ever subscribed.
if (subscriptions != null && (subLength = subscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = subscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
// the only time superSubscriptions is NULL, is if we are not publishing with superSubscriptions (otherwise it is non-NULL)
if (superSubscriptions != null && (subLength = superSubscriptions.length) > 0) {
// even though they are non-null, and have length > 0 --- it is still possible the subscription was REMOVED at some point.
// so there won't be any object/method this publishes to AND there won't be any "dead messages" triggered
for (int i = 0; i < subLength; i++) {
sub = superSubscriptions[i];
hasSubs |= sub.publish(errorHandler, message1, message2, message3);
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subManager.getSubs(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(errorHandler, deadMessage);
}
}
}
void publish(final Dispatch dispatch, final Object message1, final Object message2, final Object message3) {
dispatch.publish(message1, message2, message3);
}
@Override

View File

@ -15,16 +15,16 @@
*/
package dorkbox.messagebus.synchrony;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.dispatch.Dispatch;
/**
* @author dorkbox, llc Date: 2/3/16
*/
public
interface Synchrony {
void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1);
void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2);
void publish(Subscription[] subscriptions, Subscription[] superSubscriptions, Object message1, Object message2, Object message3);
void publish(Dispatch dispatch, Object message1);
void publish(Dispatch dispatch, Object message1, Object message2);
void publish(Dispatch dispatch, Object message1, Object message2, Object message3);
void shutdown();
boolean hasPendingMessages();

View File

@ -17,9 +17,7 @@ package dorkbox.messagebus.synchrony.disruptor;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import dorkbox.messagebus.subscription.Subscription;
import dorkbox.messagebus.synchrony.MessageHolder;
import dorkbox.messagebus.synchrony.Synchrony;
import java.util.concurrent.atomic.AtomicBoolean;
@ -29,34 +27,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
public
class MessageHandler implements WorkHandler<MessageHolder>, LifecycleAware {
private final Synchrony syncPublication;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public
MessageHandler(final Synchrony syncPublication) {
this.syncPublication = syncPublication;
MessageHandler() {
}
@SuppressWarnings("Duplicates")
@Override
public
void onEvent(final MessageHolder event) throws Exception {
final int messageType = event.type;
final Subscription[] subs = event.subs;
final Subscription[] superSubs = event.superSubs;
switch (messageType) {
case MessageType.ONE: {
syncPublication.publish(subs, superSubs, event.message1);
event.dispatch.publish(event.message1);
return;
}
case MessageType.TWO: {
syncPublication.publish(subs, superSubs, event.message1, event.message2);
event.dispatch.publish(event.message1, event.message2);
return;
}
case MessageType.THREE: {
syncPublication.publish(subs, superSubs, event.message1, event.message2, event.message3);
event.dispatch.publish(event.message1, event.message2, event.message3);
//noinspection UnnecessaryReturnStatement
return;
}

View File

@ -22,7 +22,7 @@ public class MultiMessageTest extends MessageBusTest {
@Test
public void testMultiMessageSendingExact() {
IMessageBus bus = new MessageBus(IMessageBus.PublishMode.Exact,
IMessageBus bus = new MessageBus(IMessageBus.DispatchMode.Exact,
Runtime.getRuntime()
.availableProcessors() / 2);
MultiListener listener1 = new MultiListener();
@ -53,7 +53,7 @@ public class MultiMessageTest extends MessageBusTest {
@Test
public void testMultiMessageSendingExactAndSuper() {
IMessageBus bus = new MessageBus(IMessageBus.PublishMode.ExactWithSuperTypes,
IMessageBus bus = new MessageBus(IMessageBus.DispatchMode.ExactWithSuperTypes,
Runtime.getRuntime()
.availableProcessors() / 2);
MultiListener listener1 = new MultiListener();