Moved project to dorkbox name. WIP multi-messages, and vararg/vararg-superclass

This commit is contained in:
nathan 2015-02-27 00:01:38 +01:00
parent 01f87da371
commit dd31c45dc3
80 changed files with 1787 additions and 1274 deletions

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
/**
* A message bus offers facilities for publishing messages to the message handlers of registered listeners.

View File

@ -1,19 +1,18 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.lang.reflect.Array;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.multi.common.DeadMessage;
import net.engio.mbassy.multi.common.LinkedTransferQueue;
import net.engio.mbassy.multi.common.NamedThreadFactory;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.LinkedTransferQueue;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.TransferQueue;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Subscription;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
@ -43,7 +42,6 @@ public class MultiMBassador implements IMessageBus {
private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
private final SubscriptionManager subscriptionManager;
// all threads that are available for asynchronous message dispatching
@ -58,11 +56,17 @@ public class MultiMBassador implements IMessageBus {
private final boolean forceExactMatches = false;
/**
* By default, will permit subTypes and VarArg matching, and will use all CPUs available for dispatching async messages
*/
public MultiMBassador() {
this(Runtime.getRuntime().availableProcessors());
// this(4);
}
/**
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MultiMBassador(int numberOfThreads) {
this(false, numberOfThreads);
}
@ -134,8 +138,10 @@ public class MultiMBassador implements IMessageBus {
@Override
public final void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
synchronized (this.errorHandlers) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
}
}
}
@ -149,7 +155,6 @@ public class MultiMBassador implements IMessageBus {
MultiMBassador.this.subscriptionManager.unsubscribe(listener);
}
@Override
public boolean hasPendingMessages() {
return this.dispatchQueue.getWaitingConsumerCount() != this.numberOfThreads;
@ -160,6 +165,7 @@ public class MultiMBassador implements IMessageBus {
for (Thread t : this.threads) {
t.interrupt();
}
this.subscriptionManager.shutdown();
}
@Override
@ -173,6 +179,7 @@ public class MultiMBassador implements IMessageBus {
// Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : subscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message);
@ -184,11 +191,8 @@ public class MultiMBassador implements IMessageBus {
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
publishDeadSubs = false;
Iterator<Subscription> iterator = superSubscriptions.iterator();
while (iterator.hasNext()) {
Subscription sub = iterator.next();
for (Subscription sub : superSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message);
}
@ -197,18 +201,32 @@ public class MultiMBassador implements IMessageBus {
// publish to var arg, only if not already an array
if (!messageClass.isArray()) {
Object[] asArray = null;
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
publishDeadSubs = false;
Object[] asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
Iterator<Subscription> iterator = varargSubscriptions.iterator();
for (Subscription sub : varargSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, asArray);
}
}
while (iterator.hasNext()) {
Subscription sub = iterator.next();
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
publishDeadSubs = false;
if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
}
for (Subscription sub : varargSuperSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, asArray);
}
@ -237,14 +255,68 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass2 = message2.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
boolean publishDeadSubs = true;
// Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : subscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2);
}
} else {
}
if (!this.forceExactMatches) {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : superSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2);
}
}
// publish to var arg, only if not already an array
if (messageClass1 == messageClass2 && !messageClass1.isArray()) {
Object[] asArray = null;
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
publishDeadSubs = false;
asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
for (Subscription sub : varargSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, asArray);
}
}
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
publishDeadSubs = false;
if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
}
for (Subscription sub : varargSuperSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, asArray);
}
}
}
}
if (publishDeadSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
@ -255,20 +327,6 @@ public class MultiMBassador implements IMessageBus {
}
}
}
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
Iterator<Subscription> fastIterator = superSubscriptions.iterator();
while (fastIterator.hasNext()) {
Subscription sub = fastIterator.next();
// this catches all exception types
sub.publishToSubscription(this, message1, message2);
}
}
}
@Override
@ -280,14 +338,68 @@ public class MultiMBassador implements IMessageBus {
Class<?> messageClass3 = message3.getClass();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
boolean publishDeadSubs = true;
// Run subscriptions
if (subscriptions != null && !subscriptions.isEmpty()) {
publishDeadSubs = false;
for (Subscription sub : subscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2, message3);
}
} else {
}
if (!this.forceExactMatches) {
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
for (Subscription sub : superSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2, message3);
}
}
// publish to var arg, only if not already an array
if (messageClass1 == messageClass2 && messageClass1 == messageClass3 && !messageClass1.isArray()) {
Object[] asArray = null;
Collection<Subscription> varargSubscriptions = manager.getVarArgSubscriptions(messageClass1);
if (varargSubscriptions != null && !varargSubscriptions.isEmpty()) {
publishDeadSubs = false;
asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
for (Subscription sub : varargSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, asArray);
}
}
Collection<Subscription> varargSuperSubscriptions = manager.getVarArgSuperSubscriptions(messageClass1);
// now get array based superClasses (but only if those ALSO accept vararg)
if (varargSuperSubscriptions != null && !varargSuperSubscriptions.isEmpty()) {
publishDeadSubs = false;
if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
}
for (Subscription sub : varargSuperSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, asArray);
}
}
}
}
if (publishDeadSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
Collection<Subscription> deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
@ -298,19 +410,6 @@ public class MultiMBassador implements IMessageBus {
}
}
}
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
Iterator<Subscription> fastIterator = superSubscriptions.iterator();
while (fastIterator.hasNext()) {
Subscription sub = fastIterator.next();
// this catches all exception types
sub.publishToSubscription(this, message1, message2, message3);
}
}
}
@Override
@ -323,40 +422,14 @@ public class MultiMBassador implements IMessageBus {
}
};
// try {
// this.dispatchQueue.put(runnable);
// } catch (InterruptedException e) {
// e.printStackTrace();
// // log.error(e);
//
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// }
// int counter = 200;
// while (!this.dispatchQueue.offer(runnable)) {
// if (counter > 0) {
// --counter;
// LockSupport.parkNanos(1L);
// } else {
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
// break;
// }
// }
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
}
}
@ -373,13 +446,10 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
}
}
}
@ -398,13 +468,10 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
}
@ -422,13 +489,10 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
}
}
@ -445,13 +509,10 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
}
}
}
@ -470,13 +531,10 @@ public class MultiMBassador implements IMessageBus {
try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
}

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.concurrent.TimeUnit;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.lang.reflect.Array;
import java.util.Collection;
@ -7,14 +7,15 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import net.engio.mbassy.multi.common.ConcurrentHashMapV8;
import net.engio.mbassy.multi.common.IdentityObjectTree;
import net.engio.mbassy.multi.common.ReflectionUtils;
import net.engio.mbassy.multi.common.StrongConcurrentSet;
import net.engio.mbassy.multi.common.StrongConcurrentSetV8;
import net.engio.mbassy.multi.listener.MessageHandler;
import net.engio.mbassy.multi.listener.MetadataReader;
import net.engio.mbassy.multi.subscription.Subscription;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.SuperClassIterator;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MetadataReader;
import dorkbox.util.messagebus.subscription.Subscription;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
@ -36,11 +37,10 @@ import net.engio.mbassy.multi.subscription.Subscription;
* Date: 2/2/15
*/
public class SubscriptionManager {
private final int STRIPE_SIZE;
private static final float LOAD_FACTOR = 0.8F;
// this keeps us from having to constantly recheck our cache for subscriptions
private static final Collection<Subscription> EMPTY_SUBS = Collections.emptyList();
private static final Collection<Class<?>> EMPTY_CLASSES = Collections.emptyList();
// the metadata reader that is used to inspect objects passed to the subscribe method
private static final MetadataReader metadataReader = new MetadataReader();
@ -52,7 +52,7 @@ public class SubscriptionManager {
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle;
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
private final HashMapTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti;
// synchronize read/write access to the subscription maps
private final ReentrantLock lock = new ReentrantLock();
@ -70,10 +70,14 @@ public class SubscriptionManager {
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
private Map<Class<?>, Collection<Subscription>> superClassSubscriptions;
private Map<Class<?>, Collection<Subscription>> varArgSubscriptions;
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
private final Map<Class<?>, Collection<Subscription>> superClassSubscriptions;
private final HashMapTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti;
private final Map<Class<?>, Collection<Subscription>> varArgSubscriptions;
private final Map<Class<?>, Collection<Subscription>> varArgSuperClassSubscriptions;
// stripe size of maps for concurrency
private final int STRIPE_SIZE;
SubscriptionManager(int numberOfThreads) {
@ -83,8 +87,8 @@ public class SubscriptionManager {
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, SubscriptionManager.LOAD_FACTOR);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR);
this.subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, 1);
@ -98,10 +102,29 @@ public class SubscriptionManager {
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, Collection<Subscription>>(4, SubscriptionManager.LOAD_FACTOR);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
this.varArgSuperClassSubscriptions = new ConcurrentHashMapV8<Class<?>, Collection<Subscription>>(64, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
}
}
public void shutdown() {
this.lock.lock();
try {
this.nonListeners.clear();
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerListener.clear();
this.arrayVersionCache.clear();
this.superClassesCache.clear();
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
} finally {
this.lock.unlock();
}
}
@ -127,6 +150,8 @@ public class SubscriptionManager {
// a listener is subscribed for the first time
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
Collection<MessageHandler> messageHandlers = SubscriptionManager.metadataReader.getMessageListener(listenerClass).getHandlers();
int handlersSize = messageHandlers.size();
@ -162,7 +187,7 @@ public class SubscriptionManager {
getSuperClass(clazz);
} else {
// multiversion
IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
HashMapTree<Class<?>, Collection<Subscription>> tree;
switch (size) {
case 2: {
@ -302,6 +327,7 @@ public class SubscriptionManager {
try {
this.superClassSubscriptions.clear();
this.varArgSubscriptions.clear();
this.varArgSuperClassSubscriptions.clear();
Map<Class<?>, Collection<Subscription>> localSingle = this.subscriptionsPerMessageSingle;
@ -361,222 +387,6 @@ public class SubscriptionManager {
}
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType);
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2);
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) {
return this.subscriptionsPerMessageMulti.getValue(messageTypes);
}
// ALSO checks to see if the superClass accepts subtypes.
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
Map<Class<?>, Collection<Subscription>> local = this.superClassSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(superType);
if (subsPerType == null) {
// this caches our class hierarchy. This is never cleared.
Collection<Class<?>> types = getSuperClass(superType);
if (types.isEmpty()) {
local.put(superType, EMPTY_SUBS);
return null;
}
subsPerType = new StrongConcurrentSetV8<Subscription>(types.size() + 1, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
Iterator<Class<?>> iterator = types.iterator();
while (iterator.hasNext()) {
Class<?> superClass = iterator.next();
Collection<Subscription> subs = local2.get(superClass);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
// cache our subscriptions for super classes, so that their access can be fast!
// duplicates are OK.
local.put(superType, subsPerType);
}
return subsPerType;
}
// CAN RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
public Collection<Subscription> getVarArgSubscriptions(Class<?> varArgType) {
Map<Class<?>, Collection<Subscription>> local = this.varArgSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(varArgType);
if (subsPerType == null) {
// this caches our array type. This is never cleared.
Class<?> arrayVersion = getArrayClass(varArgType);
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
subsPerType = new StrongConcurrentSetV8<Subscription>(local2.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
Collection<Subscription> subs = local2.get(arrayVersion);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
if (sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
}
}
// cache our subscriptions for super classes, so that their access can be fast!
// duplicates are OK.
local.put(varArgType, subsPerType);
}
return subsPerType;
}
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
// Collection<Subscription> subsPerType2 = this.superClassSubscriptions.get();
//
//
// // not thread safe. DO NOT MODIFY
// Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
// Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
//
// Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
//
// Collection<Subscription> subs;
// IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
// IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
//
// Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
// Iterator<Class<?>> iterator2;
//
// Class<?> eventSuperType1;
// Class<?> eventSuperType2;
//
// while (iterator1.hasNext()) {
// eventSuperType1 = iterator1.next();
// boolean type1Matches = eventSuperType1 == superType1;
//
// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
// if (leaf1 != null) {
// iterator2 = new SuperClassIterator(superType2, types2);
//
// while (iterator2.hasNext()) {
// eventSuperType2 = iterator2.next();
// if (type1Matches && eventSuperType2 == superType2) {
// continue;
// }
//
// leaf2 = leaf1.getLeaf(eventSuperType2);
//
// if (leaf2 != null) {
// subs = leaf2.getValue();
// if (subs != null) {
// for (Subscription sub : subs) {
// if (sub.acceptsSubtypes()) {
// subsPerType.add(sub);
// }
// }
// }
// }
// }
// }
// }
// return subsPerType;
return null;
}
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
// // not thread safe. DO NOT MODIFY
// Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
// Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
// Collection<Class<?>> types3 = this.superClassesCache.get(superType3);
//
// Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
//
// Collection<Subscription> subs;
// IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
// IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
// IdentityObjectTree<Class<?>, Collection<Subscription>> leaf3;
//
// Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
// Iterator<Class<?>> iterator2;
// Iterator<Class<?>> iterator3;
//
// Class<?> eventSuperType1;
// Class<?> eventSuperType2;
// Class<?> eventSuperType3;
//
// while (iterator1.hasNext()) {
// eventSuperType1 = iterator1.next();
// boolean type1Matches = eventSuperType1 == superType1;
//
// leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
// if (leaf1 != null) {
// iterator2 = new SuperClassIterator(superType2, types2);
//
// while (iterator2.hasNext()) {
// eventSuperType2 = iterator2.next();
// boolean type12Matches = type1Matches && eventSuperType2 == superType2;
//
// leaf2 = leaf1.getLeaf(eventSuperType2);
//
// if (leaf2 != null) {
// iterator3 = new SuperClassIterator(superType3, types3);
//
// while (iterator3.hasNext()) {
// eventSuperType3 = iterator3.next();
// if (type12Matches && eventSuperType3 == superType3) {
// continue;
// }
//
// leaf3 = leaf2.getLeaf(eventSuperType3);
//
// subs = leaf3.getValue();
// if (subs != null) {
// for (Subscription sub : subs) {
// if (sub.acceptsSubtypes()) {
// subsPerType.add(sub);
// }
// }
// }
// }
// }
// }
// }
// }
//
// return subsPerType;
return null;
}
/**
* race conditions will result in duplicate answers, which we don't care if happens
* never returns null
@ -613,7 +423,7 @@ public class SubscriptionManager {
* never returns null
* never reset
*/
public Class<?> getArrayClass(Class<?> c) {
private Class<?> getArrayClass(Class<?> c) {
Class<?> clazz = this.arrayVersionCache.get(c);
if (clazz == null) {
// messy, but the ONLY way to do it. Array super types are also arrays
@ -624,4 +434,300 @@ public class SubscriptionManager {
return clazz;
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType);
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.subscriptionsPerMessageMulti.get(messageType1, messageType2);
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
}
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) {
return this.subscriptionsPerMessageMulti.get(messageTypes);
}
// CAN RETURN NULL
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSubscriptions(Class<?> varArgType) {
Map<Class<?>, Collection<Subscription>> local = this.varArgSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(varArgType);
if (subsPerType == null) {
// this caches our array type. This is never cleared.
Class<?> arrayVersion = getArrayClass(varArgType);
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
subsPerType = new StrongConcurrentSetV8<Subscription>(local2.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
Collection<Subscription> subs = local2.get(arrayVersion);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
if (sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
}
}
// cache our subscriptions for super classes, so that their access can be fast!
// duplicates are OK.
local.put(varArgType, subsPerType);
}
return subsPerType;
}
// CAN RETURN NULL
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
// and then, returns the array'd version subscriptions
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) {
Map<Class<?>, Collection<Subscription>> local = this.varArgSuperClassSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(varArgType);
if (subsPerType == null) {
// this caches our array type. This is never cleared.
Class<?> arrayVersion = getArrayClass(varArgType);
Collection<Class<?>> types = getSuperClass(arrayVersion);
if (types.isEmpty()) {
local.put(varArgType, EMPTY_SUBS);
return null;
}
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
subsPerType = new StrongConcurrentSetV8<Subscription>(local2.size(), SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
Iterator<Class<?>> iterator = types.iterator();
while (iterator.hasNext()) {
Class<?> superClass = iterator.next();
Collection<Subscription> subs = local2.get(superClass);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes() && sub.acceptsVarArgs()) {
subsPerType.add(sub);
}
}
}
}
// cache our subscriptions for super classes, so that their access can be fast!
// duplicates are OK.
local.put(varArgType, subsPerType);
}
return subsPerType;
}
// ALSO checks to see if the superClass accepts subtypes.
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
Map<Class<?>, Collection<Subscription>> local = this.superClassSubscriptions;
// whenever our subscriptions change, this map is cleared.
Collection<Subscription> subsPerType = local.get(superType);
if (subsPerType == null) {
// this caches our class hierarchy. This is never cleared.
Collection<Class<?>> types = getSuperClass(superType);
if (types.isEmpty()) {
local.put(superType, EMPTY_SUBS);
return null;
}
Map<Class<?>, Collection<Subscription>> local2 = this.subscriptionsPerMessageSingle;
subsPerType = new StrongConcurrentSetV8<Subscription>(types.size() + 1, SubscriptionManager.LOAD_FACTOR, this.STRIPE_SIZE);
Iterator<Class<?>> iterator = types.iterator();
while (iterator.hasNext()) {
Class<?> superClass = iterator.next();
Collection<Subscription> subs = local2.get(superClass);
if (subs != null && !subs.isEmpty()) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
// cache our subscriptions for super classes, so that their access can be fast!
// duplicates are OK.
local.put(superType, subsPerType);
}
return subsPerType;
}
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, Collection<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2);
Collection<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf == null) {
subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE);
// whenever our subscriptions change, this map is cleared.
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
if (type1Matches) {
continue;
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2);
while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next();
if (type1Matches && eventSuperType2 == superType2) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
subs = leaf2.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
Collection<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
}
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
// whenever our subscriptions change, this map is cleared.
HashMapTree<Class<?>, Collection<Subscription>> subsPerTypeLeaf = local.getLeaf(superType1, superType2, superType3);
Collection<Subscription> subsPerType = null;
// we DO NOT care about duplicate, because the answers will be the same
if (subsPerTypeLeaf == null) {
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Class<?>> types3 = this.superClassesCache.get(superType3);
subsPerType = new StrongConcurrentSetV8<Subscription>(16, LOAD_FACTOR, this.STRIPE_SIZE);
Collection<Subscription> subs;
HashMapTree<Class<?>, Collection<Subscription>> leaf1;
HashMapTree<Class<?>, Collection<Subscription>> leaf2;
HashMapTree<Class<?>, Collection<Subscription>> leaf3;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
Iterator<Class<?>> iterator3;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
Class<?> eventSuperType3;
while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2);
while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next();
boolean type12Matches = type1Matches && eventSuperType2 == superType2;
if (type12Matches) {
continue;
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
iterator3 = new SuperClassIterator(superType3, types3);
while (iterator3.hasNext()) {
eventSuperType3 = iterator3.next();
if (type12Matches && eventSuperType3 == superType3) {
continue;
}
leaf3 = leaf2.getLeaf(eventSuperType3);
if (leaf3 != null) {
subs = leaf3.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
}
}
Collection<Subscription> putIfAbsent = local.putIfAbsent(subsPerType, superType1, superType2);
if (putIfAbsent != null) {
// someone beat us
subsPerType = putIfAbsent;
}
}
return subsPerType;
}
}

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.annotations;
package dorkbox.util.messagebus.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.annotations;
package dorkbox.util.messagebus.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.annotations;
package dorkbox.util.messagebus.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.Collection;
import java.util.Map;

