Updated to use non-deprecated junit. added missing generic parameters

This commit is contained in:
nathan 2015-07-07 00:36:23 +02:00
parent d57dca4226
commit 4e75e28802
7 changed files with 149 additions and 103 deletions

View File

@ -1,23 +1,13 @@
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import dorkbox.util.messagebus.common.AssertSupport; import dorkbox.util.messagebus.common.AssertSupport;
import dorkbox.util.messagebus.common.ConcurrentExecutor; import dorkbox.util.messagebus.common.ConcurrentExecutor;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* This test ensures the correct behaviour of the set implementation that is the building * This test ensures the correct behaviour of the set implementation that is the building
@ -171,7 +161,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// ensure that the test set does not contain any of the elements that have been removed from it // ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) { for (Object tar : testSet) {
Assert.assertTrue(!toRemove.contains(tar)); assertTrue(!toRemove.contains(tar));
} }
// ensure that the test set still contains all objects from the source set that have not been marked // ensure that the test set still contains all objects from the source set that have not been marked
// for removal // for removal
@ -214,7 +204,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// ensure that the test set does not contain any of the elements that have been removed from it // ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) { for (Object tar : testSet) {
Assert.assertTrue(!toRemove.contains(tar)); assertTrue(!toRemove.contains(tar));
} }
// ensure that the test set still contains all objects from the source set that have not been marked // ensure that the test set still contains all objects from the source set that have not been marked
// for removal // for removal

View File

