WIP getting 2/3 super vararg implemented

This commit is contained in:
nathan 2015-06-07 00:55:32 +02:00
parent cc9cb15440
commit 56728b327d
9 changed files with 430 additions and 271 deletions

View File

@ -60,7 +60,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport {
enum Mode {
enum PublishMode {
/**
* Will only publish to listeners with this exact message signature. This is the fastest
*/

View File

@ -0,0 +1,27 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class MatcherAll implements Matcher {
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable {
subscriptionManager.publishAll(message1);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishAll(message1, message2);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3)
throws Throwable {
subscriptionManager.publishAll(message1, message2, message3);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable {
subscriptionManager.publishAll(messages);
}
}

View File

@ -0,0 +1,27 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class MatcherExact implements Matcher {
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable {
subscriptionManager.publishExact(message1);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishExact(message1, message2);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3)
throws Throwable {
subscriptionManager.publishExact(message1, message2, message3);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable {
subscriptionManager.publishExact(messages);
}
}

View File

@ -0,0 +1,27 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class MatcherExactWithSuperTypes implements Matcher {
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable {
subscriptionManager.publishExactAndSuper(message1);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishExactAndSuper(message1, message2);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3)
throws Throwable {
subscriptionManager.publishExactAndSuper(message1, message2, message3);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable {
subscriptionManager.publishExactAndSuper(messages);
}
}

View File

@ -51,14 +51,14 @@ public class MultiMBassador implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MultiMBassador(int numberOfThreads) {
this(Mode.ExactWithSuperTypes, numberOfThreads);
this(PublishMode.ExactWithSuperTypesAndVarArgs, numberOfThreads);
}
/**
* @param mode Specifies which mode to operate the publication of messages.
* @param publishMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MultiMBassador(Mode mode, int numberOfThreads) {
public MultiMBassador(final PublishMode publishMode, int numberOfThreads) {
if (numberOfThreads < 2) {
numberOfThreads = 2; // at LEAST 2 threads
}
@ -66,80 +66,16 @@ public class MultiMBassador implements IMessageBus {
this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
switch (mode) {
switch (publishMode) {
case Exact:
subscriptionMatcher = new Matcher() {
@Override
public void publish(final Object message1) throws Throwable {
subscriptionManager.publishExact(message1);
}
@Override
public void publish(final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishExact(message1, message2);
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
subscriptionManager.publishExact(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) throws Throwable {
subscriptionManager.publishExact(messages);
}
};
subscriptionMatcher = new MatcherExact();
break;
case ExactWithSuperTypes:
subscriptionMatcher = new Matcher() {
@Override
public void publish(final Object message1) throws Throwable {
subscriptionManager.publishExactAndSuper(message1);
}
@Override
public void publish(final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishExactAndSuper(message1, message2);
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
subscriptionManager.publishExactAndSuper(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) throws Throwable {
subscriptionManager.publishExactAndSuper(messages);
}
};
subscriptionMatcher = new MatcherExactWithSuperTypes();
break;
case ExactWithSuperTypesAndVarArgs:
default:
subscriptionMatcher = new Matcher() {
@Override
public void publish(final Object message1) throws Throwable {
subscriptionManager.publishAll(message1);
}
@Override
public void publish(final Object message1, final Object message2) throws Throwable {
// we don't support var-args for multiple messages (var-args can only be a single type)
subscriptionManager.publishExactAndSuper(message1, message2);
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) throws Throwable {
// we don't support var-args for multiple messages (var-args can only be a single type)
subscriptionManager.publishExactAndSuper(message1, message2, message3);
}
@Override
public void publish(final Object[] messages) throws Throwable {
// we don't support var-args for multiple messages (var-args can only be a single type)
subscriptionManager.publishExactAndSuper(messages);
}
};
subscriptionMatcher = new MatcherAll();
}
this.threads = new ArrayDeque<Thread>(numberOfThreads);
@ -191,11 +127,17 @@ public class MultiMBassador implements IMessageBus {
.setCause(e).setPublishedObject(node.item1, node.item2));
break;
}
default: {
case 3: {
handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e)
.setPublishedObject(node.item1, node.item2, node.item3));
break;
}
default: {
handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1));
}
}
}
@ -265,7 +207,7 @@ public class MultiMBassador implements IMessageBus {
@Override
public void publish(final Object message) {
try {
subscriptionMatcher.publish(message);
subscriptionMatcher.publish(subscriptionManager, message);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message));
@ -275,7 +217,7 @@ public class MultiMBassador implements IMessageBus {
@Override
public void publish(final Object message1, final Object message2) {
try {
subscriptionMatcher.publish(message1, message2);
subscriptionMatcher.publish(subscriptionManager, message1, message2);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2));
@ -285,7 +227,7 @@ public class MultiMBassador implements IMessageBus {
@Override
public void publish(final Object message1, final Object message2, final Object message3) {
try {
subscriptionMatcher.publish(message1, message2, message3);
subscriptionMatcher.publish(subscriptionManager, message1, message2, message3);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3));
@ -295,7 +237,7 @@ public class MultiMBassador implements IMessageBus {
@Override
public void publish(final Object[] messages) {
try {
subscriptionMatcher.publish(messages);
subscriptionMatcher.publish(subscriptionManager, messages);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(messages));

View File

@ -1,18 +1,16 @@
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class VarArgUtils {
public final class VarArgUtils {
private final Map<Class<?>, ArrayList<Subscription>> varArgSubscriptions;
private final Map<Class<?>, List<Subscription>> varArgSuperClassSubscriptions;
private final HashMapTree<Class<?>, ConcurrentSet<Subscription>> varArgSuperClassSubscriptionsMulti;
private final Map<Class<?>, ArrayList<Subscription>> varArgSuperClassSubscriptions;
private final HashMapTree<Class<?>, ArrayList<Subscription>> varArgSuperClassSubscriptionsMulti;
private final SubscriptionHolder subHolderConcurrent;
@ -25,8 +23,7 @@ public class VarArgUtils {
public VarArgUtils(SubscriptionUtils utils, SuperClassUtils superClassUtils,
Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle, float loadFactor,
int stripeSize) {
Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle, float loadFactor, int stripeSize) {
this.utils = utils;
this.superClassUtils = superClassUtils;
@ -35,8 +32,8 @@ public class VarArgUtils {
this.stripeSize = stripeSize;
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, List<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, ConcurrentSet<Subscription>>(4, loadFactor);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.subHolderConcurrent = new SubscriptionHolder();
}
@ -53,7 +50,8 @@ public class VarArgUtils {
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Subscription[] getVarArgSubscriptions(Class<?> messageClass) {
Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptions;
// whenever our subscriptions change, this map is cleared.
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSubscriptions;
ArrayList<Subscription> varArgSubs = local.get(messageClass);
@ -63,7 +61,7 @@ public class VarArgUtils {
ArrayList<Subscription> subs = this.subscriptionsPerMessageSingle.get(arrayVersion);
if (subs != null) {
int length = subs.size();
final int length = subs.size();
varArgSubs = new ArrayList<Subscription>(length);
Subscription sub;
@ -79,194 +77,163 @@ public class VarArgUtils {
}
}
// return varArgSubs;
return null;
final Subscription[] subscriptions = new Subscription[varArgSubs.size()];
varArgSubs.toArray(subscriptions);
return subscriptions;
}
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version superclass subscriptions
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass) {
final ArrayList<Subscription> subs = getVarArgSuperSubscriptions_List(messageClass);
final Subscription[] subscriptions = new Subscription[subs.size()];
subs.toArray(subscriptions);
return subscriptions;
}
private ArrayList<Subscription> getVarArgSuperSubscriptions_List(final Class<?> messageClass) {
// whenever our subscriptions change, this map is cleared.
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.publish();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
// if (putIfAbsent == null) {
// // we are the first one in the map
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
//
//
// Iterator<Subscription> iterator;
// Subscription sub;
//
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.publish(arrayVersion);
// if (subs != null) {
// for (iterator = subs.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// if (sub.acceptsVarArgs()) {
// subsPerType.add(sub);
// }
// }
// }
// return subsPerType;
// } else {
// // someone beat us
// return putIfAbsent;
// }
final Map<Class<?>, ArrayList<Subscription>> local = this.varArgSuperClassSubscriptions;
// return null;
}
ArrayList<Subscription> varArgSuperSubs = local.get(messageClass);
if (varArgSuperSubs == null) {
// this gets (and caches) our array type. This is never cleared.
final Class<?> arrayVersion = this.superClassUtils.getArrayClass(messageClass);
final Class<?>[] types = this.superClassUtils.getSuperClasses(arrayVersion);
final int typesLength = types.length;
varArgSuperSubs = new ArrayList<Subscription>(typesLength);
if (typesLength == 0) {
local.put(messageClass, varArgSuperSubs);
return varArgSuperSubs;
}
Class<?> type;
Subscription sub;
ArrayList<Subscription> subs;
int length;
// CAN RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Subscription[] getVarArgSuperSubscriptions(Class<?> messageClass) {
// // whenever our subscriptions change, this map is cleared.
// ConcurrentMap<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptions;
//
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// ConcurrentSet<Subscription> subsPerType = subHolderConcurrent.publish();
//
// // cache our subscriptions for super classes, so that their access can be fast!
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(messageClass, subsPerType);
//
// if (putIfAbsent == null) {
// // we are the first one in the map
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
//
// Class<?> arrayVersion = this.utils.getArrayClass(messageClass);
// Collection<Class<?>> types = this.utils.getSuperClasses(arrayVersion, true);
// if (types.isEmpty()) {
// return subsPerType;
// }
//
// Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
//
// Iterator<Class<?>> iterator;
// Class<?> superClass;
//
// Iterator<Subscription> subIterator;
// Subscription sub;
//
//
// for (iterator = types.iterator(); iterator.hasNext();) {
// superClass = iterator.next();
//
// Collection<Subscription> subs = local2.publish(superClass);
// if (subs != null) {
// for (subIterator = subs.iterator(); subIterator.hasNext();) {
// sub = subIterator.next();
// if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
// subsPerType.add(sub);
// }
// }
// }
// }
// return subsPerType;
// } else {
// // someone beat us
// return putIfAbsent;
// }
for (int i = 0; i < typesLength; i++) {
type = types[i];
subs = this.subscriptionsPerMessageSingle.get(type);
return null;
if (subs != null) {
length = subs.size();
varArgSuperSubs = new ArrayList<Subscription>(length);
for (int j = 0; j < length; j++) {
sub = subs.get(j);
if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
varArgSuperSubs.add(sub);
}
}
}
}
local.put(messageClass, varArgSuperSubs);
}
return varArgSuperSubs;
}
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(Class<?> messageClass1, Class<?> messageClass2) {
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
//
// // whenever our subscriptions change, this map is cleared.
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2);
// ConcurrentSet<Subscription> subsPerType = null;
//
// // we DO NOT care about duplicate, because the answers will be the same
// if (subsPerTypeLeaf != null) {
// // if the leaf exists, then the value exists.
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.publish();
//
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2);
// if (putIfAbsent != null) {
// // someone beat us
// subsPerType = putIfAbsent;
// } else {
// // the message class types are not the same, so look for a common superClass varArg subscription.
// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
// ConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
// ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
//
// Iterator<Subscription> iterator;
// Subscription sub;
//
// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// if (varargSuperSubscriptions2.contains(sub)) {
// subsPerType.add(sub);
// }
// }
//
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
// }
// }
//
// return subsPerType;
return null;
// and then, returns the array'd version superclass subscriptions
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2);
ArrayList<Subscription> subsPerType;
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
}
else {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2);
final int size1 = varargSuperSubscriptions1.size();
final int size2 = varargSuperSubscriptions2.size();
subsPerType = new ArrayList<Subscription>(size1 + size2);
Subscription sub;
for (int i = 0; i < size1; i++) {
sub = varargSuperSubscriptions1.get(i);
if (varargSuperSubscriptions2.contains(sub)) {
subsPerType.add(sub);
}
}
subsPerType.trimToSize();
local.put(subsPerType, messageClass1, messageClass2);
}
final Subscription[] subscriptions = new Subscription[subsPerType.size()];
subsPerType.toArray(subscriptions);
return subscriptions;
}
// CAN NOT RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public ConcurrentSet<Subscription> getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
//
// // whenever our subscriptions change, this map is cleared.
// HashMapTree<Class<?>, ConcurrentSet<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3);
// ConcurrentSet<Subscription> subsPerType = null;
//
// // we DO NOT care about duplicate, because the answers will be the same
// if (subsPerTypeLeaf != null) {
// // if the leaf exists, then the value exists.
// subsPerType = subsPerTypeLeaf.getValue();
// } else {
// SubscriptionHolder subHolderConcurrent = this.subHolderConcurrent;
// subsPerType = subHolderConcurrent.publish();
//
// ConcurrentSet<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, messageClass1, messageClass2, messageClass3);
// if (putIfAbsent != null) {
// // someone beat us
// subsPerType = putIfAbsent;
// } else {
// // the message class types are not the same, so look for a common superClass varArg subscription.
// // this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
// ConcurrentSet<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions(messageClass1);
// ConcurrentSet<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions(messageClass2);
// ConcurrentSet<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions(messageClass3);
//
// Iterator<Subscription> iterator;
// Subscription sub;
//
// for (iterator = varargSuperSubscriptions1.iterator(); iterator.hasNext();) {
// sub = iterator.next();
// if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) {
// subsPerType.add(sub);
// }
// }
//
// subHolderConcurrent.set(subHolderConcurrent.initialValue());
// }
// }
//
// return subsPerType;
// and then, returns the array'd version superclass subscriptions
public Subscription[] getVarArgSuperSubscriptions(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
// whenever our subscriptions change, this map is cleared.
final HashMapTree<Class<?>, ArrayList<Subscription>> local = this.varArgSuperClassSubscriptionsMulti;
return null;
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerTypeLeaf = local.getLeaf(messageClass1, messageClass2, messageClass3);
ArrayList<Subscription> subsPerType;
if (subsPerTypeLeaf != null) {
// if the leaf exists, then the value exists.
subsPerType = subsPerTypeLeaf.getValue();
}
else {
// the message class types are not the same, so look for a common superClass varArg subscription.
// this is to publish to object[] (or any class[]) handler that is common among all superTypes of the messages
final ArrayList<Subscription> varargSuperSubscriptions1 = getVarArgSuperSubscriptions_List(messageClass1);
final ArrayList<Subscription> varargSuperSubscriptions2 = getVarArgSuperSubscriptions_List(messageClass2);
final ArrayList<Subscription> varargSuperSubscriptions3 = getVarArgSuperSubscriptions_List(messageClass3);
final int size1 = varargSuperSubscriptions1.size();
final int size2 = varargSuperSubscriptions2.size();
subsPerType = new ArrayList<Subscription>(size1 + size2);
Subscription sub;
for (int i = 0; i < size1; i++) {
sub = varargSuperSubscriptions1.get(i);
if (varargSuperSubscriptions2.contains(sub) && varargSuperSubscriptions3.contains(sub)) {
subsPerType.add(sub);
}
}
subsPerType.trimToSize();
local.put(subsPerType, messageClass1, messageClass2, messageClass3);
}
final Subscription[] subscriptions = new Subscription[subsPerType.size()];
subsPerType.toArray(subscriptions);
return subscriptions;
}
}

View File

@ -1,11 +1,11 @@
package dorkbox.util.messagebus.subscription;
public interface Matcher {
void publish(Object message) throws Throwable;
void publish(SubscriptionManager subscriptionManager, Object message1) throws Throwable;
void publish(Object message1, Object message2) throws Throwable;
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2) throws Throwable;
void publish(Object message1, Object message2, Object message3) throws Throwable;
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2, Object message3) throws Throwable;
void publish(Object[] messages) throws Throwable;
void publish(SubscriptionManager subscriptionManager, Object[] messages) throws Throwable;
}

View File

@ -336,7 +336,7 @@ public final class SubscriptionManager {
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
collection = superSubscriptions;
}
if (collection != null) {
@ -622,7 +622,7 @@ public final class SubscriptionManager {
for (int i = 0; i < length; i++) {
sub = varArgSubscriptions[i];
sub.publish(message);
sub.publish(asArray);
}
stamp = lock.readLock();
@ -672,6 +672,167 @@ public final class SubscriptionManager {
}
}
public void publishAll(final Object message1, final Object message2) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && messageClass1 == messageClass2 && !messageClass1.isArray()) {
long stamp = lock.readLock();
final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null
lock.unlockRead(stamp);
if (varArgSubscriptions != null) {
final int length = varArgSubscriptions.length;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
for (int i = 0; i < length; i++) {
sub = varArgSubscriptions[i];
sub.publish(asArray);
}
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
for (int i = 0; i < length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
else {
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
for (int i = 0; i < varArgSuperSubscriptions.length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishAll(final Object message1, final Object message2, final Object message3) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2, message3);
}
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) {
long stamp = lock.readLock();
final Subscription[] varArgSubscriptions = varArgUtils.getVarArgSubscriptions(messageClass1); // can return null
lock.unlockRead(stamp);
if (varArgSubscriptions != null) {
final int length = varArgSubscriptions.length;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
for (int i = 0; i < length; i++) {
sub = varArgSubscriptions[i];
sub.publish(asArray);
}
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
for (int i = 0; i < length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
else {
stamp = lock.readLock();
// now publish array based superClasses (but only if those ALSO accept vararg)
final Subscription[] varArgSuperSubscriptions = this.varArgUtils.getVarArgSuperSubscriptions(messageClass1);
lock.unlockRead(stamp);
if (varArgSuperSubscriptions != null) {
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
for (int i = 0; i < varArgSuperSubscriptions.length; i++) {
sub = varArgSuperSubscriptions[i];
sub.publish(asArray);
}
}
}
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
// public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
// @Override

View File

@ -3,12 +3,11 @@
*/
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageBusTest;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author dorkbox, llc
@ -29,6 +28,7 @@ public class MultiMessageTest extends MessageBusTest {
bus.publish("s");
bus.publish("s", "s");
bus.publish("s", "s", "s");
bus.publish(1, "s");
bus.publish(1, 2, "s");
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
@ -36,9 +36,10 @@ public class MultiMessageTest extends MessageBusTest {
bus.subscribe(listener1);
bus.publish("s");
bus.publish("s", "s");
bus.publish("s", "s", "s");
bus.publish("s"); // 4
bus.publish("s", "s"); // 3
bus.publish("s", "s", "s"); // 3
bus.publish(1, "s");
bus.publish(1, 2, "s");
bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
@ -49,6 +50,7 @@ public class MultiMessageTest extends MessageBusTest {
bus.publishAsync("s");
bus.publishAsync("s", "s");
bus.publishAsync("s", "s", "s");
bus.publish(1, "s");
bus.publishAsync(1, 2, "s");
bus.publishAsync(new Integer[] {1, 2, 3, 4, 5, 6});
@ -70,6 +72,12 @@ public class MultiMessageTest extends MessageBusTest {
@SuppressWarnings("unused")
public static class Listener {
@Handler
public void handleSync(Object o) {
count.getAndIncrement();
System.err.println("match Object");
}
@Handler
public void handleSync(String o1) {
count.getAndIncrement();