View File

@ -3,7 +3,7 @@
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.io.ObjectStreamField;
import java.io.Serializable;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
/**
* The dead message event is published whenever no message

View File

@ -0,0 +1,699 @@
package dorkbox.util.messagebus.common;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
/**
* Simple tree structure that is a map that contains a chain of keys to get to a value.
* <p>
* THREAD SAFE, each level in the tree has it's own write lock, and there a tree-global read lock, to prevent writes
*
* @author dorkbox, llc
* Date: 2/2/15
*/
public class HashMapTree<KEY, VALUE> {
private final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
private ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> children; // protected by read/write lock
private volatile VALUE value; // protected by read/write lock
private final int defaultSize;
private final float loadFactor;
public HashMapTree(int defaultSize, float loadFactor) {
this.defaultSize = defaultSize;
this.loadFactor = loadFactor;
}
/**
* can be overridden to provide a custom backing map
*/
protected ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> createChildren(int defaultSize, float loadFactor) {
return new ConcurrentHashMapV8<KEY, HashMapTree<KEY, VALUE>>(defaultSize, loadFactor, 1);
}
public VALUE getValue() {
VALUE returnValue = this.value;
return returnValue;
}
public void putValue(VALUE value) {
this.value = value;
}
public void removeValue() {
this.value = null;
}
public void clear() {
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
if (this.children != null) {
Set<Entry<KEY, HashMapTree<KEY, VALUE>>> entrySet = this.children.entrySet();
for (Entry<KEY, HashMapTree<KEY, VALUE>> entry : entrySet) {
entry.getValue().clear();
}
this.children.clear();
this.value = null;
}
WRITE.unlock();
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key) {
if (key != null) {
removeLeaf(key);
}
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
return;
}
Lock UPDATE = this.lock.updateLock();
UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> leaf = null;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
leaf.removeLeaf(key2);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
WRITE.unlock();
}
}
UPDATE.unlock();
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null) {
return;
}
Lock UPDATE = this.lock.updateLock();
UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> leaf = null;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
leaf.remove(key2, key3);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
WRITE.unlock();
}
}
UPDATE.unlock();
}
/**
* This <b>IS NOT</b> safe to call outside of root.remove(...)
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
@SuppressWarnings("unchecked")
public void remove(KEY... keys) {
if (keys == null) {
return;
}
removeLeaf(0, keys);
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
private final void removeLeaf(KEY key) {
if (key != null) {
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
if (this.children != null) {
HashMapTree<KEY, VALUE> leaf = this.children.get(key);
if (leaf != null) {
leaf = this.children.get(key);
if (leaf != null) {
if (leaf.children == null && leaf.value == null) {
this.children.remove(key);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
WRITE.unlock();
}
}
// keys CANNOT be null here!
private final void removeLeaf(int index, KEY[] keys) {
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
if (index == keys.length) {
// we have reached the leaf to remove!
this.value = null;
this.children = null;
} else if (this.children != null) {
HashMapTree<KEY, VALUE> leaf = this.children.get(keys[index]);
if (leaf != null) {
leaf.removeLeaf(index+1, keys);
if (leaf.children == null && leaf.value == null) {
this.children.remove(keys[index]);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
WRITE.unlock();
}
public VALUE put(VALUE value, KEY key) {
if (key == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key);
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
public VALUE putIfAbsent(VALUE value, KEY key) {
if (key == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key);
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
public VALUE put(VALUE value, KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
WRITE2.unlock();
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
public VALUE putIfAbsent(VALUE value, KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
WRITE2.unlock();
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
public VALUE put(VALUE value, KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null || key3 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
Lock WRITE3 = leaf.lock.writeLock();
WRITE3.lock();
leaf = leaf.createLeaf_NL(key3);
WRITE3.unlock();
WRITE2.unlock();
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
public VALUE putIfAbsent(VALUE value, KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null || key3 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
Lock WRITE3 = leaf.lock.writeLock();
WRITE3.lock();
leaf = leaf.createLeaf_NL(key3);
WRITE3.unlock();
WRITE2.unlock();
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
@SuppressWarnings("unchecked")
public VALUE put(VALUE value, KEY... keys) {
if (keys == null) {
throw new NullPointerException("keys");
}
int length = keys.length;
Lock[] locks = new Lock[length];
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(keys[0]);
for (int i=1;i<length;i++) {
locks[i] = leaf.lock.writeLock();
locks[i].lock();
leaf = leaf.createLeaf_NL(keys[i]);
}
for (int i=length-1;i>0;i--) {
locks[i].unlock();
}
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
@SuppressWarnings("unchecked")
public VALUE putIfAbsent(VALUE value, KEY... keys) {
if (keys == null) {
throw new NullPointerException("keys");
}
int length = keys.length;
Lock[] locks = new Lock[length];
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(keys[0]);
for (int i=1;i<length;i++) {
locks[i] = leaf.lock.writeLock();
locks[i].lock();
leaf = leaf.createLeaf_NL(keys[i]);
}
for (int i=length-1;i>0;i--) {
locks[i].unlock();
}
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
@SuppressWarnings("unchecked")
public HashMapTree<KEY, VALUE> createLeaf(KEY... keys) {
if (keys == null) {
return this;
}
int length = keys.length;
Lock[] locks = new Lock[length];
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(keys[0]);
for (int i=1;i<length;i++) {
locks[i] = leaf.lock.writeLock();
locks[i].lock();
leaf = leaf.createLeaf_NL(keys[i]);
}
for (int i=length-1;i>0;i--) {
locks[i].unlock();
}
WRITE.unlock();
return leaf;
}
private final HashMapTree<KEY, VALUE> createLeaf_NL(KEY key) {
if (key == null) {
return null;
}
HashMapTree<KEY, VALUE> objectTree;
if (this.children == null) {
this.children = createChildren(this.defaultSize, this.loadFactor);
}
objectTree = this.children.get(key);
// make sure we have a tree for the specified node
if (objectTree == null) {
objectTree = new HashMapTree<KEY, VALUE>(this.defaultSize, this.loadFactor);
HashMapTree<KEY, VALUE> putIfAbsent = this.children.putIfAbsent(key, objectTree);
if (putIfAbsent != null) {
// some other thread beat us.
objectTree = putIfAbsent;
}
}
return objectTree;
}
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
public VALUE get(KEY key) {
if (key == null) {
return null;
}
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> objectTree = null;
// get value from our children
objectTree = getLeaf_NL(key); // protected by lock
if (objectTree == null) {
READ.unlock();
return null;
}
VALUE returnValue = objectTree.value;
READ.unlock();
return returnValue;
}
public VALUE get(KEY key1, KEY key2) {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf_NL(key1); // protected by lock
if (tree != null) {
tree = tree.getLeaf_NL(key2); // protected by lock
}
if (tree == null) {
READ.unlock();
return null;
}
VALUE returnValue = tree.value;
READ.unlock();
return returnValue;
}
public VALUE getValue(KEY key1, KEY key2, KEY key3) {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf_NL(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
}
if (tree != null) {
tree = tree.getLeaf_NL(key3);
}
if (tree == null) {
READ.unlock();
return null;
}
VALUE returnValue = tree.value;
READ.unlock();
return returnValue;
}
@SuppressWarnings("unchecked")
public VALUE get(KEY... keys) {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf_NL(keys[0]);
int size = keys.length;
for (int i=1;i<size;i++) {
if (tree != null) {
tree = tree.getLeaf_NL(keys[i]);
} else {
READ.unlock();
return null;
}
}
if (tree == null) {
READ.unlock();
return null;
}
VALUE returnValue = tree.value;
READ.unlock();
return returnValue;
}
public final HashMapTree<KEY, VALUE> getLeaf(KEY key) {
if (key == null) {
return null;
}
HashMapTree<KEY, VALUE> tree;
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
if (this.children == null) {
tree = null;
} else {
tree = this.children.get(key);
}
READ.unlock();
return tree;
}
public final HashMapTree<KEY, VALUE> getLeaf(KEY key1, KEY key2) {
HashMapTree<KEY, VALUE> tree = null;
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
// get value from our children
tree = getLeaf_NL(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
}
READ.unlock();
return tree;
}
public final HashMapTree<KEY, VALUE> getLeaf(KEY key1, KEY key2, KEY key3) {
HashMapTree<KEY, VALUE> tree = null;
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
// get value from our children
tree = getLeaf_NL(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
}
if (tree != null) {
tree = tree.getLeaf_NL(key3);
}
READ.unlock();
return tree;
}
@SuppressWarnings("unchecked")
public final HashMapTree<KEY, VALUE> getLeaf(KEY... keys) {
int size = keys.length;
if (size == 0) {
return null;
}
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf_NL(keys[0]);
for (int i=1;i<size;i++) {
if (tree != null) {
tree = tree.getLeaf_NL(keys[i]);
} else {
READ.unlock();
return null;
}
}
READ.unlock();
return tree;
}
private final HashMapTree<KEY, VALUE> getLeaf_NL(KEY key) {
HashMapTree<KEY, VALUE> tree;
if (this.children == null) {
tree = null;
} else {
tree = this.children.get(key);
}
return tree;
}
}

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
/**
* Todo: Add javadoc

View File

@ -40,7 +40,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.AbstractQueue;
import java.util.Collection;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.lang.management.RuntimeMXBean;
import java.util.List;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
@ -6,7 +6,7 @@ import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Set;
import net.engio.mbassy.multi.annotations.Handler;
import dorkbox.util.messagebus.annotations.Handler;
/**
* @author bennidi

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
/**
* This implementation uses strong references to the elements, uses an IdentityHashMap

View File

@ -0,0 +1,54 @@
package dorkbox.util.messagebus.common;
import java.util.Collection;
import java.util.Iterator;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
public class SuperClassIterator implements Iterator<Class<?>> {
private final Iterator<Class<?>> iterator;
private Class<?> clazz;
public SuperClassIterator(Class<?> clazz, Collection<Class<?>> types) {
this.clazz = clazz;
if (types != null) {
this.iterator = types.iterator();
} else {
this.iterator = null;
}
}
@Override
public boolean hasNext() {
if (this.clazz != null) {
return true;
}
if (this.iterator != null) {
return this.iterator.hasNext();
}
return false;
}
@Override
public Class<?> next() {
if (this.clazz != null) {
Class<?> clazz2 = this.clazz;
this.clazz = null;
return clazz2;
}
if (this.iterator != null) {
return this.iterator.next();
}
return null;
}
@Override
public void remove() {
throw new NotImplementedException();
}
}

View File

@ -3,7 +3,7 @@
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.dispatch;
package dorkbox.util.messagebus.dispatch;
import com.esotericsoftware.reflectasm.MethodAccess;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.dispatch;
package dorkbox.util.messagebus.dispatch;
import com.esotericsoftware.reflectasm.MethodAccess;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.dispatch;
package dorkbox.util.messagebus.dispatch;
import com.esotericsoftware.reflectasm.MethodAccess;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.error;
package dorkbox.util.messagebus.error;
/**
* @author bennidi

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.error;
package dorkbox.util.messagebus.error;
/**
* Publication error handlers are provided with a publication error every time an

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.error;
package dorkbox.util.messagebus.error;
/**
* The universal exception type for message bus implementations.

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.error;
package dorkbox.util.messagebus.error;
import java.util.Arrays;

View File

@ -1,14 +1,14 @@
package net.engio.mbassy.multi.listener;
package dorkbox.util.messagebus.listener;
import java.lang.reflect.Method;
import java.util.Arrays;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.annotations.Synchronized;
import net.engio.mbassy.multi.common.ReflectionUtils;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.common.ReflectionUtils;
/**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
* the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.multi.listener;
package dorkbox.util.messagebus.listener;
import java.util.Collection;
import net.engio.mbassy.multi.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
/**
* All instances of any class that defines at least one message handler (see @MessageHandler) are message listeners. Thus, a message

View File

@ -1,11 +1,11 @@
package net.engio.mbassy.multi.listener;
package dorkbox.util.messagebus.listener;
import java.lang.reflect.Method;
import java.util.Collection;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.ReflectionUtils;
import net.engio.mbassy.multi.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ReflectionUtils;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
/**
* The meta data reader is responsible for parsing and validating message handler configurations.

View File

@ -1,9 +1,9 @@
package net.engio.mbassy.multi.subscription;
import net.engio.mbassy.multi.PubSubSupport;
package dorkbox.util.messagebus.subscription;
import com.lmax.disruptor.WorkHandler;
import dorkbox.util.messagebus.PubSubSupport;
/**
* @author dorkbox, llc
* Date: 2/2/15

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.subscription;
package dorkbox.util.messagebus.subscription;
/**

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.subscription;
package dorkbox.util.messagebus.subscription;
/**
* @author dorkbox, llc
* Date: 2/2/15

View File

@ -0,0 +1,272 @@
package dorkbox.util.messagebus.subscription;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.listener.MessageHandler;
/**
* A subscription is a thread-safe container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class
* will be referenced in the subscription created for SingleMessageHandler.class.
*
* There will be as many unique subscription objects per message listener class as there are message handlers
* defined in the message listeners class hierarchy.
*
* The subscription provides functionality for message publication by means of delegation to the respective
* message dispatcher.
*
* @author bennidi
* @author dorkbox, llc
* Date: 2/2/15
*/
public class Subscription {
private static AtomicInteger ID_COUNTER = new AtomicInteger();
private final int ID = ID_COUNTER.getAndIncrement();
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handlerMetadata;
private final IHandlerInvocation invocation;
private final StrongConcurrentSet<Object> listeners;
public Subscription(MessageHandler handler) {
this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSet<Object>();
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) {
invocation = new SynchronizedHandlerInvocation(invocation);
}
this.invocation = invocation;
}
public Class<?>[] getHandledMessageTypes() {
return this.handlerMetadata.getHandledMessages();
}
public boolean acceptsSubtypes() {
return this.handlerMetadata.acceptsSubtypes();
}
public boolean acceptsVarArgs() {
return this.handlerMetadata.acceptsVarArgs();
}
public boolean isEmpty() {
return this.listeners.isEmpty();
}
public void subscribe(Object listener) {
this.listeners.add(listener);
}
/**
* @return TRUE if the element was removed
*/
public boolean unsubscribe(Object existingListener) {
return this.listeners.remove(existingListener);
}
/**
* Check whether this subscription manages a message handler of the given message listener class
*/
// only in unit test
public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener);
}
// only used in unit-test
public int size() {
return this.listeners.size();
}
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0])
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
}
}
}
}
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
try {
invocation.invoke(listener, handler, handleIndex, message1, message2);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " +
message1.getClass() + ", " +
message2.getClass()
+ ". Expected: " + handler.getParameterTypes()[0] + ", " +
handler.getParameterTypes()[1]
)
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2));
}
}
}
}
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
try {
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " +
message1.getClass() + ", " +
message2.getClass() + ", " +
message3.getClass()
+ ". Expected: " + handler.getParameterTypes()[0] + ", " +
handler.getParameterTypes()[1] + ", " +
handler.getParameterTypes()[2]
)
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message1, message2, message3));
}
}
}
}
@Override
public int hashCode() {
return this.ID;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Subscription other = (Subscription) obj;
return this.ID == other.ID;
}
}