@ -7,12 +7,14 @@ import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentExecutor; import dorkbox.util.messagebus.common.ConcurrentExecutor;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
import junit.framework.Assert;
import static org.junit.Assert.*;
/** /**
* @author dorkbox, llc Date: 2/2/15 * @author dorkbox, llc Date: 2/2/15
*/ */
public class PerformanceTest { public
class PerformanceTest {
// 15 == 32 * 1024 // 15 == 32 * 1024
public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000;
@ -22,21 +24,29 @@ public class PerformanceTest {
protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
@Override @Override
public void handleError(PublicationError error) { public
error.getCause().printStackTrace(); void handleError(PublicationError error) {
Assert.fail(); error.getCause()
.printStackTrace();
fail();
} }
@Override @Override
public void handleError(final String error, final Class<?> listenerClass) { public
void handleError(final String error, final Class<?> listenerClass) {
// Printout the error itself // Printout the error itself
System.out.println(new StringBuilder().append(error).append(": ").append(listenerClass.getSimpleName()).toString()); System.out.println(new StringBuilder().append(error)
.append(": ")
.append(listenerClass.getSimpleName())
.toString());
} }
}; };
public static void main(String[] args) throws Exception { public static
void main(String[] args) throws Exception {
final MessageBus bus = new MessageBus(CONCURRENCY_LEVEL); final MessageBus bus = new MessageBus(CONCURRENCY_LEVEL);
bus.getErrorHandler().addErrorHandler(TestFailingHandler); bus.getErrorHandler()
.addErrorHandler(TestFailingHandler);
Listener listener1 = new Listener(); Listener listener1 = new Listener();
@ -45,7 +55,8 @@ public class PerformanceTest {
ConcurrentExecutor.runConcurrent(new Runnable() { ConcurrentExecutor.runConcurrent(new Runnable() {
@Override @Override
public void run() { public
void run() {
Long num = Long.valueOf(7L); Long num = Long.valueOf(7L);
while (true) { while (true) {
bus.publish(num); bus.publish(num);
@ -57,13 +68,16 @@ public class PerformanceTest {
bus.shutdown(); bus.shutdown();
} }
public PerformanceTest() { public
PerformanceTest() {
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
public static class Listener { public static
class Listener {
@Handler @Handler
public void handleSync(Long o1) { public
void handleSync(Long o1) {
// System.err.println(Long.toString(o1)); // System.err.println(Long.toString(o1));
} }
} }

View File

@ -63,7 +63,8 @@ public class AssertSupport {
Assert.fail(message); Assert.fail(message);
} }
public void fail() { public static
void fail() {
Assert.fail(); Assert.fail();
} }

View File

@ -1,10 +1,10 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.common;
import junit.framework.Assert;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.fail;
/** /**
* The factory can be used to declaratively specify how many instances of some given classes * The factory can be used to declaratively specify how many instances of some given classes
* should be created. It will create those instances using reflection and provide a list containing those instances. * should be created. It will create those instances using reflection and provide a list containing those instances.
@ -14,87 +14,103 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author bennidi * @author bennidi
* Date: 11/22/12 * Date: 11/22/12
*/ */
public class ListenerFactory { public
class ListenerFactory {
private Map<Class, Integer> requiredBeans = new HashMap<Class, Integer>(); private Map<Class, Integer> requiredBeans = new HashMap<Class, Integer>();
private volatile List generatedListeners; private volatile List generatedListeners;
private int requiredSize = 0; private int requiredSize = 0;
public int getNumberOfListeners(Class listener){ public
int getNumberOfListeners(Class listener) {
return requiredBeans.containsKey(listener) ? requiredBeans.get(listener) : 0; return requiredBeans.containsKey(listener) ? requiredBeans.get(listener) : 0;
} }
public ListenerFactory create(int numberOfInstances, Class clazz){ public
ListenerFactory create(int numberOfInstances, Class clazz) {
requiredBeans.put(clazz, numberOfInstances); requiredBeans.put(clazz, numberOfInstances);
requiredSize +=numberOfInstances; requiredSize += numberOfInstances;
return this; return this;
} }
public ListenerFactory create(int numberOfInstances, Class ...classes){ public
for(Class clazz : classes) ListenerFactory create(int numberOfInstances, Class... classes) {
create(numberOfInstances,clazz); for (Class clazz : classes) {
create(numberOfInstances, clazz);
}
return this; return this;
} }
public ListenerFactory create(int numberOfInstances, Collection<Class> classes){ public
for(Class clazz : classes) ListenerFactory create(int numberOfInstances, Collection<Class> classes) {
create(numberOfInstances,clazz); for (Class clazz : classes) {
create(numberOfInstances, clazz);
}
return this; return this;
} }
public synchronized List<Object> getAll(){ @SuppressWarnings("unchecked")
if(generatedListeners != null) public synchronized
List<Object> getAll() {
if (generatedListeners != null) {
return generatedListeners; return generatedListeners;
}
List listeners = new ArrayList(requiredSize); List listeners = new ArrayList(requiredSize);
try { try {
for(Class clazz : requiredBeans.keySet()){ for (Class clazz : requiredBeans.keySet()) {
int numberOfRequiredBeans = requiredBeans.get(clazz); int numberOfRequiredBeans = requiredBeans.get(clazz);
for(int i = 0; i < numberOfRequiredBeans; i++){ for (int i = 0; i < numberOfRequiredBeans; i++) {
listeners.add(clazz.newInstance()); listeners.add(clazz.newInstance());
} }
} }
} catch (Exception e) { } catch (Exception e) {
// if instantiation fails, counts will be incorrect // if instantiation fails, counts will be incorrect
// -> fail early here // -> fail early here
Assert.fail("There was a problem instantiating a listener " + e); fail("There was a problem instantiating a listener " + e);
} }
Collections.shuffle(listeners); Collections.shuffle(listeners);
generatedListeners = Collections.unmodifiableList(listeners); generatedListeners = Collections.unmodifiableList(listeners);
return generatedListeners; return generatedListeners;
} }
// not thread-safe but not yet used concurrently // not thread-safe but not yet used concurrently
public synchronized void clear(){ public synchronized
void clear() {
generatedListeners = null; generatedListeners = null;
requiredBeans.clear(); requiredBeans.clear();
} }
/** /**
* Create a thread-safe read-only iterator * Create a thread-safe read-only iterator
* * <p/>
* NOTE: Iterator is not perfectly synchronized with mutator methods of the list of generated listeners * NOTE: Iterator is not perfectly synchronized with mutator methods of the list of generated listeners
* In theory, it is possible that the list is changed while iterators are still running which should be avoided. * In theory, it is possible that the list is changed while iterators are still running which should be avoided.
*
* @return * @return
*/ */
public Iterator iterator(){ public
Iterator iterator() {
getAll(); getAll();
final AtomicInteger current = new AtomicInteger(0); final AtomicInteger current = new AtomicInteger(0);
return new Iterator() { return new Iterator() {
@Override @Override
public boolean hasNext() { public
boolean hasNext() {
return current.get() < generatedListeners.size(); return current.get() < generatedListeners.size();
} }
@Override @Override
public Object next() { public
int index = current.getAndIncrement(); Object next() {
int index = current.getAndIncrement();
return index < generatedListeners.size() ? generatedListeners.get(index) : null; return index < generatedListeners.size() ? generatedListeners.get(index) : null;
} }
@Override @Override
public void remove() { public
void remove() {
throw new UnsupportedOperationException("Iterator is read only"); throw new UnsupportedOperationException("Iterator is read only");
} }
}; };

View File

@ -4,7 +4,6 @@ import dorkbox.util.messagebus.MessageBus;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.messages.MessageTypes; import dorkbox.util.messagebus.messages.MessageTypes;
import junit.framework.Assert;
import org.junit.Before; import org.junit.Before;
/** /**
@ -14,7 +13,8 @@ import org.junit.Before;
* @author bennidi * @author bennidi
* Date: 3/2/13 * Date: 3/2/13
*/ */
public abstract class MessageBusTest extends AssertSupport { public abstract
class MessageBusTest extends AssertSupport {
// this value probably needs to be adjusted depending on the performance of the underlying plattform // this value probably needs to be adjusted depending on the performance of the underlying plattform
// otherwise the tests will fail since asynchronous processing might not have finished when // otherwise the tests will fail since asynchronous processing might not have finished when
@ -25,35 +25,43 @@ public abstract class MessageBusTest extends AssertSupport {
protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() { protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
@Override @Override
public void handleError(PublicationError error) { public
error.getCause().printStackTrace(); void handleError(PublicationError error) {
Assert.fail(); error.getCause()
.printStackTrace();
fail();
} }
@Override @Override
public void handleError(final String error, final Class<?> listenerClass) { public
void handleError(final String error, final Class<?> listenerClass) {
System.err.println(error + " " + listenerClass); System.err.println(error + " " + listenerClass);
} }
}; };
@Before @Before
public void setUp() { public
void setUp() {
for (MessageTypes mes : MessageTypes.values()) { for (MessageTypes mes : MessageTypes.values()) {
mes.reset(); mes.reset();
} }
} }
public MessageBus createBus() { public
MessageBus createBus() {
MessageBus bus = new MessageBus(); MessageBus bus = new MessageBus();
bus.getErrorHandler().addErrorHandler(TestFailingHandler); bus.getErrorHandler()
.addErrorHandler(TestFailingHandler);
bus.start(); bus.start();
return bus; return bus;
} }
public MessageBus createBus(ListenerFactory listeners) { public
MessageBus createBus(ListenerFactory listeners) {
MessageBus bus = new MessageBus(); MessageBus bus = new MessageBus();
bus.getErrorHandler().addErrorHandler(TestFailingHandler); bus.getErrorHandler()
.addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits); ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
bus.start(); bus.start();
return bus; return bus;

View File

@ -2,13 +2,15 @@ package dorkbox.util.messagebus.queuePerf;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
public class PerfTest_LinkedBlockingQueue_Block { public
class PerfTest_LinkedBlockingQueue_Block {
public static final int REPETITIONS = 50 * 1000 * 100; public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777); public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 4; private static final int concurrency = 4;
public static void main(final String[] args) throws Exception { public static
void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency); System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);
final int warmupRuns = 4; final int warmupRuns = 4;
@ -17,10 +19,15 @@ public class PerfTest_LinkedBlockingQueue_Block {
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(concurrency); final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(concurrency);
long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); long average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); System.out.format("summary,QueuePerfTest,%s %,d\n",
queue.getClass()
.getSimpleName(),
average);
} }
public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { public static
long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions)
throws Exception {
int runs = warmUpRuns + sumCount; int runs = warmUpRuns + sumCount;
final long[] results = new long[runs]; final long[] results = new long[runs];
for (int i = 0; i < runs; i++) { for (int i = 0; i < runs; i++) {
@ -33,39 +40,41 @@ public class PerfTest_LinkedBlockingQueue_Block {
sum += results[i]; sum += results[i];
} }
return sum/sumCount; return sum / sumCount;
} }
private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { private static
long performanceRun(int runNumber, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions)
throws Exception {
Producer[] producers = new Producer[concurrency]; Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency]; Consumer[] consumers = new Consumer[concurrency];
Thread[] threads = new Thread[concurrency*2]; Thread[] threads = new Thread[concurrency * 2];
for (int i=0;i<concurrency;i++) { for (int i = 0; i < concurrency; i++) {
producers[i] = new Producer(queue, repetitions); producers[i] = new Producer(queue, repetitions);
consumers[i] = new Consumer(queue, repetitions); consumers[i] = new Consumer(queue, repetitions);
} }
for (int j=0,i=0;i<concurrency;i++,j+=2) { for (int j = 0, i = 0; i < concurrency; i++, j += 2) {
threads[j] = new Thread(producers[i], "Producer " + i); threads[j] = new Thread(producers[i], "Producer " + i);
threads[j+1] = new Thread(consumers[i], "Consumer " + i); threads[j + 1] = new Thread(consumers[i], "Consumer " + i);
} }
for (int i=0;i<concurrency*2;i+=2) { for (int i = 0; i < concurrency * 2; i += 2) {
threads[i].start(); threads[i].start();
threads[i+1].start(); threads[i + 1].start();
} }
for (int i=0;i<concurrency*2;i+=2) { for (int i = 0; i < concurrency * 2; i += 2) {
threads[i].join(); threads[i].join();
threads[i+1].join(); threads[i + 1].join();
} }
long start = Long.MAX_VALUE; long start = Long.MAX_VALUE;
long end = -1; long end = -1;
for (int i=0;i<concurrency;i++) { for (int i = 0; i < concurrency; i++) {
if (producers[i].start < start) { if (producers[i].start < start) {
start = producers[i].start; start = producers[i].start;
} }
@ -78,7 +87,8 @@ public class PerfTest_LinkedBlockingQueue_Block {
long duration = end - start; long duration = end - start;
long ops = repetitions * 1_000_000_000L / duration; long ops = repetitions * 1_000_000_000L / duration;
String qName = queue.getClass().getSimpleName(); String qName = queue.getClass()
.getSimpleName();
if (showStats) { if (showStats) {
System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName); System.out.format("%d - ops/sec=%,d - %s\n", runNumber, ops, qName);
@ -86,19 +96,22 @@ public class PerfTest_LinkedBlockingQueue_Block {
return ops; return ops;
} }
public static class Producer implements Runnable { public static
private final LinkedBlockingQueue queue; class Producer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
volatile long start; volatile long start;
private int repetitions; private int repetitions;
public Producer(LinkedBlockingQueue queue, int repetitions) { public
Producer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions; this.repetitions = repetitions;
} }
@Override @Override
public void run() { public
LinkedBlockingQueue producer = this.queue; void run() {
LinkedBlockingQueue<Integer> producer = this.queue;
int i = this.repetitions; int i = this.repetitions;
this.start = System.nanoTime(); this.start = System.nanoTime();
@ -112,20 +125,24 @@ public class PerfTest_LinkedBlockingQueue_Block {
} }
} }
public static class Consumer implements Runnable {
private final LinkedBlockingQueue queue; public static
class Consumer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
Object result; Object result;
volatile long end; volatile long end;
private int repetitions; private int repetitions;
public Consumer(LinkedBlockingQueue queue, int repetitions) { public
Consumer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions; this.repetitions = repetitions;
} }
@Override @Override
public void run() { public
LinkedBlockingQueue consumer = this.queue; void run() {
LinkedBlockingQueue<Integer> consumer = this.queue;
Object result = null; Object result = null;
int i = this.repetitions; int i = this.repetitions;

View File

@ -18,13 +18,13 @@ public class PerfTest_LinkedBlockingQueue_NonBlock {
long average = 0; long average = 0;
final LinkedBlockingQueue queue = new LinkedBlockingQueue(Integer.MAX_VALUE); final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue(Integer.MAX_VALUE);
average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS); average = averageRun(warmupRuns, runs, queue, true, concurrency, REPETITIONS);
System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average); System.out.format("summary,QueuePerfTest,%s %,d\n", queue.getClass().getSimpleName(), average);
} }
public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { public static long averageRun(int warmUpRuns, int sumCount, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions) throws Exception {
int runs = warmUpRuns + sumCount; int runs = warmUpRuns + sumCount;
final long[] results = new long[runs]; final long[] results = new long[runs];
for (int i = 0; i < runs; i++) { for (int i = 0; i < runs; i++) {
@ -40,7 +40,7 @@ public class PerfTest_LinkedBlockingQueue_NonBlock {
return sum/sumCount; return sum/sumCount;
} }
private static long performanceRun(int runNumber, LinkedBlockingQueue queue, boolean showStats, int concurrency, int repetitions) throws Exception { private static long performanceRun(int runNumber, LinkedBlockingQueue<Integer> queue, boolean showStats, int concurrency, int repetitions) throws Exception {
Producer[] producers = new Producer[concurrency]; Producer[] producers = new Producer[concurrency];
Consumer[] consumers = new Consumer[concurrency]; Consumer[] consumers = new Consumer[concurrency];
@ -91,18 +91,18 @@ public class PerfTest_LinkedBlockingQueue_NonBlock {
} }
public static class Producer implements Runnable { public static class Producer implements Runnable {
private final LinkedBlockingQueue queue; private final LinkedBlockingQueue<Integer> queue;
volatile long start; volatile long start;
private int repetitions; private int repetitions;
public Producer(LinkedBlockingQueue queue, int repetitions) { public Producer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions; this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
LinkedBlockingQueue producer = this.queue; LinkedBlockingQueue<Integer> producer = this.queue;
int i = this.repetitions; int i = this.repetitions;
this.start = System.nanoTime(); this.start = System.nanoTime();
@ -115,19 +115,19 @@ public class PerfTest_LinkedBlockingQueue_NonBlock {
} }
public static class Consumer implements Runnable { public static class Consumer implements Runnable {
private final LinkedBlockingQueue queue; private final LinkedBlockingQueue<Integer> queue;
Object result; Object result;
volatile long end; volatile long end;
private int repetitions; private int repetitions;
public Consumer(LinkedBlockingQueue queue, int repetitions) { public Consumer(LinkedBlockingQueue<Integer> queue, int repetitions) {
this.queue = queue; this.queue = queue;
this.repetitions = repetitions; this.repetitions = repetitions;
} }
@Override @Override
public void run() { public void run() {
LinkedBlockingQueue consumer = this.queue; LinkedBlockingQueue<Integer> consumer = this.queue;
Object result = null; Object result = null;
int i = this.repetitions; int i = this.repetitions;