diff --git a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java index 1db5fd0..d2aec57 100644 --- a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java +++ b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java @@ -101,7 +101,7 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { } else { ISetEntry oldHead = head; head = head.next(); - oldHead.clear(); // optimize for GC + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators } entries.remove(element); } finally { @@ -137,8 +137,10 @@ public abstract class AbstractConcurrentSet implements IConcurrentSet { } else if (next != null) { next.predecessor = null; } - next = null; - predecessor = null; + // can not nullify references to help GC since running iterators might not see the entire set + // if this element is their current element + //next = null; + //predecessor = null; } @Override diff --git a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java index 17d4a89..6ca0cbc 100644 --- a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java @@ -8,6 +8,7 @@ import org.junit.Test; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; /** * This test ensures the correct behaviour of the set implementation that is the building @@ -65,6 +66,47 @@ public abstract class ConcurrentSetTest extends AssertSupport { } } + @Test + public void testIterationWithConcurrentRemoval() { + final IConcurrentSet testSetWeak = createSet(); + final Random rand = new Random(); + + for (int i = 0; i < numberOfElements; i++) { + testSetWeak.add(new AtomicInteger()); + } + + Runnable incrementer = new Runnable() { + @Override + public void run() { + while(testSetWeak.size() > 100){ + for(AtomicInteger element : testSetWeak) + element.incrementAndGet(); + } + + } + }; + + Runnable remover = new Runnable() { + @Override + public void run() { + while(testSetWeak.size() > 100){ + for(AtomicInteger element : testSetWeak) + if(rand.nextInt() % 3 == 0) + testSetWeak.remove(element); + } + } + }; + + ConcurrentExecutor.runConcurrent(20, incrementer, incrementer, remover); + + Set counts = new HashSet(); + for (AtomicInteger count : testSetWeak) { + counts.add(count.get()); + } + assertEquals(1, counts.size()); + } + + @Test public void testRandomRemoval() { @@ -248,7 +290,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { // Adds and removes items // thus forcing constant rehashing of the backing hashtable - Runnable updatingThread = new Runnable() { + Runnable rehasher = new Runnable() { public void run() { Random rand = new Random(); for(int times = 0; times < 1000 ; times++){ @@ -266,7 +308,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { }; }; - Runnable lookupThread = new Runnable() { + Runnable lookup = new Runnable() { @Override public void run() { for (int i = 0; i < 10000; i++) { @@ -280,7 +322,7 @@ public abstract class ConcurrentSetTest extends AssertSupport { } }; - ConcurrentExecutor.runConcurrent(updatingThread, lookupThread, lookupThread, lookupThread); + ConcurrentExecutor.runConcurrent(rehasher, lookup, lookup, lookup); assertTrue("There where items temporarily unavailable: " + missing.size(), missing.size() == 0); } @@ -298,4 +340,5 @@ public abstract class ConcurrentSetTest extends AssertSupport { return result; } + } diff --git a/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java b/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java index b887751..09c1264 100644 --- a/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java +++ b/src/test/java/net/engio/mbassy/common/ConcurrentExecutor.java @@ -23,10 +23,21 @@ public class ConcurrentExecutor { units[i] = unit; } runConcurrent(units); - } - public static void runConcurrent(final Runnable... units) { + + public static void runConcurrent(int numberOfConcurrentExecutions, final Runnable... units) { + Runnable[] runnables = new Runnable[numberOfConcurrentExecutions * units.length]; + // create the tasks and schedule for execution + for (int i = 0; i < numberOfConcurrentExecutions; i++) { + for(int k = 0; k < units.length; k++) + runnables[k * numberOfConcurrentExecutions +i] = units[k]; + } + runConcurrent(runnables); + } + + + public static void runConcurrent(final Runnable... units) { ExecutorService executor = Executors.newCachedThreadPool(); List> returnValues = new ArrayList>();