Merge pull request #1 from bennidi/master

Download the latest changes from bennidi
This commit is contained in:
durron597 2013-11-08 13:46:45 -08:00
commit edf4939f59
5 changed files with 65 additions and 12 deletions

View File

@ -11,7 +11,7 @@ Read this documentation to get an overview of MBassadors features. There is also
not enough to make a developer happy (work is in progress). But usage of publish subscribe pattern at its core is pretty straight forward and the basic
use cases are very easy to understand and implement.
The current version is 1.1.7 and it is available from the Maven Central Repository. See the release notes for more details.
The current version is 1.1.8 and it is available from the Maven Central Repository. See the release notes for more details.
There is also an extension available to support CDI-like transactional message sending in a Spring environment. It's beta but
stable enough to give it a try. See <a href="https://github.com/bennidi/mbassador-spring" target="_blank">here</a>.
@ -157,6 +157,14 @@ to avoid confusion and misunderstanding.
<h2>Release Notes</h2>
<h3>1.1.8</h3>
+ Internal refactorings and code improvements
+ Fixed #44 #45 #47
+ NOTE: This release has a known issue with weak references which introduces a memory leak. A fix is on its way for 1.1.9
to be released soon
<h3>1.1.7</h3>
+ Console Logger not added to message bus instances by default -> use addErrorHandler(IPublicationErrorHandler.ConsoleLogger)
@ -224,7 +232,12 @@ First stable release!
<h2>Roadmap</h2>
+ Spring integration with support for conditional message dispatch in transactional context (dispatch only after
Check the issues marked with label enhancement. Comment if you would like to see the feature in a future release.
Please understand that I have limited time to include new features and that I will focus on stability and cleaner APIs.
Adding features only works well with well designed and thoroughly tested components especially with all this multi-threaded code
and I am still not 100 percent happy with the existing test coverage.
Planned for release:Spring integration with support for conditional message dispatch in transactional context (dispatch only after
successful commit etc.). Currently in beta, see <a href="https://github.com/bennidi/mbassador-spring">this</a> repository

View File

@ -9,7 +9,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.1.8-SNAPSHOT</version>
<version>1.1.10-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>mbassador</name>
<description>

View File

@ -17,7 +17,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
// Internal state
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
protected Entry<T> head; // reference to the first element

View File

@ -4,6 +4,7 @@ package net.engio.mbassy.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
import java.util.concurrent.locks.Lock;
/**
* This implementation uses weak references to the elements. Iterators automatically perform cleanups of
@ -25,15 +26,36 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
public Iterator<T> iterator() {
return new Iterator<T>() {
// the current listelement of this iterator
// used to keep track of the iteration process
private ISetEntry<T> current = head;
// this method will remove all orphaned entries
// until it finds the first entry whose value has not yet been garbage collected
// the method assumes that the current element is already orphaned and will remove it
private void removeOrphans(){
Lock writelock = lock.writeLock();
try{
writelock.lock();
do {
ISetEntry orphaned = current;
current = current.next();
orphaned.remove();
} while(current != null && current.getValue() == null);
}
finally {
writelock.unlock();
}
}
public boolean hasNext() {
if (current == null) return false;
if (current.getValue() == null) { // auto-removal of orphan references
do {
remove();
} while(current != null && current.getValue() == null);
return hasNext();
if (current.getValue() == null) {
// trigger removal of orphan references
// because a null value indicates that the value has been garbage collected
removeOrphans();
return current != null; // if any entry is left then it will have a value
} else {
return true;
}
@ -45,9 +67,7 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
}
T value = current.getValue();
if (value == null) { // auto-removal of orphan references
do {
remove();
} while(current != null && current.getValue() == null);
removeOrphans();
return next();
} else {
current = current.next();
@ -56,6 +76,7 @@ public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
}
public void remove() {
//throw new UnsupportedOperationException("Explicit removal of set elements is only allowed via the controlling set. Sorry!");
if (current == null) {
return;
}

View File

@ -3,6 +3,7 @@ package net.engio.mbassy.common;
import junit.framework.Assert;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.messages.MessageTypes;
@ -33,6 +34,8 @@ public abstract class MessageBusTest extends AssertSupport {
};
private StrongConcurrentSet<MessagePublication> issuedPublications = new StrongConcurrentSet<MessagePublication>();
@Before
public void setUp(){
for(MessageTypes mes : MessageTypes.values())
@ -52,4 +55,20 @@ public abstract class MessageBusTest extends AssertSupport {
return bus;
}
public void waitForPublications(long timeOutInMs){
long start = System.currentTimeMillis();
while(issuedPublications.size() > 0 && System.currentTimeMillis() - start < timeOutInMs){
for(MessagePublication pub : issuedPublications){
if(pub.isFinished())
issuedPublications.remove(pub);
}
}
if(issuedPublications.size() > 0)
fail("Issued publications did not finish within specified timeout of " + timeOutInMs + " ms");
}
public void addPublication(MessagePublication publication){
issuedPublications.add(publication);
}
}