View File

@ -1,416 +0,0 @@
package net.engio.mbassy.multi.common;
import java.util.Map;
/**
* Simple tree structure that is a map that contains a chain of keys to get to a value.
* <p>
* NOT THREAD SAFE
* <p>
* Comparisons for the KEY are '==', NOT '.equals()'
*
* @author dorkbox, llc
* Date: 2/2/15
*/
public class IdentityObjectTree<KEY, VALUE> {
private Map<KEY, IdentityObjectTree<KEY, VALUE>> children;
private volatile VALUE value;
public IdentityObjectTree() {
}
/**
* can be overridden to provide a custom backing map
*/
protected Map<KEY, IdentityObjectTree<KEY, VALUE>> createChildren() {
return new ConcurrentHashMapV8<KEY, IdentityObjectTree<KEY, VALUE>>(4);
}
public VALUE getValue() {
VALUE returnValue = this.value;
return returnValue;
}
public void putValue(VALUE value) {
this.value = value;
}
public void removeValue() {
this.value = null;
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key) {
if (key == null) {
removeLeaf(key);
}
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
return;
}
IdentityObjectTree<KEY, VALUE> leaf = null;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
leaf.removeLeaf(key2);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null) {
return;
}
IdentityObjectTree<KEY, VALUE> leaf = null;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
leaf.remove(key2, key3);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
@SuppressWarnings("unchecked")
public void remove(KEY... keys) {
if (keys == null) {
return;
}
removeLeaf(0, keys);
}
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
private final void removeLeaf(KEY key) {
if (key != null) {
if (this.children != null) {
IdentityObjectTree<KEY, VALUE> leaf = this.children.get(key);
if (leaf != null) {
if (leaf.children == null && leaf.value == null) {
this.children.remove(key);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
}
// keys CANNOT be null here!
private final void removeLeaf(int index, KEY[] keys) {
if (index == keys.length) {
// we have reached the leaf to remove!
this.value = null;
this.children = null;
} else if (this.children != null) {
IdentityObjectTree<KEY, VALUE> leaf = this.children.get(keys[index]);
if (leaf != null) {
leaf.removeLeaf(index+1, keys);
if (leaf.children == null && leaf.value == null) {
this.children.remove(keys[index]);
}
if (this.children.isEmpty()) {
this.children = null;
}
}
}
}
/**
* BACKWARDS, because our signature must allow for N keys...
*/
public void put(VALUE value, KEY key) {
// have to put value into our children
createLeaf(key, value, true);
}
/**
* BACKWARDS, because our signature must allow for N keys...
*/
public void put(VALUE value, KEY key1, KEY key2) {
// have to put value into our children
IdentityObjectTree<KEY, VALUE> leaf = createLeaf(key1, value, false);
if (leaf != null) {
leaf.createLeaf(key2, value, true);
}
}
/**
* BACKWARDS, because our signature must allow for N keys...
*/
public void put(VALUE value, KEY key1, KEY key2, KEY key3) {
// have to put value into our children
IdentityObjectTree<KEY, VALUE> leaf = createLeaf(key1, value, false);
if (leaf != null) {
leaf = leaf.createLeaf(key2, value, false);
}
if (leaf != null) {
leaf.createLeaf(key3, value, true);
}
}
/**
* BACKWARDS, because our signature must allow for N keys...
*/
@SuppressWarnings("unchecked")
public void put(VALUE value, KEY... keys) {
if (keys == null) {
return;
}
int length = keys.length;
int length_1 = length - 1;
boolean setFirstValue = length == 1;
// have to put value into our children
IdentityObjectTree<KEY, VALUE> leaf = createLeaf(keys[0], value, setFirstValue);
for (int i=1;i<length;i++) {
if (leaf != null) {
leaf = leaf.createLeaf(keys[i], value, i == length_1);
}
}
}
/**
* BACKWARDS, because our signature must allow for N keys...
*/
@SuppressWarnings("unchecked")
public IdentityObjectTree<KEY, VALUE> createLeaf(KEY... keys) {
if (keys == null) {
return this;
}
int length = keys.length;
// have to put value into our children
IdentityObjectTree<KEY, VALUE> leaf = createLeaf(keys[0], null, false);
for (int i=1;i<length;i++) {
if (leaf != null) {
leaf = leaf.createLeaf(keys[i], null, false);
}
}
return leaf;
}
public final IdentityObjectTree<KEY, VALUE> createLeaf(KEY key, VALUE value, boolean setValue) {
if (key == null) {
return null;
}
IdentityObjectTree<KEY, VALUE> objectTree;
if (this.children == null) {
this.children = createChildren();
// might as well add too
objectTree = new IdentityObjectTree<KEY, VALUE>();
if (setValue) {
objectTree.value = value;
}
this.children.put(key, objectTree);
} else {
objectTree = this.children.get(key);
// make sure we have a tree for the specified node
if (objectTree == null) {
objectTree = new IdentityObjectTree<KEY, VALUE>();
if (setValue) {
objectTree.value = value;
}
this.children.put(key, objectTree);
} else if (setValue) {
objectTree.value = value;
}
}
return objectTree;
}
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
/////////////////////////////////////////
public VALUE get(KEY key) {
IdentityObjectTree<KEY, VALUE> objectTree = null;
// get value from our children
objectTree = getLeaf(key);
if (objectTree == null) {
return null;
}
VALUE returnValue = objectTree.value;
return returnValue;
}
public VALUE getValue(KEY key1, KEY key2) {
IdentityObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf(key2);
}
if (tree == null) {
return null;
}
VALUE returnValue = tree.value;
return returnValue;
}
public VALUE getValue(KEY key1, KEY key2, KEY key3) {
IdentityObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf(key2);
}
if (tree != null) {
tree = tree.getLeaf(key3);
}
if (tree == null) {
return null;
}
VALUE returnValue = tree.value;
return returnValue;
}
@SuppressWarnings("unchecked")
public VALUE getValue(KEY... keys) {
IdentityObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(keys[0]);
int size = keys.length;
for (int i=1;i<size;i++) {
if (tree != null) {
tree = tree.getLeaf(keys[i]);
} else {
return null;
}
}
if (tree == null) {
return null;
}
VALUE returnValue = tree.value;
return returnValue;
}
public final IdentityObjectTree<KEY, VALUE> getLeaf(KEY key) {
if (key == null) {
return null;
}
IdentityObjectTree<KEY, VALUE> tree;
if (this.children == null) {
tree = null;
} else {
tree = this.children.get(key);
}
return tree;
}
public final IdentityObjectTree<KEY, VALUE> getLeaf(KEY key1, KEY key2) {
IdentityObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf(key2);
}
if (tree == null) {
return null;
}
return tree;
}
public final IdentityObjectTree<KEY, VALUE> getLeaf(KEY key1, KEY key2, KEY key3) {
IdentityObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf(key2);
}
if (tree != null) {
tree = tree.getLeaf(key3);
}
if (tree == null) {
return null;
}
return tree;
}
@SuppressWarnings("unchecked")
public final IdentityObjectTree<KEY, VALUE> getLeaf(KEY... keys) {
int size = keys.length;
if (size == 0) {
return null;
}
IdentityObjectTree<KEY, VALUE> tree = null;
// get value from our children
tree = getLeaf(keys[0]);
for (int i=1;i<size;i++) {
if (tree != null) {
tree = tree.getLeaf(keys[i]);
} else {
return null;
}
}
if (tree == null) {
return null;
}
return tree;
}
}

