fixed race condition in set iterators

This commit is contained in:
benjamin 2013-07-03 15:06:29 +02:00
parent 7bfc426a67
commit 46cedcee46
3 changed files with 64 additions and 8 deletions

View File

@ -101,7 +101,7 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
} else { } else {
ISetEntry<T> oldHead = head; ISetEntry<T> oldHead = head;
head = head.next(); head = head.next();
oldHead.clear(); // optimize for GC //oldHead.clear(); // optimize for GC not possible because of potentially running iterators
} }
entries.remove(element); entries.remove(element);
} finally { } finally {
@ -137,8 +137,10 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
} else if (next != null) { } else if (next != null) {
next.predecessor = null; next.predecessor = null;
} }
next = null; // can not nullify references to help GC since running iterators might not see the entire set
predecessor = null; // if this element is their current element
//next = null;
//predecessor = null;
} }
@Override @Override

View File

@ -8,6 +8,7 @@ import org.junit.Test;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* This test ensures the correct behaviour of the set implementation that is the building * This test ensures the correct behaviour of the set implementation that is the building
@ -65,6 +66,47 @@ public abstract class ConcurrentSetTest extends AssertSupport {
} }
} }
@Test
public void testIterationWithConcurrentRemoval() {
final IConcurrentSet<AtomicInteger> testSetWeak = createSet();
final Random rand = new Random();
for (int i = 0; i < numberOfElements; i++) {
testSetWeak.add(new AtomicInteger());
}
Runnable incrementer = new Runnable() {
@Override
public void run() {
while(testSetWeak.size() > 100){
for(AtomicInteger element : testSetWeak)
element.incrementAndGet();
}
}
};
Runnable remover = new Runnable() {
@Override
public void run() {
while(testSetWeak.size() > 100){
for(AtomicInteger element : testSetWeak)
if(rand.nextInt() % 3 == 0)
testSetWeak.remove(element);
}
}
};
ConcurrentExecutor.runConcurrent(20, incrementer, incrementer, remover);
Set<Integer> counts = new HashSet<Integer>();
for (AtomicInteger count : testSetWeak) {
counts.add(count.get());
}
assertEquals(1, counts.size());
}
@Test @Test
public void testRandomRemoval() { public void testRandomRemoval() {
@ -248,7 +290,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// Adds and removes items // Adds and removes items
// thus forcing constant rehashing of the backing hashtable // thus forcing constant rehashing of the backing hashtable
Runnable updatingThread = new Runnable() { Runnable rehasher = new Runnable() {
public void run() { public void run() {
Random rand = new Random(); Random rand = new Random();
for(int times = 0; times < 1000 ; times++){ for(int times = 0; times < 1000 ; times++){
@ -266,7 +308,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
}; };
}; };
Runnable lookupThread = new Runnable() { Runnable lookup = new Runnable() {
@Override @Override
public void run() { public void run() {
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
@ -280,7 +322,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
} }
}; };
ConcurrentExecutor.runConcurrent(updatingThread, lookupThread, lookupThread, lookupThread); ConcurrentExecutor.runConcurrent(rehasher, lookup, lookup, lookup);
assertTrue("There where items temporarily unavailable: " + missing.size(), missing.size() == 0); assertTrue("There where items temporarily unavailable: " + missing.size(), missing.size() == 0);
} }
@ -298,4 +340,5 @@ public abstract class ConcurrentSetTest extends AssertSupport {
return result; return result;
} }
} }

View File

@ -23,10 +23,21 @@ public class ConcurrentExecutor {
units[i] = unit; units[i] = unit;
} }
runConcurrent(units); runConcurrent(units);
} }
public static void runConcurrent(final Runnable... units) {
public static void runConcurrent(int numberOfConcurrentExecutions, final Runnable... units) {
Runnable[] runnables = new Runnable[numberOfConcurrentExecutions * units.length];
// create the tasks and schedule for execution
for (int i = 0; i < numberOfConcurrentExecutions; i++) {
for(int k = 0; k < units.length; k++)
runnables[k * numberOfConcurrentExecutions +i] = units[k];
}
runConcurrent(runnables);
}
public static void runConcurrent(final Runnable... units) {
ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Long>> returnValues = new ArrayList<Future<Long>>(); List<Future<Long>> returnValues = new ArrayList<Future<Long>>();