Added SubsciptionManager, fixed #30 and #31

This commit is contained in:
benni 2013-05-12 14:32:16 +02:00
parent c7922f5e7e
commit 17b1fee1b6
24 changed files with 458 additions and 190 deletions

View File

@ -2,15 +2,13 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionContext;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.*;
import java.util.concurrent.*;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* The base class for all message bus implementations.
@ -21,37 +19,17 @@ import java.util.concurrent.*;
public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncPostCommand> implements ISyncMessageBus<T, P> {
// the metadata reader that is used to parse objects passed to the subscribe method
private final MetadataReader metadataReader;
// all subscriptions per message type
// this is the primary list for dispatching a specific message
// write access is synchronized and happens very infrequently
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage
= new HashMap<Class, Collection<Subscription>>(50);
// all subscriptions per messageHandler type
// this list provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
private final Map<Class, Collection<Subscription>> subscriptionsPerListener
= new HashMap<Class, Collection<Subscription>>(50);
// remember already processed classes that do not contain any listeners
private final Collection<Class> nonListeners = new HashSet<Class>();
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
private final SubscriptionFactory subscriptionFactory;
private final MessagePublication.Factory publicationFactory;
private final SubscriptionManager subscriptionManager;
public AbstractSyncMessageBus(SyncBusConfiguration configuration) {
subscriptionFactory = configuration.getSubscriptionFactory();
this.metadataReader = configuration.getMetadataReader();
this.subscriptionManager = new SubscriptionManager(configuration.getMetadataReader(),
configuration.getSubscriptionFactory().setBus(this));
this.publicationFactory = configuration.getMessagePublicationFactory();
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
}
@ -66,63 +44,12 @@ public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncP
}
public boolean unsubscribe(Object listener) {
if (listener == null) {
return false;
}
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
if (subscriptions == null) {
return false;
}
boolean isRemoved = true;
for (Subscription subscription : subscriptions) {
isRemoved = isRemoved && subscription.unsubscribe(listener);
}
return isRemoved;
return subscriptionManager.unsubscribe(listener);
}
public void subscribe(Object listener) {
try {
Class listeningClass = listener.getClass();
if (nonListeners.contains(listeningClass)) {
return; // early reject of known classes that do not participate in eventing
}
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // if the type is registered for the first time
synchronized (this) { // new subscriptions must be processed sequentially
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<MessageHandlerMetadata> messageHandlers = metadataReader.getMessageHandlers(listeningClass);
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
nonListeners.add(listeningClass);
return;
}
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
// create subscriptions for all detected listeners
for (MessageHandlerMetadata messageHandler : messageHandlers) {
// create the subscription
Subscription subscription = subscriptionFactory
.createSubscription(new SubscriptionContext(this, messageHandler));
subscription.subscribe(listener);
subscriptionsByListener.add(subscription);// add it for the listener type (for future subscriptions)
List<Class<?>> messageTypes = messageHandler.getHandledMessages();
for (Class<?> messageType : messageTypes) {
addMessageTypeSubscription(messageType, subscription);
}
//updateMessageTypeHierarchy(eventType);
}
subscriptionsPerListener.put(listeningClass, subscriptionsByListener);
}
}
}
// register the listener to the existing subscriptions
for (Subscription sub : subscriptionsByListener) {
sub.subscribe(listener);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
subscriptionManager.subscribe(listener);
}
@ -131,39 +58,10 @@ public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncP
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
protected Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);
if (subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
// TODO: get superclasses is eligible for caching
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
}
}
}
}
return subscriptions;
}
// associate a suscription with a message type
// NOTE: Not thread-safe! must be synchronized in outer scope
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
if (subscriptions == null) {
subscriptions = new LinkedList<Subscription>();
subscriptionsPerMessage.put(messageType, subscriptions);
}
subscriptions.add(subscription);
return subscriptionManager.getSubscriptionsByMessageType(messageType);
}

View File