View File

@ -1,268 +0,0 @@
package net.engio.mbassy.multi.subscription;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.common.StrongConcurrentSet;
import net.engio.mbassy.multi.dispatch.IHandlerInvocation;
import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation;
import net.engio.mbassy.multi.dispatch.SynchronizedHandlerInvocation;
import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.listener.MessageHandler;
import com.esotericsoftware.reflectasm.MethodAccess;
/**
* A subscription is a thread-safe container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class
* will be referenced in the subscription created for SingleMessageHandler.class.
*
* There will be as many unique subscription objects per message listener class as there are message handlers
* defined in the message listeners class hierarchy.
*
* The subscription provides functionality for message publication by means of delegation to the respective
* message dispatcher.
*
* @author bennidi
* @author dorkbox, llc
* Date: 2/2/15
*/
public class Subscription {
private static AtomicInteger ID_COUNTER = new AtomicInteger();
private final int ID = ID_COUNTER.getAndIncrement();
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handlerMetadata;
private final IHandlerInvocation invocation;
private final StrongConcurrentSet<Object> listeners;
public Subscription(MessageHandler handler) {
this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSet<Object>();
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
if (handler.isSynchronized()) {
invocation = new SynchronizedHandlerInvocation(invocation);
}
this.invocation = invocation;
}
public Class<?>[] getHandledMessageTypes() {
return this.handlerMetadata.getHandledMessages();
}
public boolean acceptsSubtypes() {
return this.handlerMetadata.acceptsSubtypes();
}
public boolean acceptsVarArgs() {
return this.handlerMetadata.acceptsVarArgs();
}
public boolean isEmpty() {
return this.listeners.isEmpty();
}
public void subscribe(Object listener) {
this.listeners.add(listener);
}
/**
* @return TRUE if the element was removed
*/
public boolean unsubscribe(Object existingListener) {
return this.listeners.remove(existingListener);
}
/**
* Check whether this subscription manages a message handler of the given message listener class
*/
// only in unit test
public boolean belongsTo(Class<?> listener){
return this.handlerMetadata.isFromListener(listener);
}
// only used in unit-test
public int size() {
return this.listeners.size();
}
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
Collection<Object> listeners = this.listeners;
if (!listeners.isEmpty()) {
MethodAccess handler = this.handlerMetadata.getHandler();
int handleIndex = this.handlerMetadata.getMethodIndex();
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
try {
invocation.invoke(listener, handler, handleIndex, message);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The class or method is not accessible")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (IllegalArgumentException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0])
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (InvocationTargetException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"Message handler threw exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
"The handler code threw an exception")
.setCause(e)
.setMethodName(handler.getMethodNames()[handleIndex])
.setListener(listener)
.setPublishedObject(message));
}
}
}
}
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
// Collection<Object> listeners = this.listeners;
//
// if (listeners.size() > 0) {
// MethodHandle handler = this.handlerMetadata.getHandler();
//
// for (Object listener : listeners) {
// try {
// this.invocation.invoke(listener, handler, message1, message2);
// } catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " +
// message1.getClass() + ", " +
// message2.getClass()
// + ". Expected: " + handler.getParameterTypes()[0] + ", " +
// handler.getParameterTypes()[1]
// )
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (InvocationTargetException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2));
// }
// }
// }
}
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
// Collection<Object> listeners = this.listeners;
//
// if (listeners.size() > 0) {
// Method handler = this.handlerMetadata.getHandler();
//
// for (Object listener : listeners) {
// try {
// this.invocation.invoke(listener, handler, message1, message2, message3);
// } catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " +
// message1.getClass() + ", " +
// message2.getClass() + ", " +
// message3.getClass()
// + ". Expected: " + handler.getParameterTypes()[0] + ", " +
// handler.getParameterTypes()[1] + ", " +
// handler.getParameterTypes()[2]
// )
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (InvocationTargetException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Message handler threw exception")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getName())
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// }
// }
// }
}
@Override
public int hashCode() {
return this.ID;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Subscription other = (Subscription) obj;
return this.ID == other.ID;
}
}

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

