introduced configuration object, small performance tweaks, more tests

This commit is contained in:
benni 2012-12-08 09:10:30 +01:00
parent 31e6ca18bb
commit 21385b605e
10 changed files with 199 additions and 84 deletions

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.2.RC</version>
<version>1.0.3.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern

View File

@ -2,6 +2,7 @@ package org.mbassy;
import org.mbassy.common.IPredicate;
import org.mbassy.common.ReflectionUtils;
import org.mbassy.dispatch.MessagingContext;
import org.mbassy.listener.Listener;
import org.mbassy.listener.MetadataReader;
import org.mbassy.subscription.Subscription;
@ -21,10 +22,10 @@ import java.util.concurrent.*;
public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand> implements IMessageBus<T, P> {
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
private ExecutorService executor;
private final ExecutorService executor;
// the metadata reader that is used to parse objects passed to the subscribe method
private MetadataReader metadataReader = new MetadataReader();
private final MetadataReader metadataReader;
// all subscriptions per message type
// this is the primary list for dispatching a specific message
@ -40,13 +41,13 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
private final Collection<Class> nonListeners = new HashSet();
// this handler will receive all errors that occur during message dispatch or message handling
private CopyOnWriteArrayList<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
private final List<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
// all threads that are available for asynchronous message dispatching
private final CopyOnWriteArrayList<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
private final List<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
// all pending messages scheduled for asynchronous dispatch are queued here
private final LinkedBlockingQueue<SubscriptionDeliveryRequest<T>> pendingMessages = new LinkedBlockingQueue<SubscriptionDeliveryRequest<T>>();
private final LinkedBlockingQueue<SubscriptionDeliveryRequest<T>> pendingMessages;
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
@ -54,24 +55,15 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
public AbstractMessageBus() {
this(2);
}
public AbstractMessageBus(int dispatcherThreadCount) {
this(dispatcherThreadCount, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));
}
public AbstractMessageBus(int dispatcherThreadCount, ExecutorService executor) {
this.executor = executor;
initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2);
public AbstractMessageBus(BusConfiguration configuration) {
this.executor = configuration.getExecutor();
subscriptionFactory = configuration.getSubscriptionFactory();
this.metadataReader = configuration.getMetadataReader();
pendingMessages = new LinkedBlockingQueue<SubscriptionDeliveryRequest<T>>(configuration.getMaximumNumberOfPendingMessages());
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
subscriptionFactory = getSubscriptionFactory();
}
// use this method to introduce a custom subscription factory for extension
protected abstract SubscriptionFactory getSubscriptionFactory();
// initialize the dispatch workers
private void initDispatcherThreads(int numberOfThreads) {
@ -119,20 +111,21 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
return; // early reject of known classes that do not participate in eventing
Collection<Subscription> subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // if the type is registered for the first time
synchronized (this) { // new subscriptions must be processed sequentially for each class
synchronized (this) { // new subscriptions must be processed sequentially
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<Method> messageHandlers = metadataReader.getListeners(listeningClass); // get all methods with subscriptions
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
if (messageHandlers.isEmpty()) { // remember the class as non listening class
nonListeners.add(listeningClass);
return;
}
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
// create subscriptions for all detected listeners
for (Method messageHandler : messageHandlers) {
if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners
Class eventType = getMessageType(messageHandler);
Subscription subscription = subscriptionFactory.createSubscription(metadataReader.getHandlerMetadata(messageHandler));
Subscription subscription = subscriptionFactory
.createSubscription(new MessagingContext(this, metadataReader.getHandlerMetadata(messageHandler)));
subscription.subscribe(listener);
addMessageTypeSubscription(eventType, subscription);
subscriptionsByListener.add(subscription);

View File

@ -0,0 +1,85 @@
package org.mbassy;
import org.mbassy.listener.MetadataReader;
import org.mbassy.subscription.SubscriptionFactory;
import java.util.concurrent.*;
/**
*
*
*
* @author bennidi
* Date: 12/8/12
*/
public class BusConfiguration {
public static final BusConfiguration Default(){
return new BusConfiguration();
}
private int numberOfMessageDispatchers;
private ExecutorService executor;
private int maximumNumberOfPendingMessages;
private SubscriptionFactory subscriptionFactory;
private MetadataReader metadataReader;
public BusConfiguration() {
this.numberOfMessageDispatchers = 2;
this.maximumNumberOfPendingMessages = Integer.MAX_VALUE;
this.subscriptionFactory = new SubscriptionFactory();
this.executor = new ThreadPoolExecutor(5, 20, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
this.metadataReader = new MetadataReader();
}
public MetadataReader getMetadataReader() {
return metadataReader;
}
public BusConfiguration setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
return this;
}
public int getNumberOfMessageDispatchers() {
return numberOfMessageDispatchers > 0 ? numberOfMessageDispatchers : 2;
}
public BusConfiguration setNumberOfMessageDispatchers(int numberOfMessageDispatchers) {
this.numberOfMessageDispatchers = numberOfMessageDispatchers;
return this;
}
public ExecutorService getExecutor() {
return executor;
}
public BusConfiguration setExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}
public int getMaximumNumberOfPendingMessages() {
return maximumNumberOfPendingMessages;
}
public BusConfiguration setMaximumNumberOfPendingMessages(int maximumNumberOfPendingMessages) {
this.maximumNumberOfPendingMessages = maximumNumberOfPendingMessages > 0
? maximumNumberOfPendingMessages
: Integer.MAX_VALUE;
return this;
}
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public BusConfiguration setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return this;
}
}

View File

@ -8,22 +8,10 @@ import java.util.concurrent.*;
public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>{
public MBassador(){
this(2);
public MBassador(BusConfiguration configuration){
super(configuration);
}
public MBassador(int dispatcherThreadCount){
super(dispatcherThreadCount);
}
public MBassador(int dispatcherThreadCount, ExecutorService executor){
super(dispatcherThreadCount,executor);
}
@Override
protected SubscriptionFactory getSubscriptionFactory() {
return new SubscriptionFactory(this);
}
public void publishAsync(T message){
addAsynchronousDeliveryRequest(new SubscriptionDeliveryRequest<T>(getSubscriptionsByMessageType(message.getClass()), message));

View File

@ -3,6 +3,7 @@ package org.mbassy.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.Set;
import java.util.WeakHashMap;
/**
@ -12,8 +13,7 @@ import java.util.WeakHashMap;
* been reached by the iterator will not appear in that iterator anymore.
* <p/>
* The structure uses weak references to the elements. Iterators automatically perform cleanups of
* garbace collect objects during iteration.
* No dedicated maintenance operations need to be called or run in background.
* garbage collected objects during iteration -> no dedicated maintenance operations need to be called or run in background.
* <p/>
* <p/>
* <p/>
@ -21,8 +21,7 @@ import java.util.WeakHashMap;
* @author bennidi
* Date: 2/12/12
*/
public class ConcurrentSet<T> implements Iterable<T> {
public class ConcurrentSet<T> implements Iterable<T>{
private WeakHashMap<T, Entry<T>> entries = new WeakHashMap<T, Entry<T>>(); // maintain a map of entries for O(log n) lookup
@ -70,11 +69,13 @@ public class ConcurrentSet<T> implements Iterable<T> {
if (!entries.containsKey(element)) return false;
synchronized (this) {
Entry<T> listelement = entries.get(element);
if(listelement == null)return false;
if(listelement == null)return false; //removed by other thread
if (listelement != head) {
listelement.remove();
} else {
Entry<T> oldHead = head;
head = head.next();
oldHead.next = null; // optimize for GC
}
entries.remove(element);
}
@ -111,10 +112,9 @@ public class ConcurrentSet<T> implements Iterable<T> {
public void remove() {
if (current == null) return;
synchronized (ConcurrentSet.this) {
current.remove();
current = current.next();
}
Entry<T> newCurrent = current.next();
ConcurrentSet.this.remove(current.getValue());
current = newCurrent;
}
};
}
@ -143,26 +143,22 @@ public class ConcurrentSet<T> implements Iterable<T> {
return value.get();
}
// not thread-safe! must be synchronized in enclosing context
public void remove() {
if (predecessor != null) {
predecessor.setNext(next());
} else if (next() != null) {
predecessor.next = next;
if(next != null)next.predecessor = predecessor;
} else if (next != null) {
next.predecessor = null;
}
}
public void setNext(Entry<T> element) {
this.next = element;
if (element != null) element.predecessor = this;
next = null;
predecessor = null;
}
public Entry<T> next() {
return next;
}
public boolean hasNext() {
return next() != null;
}
}
}

View File

@ -18,15 +18,6 @@ public class SubscriptionDeliveryRequest<T> {
this.message = message;
}
public SubscriptionDeliveryRequest(T message){
this.message = message;
subscriptions = new LinkedList<Subscription>();
}
public boolean addAll(Collection<? extends Subscription> c) {
return subscriptions.addAll(c);
}
public boolean add(Subscription subscription) {
return subscriptions.add(subscription);
}

View File

@ -16,14 +16,7 @@ import java.util.Collection;
*/
public class SubscriptionFactory {
private IMessageBus owner;
public SubscriptionFactory(IMessageBus owner) {
this.owner = owner;
}
public Subscription createSubscription(MessageHandlerMetadata messageHandlerMetadata){
MessagingContext context = new MessagingContext(owner, messageHandlerMetadata);
public Subscription createSubscription(MessagingContext context){
IHandlerInvocation invocation = buildInvocationForHandler(context);
IMessageDispatcher dispatcher = buildDispatcher(context, invocation);
return new Subscription(context, dispatcher);

View File

@ -5,13 +5,14 @@ import org.junit.Test;
import org.mbassy.common.ConcurrentSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Random;
/**
* This test ensures the correct behaviour of the set implementation that is the building
* block of the subscription implementations used by the Mbassador message bus.
*
* <p/>
* It should behave exactly like other set implementations do and as such all tests are based
* on comparing the outcome of sequence of operations applied to a standard set implementation
* and the concurrent set.
@ -102,7 +103,7 @@ public class ConcurrentSetTest extends UnitTest {
}
@Test
public void testPerformance(){
public void testPerformance() {
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> hashSet = new HashSet<Object>();
@ -115,14 +116,14 @@ public class ConcurrentSetTest extends UnitTest {
long start = System.currentTimeMillis();
for(Object o: source){
for (Object o : source) {
hashSet.add(o);
}
long duration = System.currentTimeMillis() - start;
System.out.println("Performance of HashSet for 1.000.000 object insertions " + duration);
start = System.currentTimeMillis();
for(Object o: source){
for (Object o : source) {
concurrentSet.add(o);
}
duration = System.currentTimeMillis() - start;
@ -217,4 +218,70 @@ public class ConcurrentSetTest extends UnitTest {
}
}
@Test
public void testCompleteRemoval() {
final HashSet<Object> source = new HashSet<Object>();
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
// build set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
testSet.add(candidate);
}
// build test set by adding the candidates
// and subsequently removing those marked for removal
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for (Object src : source) {
testSet.remove(src);
}
}
}, numberOfThreads);
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(0, testSet.size());
for(Object src : source){
assertFalse(testSet.contains(src));
}
}
@Test
public void testRemovalViaIterator() {
final HashSet<Object> source = new HashSet<Object>();
final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
// build set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
testSet.add(candidate);
}
// build test set by adding the candidates
// and subsequently removing those marked for removal
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
Iterator<Object> iterator = testSet.iterator();
while(iterator.hasNext()){
iterator.remove();
}
}
}, numberOfThreads);
// ensure that the test set still contains all objects from the source set that have not been marked
// for removal
assertEquals(0, testSet.size());
for(Object src : source){
assertFalse(testSet.contains(src));
}
}
}

View File

@ -21,7 +21,7 @@ public class FilterTest extends UnitTest{
@Test
public void testSubclassFilter() throws Exception {
MBassador bus = new MBassador();
MBassador bus = new MBassador(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, FilteredMessageListener.class)
.create(100, Object.class)

View File

@ -1,5 +1,6 @@
package org.mbassy;
import org.junit.Before;
import org.junit.Test;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
@ -20,10 +21,11 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/
public class MBassadorTest extends UnitTest {
// this is a single threaded test for subscribing and unsubscribing of a single listener
@Test
public void testSubscribeSimple() throws InterruptedException {
MBassador bus = new MBassador();
MBassador bus = new MBassador(new BusConfiguration());
List<Object> listeners = new LinkedList<Object>();
int listenerCount = 1000;
@ -77,7 +79,7 @@ public class MBassadorTest extends UnitTest {
@Test
public void testConcurrentSubscription() throws Exception {
MBassador bus = new MBassador();
MBassador bus = new MBassador(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)
@ -105,7 +107,7 @@ public class MBassadorTest extends UnitTest {
@Test
public void testAsynchronousMessagePublication() throws Exception {
MBassador bus = new MBassador();
MBassador bus = new MBassador(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)
@ -134,7 +136,7 @@ public class MBassadorTest extends UnitTest {
@Test
public void testSynchronousMessagePublication() throws Exception {
MBassador bus = new MBassador();
MBassador bus = new MBassador(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)
@ -167,7 +169,7 @@ public class MBassadorTest extends UnitTest {
final int eventLoopsPerTHread = 100;
final MBassador bus = new MBassador();
final MBassador bus = new MBassador(new BusConfiguration());
ListenerFactory listenerFactory = new ListenerFactory()
.create(100, EventingTestBean.class)
.create(100, EventingTestBean2.class)