@ -55,7 +55,7 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> extends ISyn
Executor getExecutor();
/**
* Check whether any asynchronous message publications are pending for being processed
* Check whether any asynchronous message publications are pending to be processed
*
* @return
*/

View File

@ -2,6 +2,8 @@ package net.engio.mbassy.common;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
@ -15,7 +17,7 @@ import java.util.Map;
public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
// Internal state
private final Object lock = new Object();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
protected Entry<T> head; // reference to the first element
@ -27,27 +29,38 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
@Override
public IConcurrentSet<T> add(T element) {
if (element == null) return this;
Lock writeLock = lock.writeLock();
writeLock.lock();
if (element == null || entries.containsKey(element)) {
writeLock.unlock();
return this;
}
synchronized (lock) {
} else {
insert(element);
writeLock.unlock();
}
return this;
}
@Override
public boolean contains(T element) {
ISetEntry<T> entry = entries.get(element);
Lock readLock = lock.readLock();
ISetEntry<T> entry;
try {
readLock.lock();
entry = entries.get(element);
} finally {
readLock.unlock();
}
return entry != null && entry.getValue() != null;
}
private void insert(T element) {
if (entries.containsKey(element)) {
return;
if (!entries.containsKey(element)) {
head = createEntry(element, head);
entries.put(element, head);
}
head = createEntry(element, head);
entries.put(element, head);
}
@Override
@ -57,38 +70,45 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
@Override
public IConcurrentSet<T> addAll(Iterable<T> elements) {
synchronized (lock) {
Lock writeLock = lock.writeLock();
try {
writeLock.lock();
for (T element : elements) {
if (element == null || entries.containsKey(element)) {
return this;
if (element != null) {
insert(element);
}
insert(element);
}
} finally {
writeLock.unlock();
}
return this;
}
@Override
public boolean remove(T element) {
if (!entries.containsKey(element)) {
if (!contains(element)) {
return false;
}
synchronized (lock) {
ISetEntry<T> listelement = entries.get(element);
if (listelement == null) {
return false; //removed by other thread
} else {
Lock writeLock = lock.writeLock();
try {
writeLock.lock();
ISetEntry<T> listelement = entries.get(element);
if (listelement == null) {
return false; //removed by other thread
}
if (listelement != head) {
listelement.remove();
} else {
ISetEntry<T> oldHead = head;
head = head.next();
oldHead.clear(); // optimize for GC
}
entries.remove(element);
} finally {
writeLock.unlock();
}
if (listelement != head) {
listelement.remove();
} else {
ISetEntry<T> oldHead = head;
head = head.next();
oldHead.clear(); // optimize for GC
}
entries.remove(element);
return true;
}
return true;
}

View File

@ -62,13 +62,21 @@ public class ReflectionUtils {
public static Collection<Class> getSuperclasses(Class from) {
Collection<Class> superclasses = new LinkedList<Class>();
while (!from.equals(Object.class)) {
while (!from.equals(Object.class) && !from.isInterface()) {
superclasses.add(from.getSuperclass());
from = from.getSuperclass();
}
collectInterfaces(from, superclasses);
return superclasses;
}
public static void collectInterfaces(Class from, Collection<Class> accumulator){
for(Class intface : from.getInterfaces()){
accumulator.add(intface);
collectInterfaces(intface, accumulator);
}
}
public static boolean containsOverridingMethod(final List<Method> allMethods, final Method methodToCheck) {
for (Method method : allMethods) {
if (isOverriddenBy(methodToCheck, method)) {

View File

@ -58,7 +58,8 @@ public class MetadataReader {
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public List<MessageHandlerMetadata> getMessageHandlers(Class<?> target) {
public List<MessageHandlerMetadata> getMessageHandlers(Object listener) {
Class<?> target = listener.getClass();
Listener listenerConfig = target.getAnnotation(Listener.class);
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
@ -89,8 +90,8 @@ public class MetadataReader {
}
public <T> MessageListenerMetadata<T> getMessageListener(Class<T> target) {
return new MessageListenerMetadata(getMessageHandlers(target), target);
public <T> MessageListenerMetadata<T> getMessageListener(Object listener) {
return new MessageListenerMetadata(getMessageHandlers(listener), listener.getClass());
}

View File

@ -5,6 +5,7 @@ import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
/**
@ -26,11 +27,18 @@ public class Subscription {
this.listeners = listeners;
}
public boolean contains(Object listener){
return listeners.contains(listener);
}
public boolean handlesMessageType(Class<?> messageType) {
return context.getHandlerMetadata().handlesMessage(messageType);
}
public List<Class<?>> getHandledMessageTypes(){
return context.getHandlerMetadata().getHandledMessages();
}
public void publish(MessagePublication publication, Object message) {
dispatcher.dispatch(publication, message, listeners);

View File

@ -7,7 +7,7 @@ import net.engio.mbassy.listener.MessageHandlerMetadata;
* The subscription context holds all (meta)data/objects that are relevant to successfully publish
* a message within a subscription. A one-to-one relation between a subscription and
* subscription context holds -> a subscription context is created for each distinct subscription
* that lives inside a message bus.
* managed by the subscription manager.
*
* @author bennidi
* Date: 11/23/12

View File

@ -1,25 +1,31 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.MessageBusException;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.common.StrongConcurrentSet;
import net.engio.mbassy.common.WeakConcurrentSet;
import net.engio.mbassy.dispatch.*;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
/**
* Created with IntelliJ IDEA.
*
* @author bennidi
* Date: 11/16/12
* Time: 10:39 AM
* To change this template use File | Settings | File Templates.
* The subscription factory is used to create an empty subscription for specific message handler.
* The message handler's configuration is evaluated and a corresponding subscription is built.
*/
public class SubscriptionFactory {
public Subscription createSubscription(SubscriptionContext context) throws MessageBusException{
private ISyncMessageBus bus;
public SubscriptionFactory setBus(ISyncMessageBus bus) {
this.bus = bus;
return this;
}
public Subscription createSubscription(MessageHandlerMetadata handlerMetadata) throws MessageBusException{
try {
SubscriptionContext context = new SubscriptionContext(bus, handlerMetadata);
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
return new Subscription(context, dispatcher, context.getHandlerMetadata().useStrongReferences()

View File

@ -0,0 +1,181 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.common.StrongConcurrentSet;
import net.engio.mbassy.listener.MessageHandlerMetadata;
import net.engio.mbassy.listener.MetadataReader;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 5/11/13
*/
public class SubscriptionManager {
// the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader;
// all subscriptions per message type
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage
= new HashMap<Class, Collection<Subscription>>(50);
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final Map<Class, Collection<Subscription>> subscriptionsPerListener
= new HashMap<Class, Collection<Subscription>>(50);
// remember already processed classes that do not contain any message handlers
private final StrongConcurrentSet<Class> nonListeners = new StrongConcurrentSet<Class>();
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
private final SubscriptionFactory subscriptionFactory;
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public SubscriptionManager(MetadataReader metadataReader, SubscriptionFactory subscriptionFactory) {
this.metadataReader = metadataReader;
this.subscriptionFactory = subscriptionFactory;
}
public boolean unsubscribe(Object listener) {
if (listener == null) {
return false;
}
Collection<Subscription> subscriptions = getSubscriptionsByListener(listener);
if (subscriptions == null) {
return false;
}
boolean isRemoved = true;
for (Subscription subscription : subscriptions) {
isRemoved &= subscription.unsubscribe(listener);
}
return isRemoved;
}
private Collection<Subscription> getSubscriptionsByListener(Object listener) {
Collection<Subscription> subscriptions;
try {
readWriteLock.readLock().lock();
subscriptions = subscriptionsPerListener.get(listener.getClass());
} finally {
readWriteLock.readLock().unlock();
}
return subscriptions;
}
public void subscribe(Object listener) {
try {
if (isKnownNonListener(listener)) {
return; // early reject of known classes that do not define message handlers
}
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
// a listener is either subscribed for the first time
if (subscriptionsByListener == null) {
List<MessageHandlerMetadata> messageHandlers = metadataReader.getMessageHandlers(listener);
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
nonListeners.add(listener.getClass());
return;
}
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
// create subscriptions for all detected message handlers
for (MessageHandlerMetadata messageHandler : messageHandlers) {
// create the subscription
subscriptionsByListener.add(subscriptionFactory.createSubscription(messageHandler));
}
// this will acquire a write lock and handle the case when another thread already subscribed
// this particular listener in the mean-time
subscribe(listener, subscriptionsByListener);
} // or the subscriptions already exist and must only be updated
else {
for (Subscription sub : subscriptionsByListener) {
sub.subscribe(listener);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void subscribe(Object listener, Collection<Subscription> subscriptions) {
try {
readWriteLock.writeLock().lock();
// basically this is a deferred double check
// it's an ugly pattern but necessary because atomic upgrade from read to write lock
// is not possible and using a write lock from the beginning with will dramatically decrease performance
Collection<Subscription> subscriptionsByListener = getSubscriptionsByListener(listener);
if (subscriptionsByListener == null) {
for (Subscription subscription : subscriptions) {
subscription.subscribe(listener);
for (Class<?> messageType : subscription.getHandledMessageTypes()) {
addMessageTypeSubscription(messageType, subscription);
}
}
subscriptionsPerListener.put(listener.getClass(), subscriptions);
}
// the rare case when multiple threads concurrently subscribed the same class for the first time
// one will be first, all others will have to subscribe to the existing instead the generated subscriptions
else {
for (Subscription existingSubscription : subscriptionsByListener) {
existingSubscription.subscribe(listener);
}
}
} finally {
readWriteLock.writeLock().unlock();
}
}
private boolean isKnownNonListener(Object listener) {
Class listeningClass = listener.getClass();
return nonListeners.contains(listeningClass);
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
public Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);
readWriteLock.readLock().lock();
if (subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
Collection<Subscription> subs = subscriptionsPerMessage.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
}
}
}
}
readWriteLock.readLock().unlock();
return subscriptions;
}
// associate a suscription with a message type
// NOTE: Not thread-safe! must be synchronized in outer scope
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
if (subscriptions == null) {
subscriptions = new LinkedList<Subscription>();
subscriptionsPerMessage.put(messageType, subscriptions);
}
subscriptions.add(subscription);
}
}

View File

@ -6,10 +6,8 @@ import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.common.UnitTest;
import org.junit.Test;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Random;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* This test ensures the correct behaviour of the set implementation that is the building
@ -218,13 +216,13 @@ public abstract class ConcurrentSetTest extends UnitTest {
@Test
public void testRemovalViaIterator() {
final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
final IConcurrentSet setUnderTest = createSet();
// build set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
testSetWeak.add(candidate);
setUnderTest.add(candidate);
}
// build test set by adding the candidates
@ -232,7 +230,7 @@ public abstract class ConcurrentSetTest extends UnitTest {
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
Iterator<Object> iterator = testSetWeak.iterator();
Iterator<Object> iterator = setUnderTest.iterator();
while(iterator.hasNext()){
iterator.remove();
}
@ -242,11 +240,89 @@ public abstract class ConcurrentSetTest extends UnitTest {
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(0, testSetWeak.size());
assertEquals(0, setUnderTest.size());
for(Object src : source){
assertFalse(testSetWeak.contains(src));
assertFalse(setUnderTest.contains(src));
}
}
/**
* In this test HashMap will cross capacity threshold multiple times in
* different directions which will trigger rehashing. Because rehashing
* requires modification of Entry class for all hash map entries some keys
* may temporarily disappear from the map.
* <p>
* For more information please take a look at transfer method in HashMap.
*
* Thanks to Ivan Koblik (http://koblik.blogspot.com) for contributing initial code and idea
*/
@Test
public void testConcurrentAddRemove() {
final IConcurrentSet set = createSet();
final List permanentObjects = createWithRandomIntegers(80, null);
final List volatileObjects = createWithRandomIntegers(10000, permanentObjects);
final CopyOnWriteArraySet missing = new CopyOnWriteArraySet();
final int mutatorThreshold = 1000;
// Add elements that will not be touched by the constantly running mutating thread
final int numItems = 8;
for (Object permanent : permanentObjects) {
set.add(permanent);
}
// Adds and removes items >= numItems
// thus forcing constant rehashing of the backing hashtable
Runnable updatingThread = new Runnable() {
public void run() {
Random rand = new Random();
for(int times = 0; times < 1000 ; times++){
System.out.println("New mutator cycle: " + times);
HashSet elements = new HashSet(mutatorThreshold);
for (int i = numItems; i < mutatorThreshold; i++) {
Object volatileObject = volatileObjects.get(Math.abs(rand.nextInt()) % volatileObjects.size());
set.add(volatileObject);
elements.add(volatileObject);
}
for (Object volObj : elements) {
set.remove(volObj);
}
}
};
};
Runnable lookupThread = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println("New lookup cycle: " + i);
for (Object permanent : permanentObjects) {
// permanent items are never touched,
// --> set.contains(j) should always return true
if(!set.contains(permanent))
missing.add(permanent);
}
}
}
};
ConcurrentExecutor.runConcurrent(updatingThread, lookupThread, lookupThread, lookupThread);
assertTrue("There where items temporarily unavailable: " + missing.size(), missing.size() == 0);
}
public List createWithRandomIntegers(int size, List<Integer> exluding){
if(exluding == null) exluding = new ArrayList<Integer>();
List<Integer> result = new ArrayList<Integer>(size);
Random rand = new Random();
for(int i = 0; i < size;i++){
result.add(rand.nextInt());
}
for(Integer excluded : exluding)
result.remove(excluded);
return result;
}
}

View File

@ -6,8 +6,8 @@ import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.common.FilteredMessage;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.listeners.ListenerFactory;
import org.junit.Test;

View File

@ -5,9 +5,9 @@ import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.events.TestMessage2;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.messages.TestMessage2;
import net.engio.mbassy.listeners.*;
import org.junit.Test;

View File

@ -27,7 +27,7 @@ public class MetadataReaderTest extends UnitTest {
@Test
public void testListenerWithoutInheritance() {
MessageListenerMetadata<EventListener1> listener = reader.getMessageListener(EventListener1.class);
MessageListenerMetadata<MessageListener1> listener = reader.getMessageListener(new MessageListener1());
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
@ -38,7 +38,7 @@ public class MetadataReaderTest extends UnitTest {
@Test
public void testListenerWithInheritance() {
MessageListenerMetadata<EventListener2> listener = reader.getMessageListener(EventListener2.class);
MessageListenerMetadata<MessageListener2> listener = reader.getMessageListener(new MessageListener2());
ListenerValidator validator = new ListenerValidator()
.expectHandlers(2, String.class)
.expectHandlers(2, Object.class)
@ -48,7 +48,7 @@ public class MetadataReaderTest extends UnitTest {
@Test
public void testListenerWithInheritanceOverriding() {
MessageListenerMetadata<EventListener3> listener = reader.getMessageListener(EventListener3.class);
MessageListenerMetadata<MessageListener3> listener = reader.getMessageListener(new MessageListener3());
ListenerValidator validator = new ListenerValidator()
.expectHandlers(0, String.class)
@ -59,7 +59,7 @@ public class MetadataReaderTest extends UnitTest {
@Test
public void testEnveloped() {
MessageListenerMetadata<EnvelopedListener> listener = reader.getMessageListener(EnvelopedListener.class);
MessageListenerMetadata<EnvelopedListener> listener = reader.getMessageListener(new EnvelopedListener());
ListenerValidator validator = new ListenerValidator()
.expectHandlers(1, String.class)
.expectHandlers(2, Integer.class)
@ -72,7 +72,7 @@ public class MetadataReaderTest extends UnitTest {
@Test
public void testEnvelopedSubclass() {
MessageListenerMetadata<EnvelopedListenerSubclass> listener = reader.getMessageListener(EnvelopedListenerSubclass.class);
MessageListenerMetadata<EnvelopedListenerSubclass> listener = reader.getMessageListener(new EnvelopedListenerSubclass());
ListenerValidator validator = new ListenerValidator()
.expectHandlers(1, String.class)
.expectHandlers(2, Integer.class)
@ -110,7 +110,7 @@ public class MetadataReaderTest extends UnitTest {
// a simple event listener
public class EventListener1 {
public class MessageListener1 {
@Handler(rejectSubtypes = true)
public void handleObject(Object o) {
@ -131,7 +131,7 @@ public class MetadataReaderTest extends UnitTest {
}
// the same handlers as its super class
public class EventListener2 extends EventListener1 {
public class MessageListener2 extends MessageListener1 {
// redefine handler implementation (not configuration)
public void handleString(String s) {
@ -140,7 +140,7 @@ public class MetadataReaderTest extends UnitTest {
}
public class EventListener3 extends EventListener2 {
public class MessageListener3 extends MessageListener2 {
// narrow the handler
@Handler(rejectSubtypes = true)

View File

@ -0,0 +1,60 @@
package net.engio.mbassy;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.messages.ITestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionFactory;
import net.engio.mbassy.subscription.SubscriptionManager;
import org.junit.Test;
import java.util.Collection;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 5/12/13
*/
public class SubscriptionManagerTest extends UnitTest{
@Test
public void testSimpleSynchronousHandler(){
SubscriptionManager subMan = new SubscriptionManager(new MetadataReader(), new SubscriptionFactory());
SimpleSynchronousMessageHandler
listener1 = new SimpleSynchronousMessageHandler(),
listener2 = new SimpleSynchronousMessageHandler();
subMan.subscribe(listener1);
subMan.subscribe(listener2);
Collection<Subscription> subscriptions = subMan.getSubscriptionsByMessageType(TestMessage.class);
assertEquals(1, subscriptions.size());
for(Subscription sub : subscriptions){
assertEquals(2, sub.size());
assertTrue(sub.contains(listener1));
assertTrue(sub.contains(listener2));
}
subscriptions = subMan.getSubscriptionsByMessageType(ITestMessage.class);
assertEquals(1, subscriptions.size());
for(Subscription sub : subscriptions){
assertEquals(2, sub.size());
assertTrue(sub.contains(listener1));
assertTrue(sub.contains(listener2));
}
}
static class SimpleSynchronousMessageHandler{
@Handler
public void handle(TestMessage message) {
}
@Handler
public void handle(ITestMessage message) {
}
}
}

View File

@ -2,8 +2,8 @@ package net.engio.mbassy.bus;
import net.engio.mbassy.common.MessageBusTest;
import net.engio.mbassy.common.TestUtil;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.listeners.EventingTestBean;
import net.engio.mbassy.listeners.EventingTestBean2;
import net.engio.mbassy.listeners.EventingTestBean3;

View File

@ -1,8 +1,8 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.subscription.SubscriptionContext;

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.listener.Listener;

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.listener.Listener;

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.events.TestMessage2;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.messages.TestMessage2;
import net.engio.mbassy.listener.*;
import net.engio.mbassy.listener.Invoke;
import net.engio.mbassy.subscription.MessageEnvelope;

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.listeners;
import net.engio.mbassy.events.SubTestMessage;
import net.engio.mbassy.events.TestMessage;
import net.engio.mbassy.messages.SubTestMessage;
import net.engio.mbassy.messages.TestMessage;
import net.engio.mbassy.listener.Handler;
/**

View File

@ -0,0 +1,10 @@
package net.engio.mbassy.messages;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 5/12/13
*/
public interface ITestMessage {
}

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.events;
package net.engio.mbassy.messages;
/**
*

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.events;
package net.engio.mbassy.messages;
import java.util.concurrent.atomic.AtomicInteger;

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.events;
package net.engio.mbassy.messages;
import java.util.concurrent.atomic.AtomicInteger;