From 6aadcbe036b535732ef54f49cebc6498eb3f2d62 Mon Sep 17 00:00:00 2001 From: benni Date: Mon, 12 Nov 2012 20:54:13 +0100 Subject: [PATCH] Bugfix of synchronization issue (race condition when inserting the same object concurrently) Extension of publishing api for more convenient usage Added some tests for the underlying data structures --- pom.xml | 8 +- src/main/java/org/mbassy/IMessageBus.java | 82 +++++ src/main/java/org/mbassy/MBassador.java | 327 +++++++++++------- .../java/org/mbassy/SimplePostCommand.java | 29 ++ .../mbassy/common/ConcurrentLinkedBag.java | 159 --------- .../java/org/mbassy/common/ConcurrentSet.java | 168 +++++++++ .../java/org/mbassy/ConcurrentSetTest.java | 190 ++++++++++ src/test/java/org/mbassy/MBassadorTest.java | 28 ++ src/test/java/org/mbassy/UnitTest.java | 34 ++ 9 files changed, 735 insertions(+), 290 deletions(-) create mode 100644 src/main/java/org/mbassy/IMessageBus.java create mode 100644 src/main/java/org/mbassy/SimplePostCommand.java delete mode 100644 src/main/java/org/mbassy/common/ConcurrentLinkedBag.java create mode 100644 src/main/java/org/mbassy/common/ConcurrentSet.java create mode 100644 src/test/java/org/mbassy/ConcurrentSetTest.java create mode 100644 src/test/java/org/mbassy/UnitTest.java diff --git a/pom.xml b/pom.xml index 425ca2f..3b8943f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ 1.0.0.RC jar mbassador - Library for simple implementation of bidirectional conversions + Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern UTF-8 @@ -35,6 +35,12 @@ 1.5.2 + + commons-collections + commons-collections + 3.2 + + diff --git a/src/main/java/org/mbassy/IMessageBus.java b/src/main/java/org/mbassy/IMessageBus.java new file mode 100644 index 0000000..768a259 --- /dev/null +++ b/src/main/java/org/mbassy/IMessageBus.java @@ -0,0 +1,82 @@ +package org.mbassy; + +/** + * + * A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched + * synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T. + * The dispatch mechanism can by controlled for each concrete message publication. + * A message publication is the publication of any message using one of the bus' publish(..) methods. + *

+ * Each message publication is isolated from all other running publications such that it does not interfere with them. + * Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple + * messages get published asynchronously. + *

+ * A listener is any object that defines at least one message handler and that has been subscribed to at least + * one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked + * as a message handler using the @Listener annotation. + *

+ * The bus uses weak references to all listeners such that registered listeners do not need to + * be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are + * removed on-the-fly as messages get dispatched. + *

+ * Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any + * class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver + * a specific message exactly once to each of the subscribed message handlers. + *

+ * Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally + * a message handler may define filters to narrow the set of messages that it accepts. + *

+ * Subscribed message handlers are available to all pending message publications that have not yet started processing. + * Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored) + *

+ * Removing a listener means removing all subscribed message handlers of that object. This remove operation + * immediately takes effect and on all running dispatch processes. A removed listener (a listener + * is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications. + * + * NOTE: Generic type parameters of messages will not be taken into account, e.g. a List will + * get dispatched to all message handlers that take an instance of List as their parameter + * + * @Author bennidi + * Date: 2/8/12 + */ +public interface IMessageBus { + + /** + * Subscribe all listeners of the given message to receive message publications. + * Any message may only be subscribed once (subsequent subscriptions of an already subscribed + * message will be silently ignored) + * + * @param listener + */ + public void subscribe(Object listener); + + + /** + * Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns + * have effectively been removed and will not receive any message publications (including asynchronously scheduled + * publications that have been published when the messageHandler was still subscribed). + * A call to this method passing null, an already subscribed message or any message that does not define any listeners + * will not have any effect. + * + * @param listener + */ + public void unsubscribe(Object listener); + + /** + * + * @param message + * @return + */ + public P post(T message); + + + + public static interface IPostCommand{ + + public void now(); + + public void asynchronously(); + + } + +} diff --git a/src/main/java/org/mbassy/MBassador.java b/src/main/java/org/mbassy/MBassador.java index 3be67bc..eb5e62e 100644 --- a/src/main/java/org/mbassy/MBassador.java +++ b/src/main/java/org/mbassy/MBassador.java @@ -9,46 +9,8 @@ import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.*; -/** - * - * A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched - * synchronously or asynchronously and may be of any type that is a valid sub type of the type parameter T. - * The dispatch mechanism can by controlled for each concrete message publication. - * A message publication is the publication of any message using one of the bus' publish(..) methods. - *

- * Each message publication is isolated from all other running publications such that it does not interfere with them. - * Hence, the bus expects message handlers to be stateless as it may invoke them concurrently if multiple - * messages get published asynchronously. - *

- * A listener is any object that defines at least one message handler and that has been subscribed to at least - * one message bus. A message handler can be any method that accepts exactly one parameter (the message) and is marked - * as a message handler using the @Listener annotation. - *

- * The bus uses weak references to all listeners such that registered listeners do not need to - * be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are - * removed on-the-fly as messages get dispatched. - *

- * Generally message handlers will be invoked in inverse sequence of insertion (subscription) but any - * class using this bus should not rely on this assumption. The basic contract of the bus is that it will deliver - * a specific message exactly once to each of the subscribed message handlers. - *

- * Messages are dispatched to all listeners that accept the type or supertype of the dispatched message. Additionally - * a message handler may define filters to narrow the set of messages that it accepts. - *

- * Subscribed message handlers are available to all pending message publications that have not yet started processing. - * Any messageHandler may only be subscribed once (subsequent subscriptions of an already subscribed messageHandler will be silently ignored) - *

- * Removing a listener means removing all subscribed message handlers of that object. This remove operation - * immediately takes effect and on all running dispatch processes. A removed listener (a listener - * is considered removed after the remove(Object) call returned) will under no circumstances receive any message publications. - * - * NOTE: Generic type parameters of messages will not be taken into account, e.g. a List will - * get dispatched to all message handlers that take an instance of List as their parameter - * - * @Author bennidi - * Date: 2/8/12 - */ -public class MBassador{ + +public class MBassador implements IMessageBus{ // This predicate is used to find all message listeners (methods annotated with @Listener) @@ -70,17 +32,18 @@ public class MBassador{ }; // executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost - private ExecutorService executor = new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue()); + private ExecutorService executor; // cache already created filter instances private final Map, MessageFilter> filterCache = new HashMap, MessageFilter>(); // all subscriptions per message type // this is the primary list for dispatching a specific message + // write access is synchronized and happens very infrequently private final Map> subscriptionsPerMessage = new HashMap(50); // all subscriptions per messageHandler type - // this list provides access for subscribing and unsubsribing + // this list provides fast access for subscribing and unsubscribing private final Map> subscriptionsPerListener = new HashMap(50); // remember already processed classes that do not contain any listeners @@ -107,7 +70,7 @@ public class MBassador{ try { publish(pendingMessages.take()); } catch (InterruptedException e) { - errorHandler.handleError(new PublicationError(e, "Asnchronous publication interupted", null, null, null)); + errorHandler.handleError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null)); return; } } @@ -119,10 +82,15 @@ public class MBassador{ } public MBassador(){ - initDispatcherThreads(2); + this(2); } public MBassador(int dispatcherThreadCount){ + this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue())); + } + + public MBassador(int dispatcherThreadCount, ExecutorService executor){ + this.executor = executor; initDispatcherThreads(dispatcherThreadCount > 0 ? dispatcherThreadCount : 2); } @@ -141,7 +109,12 @@ public class MBassador{ public void publish(T message){ try { final Collection subscriptions = getSubscriptionsByMessageType(message.getClass()); - for (Subscription subscription : subscriptions) subscription.publish(message); + if(subscriptions == null){ + return; // TODO: Dead Event? + } + for (Subscription subscription : subscriptions){ + subscription.publish(message); + } } catch (Throwable e) { handlePublicationError(new PublicationError() .setMessage("Error during publication of message") @@ -151,32 +124,22 @@ public class MBassador{ } - /** - * Immediately unsubscribe all registered message handlers (if any) of the given listener. When this call returns - * have effectively been removed and will not receive any message publications (including asynchronously scheduled - * publications that have been published when the messageHandler was still subscribed). - * A call to this method passing null, an already subscribed message or any message that does not define any listeners - * will not have any effect. - * - * @param listener - */ + public void unsubscribe(Object listener){ if (listener == null) return; Collection subscriptions = subscriptionsPerListener.get(listener.getClass()); - for (Subscription subscription : subscriptions) { + if(subscriptions == null)return; + for (Subscription subscription : subscriptions) { subscription.unsubscribe(listener); } } + @Override + public SimplePostCommand post(T message) { + return new SimplePostCommand(this, message); + } - /** - * Subscribe all listeners of the given message to receive message publications. - * Any message may only be subscribed once (subsequent subscriptions of an already subscribed - * message will be silently ignored) - * - * @param listener - */ - public void subscribe(Object listener){ + public void subscribe(Object listener){ Class listeningClass = listener.getClass(); if (nonListeners.contains(listeningClass)) return; // early reject of known classes that do not participate in eventing @@ -196,7 +159,7 @@ public class MBassador{ if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners MessageFilter[] filter = getFilter(messageHandler.getAnnotation(Listener.class)); Class eventType = getMessageType(messageHandler); - Subscription subscription = new Subscription(messageHandler, filter); + Subscription subscription = createSubscription(messageHandler, filter); subscription.subscribe(listener); addMessageTypeSubscription(eventType, subscription); subscriptionsByListener.add(subscription); @@ -206,7 +169,7 @@ public class MBassador{ } } } - // register the message to the existing subscriptions + // register the listener to the existing subscriptions for (Subscription sub : subscriptionsByListener) sub.subscribe(listener); } @@ -219,8 +182,7 @@ public class MBassador{ // obtain the set of subscriptions for the given message type private Collection getSubscriptionsByMessageType(Class messageType) { - // TODO improve with cache - Collection subscriptions = new LinkedList(); + List subscriptions = new LinkedList(); if(subscriptionsPerMessage.get(messageType) != null) { subscriptions.addAll(subscriptionsPerMessage.get(messageType)); @@ -230,8 +192,9 @@ public class MBassador{ subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType)); } } - - return subscriptions; + // IMPROVEMENT: use tree list that sorts during insertion + //Collections.sort(subscriptions, new SubscriptionByPriorityDesc()); + return subscriptions; } private Collection getSuperclasses(Class from){ @@ -253,24 +216,11 @@ public class MBassador{ subscriptions.add(subscription); } - /* - private void updateMessageTypeHierarchy(Class messageType) { - for (Class existingEventType : subscriptionsPerMessage.keySet()) { - if (existingEventType.equals(messageType)) continue; - if (messageType.isAssignableFrom(existingEventType)) //message is super type of existing - messageTypeHierarchy.put(existingEventType, messageType); - else if (existingEventType.isAssignableFrom(messageType)) { // message is sub type of existing - messageTypeHierarchy.put(messageType, existingEventType); // add direct super type - messageTypeHierarchy.putAll(messageType, messageTypeHierarchy.get(existingEventType)); // add all super types of super type - } - } - }*/ - private boolean isValidMessageHandler(Method handler) { if (handler.getParameterTypes().length != 1) { // a messageHandler only defines one parameter (the message) - System.out.println("Found nono or more than one parameter in messageHandler [" + handler.getName() + System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName() + "]. A messageHandler must define exactly one parameter"); return false; } @@ -324,48 +274,64 @@ public class MBassador{ } } + + private Subscription createSubscription(Method messageHandler, MessageFilter[] filter){ + if(filter == null || filter.length == 0){ + if(isAsynchronous(messageHandler)){ + return new UnfilteredAsynchronousSubscription(messageHandler); + } + else{ + return new UnfilteredSynchronousSubscription(messageHandler); + } + } + else{ + if(isAsynchronous(messageHandler)){ + return new FilteredAsynchronousSubscription(messageHandler, filter); + } + else{ + return new FilteredSynchronousSubscription(messageHandler, filter); + } + } + } + + private boolean isAsynchronous(Method messageHandler){ + return messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous); + } + + /** * Subscription is a thread safe container for objects that contain message handlers - * */ - private class Subscription { - - private final MessageFilter[] filter; + private abstract class Subscription { private final Method messageHandler; - private ConcurrentLinkedBag listeners = new ConcurrentLinkedBag(); + protected ConcurrentSet listeners = new ConcurrentSet(); - private boolean isAynchronous; + private int priority = 0; - private Subscription(Method messageHandler, MessageFilter[] filter) { + private Subscription(Method messageHandler) { + // TODO: init priority this.messageHandler = messageHandler; - this.filter = filter; this.messageHandler.setAccessible(true); - this.isAynchronous = messageHandler.getAnnotation(Listener.class).mode().equals(Listener.Dispatch.Asynchronous); } + protected abstract void publish(Object message); + + protected abstract void dispatch(final Object message, final Object listener); + + + public int getPriority(){ + return priority; + } + public void subscribe(Object o) { listeners.add(o); } - private void dispatch(final Object message, final Object listener){ - if(isAynchronous){ - MBassador.this.executor.execute(new Runnable() { - @Override - public void run() { - invokeHandler(message, listener); - } - }); - } - else{ - invokeHandler(message, listener); - } - } - - private void invokeHandler(final Object message, final Object listener){ + protected void invokeHandler(final Object message, final Object listener){ try { messageHandler.invoke(listener, message); }catch(IllegalAccessException e){ @@ -395,33 +361,134 @@ public class MBassador{ } } - public void publish(Object message) { - - Iterator iterator = listeners.iterator(); - Object listener = null; - while ((listener = iterator.next()) != null) { - if(passesFilter(message, listener)) { - dispatch(message, listener); - } - } - } - - private boolean passesFilter(Object message, Object listener) { - - if (filter == null) { - return true; - } - else { - for (int i = 0; i < filter.length; i++) { - if (!filter[i].accepts(message, listener)) return false; - } - return true; - } - } public void unsubscribe(Object existingListener) { listeners.remove(existingListener); } + + + + } + private abstract class UnfilteredSubscription extends Subscription{ + + + private UnfilteredSubscription(Method messageHandler) { + super(messageHandler); + } + + public void publish(Object message) { + + Iterator iterator = listeners.iterator(); + Object listener = null; + while ((listener = iterator.next()) != null) { + dispatch(message, listener); + } + } + } + + private class UnfilteredAsynchronousSubscription extends UnfilteredSubscription{ + + + private UnfilteredAsynchronousSubscription(Method messageHandler) { + super(messageHandler); + } + + protected void dispatch(final Object message, final Object listener){ + MBassador.this.executor.execute(new Runnable() { + @Override + public void run() { + invokeHandler(message, listener); + } + }); + + } + } + + private class UnfilteredSynchronousSubscription extends UnfilteredSubscription{ + + + private UnfilteredSynchronousSubscription(Method messageHandler) { + super(messageHandler); + } + + protected void dispatch(final Object message, final Object listener){ + invokeHandler(message, listener); + } + } + + private abstract class FilteredSubscription extends Subscription{ + + private final MessageFilter[] filter; + + + private FilteredSubscription(Method messageHandler, MessageFilter[] filter) { + super(messageHandler); + this.filter = filter; + } + + private boolean passesFilter(Object message, Object listener) { + + if (filter == null) { + return true; + } + else { + for (int i = 0; i < filter.length; i++) { + if (!filter[i].accepts(message, listener)) return false; + } + return true; + } + } + + protected void publish(Object message) { + + Iterator iterator = listeners.iterator(); + Object listener = null; + while ((listener = iterator.next()) != null) { + if(passesFilter(message, listener)) { + dispatch(message, listener); + } + } + } + } + + private class FilteredSynchronousSubscription extends FilteredSubscription{ + + + private FilteredSynchronousSubscription(Method messageHandler, MessageFilter[] filter) { + super(messageHandler, filter); + } + + protected void dispatch(final Object message, final Object listener){ + MBassador.this.executor.execute(new Runnable() { + @Override + public void run() { + invokeHandler(message, listener); + } + }); + + } + } + + private class FilteredAsynchronousSubscription extends FilteredSubscription{ + + + private FilteredAsynchronousSubscription(Method messageHandler, MessageFilter[] filter) { + super(messageHandler, filter); + } + + protected void dispatch(final Object message, final Object listener){ + invokeHandler(message, listener); + } + } + + + private final class SubscriptionByPriorityDesc implements Comparator { + @Override + public int compare(Subscription o1, Subscription o2) { + return o1.getPriority() - o2.getPriority(); + } + }; + } diff --git a/src/main/java/org/mbassy/SimplePostCommand.java b/src/main/java/org/mbassy/SimplePostCommand.java new file mode 100644 index 0000000..147ad31 --- /dev/null +++ b/src/main/java/org/mbassy/SimplePostCommand.java @@ -0,0 +1,29 @@ +package org.mbassy; + +/** +* Created with IntelliJ IDEA. +* User: benni +* Date: 11/12/12 +* Time: 8:44 PM +* To change this template use File | Settings | File Templates. +*/ +public class SimplePostCommand implements IMessageBus.IPostCommand { + + private T message; + private MBassador mBassador; + + public SimplePostCommand(MBassador mBassador, T message) { + this.mBassador = mBassador; + this.message = message; + } + + @Override + public void now() { + mBassador.publish(message); + } + + @Override + public void asynchronously() { + mBassador.publishAsync(message); + } +} diff --git a/src/main/java/org/mbassy/common/ConcurrentLinkedBag.java b/src/main/java/org/mbassy/common/ConcurrentLinkedBag.java deleted file mode 100644 index 1483ca7..0000000 --- a/src/main/java/org/mbassy/common/ConcurrentLinkedBag.java +++ /dev/null @@ -1,159 +0,0 @@ -package org.mbassy.common; - - -import java.lang.ref.WeakReference; -import java.util.Iterator; -import java.util.WeakHashMap; - -/** - * This data structure is optimized for non-blocking reads even when write operations occur. - * Running read iterators will not be affected by add operations since writes always insert at the head of the - * structure. Remove operations can affect any running iterator such that a removed element that has not yet - * been reached by the iterator will not appear in that iterator anymore. - * - * The structure uses weak references to the elements. Iterators automatically perform cleanups of - * garbace collect objects during iteration. - * No dedicated maintenance operations need to be called or run in background. - * - * - *

- * @author bennidi - * Date: 2/12/12 - */ -public class ConcurrentLinkedBag implements Iterable { - - - private WeakHashMap> entries = new WeakHashMap>(); // maintain a map of entries for O(log n) lookup - - private ListEntry head; // reference to the first element - - public ConcurrentLinkedBag add(T element) { - if (element == null || entries.containsKey(element)) return this; - synchronized (this) { - insert(element); - } - return this; - } - - private void insert(T element) { - if(head == null){ - head = new ListEntry(element); - } - else{ - head = new ListEntry(element, head); - } - entries.put(element, head); - } - - public ConcurrentLinkedBag addAll(Iterable elements) { - for (T element : elements) { - if (element == null || entries.containsKey(element)) return this; - synchronized (this) { - insert(element); - } - } - return this; - } - - public ConcurrentLinkedBag remove(T element) { - if (!entries.containsKey(element)) return this; - synchronized (this) { - ListEntry listelement = entries.get(element); - if(listelement != head){ - listelement.remove(); - } - else{ - head = head.next(); - } - entries.remove(element); - } - return this; - } - - public Iterator iterator() { - return new Iterator() { - - private ListEntry current = head; - - public boolean hasNext() { - if(current == null) return false; - T value = current.getValue(); - if(value == null){ // auto-removal of orphan references - remove(); - return hasNext(); - } - else{ - return true; - } - } - - public T next() { - if(current == null) return null; - T value = current.getValue(); - if(value == null){ // auto-removal of orphan references - remove(); - return next(); - } - else{ - current = current.next(); - return value; - } - } - - public void remove() { - if(current == null)return; - synchronized (ConcurrentLinkedBag.this){ - current.remove(); - current = current.next();} - } - }; - } - - - public class ListEntry { - - private WeakReference value; - - private ListEntry next; - - private ListEntry predecessor; - - - private ListEntry(T value) { - this.value = new WeakReference(value); - } - - private ListEntry(T value, ListEntry next) { - this(value); - this.next = next; - next.predecessor = this; - } - - public T getValue() { - return value.get(); - } - - public void remove(){ - if(predecessor != null){ - predecessor.setNext(next()); - } - else if(next() != null){ - next.predecessor = null; - } - } - - public void setNext(ListEntry element) { - this.next = element; - if(element != null)element.predecessor = this; - } - - public ListEntry next() { - return next; - } - - public boolean hasNext() { - return next() != null; - } - - } -} diff --git a/src/main/java/org/mbassy/common/ConcurrentSet.java b/src/main/java/org/mbassy/common/ConcurrentSet.java new file mode 100644 index 0000000..b16aeb6 --- /dev/null +++ b/src/main/java/org/mbassy/common/ConcurrentSet.java @@ -0,0 +1,168 @@ +package org.mbassy.common; + + +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.WeakHashMap; + +/** + * This data structure is optimized for non-blocking reads even when write operations occur. + * Running read iterators will not be affected by add operations since writes always insert at the head of the + * structure. Remove operations can affect any running iterator such that a removed element that has not yet + * been reached by the iterator will not appear in that iterator anymore. + *

+ * The structure uses weak references to the elements. Iterators automatically perform cleanups of + * garbace collect objects during iteration. + * No dedicated maintenance operations need to be called or run in background. + *

+ *

+ *

+ * + * @author bennidi + * Date: 2/12/12 + */ +public class ConcurrentSet implements Iterable { + + + private WeakHashMap> entries = new WeakHashMap>(); // maintain a map of entries for O(log n) lookup + + private Entry head; // reference to the first element + + public ConcurrentSet add(T element) { + if (element == null || entries.containsKey(element)) return this; + synchronized (this) { + insert(element); + } + return this; + } + + public boolean contains(T element){ + Entry entry = entries.get(element); + return entry != null && entry.getValue() != null; + } + + private void insert(T element) { + if (entries.containsKey(element)) return; + if (head == null) { + head = new Entry(element); + } else { + head = new Entry(element, head); + } + entries.put(element, head); + } + + public int size(){ + return entries.size(); + } + + public ConcurrentSet addAll(Iterable elements) { + synchronized (this) { + for (T element : elements) { + if (element == null || entries.containsKey(element)) return this; + + insert(element); + } + } + return this; + } + + public ConcurrentSet remove(T element) { + if (!entries.containsKey(element)) return this; + synchronized (this) { + Entry listelement = entries.get(element); + if(listelement == null)return this; + if (listelement != head) { + listelement.remove(); + } else { + head = head.next(); + } + entries.remove(element); + } + return this; + } + + public Iterator iterator() { + return new Iterator() { + + private Entry current = head; + + public boolean hasNext() { + if (current == null) return false; + T value = current.getValue(); + if (value == null) { // auto-removal of orphan references + remove(); + return hasNext(); + } else { + return true; + } + } + + public T next() { + if (current == null) return null; + T value = current.getValue(); + if (value == null) { // auto-removal of orphan references + remove(); + return next(); + } else { + current = current.next(); + return value; + } + } + + public void remove() { + if (current == null) return; + synchronized (ConcurrentSet.this) { + current.remove(); + current = current.next(); + } + } + }; + } + + + public class Entry { + + private WeakReference value; + + private Entry next; + + private Entry predecessor; + + + private Entry(T value) { + this.value = new WeakReference(value); + } + + private Entry(T value, Entry next) { + this(value); + this.next = next; + next.predecessor = this; + } + + public T getValue() { + return value.get(); + } + + public void remove() { + if (predecessor != null) { + predecessor.setNext(next()); + } else if (next() != null) { + next.predecessor = null; + } + } + + public void setNext(Entry element) { + this.next = element; + if (element != null) element.predecessor = this; + } + + public Entry next() { + return next; + } + + public boolean hasNext() { + return next() != null; + } + + } +} diff --git a/src/test/java/org/mbassy/ConcurrentSetTest.java b/src/test/java/org/mbassy/ConcurrentSetTest.java new file mode 100644 index 0000000..16e074d --- /dev/null +++ b/src/test/java/org/mbassy/ConcurrentSetTest.java @@ -0,0 +1,190 @@ +package org.mbassy; + +import junit.framework.Assert; +import org.junit.Test; +import org.mbassy.common.ConcurrentSet; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Random; + +/** + * Created with IntelliJ IDEA. + * User: benni + * Date: 11/12/12 + * Time: 3:02 PM + * To change this template use File | Settings | File Templates. + */ +public class ConcurrentSetTest extends UnitTest{ + + private int numberOfElements = 100000; + + private int numberOfThreads = 50; + + + @Test + public void testIterator(){ + final HashSet distinct = new HashSet(); + + final ConcurrentSet target = new ConcurrentSet(); + Random rand = new Random(); + + for(int i=0;i < numberOfElements ; i++){ + Object candidate = new Object(); + + if(rand.nextInt() % 3 == 0){ + distinct.add(candidate); + } + target.add(candidate); + } + + runGC(); + + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + for(Object src : target){ + // do nothing + // just iterate to trigger automatic clean up + System.currentTimeMillis(); + } + } + }, numberOfThreads); + + + + + for(Object tar : target){ + Assert.assertTrue(distinct.contains(tar)); + } + + + + } + + + @Test + public void testInsert(){ + final LinkedList duplicates = new LinkedList(); + final HashSet distinct = new HashSet(); + + final ConcurrentSet target = new ConcurrentSet(); + Random rand = new Random(); + + Object candidate = new Object(); + for(int i=0;i < numberOfElements ; i++){ + if(rand.nextInt() % 3 == 0){ + candidate = new Object(); + } + duplicates.add(candidate); + distinct.add(candidate); + } + + + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + for(Object src : duplicates){ + target.add(src); + } + } + }, numberOfThreads); + + pause(3000); + + + for(Object tar : target){ + Assert.assertTrue(distinct.contains(tar)); + } + + for(Object src : distinct){ + Assert.assertTrue(target.contains(src)); + } + + Assert.assertEquals(distinct.size(), target.size()); + } + + + + @Test + public void testRemove1(){ + final HashSet source = new HashSet(); + final HashSet toRemove = new HashSet(); + + final ConcurrentSet target = new ConcurrentSet(); + for(int i=0;i < numberOfElements ; i++){ + Object candidate = new Object(); + source.add(candidate); + if(i % 3 == 0){ + toRemove.add(candidate); + } + } + + + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + for(Object src : source){ + target.add(src); + } + } + }, numberOfThreads); + + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + for(Object src : toRemove){ + target.remove(src); + } + } + }, numberOfThreads); + + pause(3000); + + for(Object tar : target){ + Assert.assertTrue(!toRemove.contains(tar)); + } + + for(Object src : source){ + if(!toRemove.contains(src))Assert.assertTrue(target.contains(src)); + } + } + + @Test + public void testRemove2(){ + final HashSet source = new HashSet(); + final HashSet toRemove = new HashSet(); + + final ConcurrentSet target = new ConcurrentSet(); + for(int i=0;i < numberOfElements ; i++){ + Object candidate = new Object(); + source.add(candidate); + if(i % 3 == 0){ + toRemove.add(candidate); + } + } + + + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + for(Object src : source){ + target.add(src); + if(toRemove.contains(src)) + target.remove(src); + } + } + }, numberOfThreads); + + pause(3000); + + for(Object tar : target){ + Assert.assertTrue(!toRemove.contains(tar)); + } + + for(Object src : source){ + if(!toRemove.contains(src))Assert.assertTrue(target.contains(src)); + } + } + +} diff --git a/src/test/java/org/mbassy/MBassadorTest.java b/src/test/java/org/mbassy/MBassadorTest.java index 8cd6fb6..7ef79ae 100644 --- a/src/test/java/org/mbassy/MBassadorTest.java +++ b/src/test/java/org/mbassy/MBassadorTest.java @@ -19,6 +19,34 @@ import java.util.concurrent.atomic.AtomicInteger; public class MBassadorTest { + + @Test + public void testSubscribe() throws InterruptedException { + + MBassador bus = new MBassador(); + int listenerCount = 1000; + + for (int i = 1; i <= listenerCount; i++) { + EventingTestBean bean = new EventingTestBean(); + bus.subscribe(bean); + bus.unsubscribe(new EventingTestBean()); + + } + } + + @Test + public void testUnSubscribe() throws InterruptedException { + + MBassador bus = new MBassador(); + int listenerCount = 1000; + + for (int i = 1; i <= listenerCount; i++) { + bus.unsubscribe(new EventingTestBean()); + + } + } + + @Test public void testAsynchronous() throws InterruptedException { diff --git a/src/test/java/org/mbassy/UnitTest.java b/src/test/java/org/mbassy/UnitTest.java new file mode 100644 index 0000000..2990bd3 --- /dev/null +++ b/src/test/java/org/mbassy/UnitTest.java @@ -0,0 +1,34 @@ +package org.mbassy; + +import java.lang.ref.WeakReference; + +/** + * Created with IntelliJ IDEA. + * User: benni + * Date: 11/12/12 + * Time: 3:16 PM + * To change this template use File | Settings | File Templates. + */ +public class UnitTest { + + + public void pause(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + public void pause() { + pause(10); + } + + + public void runGC() { + WeakReference ref = new WeakReference(new Object()); + while(ref.get() != null) { + System.gc(); + } + } +}