View File

@ -1,15 +1,15 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.MessageBusTest;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageBusTest;
/**
*
* @author bennidi

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.ArrayList;
import java.util.Collection;
@ -12,12 +12,13 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import net.engio.mbassy.multi.common.AssertSupport;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import org.junit.Before;
import org.junit.Test;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
/**
* This test ensures the correct behaviour of the set implementation that is the building
* block of the subscription implementations used by the Mbassador message bus.

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
@ -8,13 +8,13 @@ import java.lang.annotation.Target;
import java.util.HashSet;
import java.util.Set;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.annotations.Synchronized;
import net.engio.mbassy.multi.common.MessageBusTest;
import org.junit.Test;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.common.MessageBusTest;
/**
* Tests a custom handler annotation with a @Handler meta annotation and a default filter.
*/

View File

@ -1,21 +1,21 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.DeadMessage;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest;
import net.engio.mbassy.multi.common.TestUtil;
import net.engio.mbassy.multi.listeners.IMessageListener;
import net.engio.mbassy.multi.listeners.MessagesListener;
import net.engio.mbassy.multi.listeners.ObjectListener;
import org.junit.Before;
import org.junit.Test;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.MessageBusTest;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.listeners.ObjectListener;
/**
* Verify correct behaviour in case of message publications that do not have any matching subscriptions
*

View File

@ -1,25 +1,25 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest;
import net.engio.mbassy.multi.common.MessageManager;
import net.engio.mbassy.multi.common.TestUtil;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.listeners.ExceptionThrowingListener;
import net.engio.mbassy.multi.listeners.IMessageListener;
import net.engio.mbassy.multi.listeners.Listeners;
import net.engio.mbassy.multi.listeners.MessagesListener;
import net.engio.mbassy.multi.messages.MessageTypes;
import net.engio.mbassy.multi.messages.MultipartMessage;
import net.engio.mbassy.multi.messages.StandardMessage;
import org.junit.Test;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.MessageBusTest;
import dorkbox.util.messagebus.common.MessageManager;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.listeners.ExceptionThrowingListener;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.Listeners;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.messages.MessageTypes;
import dorkbox.util.messagebus.messages.MultipartMessage;
import dorkbox.util.messagebus.messages.StandardMessage;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.io.BufferedReader;
import java.util.Arrays;
@ -7,14 +7,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.AssertSupport;
import net.engio.mbassy.multi.listener.MessageHandler;
import net.engio.mbassy.multi.listener.MessageListener;
import net.engio.mbassy.multi.listener.MetadataReader;
import org.junit.Test;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.listener.MessageHandler;
import dorkbox.util.messagebus.listener.MessageListener;
import dorkbox.util.messagebus.listener.MetadataReader;
/**
*
* @author bennidi

View File

@ -1,11 +1,11 @@
package net.engio.mbassy.multi;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.MessageBusTest;
package dorkbox.util.messagebus;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageBusTest;
/**
* Very simple test to verify dispatch to correct message handler
*

View File

@ -1,15 +1,15 @@
/*
* Copyright 2015 dorkbox, llc
*/
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.common.MessageBusTest;
import org.junit.Test;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageBusTest;
/**
* @author dorkbox, llc
* Date: 2/2/15
@ -55,13 +55,13 @@ public class MultiMessageTest extends MessageBusTest {
// count.getAndIncrement();
// System.err.println("match String");
// }
//
// @Handler
// public void handleSync(String o1, String o2) {
// count.getAndIncrement();
// System.err.println("match String, String");
// }
//
@Handler
public void handleSync(String o1, String o2) {
count.getAndIncrement();
System.err.println("match String, String");
}
// @Handler
// public void handleSync(String o1, String o2, String o3) {
// count.getAndIncrement();
@ -80,10 +80,16 @@ public class MultiMessageTest extends MessageBusTest {
System.err.println("match String[]");
}
// @Handler
// public void handleSync(Integer... o) {
// count.getAndIncrement();
// System.err.println("match Integer[]");
// }
@Handler
public void handleSync(Integer... o) {
count.getAndIncrement();
System.err.println("match Integer[]");
}
@Handler(acceptVarargs=true)
public void handleSync(Object... o) {
count.getAndIncrement();
System.err.println("match Object[]");
}
}
}

View File

@ -13,38 +13,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.engio.mbassy.multi;
import net.engio.mbassy.multi.common.AssertSupport;
import net.engio.mbassy.multi.common.IdentityObjectTree;
package dorkbox.util.messagebus;
import org.junit.Test;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.common.HashMapTree;
public class ObjectTreeTest extends AssertSupport {
public void test(IdentityObjectTree<Class<?>, String> tree, String string, Class<?> clazz) {
public void test(HashMapTree<Class<?>, String> tree, String string, Class<?> clazz) {
tree.put(string, clazz);
assertEquals(string, tree.get(clazz));
}
public void test(IdentityObjectTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2) {
public void test(HashMapTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2) {
tree.put(string, clazz1, clazz2);
assertEquals(string, tree.getValue(clazz1, clazz2));
assertEquals(string, tree.get(clazz1, clazz2));
}
public void test(IdentityObjectTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) {
public void test(HashMapTree<Class<?>, String> tree, String string, Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) {
tree.put(string, clazz1, clazz2, clazz3);
assertEquals(string, tree.getValue(clazz1, clazz2, clazz3));
}
public void test(IdentityObjectTree<Class<?>, String> tree, String string, Class<?>... clazzes) {
public void test(HashMapTree<Class<?>, String> tree, String string, Class<?>... clazzes) {
tree.put(string, clazzes);
assertEquals(string, tree.getValue(clazzes));
assertEquals(string, tree.get(clazzes));
}
@Test
public void testObjectTree() {
IdentityObjectTree<Class<?>, String> tree = new IdentityObjectTree<Class<?>, String>();
HashMapTree<Class<?>, String> tree = new HashMapTree<Class<?>, String>(8, 0.8F);
test(tree, "s", String.class);
test(tree, "x", String.class);
@ -62,7 +62,7 @@ public class ObjectTreeTest extends AssertSupport {
// now make sure we can REMOVE the tree elements
tree.remove(Object.class, Object.class, String.class, Integer.class, Float.class);
IdentityObjectTree<Class<?>, String> leaf = tree.getLeaf(Object.class, Object.class, String.class, Integer.class);
HashMapTree<Class<?>, String> leaf = tree.getLeaf(Object.class, Object.class, String.class, Integer.class);
assertNull(leaf);
leaf = tree.getLeaf(Object.class, Object.class);
assertNotNull(leaf);

View File

@ -1,13 +1,13 @@
/*
* Copyright 2015 dorkbox, llc
*/
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import junit.framework.Assert;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError;
/**
* @author dorkbox, llc Date: 2/2/15

View File

@ -1,29 +1,29 @@
package net.engio.mbassy.multi;
import net.engio.mbassy.multi.SubscriptionManager;
import net.engio.mbassy.multi.common.AssertSupport;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.SubscriptionValidator;
import net.engio.mbassy.multi.common.TestUtil;
import net.engio.mbassy.multi.listeners.AbstractMessageListener;
import net.engio.mbassy.multi.listeners.ICountableListener;
import net.engio.mbassy.multi.listeners.IMessageListener;
import net.engio.mbassy.multi.listeners.IMultipartMessageListener;
import net.engio.mbassy.multi.listeners.MessagesListener;
import net.engio.mbassy.multi.listeners.MultipartMessageListener;
import net.engio.mbassy.multi.listeners.Overloading;
import net.engio.mbassy.multi.listeners.StandardMessageListener;
import net.engio.mbassy.multi.messages.AbstractMessage;
import net.engio.mbassy.multi.messages.ICountable;
import net.engio.mbassy.multi.messages.IMessage;
import net.engio.mbassy.multi.messages.IMultipartMessage;
import net.engio.mbassy.multi.messages.MessageTypes;
import net.engio.mbassy.multi.messages.MultipartMessage;
import net.engio.mbassy.multi.messages.StandardMessage;
package dorkbox.util.messagebus;
import org.junit.Test;
import dorkbox.util.messagebus.SubscriptionManager;
import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.SubscriptionValidator;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.listeners.AbstractMessageListener;
import dorkbox.util.messagebus.listeners.ICountableListener;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.IMultipartMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.listeners.MultipartMessageListener;
import dorkbox.util.messagebus.listeners.Overloading;
import dorkbox.util.messagebus.listeners.StandardMessageListener;
import dorkbox.util.messagebus.messages.AbstractMessage;
import dorkbox.util.messagebus.messages.ICountable;
import dorkbox.util.messagebus.messages.IMessage;
import dorkbox.util.messagebus.messages.IMultipartMessage;
import dorkbox.util.messagebus.messages.MessageTypes;
import dorkbox.util.messagebus.messages.MultipartMessage;
import dorkbox.util.messagebus.messages.StandardMessage;
/**
*
* Test the subscriptions as generated and organized by the subscription manager. Tests use different sets of listeners

View File

@ -1,25 +1,25 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.ListenerFactory;
import net.engio.mbassy.multi.common.MessageBusTest;
import net.engio.mbassy.multi.common.TestUtil;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.listeners.ExceptionThrowingListener;
import net.engio.mbassy.multi.listeners.IMessageListener;
import net.engio.mbassy.multi.listeners.MessagesListener;
import net.engio.mbassy.multi.messages.MessageTypes;
import net.engio.mbassy.multi.messages.MultipartMessage;
import net.engio.mbassy.multi.messages.StandardMessage;
import org.junit.Assert;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.ListenerFactory;
import dorkbox.util.messagebus.common.MessageBusTest;
import dorkbox.util.messagebus.common.TestUtil;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.listeners.ExceptionThrowingListener;
import dorkbox.util.messagebus.listeners.IMessageListener;
import dorkbox.util.messagebus.listeners.MessagesListener;
import dorkbox.util.messagebus.messages.MessageTypes;
import dorkbox.util.messagebus.messages.MultipartMessage;
import dorkbox.util.messagebus.messages.StandardMessage;
/**
* Test synchronous and asynchronous dispatch in single and multi-threaded scenario.
*

View File

@ -1,16 +1,16 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.multi.IMessageBus;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.annotations.Synchronized;
import net.engio.mbassy.multi.common.MessageBusTest;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.common.MessageBusTest;
/**
* Todo: Add javadoc
*

View File

@ -1,14 +1,14 @@
package net.engio.mbassy.multi;
package dorkbox.util.messagebus;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.WeakConcurrentSet;
import org.junit.Test;
import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.common.WeakConcurrentSet;
/**
*
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.lang.ref.WeakReference;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.ArrayList;
import java.util.List;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import junit.framework.Assert;

View File

@ -1,13 +1,14 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import junit.framework.Assert;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.messages.MessageTypes;
import org.junit.Before;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.messages.MessageTypes;
/**
* A base test that provides a factory for message bus that makes tests fail if any
* publication error occurs

View File

@ -1,13 +1,13 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import net.engio.mbassy.multi.messages.IMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dorkbox.util.messagebus.messages.IMessage;
/**
* Created with IntelliJ IDEA.
* User: benjamin

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.ArrayDeque;
import java.util.Collection;
@ -7,8 +7,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import net.engio.mbassy.multi.SubscriptionManager;
import net.engio.mbassy.multi.subscription.Subscription;
import dorkbox.util.messagebus.SubscriptionManager;
import dorkbox.util.messagebus.subscription.Subscription;
/**
*

View File

@ -1,11 +1,11 @@
package net.engio.mbassy.multi.common;
package dorkbox.util.messagebus.common;
import java.util.Iterator;
import java.util.List;
import net.engio.mbassy.multi.MultiMBassador;
import net.engio.mbassy.multi.PubSubSupport;
import net.engio.mbassy.multi.SubscriptionManager;
import dorkbox.util.messagebus.MultiMBassador;
import dorkbox.util.messagebus.PubSubSupport;
import dorkbox.util.messagebus.SubscriptionManager;
/**
* Todo: Add javadoc

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.messages.AbstractMessage;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.messages.AbstractMessage;
/**
*

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.annotations.Listener;
import net.engio.mbassy.multi.messages.StandardMessage;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Listener;
import dorkbox.util.messagebus.messages.StandardMessage;
/**
* @author bennidi

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.messages.ICountable;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.messages.ICountable;
/**
*

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.messages.IMessage;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.messages.IMessage;
/**
*

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.messages.IMultipartMessage;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.messages.IMultipartMessage;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import java.util.Arrays;
import java.util.Collection;

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.messages.MessageTypes;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.messages.MessageTypes;
/**
*

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.messages.MultipartMessage;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.messages.MultipartMessage;
/**
*

View File

@ -1,10 +1,10 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy.multi.annotations.Handler;
import dorkbox.util.messagebus.annotations.Handler;
public class ObjectListener {

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.annotations.Listener;
import net.engio.mbassy.multi.messages.AbstractMessage;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Listener;
import dorkbox.util.messagebus.messages.AbstractMessage;
/**
* Some handlers and message types to test correct functioning of overloaded

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.multi.listeners;
package dorkbox.util.messagebus.listeners;
import net.engio.mbassy.multi.annotations.Handler;
import net.engio.mbassy.multi.messages.StandardMessage;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.messages.StandardMessage;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
import java.util.HashMap;
import java.util.Map;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
/**
* Interface analogous to IMessage. Exists to test more complex class/interface hierarchies

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
import java.util.HashMap;
import java.util.Map;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
/**
* @author bennidi

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.multi.messages;
package dorkbox.util.messagebus.messages;
import java.util.concurrent.atomic.AtomicInteger;