Bugfix of synchronization issue (race condition when inserting the same object concurrently)

Extension of publishing api for more convenient usage
Added some tests for the underlying data structures
This commit is contained in:
benni 2012-11-12 20:54:13 +01:00
parent 3f7b80bdeb
commit 6aadcbe036
9 changed files with 735 additions and 290 deletions

View File

@ -7,7 +7,7 @@
<version>1.0.0.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Library for simple implementation of bidirectional conversions</description>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -35,6 +35,12 @@
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,82 @@
package org.mbassy;
/**
*
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* The dispatch mechanism can by controlled for each concrete message publication.
* A message publication is the publication of any message using one of the bus' publish(..) methods.
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages get published asynchronously.
* <p/>
* A listener is any object that defines at least one message handler and that has been subscribed to at least
* one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
* as a message handler using the @Listener annotation.
* <p/>
* The bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages get dispatched.
* <p/>
* Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any
* class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* a specific message exactly once to each of the subscribed message handlers.
* <p/>
* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally
* a message handler may define filters to narrow the set of messages that it accepts.
* <p/>
* Subscribed message handlers are available to all pending message publications that have not yet started processing.
* Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored)
* <p/>
* Removing a listener means removing all subscribed message handlers of that object. This remove operation
* immediately takes effect and on all running dispatch processes. A removed listener (a listener
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
*
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* get dispatched to all message handlers that take an instance of List as their parameter
*
* @Author bennidi
* Date: 2/8/12
*/
public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
/**
* Subscribe all listeners of the given message to receive message publications.
* Any message may only be subscribed once (subsequent subscriptions of an already subscribed
* message will be silently ignored)
*
* @param listener
*/
public void subscribe(Object listener);
/**
* Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the messageHandler was still subscribed).
* A call to this method passing null, an already subscribed message or any message that does not define any listeners
* will not have any effect.
*
* @param listener
*/
public void unsubscribe(Object listener);
/**
*
* @param message
* @return
*/
public P post(T message);
public static interface IPostCommand{
public void now();
public void asynchronously();
}
}

View File

