diff --git a/README.md b/README.md index 8a15438..807ec86 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Read this documentation to get an overview of MBassadors features. There is also not enough to make a developer happy (work is in progress). But usage of publish subscribe pattern at its core is pretty straight forward and the basic use cases are very easy to understand and implement. -The current version is 1.1.7 and it is available from the Maven Central Repository. See the release notes for more details. +The current version is 1.1.8 and it is available from the Maven Central Repository. See the release notes for more details. There is also an extension available to support CDI-like transactional message sending in a Spring environment. It's beta but stable enough to give it a try. See here. @@ -157,6 +157,14 @@ to avoid confusion and misunderstanding.

Release Notes

+

1.1.8

+ + + Internal refactorings and code improvements + + Fixed #44 #45 #47 + + NOTE: This release has a known issue with weak references which introduces a memory leak. A fix is on its way for 1.1.9 + to be released soon + +

1.1.7

+ Console Logger not added to message bus instances by default -> use addErrorHandler(IPublicationErrorHandler.ConsoleLogger) @@ -224,7 +232,12 @@ First stable release!

Roadmap

-+ Spring integration with support for conditional message dispatch in transactional context (dispatch only after +Check the issues marked with label enhancement. Comment if you would like to see the feature in a future release. +Please understand that I have limited time to include new features and that I will focus on stability and cleaner APIs. +Adding features only works well with well designed and thoroughly tested components especially with all this multi-threaded code +and I am still not 100 percent happy with the existing test coverage. + +Planned for release:Spring integration with support for conditional message dispatch in transactional context (dispatch only after successful commit etc.). Currently in beta, see this repository diff --git a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java index d2aec57..b916a82 100644 --- a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java @@ -17,7 +17,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class AbstractConcurrentSet implements IConcurrentSet { // Internal state - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Map> entries; // maintain a map of entries for O(log n) lookup protected Entry head; // reference to the first element diff --git a/src/main/java/net/engio/mbassy/common/WeakConcurrentSet.java b/src/main/java/net/engio/mbassy/common/WeakConcurrentSet.java index d5caf9a..0741cf1 100644 --- a/src/main/java/net/engio/mbassy/common/WeakConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/common/WeakConcurrentSet.java @@ -4,6 +4,7 @@ package net.engio.mbassy.common; import java.lang.ref.WeakReference; import java.util.Iterator; import java.util.WeakHashMap; +import java.util.concurrent.locks.Lock; /** * This implementation uses weak references to the elements. Iterators automatically perform cleanups of @@ -25,15 +26,36 @@ public class WeakConcurrentSet extends AbstractConcurrentSet{ public Iterator iterator() { return new Iterator() { + // the current listelement of this iterator + // used to keep track of the iteration process private ISetEntry current = head; + // this method will remove all orphaned entries + // until it finds the first entry whose value has not yet been garbage collected + // the method assumes that the current element is already orphaned and will remove it + private void removeOrphans(){ + Lock writelock = lock.writeLock(); + try{ + writelock.lock(); + do { + ISetEntry orphaned = current; + current = current.next(); + orphaned.remove(); + } while(current != null && current.getValue() == null); + } + finally { + writelock.unlock(); + } + } + + public boolean hasNext() { if (current == null) return false; - if (current.getValue() == null) { // auto-removal of orphan references - do { - remove(); - } while(current != null && current.getValue() == null); - return hasNext(); + if (current.getValue() == null) { + // trigger removal of orphan references + // because a null value indicates that the value has been garbage collected + removeOrphans(); + return current != null; // if any entry is left then it will have a value } else { return true; } @@ -45,9 +67,7 @@ public class WeakConcurrentSet extends AbstractConcurrentSet{ } T value = current.getValue(); if (value == null) { // auto-removal of orphan references - do { - remove(); - } while(current != null && current.getValue() == null); + removeOrphans(); return next(); } else { current = current.next(); @@ -56,6 +76,7 @@ public class WeakConcurrentSet extends AbstractConcurrentSet{ } public void remove() { + //throw new UnsupportedOperationException("Explicit removal of set elements is only allowed via the controlling set. Sorry!"); if (current == null) { return; }