@ -9,46 +9,8 @@ import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
/**
*
* A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
* synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T.
* The dispatch mechanism can by controlled for each concrete message publication.
* A message publication is the publication of any message using one of the bus' publish(..) methods.
* <p/>
* Each message publication is isolated from all other running publications such that it does not interfere with them.
* Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple
* messages get published asynchronously.
* <p/>
* A listener is any object that defines at least one message handler and that has been subscribed to at least
* one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked
* as a message handler using the @Listener annotation.
* <p/>
* The bus uses weak references to all listeners such that registered listeners do not need to
* be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
* removed on-the-fly as messages get dispatched.
* <p/>
* Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any
* class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver
* a specific message exactly once to each of the subscribed message handlers.
* <p/>
* Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally
* a message handler may define filters to narrow the set of messages that it accepts.
* <p/>
* Subscribed message handlers are available to all pending message publications that have not yet started processing.
* Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored)
* <p/>
* Removing a listener means removing all subscribed message handlers of that object. This remove operation
* immediately takes effect and on all running dispatch processes. A removed listener (a listener
* is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications.
*
* NOTE: Generic type parameters of messages will not be taken into account, e.g. a List<Long> will
* get dispatched to all message handlers that take an instance of List as their parameter
*
* @Author bennidi
* Date: 2/8/12
*/
public class MBassador<T>{
public class MBassador<T> implements IMessageBus<T, SimplePostCommand>{
// This predicate is used to find all message listeners (methods annotated with @Listener)
@ -70,17 +32,18 @@ public class MBassador<T>{
};
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
private ExecutorService executor = new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
private ExecutorService executor;
// cache already created filter instances
private final Map<Class<? extends MessageFilter>, MessageFilter> filterCache = new HashMap<Class<? extends MessageFilter>, MessageFilter>();
// all subscriptions per message type
// this is the primary list for dispatching a specific message
// write access is synchronized and happens very infrequently
private final Map<Class, Collection<Subscription>> subscriptionsPerMessage = new HashMap(50);
// all subscriptions per messageHandler type
// this list provides access for subscribing and unsubsribing
// this list provides fast access for subscribing and unsubscribing
private final Map<Class, Collection<Subscription>> subscriptionsPerListener = new HashMap(50);
// remember already processed classes that do not contain any listeners
@ -107,7 +70,7 @@ public class MBassador<T>{
try {
publish(pendingMessages.take());
} catch (InterruptedException e) {
errorHandler.handleError(new PublicationError(e, "Asnchronous publication interupted", null, null, null));
errorHandler.handleError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null));
return;
}
}
@ -119,10 +82,15 @@ public class MBassador<T>{
}
public MBassador(){
initDispatcherThreads(2);
this(2);
}
public MBassador(int dispatcherThreadCount){
this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));
}
public MBassador(int dispatcherThreadCount, ExecutorService executor){
this.executor = executor;
initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2);
}
@ -141,7 +109,12 @@ public class MBassador<T>{
public void publish(T message){
try {
final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
for (Subscription subscription : subscriptions) subscription.publish(message);
if(subscriptions == null){
return; // TODO: Dead Event?
}
for (Subscription subscription : subscriptions){
subscription.publish(message);
}
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
@ -151,32 +124,22 @@ public class MBassador<T>{
}
/**
* Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the messageHandler was still subscribed).
* A call to this method passing null, an already subscribed message or any message that does not define any listeners
* will not have any effect.
*
* @param listener
*/
public void unsubscribe(Object listener){
if (listener == null) return;
Collection<Subscription> subscriptions = subscriptionsPerListener.get(listener.getClass());
for (Subscription subscription : subscriptions) {
if(subscriptions == null)return;
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
}
}
@Override
public SimplePostCommand post(T message) {
return new SimplePostCommand(this, message);
}
/**
* Subscribe all listeners of the given message to receive message publications.
* Any message may only be subscribed once (subsequent subscriptions of an already subscribed
* message will be silently ignored)
*
* @param listener
*/
public void subscribe(Object listener){
public void subscribe(Object listener){
Class listeningClass = listener.getClass();
if (nonListeners.contains(listeningClass))
return; // early reject of known classes that do not participate in eventing
@ -196,7 +159,7 @@ public class MBassador<T>{
if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners
MessageFilter[] filter = getFilter(messageHandler.getAnnotation(Listener.class));
Class eventType = getMessageType(messageHandler);
Subscription subscription = new Subscription(messageHandler, filter);
Subscription subscription = createSubscription(messageHandler, filter);
subscription.subscribe(listener);
addMessageTypeSubscription(eventType, subscription);
subscriptionsByListener.add(subscription);
@ -206,7 +169,7 @@ public class MBassador<T>{
}
}
}
// register the message to the existing subscriptions
// register the listener to the existing subscriptions
for (Subscription sub : subscriptionsByListener) sub.subscribe(listener);
}
@ -219,8 +182,7 @@ public class MBassador<T>{
// obtain the set of subscriptions for the given message type
private Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
// TODO improve with cache
Collection<Subscription> subscriptions = new LinkedList<Subscription>();
List<Subscription> subscriptions = new LinkedList<Subscription>();
if(subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
@ -230,8 +192,9 @@ public class MBassador<T>{
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
}
}
return subscriptions;
// IMPROVEMENT: use tree list that sorts during insertion
//Collections.sort(subscriptions, new SubscriptionByPriorityDesc());
return subscriptions;
}
private Collection<Class> getSuperclasses(Class from){
@ -253,24 +216,11 @@ public class MBassador<T>{
subscriptions.add(subscription);
}
/*
private void updateMessageTypeHierarchy(Class messageType) {
for (Class existingEventType : subscriptionsPerMessage.keySet()) {
if (existingEventType.equals(messageType)) continue;
if (messageType.isAssignableFrom(existingEventType)) //message is super type of existing
messageTypeHierarchy.put(existingEventType, messageType);
else if (existingEventType.isAssignableFrom(messageType)) { // message is sub type of existing
messageTypeHierarchy.put(messageType, existingEventType); // add direct super type
messageTypeHierarchy.putAll(messageType, messageTypeHierarchy.get(existingEventType)); // add all super types of super type
}
}
}*/
private boolean isValidMessageHandler(Method handler) {
if (handler.getParameterTypes().length != 1) {
// a messageHandler only defines one parameter (the message)
System.out.println("Found nono or more than one parameter in messageHandler [" + handler.getName()
System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName()
+ "]. A messageHandler must define exactly one parameter");
return false;
}
@ -324,48 +274,64 @@ public class MBassador<T>{
}
}
private Subscription createSubscription(Method messageHandler, MessageFilter[] filter){
if(filter == null || filter.length == 0){
if(isAsynchronous(messageHandler)){
return new UnfilteredAsynchronousSubscription(messageHandler);
}
else{
return new UnfilteredSynchronousSubscription(messageHandler);
}
}
else{
if(isAsynchronous(messageHandler)){
return new FilteredAsynchronousSubscription(messageHandler, filter);
}
else{
return new FilteredSynchronousSubscription(messageHandler, filter);
}
}
}
private boolean isAsynchronous(Method messageHandler){
return messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous);
}
/**
* Subscription is a thread safe container for objects that contain message handlers
*
*/
private class Subscription {
private final MessageFilter[] filter;
private abstract class Subscription {
private final Method messageHandler;
private ConcurrentLinkedBag<Object> listeners = new ConcurrentLinkedBag<Object>();
protected ConcurrentSet<Object> listeners = new ConcurrentSet<Object>();
private boolean isAynchronous;
private int priority = 0;
private Subscription(Method messageHandler, MessageFilter[] filter) {
private Subscription(Method messageHandler) {
// TODO: init priority
this.messageHandler = messageHandler;
this.filter = filter;
this.messageHandler.setAccessible(true);
this.isAynchronous = messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous);
}
protected abstract void publish(Object message);
protected abstract void dispatch(final Object message, final Object listener);
public int getPriority(){
return priority;
}
public void subscribe(Object o) {
listeners.add(o);
}
private void dispatch(final Object message, final Object listener){
if(isAynchronous){
MBassador.this.executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
else{
invokeHandler(message, listener);
}
}
private void invokeHandler(final Object message, final Object listener){
protected void invokeHandler(final Object message, final Object listener){
try {
messageHandler.invoke(listener, message);
}catch(IllegalAccessException e){
@ -395,33 +361,134 @@ public class MBassador<T>{
}
}
public void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
if(passesFilter(message, listener)) {
dispatch(message, listener);
}
}
}
private boolean passesFilter(Object message, Object listener) {
if (filter == null) {
return true;
}
else {
for (int i = 0; i < filter.length; i++) {
if (!filter[i].accepts(message, listener)) return false;
}
return true;
}
}
public void unsubscribe(Object existingListener) {
listeners.remove(existingListener);
}
}
private abstract class UnfilteredSubscription extends Subscription{
private UnfilteredSubscription(Method messageHandler) {
super(messageHandler);
}
public void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
dispatch(message, listener);
}
}
}
private class UnfilteredAsynchronousSubscription extends UnfilteredSubscription{
private UnfilteredAsynchronousSubscription(Method messageHandler) {
super(messageHandler);
}
protected void dispatch(final Object message, final Object listener){
MBassador.this.executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
}
private class UnfilteredSynchronousSubscription extends UnfilteredSubscription{
private UnfilteredSynchronousSubscription(Method messageHandler) {
super(messageHandler);
}
protected void dispatch(final Object message, final Object listener){
invokeHandler(message, listener);
}
}
private abstract class FilteredSubscription extends Subscription{
private final MessageFilter[] filter;
private FilteredSubscription(Method messageHandler, MessageFilter[] filter) {
super(messageHandler);
this.filter = filter;
}
private boolean passesFilter(Object message, Object listener) {
if (filter == null) {
return true;
}
else {
for (int i = 0; i < filter.length; i++) {
if (!filter[i].accepts(message, listener)) return false;
}
return true;
}
}
protected void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
if(passesFilter(message, listener)) {
dispatch(message, listener);
}
}
}
}
private class FilteredSynchronousSubscription extends FilteredSubscription{
private FilteredSynchronousSubscription(Method messageHandler, MessageFilter[] filter) {
super(messageHandler, filter);
}
protected void dispatch(final Object message, final Object listener){
MBassador.this.executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
}
private class FilteredAsynchronousSubscription extends FilteredSubscription{
private FilteredAsynchronousSubscription(Method messageHandler, MessageFilter[] filter) {
super(messageHandler, filter);
}
protected void dispatch(final Object message, final Object listener){
invokeHandler(message, listener);
}
}
private final class SubscriptionByPriorityDesc implements Comparator<Subscription> {
@Override
public int compare(Subscription o1, Subscription o2) {
return o1.getPriority() - o2.getPriority();
}
};
}

View File

@ -0,0 +1,29 @@
package org.mbassy;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/12/12
* Time: 8:44 PM
* To change this template use File | Settings | File Templates.
*/
public class SimplePostCommand<T> implements IMessageBus.IPostCommand {
private T message;
private MBassador mBassador;
public SimplePostCommand(MBassador mBassador, T message) {
this.mBassador = mBassador;
this.message = message;
}
@Override
public void now() {
mBassador.publish(message);
}
@Override
public void asynchronously() {
mBassador.publishAsync(message);
}
}

View File

@ -1,159 +0,0 @@
package org.mbassy.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
* Running read iterators will not be affected by add operations since writes always insert at the head of the
* structure. Remove operations can affect any running iterator such that a removed element that has not yet
* been reached by the iterator will not appear in that iterator anymore.
*
* The structure uses weak references to the elements. Iterators automatically perform cleanups of
* garbace collect objects during iteration.
* No dedicated maintenance operations need to be called or run in background.
*
*
* <p/>
* @author bennidi
* Date: 2/12/12
*/
public class ConcurrentLinkedBag<T> implements Iterable<T> {
private WeakHashMap<T, ListEntry<T>> entries = new WeakHashMap<T, ListEntry<T>>(); // maintain a map of entries for O(log n) lookup
private ListEntry<T> head; // reference to the first element
public ConcurrentLinkedBag<T> add(T element) {
if (element == null || entries.containsKey(element)) return this;
synchronized (this) {
insert(element);
}
return this;
}
private void insert(T element) {
if(head == null){
head = new ListEntry<T>(element);
}
else{
head = new ListEntry<T>(element, head);
}
entries.put(element, head);
}
public ConcurrentLinkedBag<T> addAll(Iterable<T> elements) {
for (T element : elements) {
if (element == null || entries.containsKey(element)) return this;
synchronized (this) {
insert(element);
}
}
return this;
}
public ConcurrentLinkedBag<T> remove(T element) {
if (!entries.containsKey(element)) return this;
synchronized (this) {
ListEntry<T> listelement = entries.get(element);
if(listelement != head){
listelement.remove();
}
else{
head = head.next();
}
entries.remove(element);
}
return this;
}
public Iterator<T> iterator() {
return new Iterator<T>() {
private ListEntry<T> current = head;
public boolean hasNext() {
if(current == null) return false;
T value = current.getValue();
if(value == null){ // auto-removal of orphan references
remove();
return hasNext();
}
else{
return true;
}
}
public T next() {
if(current == null) return null;
T value = current.getValue();
if(value == null){ // auto-removal of orphan references
remove();
return next();
}
else{
current = current.next();
return value;
}
}
public void remove() {
if(current == null)return;
synchronized (ConcurrentLinkedBag.this){
current.remove();
current = current.next();}
}
};
}
public class ListEntry<T> {
private WeakReference<T> value;
private ListEntry<T> next;
private ListEntry<T> predecessor;
private ListEntry(T value) {
this.value = new WeakReference<T>(value);
}
private ListEntry(T value, ListEntry<T> next) {
this(value);
this.next = next;
next.predecessor = this;
}
public T getValue() {
return value.get();
}
public void remove(){
if(predecessor != null){
predecessor.setNext(next());
}
else if(next() != null){
next.predecessor = null;
}
}
public void setNext(ListEntry<T> element) {
this.next = element;
if(element != null)element.predecessor = this;
}
public ListEntry<T> next() {
return next;
}
public boolean hasNext() {
return next() != null;
}
}
}

View File

@ -0,0 +1,168 @@
package org.mbassy.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
* Running read iterators will not be affected by add operations since writes always insert at the head of the
* structure. Remove operations can affect any running iterator such that a removed element that has not yet
* been reached by the iterator will not appear in that iterator anymore.
* <p/>
* The structure uses weak references to the elements. Iterators automatically perform cleanups of
* garbace collect objects during iteration.
* No dedicated maintenance operations need to be called or run in background.
* <p/>
* <p/>
* <p/>
*
* @author bennidi
* Date: 2/12/12
*/
public class ConcurrentSet<T> implements Iterable<T> {
private WeakHashMap<T, Entry<T>> entries = new WeakHashMap<T, Entry<T>>(); // maintain a map of entries for O(log n) lookup
private Entry<T> head; // reference to the first element
public ConcurrentSet<T> add(T element) {
if (element == null || entries.containsKey(element)) return this;
synchronized (this) {
insert(element);
}
return this;
}
public boolean contains(T element){
Entry<T> entry = entries.get(element);
return entry != null && entry.getValue() != null;
}
private void insert(T element) {
if (entries.containsKey(element)) return;
if (head == null) {
head = new Entry<T>(element);
} else {
head = new Entry<T>(element, head);
}
entries.put(element, head);
}
public int size(){
return entries.size();
}
public ConcurrentSet<T> addAll(Iterable<T> elements) {
synchronized (this) {
for (T element : elements) {
if (element == null || entries.containsKey(element)) return this;
insert(element);
}
}
return this;
}
public ConcurrentSet<T> remove(T element) {
if (!entries.containsKey(element)) return this;
synchronized (this) {
Entry<T> listelement = entries.get(element);
if(listelement == null)return this;
if (listelement != head) {
listelement.remove();
} else {
head = head.next();
}
entries.remove(element);
}
return this;
}
public Iterator<T> iterator() {
return new Iterator<T>() {
private Entry<T> current = head;
public boolean hasNext() {
if (current == null) return false;
T value = current.getValue();
if (value == null) { // auto-removal of orphan references
remove();
return hasNext();
} else {
return true;
}
}
public T next() {
if (current == null) return null;
T value = current.getValue();
if (value == null) { // auto-removal of orphan references
remove();
return next();
} else {
current = current.next();
return value;
}
}
public void remove() {
if (current == null) return;
synchronized (ConcurrentSet.this) {
current.remove();
current = current.next();
}
}
};
}
public class Entry<T> {
private WeakReference<T> value;
private Entry<T> next;
private Entry<T> predecessor;
private Entry(T value) {
this.value = new WeakReference<T>(value);
}
private Entry(T value, Entry<T> next) {
this(value);
this.next = next;
next.predecessor = this;
}
public T getValue() {
return value.get();
}
public void remove() {
if (predecessor != null) {
predecessor.setNext(next());
} else if (next() != null) {
next.predecessor = null;
}
}
public void setNext(Entry<T> element) {
this.next = element;
if (element != null) element.predecessor = this;
}
public Entry<T> next() {
return next;
}
public boolean hasNext() {
return next() != null;
}
}
}

View File

@ -0,0 +1,190 @@
package org.mbassy;
import junit.framework.Assert;
import org.junit.Test;
import org.mbassy.common.ConcurrentSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Random;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/12/12
* Time: 3:02 PM
* To change this template use File | Settings | File Templates.
*/
public class ConcurrentSetTest extends UnitTest{
private int numberOfElements = 100000;
private int numberOfThreads = 50;
@Test
public void testIterator(){
final HashSet<Object> distinct = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
Random rand = new Random();
for(int i=0;i < numberOfElements ; i++){
Object candidate = new Object();
if(rand.nextInt() % 3 == 0){
distinct.add(candidate);
}
target.add(candidate);
}
runGC();
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : target){
// do nothing
// just iterate to trigger automatic clean up
System.currentTimeMillis();
}
}
}, numberOfThreads);
for(Object tar : target){
Assert.assertTrue(distinct.contains(tar));
}
}
@Test
public void testInsert(){
final LinkedList<Object> duplicates = new LinkedList<Object>();
final HashSet<Object> distinct = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
Random rand = new Random();
Object candidate = new Object();
for(int i=0;i < numberOfElements ; i++){
if(rand.nextInt() % 3 == 0){
candidate = new Object();
}
duplicates.add(candidate);
distinct.add(candidate);
}
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : duplicates){
target.add(src);
}
}
}, numberOfThreads);
pause(3000);
for(Object tar : target){
Assert.assertTrue(distinct.contains(tar));
}
for(Object src : distinct){
Assert.assertTrue(target.contains(src));
}
Assert.assertEquals(distinct.size(), target.size());
}
@Test
public void testRemove1(){
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
for(int i=0;i < numberOfElements ; i++){
Object candidate = new Object();
source.add(candidate);
if(i % 3 == 0){
toRemove.add(candidate);
}
}
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : source){
target.add(src);
}
}
}, numberOfThreads);
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : toRemove){
target.remove(src);
}
}
}, numberOfThreads);
pause(3000);
for(Object tar : target){
Assert.assertTrue(!toRemove.contains(tar));
}
for(Object src : source){
if(!toRemove.contains(src))Assert.assertTrue(target.contains(src));
}
}
@Test
public void testRemove2(){
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final ConcurrentSet<Object> target = new ConcurrentSet<Object>();
for(int i=0;i < numberOfElements ; i++){
Object candidate = new Object();
source.add(candidate);
if(i % 3 == 0){
toRemove.add(candidate);
}
}
ConcurrentExecutor.runConcurrent(new Runnable() {
@Override
public void run() {
for(Object src : source){
target.add(src);
if(toRemove.contains(src))
target.remove(src);
}
}
}, numberOfThreads);
pause(3000);
for(Object tar : target){
Assert.assertTrue(!toRemove.contains(tar));
}
for(Object src : source){
if(!toRemove.contains(src))Assert.assertTrue(target.contains(src));
}
}
}

View File

@ -19,6 +19,34 @@ import java.util.concurrent.atomic.AtomicInteger;
public class MBassadorTest {
@Test
public void testSubscribe() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 1000;
for (int i = 1; i <= listenerCount; i++) {
EventingTestBean bean = new EventingTestBean();
bus.subscribe(bean);
bus.unsubscribe(new EventingTestBean());
}
}
@Test
public void testUnSubscribe() throws InterruptedException {
MBassador bus = new MBassador();
int listenerCount = 1000;
for (int i = 1; i <= listenerCount; i++) {
bus.unsubscribe(new EventingTestBean());
}
}
@Test
public void testAsynchronous() throws InterruptedException {

View File

@ -0,0 +1,34 @@
package org.mbassy;
import java.lang.ref.WeakReference;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/12/12
* Time: 3:16 PM
* To change this template use File | Settings | File Templates.
*/
public class UnitTest {
public void pause(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
public void pause() {
pause(10);
}
public void runGC() {
WeakReference ref = new WeakReference<Object>(new Object());
while(ref.get() != null) {
System.gc();
}
}
}