separated laboratory from data pipeline

master
bennidi 2014-03-20 11:53:55 +01:00
parent b4cb56a01d
commit 3a0af9e2ee
24 changed files with 2129 additions and 0 deletions

99
pom.xml Normal file
View File

@ -0,0 +1,99 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.engio</groupId>
<artifactId>lab</artifactId>
<version>1.0.0.RC1</version>
<packaging>jar</packaging>
<name>Java macro benchmarks</name>
<description>
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>1.0.14</version>
</dependency>
<dependency>
<groupId>net.engio</groupId>
<artifactId>pips</artifactId>
<version>1.0.0.RC1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
<!-- bind the source attaching to package phase -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,135 @@
package net.engio.pips.lab;
import net.engio.pips.data.DataCollector;
import net.engio.pips.data.filter.Sampler;
import net.engio.pips.data.utils.ExecutionTimer;
import java.util.*;
/**
* Each execution unit is provided with an execution context. The context allows
* access to shared objects such as the result collector.
*
* @author bennidi
* Date: 2/11/14
*/
public class ExecutionContext {
private Experiment experiment;
private ExecutionContext parent;
private Map<String, Object> properties = new HashMap<String, Object>();
private long started;
private long finished;
public ExecutionContext(Experiment experiment) {
this.experiment = experiment;
}
public void started(){
started = System.currentTimeMillis();
}
public void finished(){
finished = System.currentTimeMillis();
}
public boolean isFinished(){
return finished != -1;
}
public long getExecutionTime(){
return isFinished() ? finished - started : -1;
}
public ExecutionTimer createExecutionTimer(String timerId){
DataCollector<Long> timings = createLocalCollector(timerId);
Sampler sampler = Sampler.<Long>timeBased((Integer)get(Experiment.Properties.SampleInterval));
sampler.pipeInto(timings);
ExecutionTimer timer = new ExecutionTimer(sampler);
return timer;
}
public <V> DataCollector<V> createLocalCollector(String collectorId){
DataCollector<V> collector = new DataCollector(collectorId);
bind(Experiment.Properties.ExecutionTimers + collectorId, collector);
return collector;
}
public ExecutionContext bind(String key, Object value){
properties.put(key, value);
return this;
}
public ExecutionContext bind(Object value){
return bind(value.toString(), value);
}
public ExecutionContext bindAll(Object ...values){
for(Object value : values)
bind(value);
return this;
}
public ExecutionContext bindAll(Map<String, Object> values){
for(String key : values.keySet())
bind(key, values.get(key));
return this;
}
public ExecutionContext getChild(){
ExecutionContext child = new ExecutionContext(experiment);
child.parent = this;
return child;
}
public <T> T get(String key) {
return properties.containsKey(key)
? (T)properties.get(key)
: parent != null ? (T)parent.get(key) : null;
}
public <T> T get(Object key) {
return get(key.toString());
}
public <T> Collection<T> getAll(Object key){
return getAll(key.toString());
}
public <T> Collection<T> getAll(String key){
LinkedList all = new LinkedList();
if(properties.containsKey(key))
all.add(properties.get(key));
if(parent != null)
all.addAll(parent.getAll(key));
return all;
}
public <T> Collection<T> getMatching(String key){
LinkedList all = new LinkedList();
for(String globalKey : getGlobalKeySet())
if(globalKey.startsWith(key))all.addAll(getAll(globalKey));
return all;
}
private Set<String> getGlobalKeySet(){
Set<String> keys = new HashSet<String>();
keys.addAll(properties.keySet());
ExecutionContext current = parent;
while(current != null){
keys.addAll(parent.properties.keySet());
current = current.parent;
}
return keys;
}
public Map<String, Object> getProperties() {
return Collections.unmodifiableMap(properties);
}
public boolean containsKey(String key) {
return properties.containsKey(key)
|| parent != null ? parent.containsKey(key) : false;
}
}

View File

@ -0,0 +1,46 @@
package net.engio.pips.lab;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
/**
* @author bennidi
* Date: 3/13/14
*/
public class Executions {
private List<ExecutionContext> contexts = new LinkedList<ExecutionContext>();
public boolean addAll(Collection<? extends ExecutionContext> executionContexts) {
return contexts.addAll(executionContexts);
}
public boolean add(ExecutionContext executionContext) {
return contexts.add(executionContext);
}
public <T> Collection<T> getAll(Object key) {
return getAll(key.toString());
}
public <T> Collection<T> getAll(String key) {
LinkedList<T> all = new LinkedList<T>();
for(ExecutionContext ctx : contexts){
all.addAll(ctx.<T>getAll(key));
}
return all;
}
public <T> Collection<T> getMatching(String key) {
LinkedList<T> matching = new LinkedList<T>();
for(ExecutionContext ctx : contexts){
matching.addAll(ctx.<T>getMatching(key));
}
return matching;
}
}

View File

@ -0,0 +1,202 @@
package net.engio.pips.lab;
import net.engio.pips.data.IDataCollector;
import net.engio.pips.lab.workload.Workload;
import net.engio.pips.reports.IReporter;
import java.io.File;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* A benchmark is the container for all information of a formerly executed performance
* measurement. It provides access to all collected data and means to create reports and
* export them to persistent storage
*
* @author bennidi
* Date: 2/11/14
*/
public class Experiment {
public static final class Properties{
public static final String TimeoutInSeconds = "Timeout in seconds";
public static final String SampleInterval = "Sample interval";
public static final String BasePath = "Base path";
public static final String LogStream = "Log stream";
public static final String Title = "Title";
public static final String ReportBaseDir = "Report base dir";
public static final String Collectors = "collectors:";
public static final String ExecutionTimers = Collectors + "execution-timer:";
}
private ExecutionContext context = new ExecutionContext(this);
private List<IReporter> reporters = new LinkedList<IReporter>();
private List<Workload> workloads = new LinkedList<Workload>();
private Executions executions;
private String title;
public Experiment(String title) {
if (title == null || title.isEmpty())
throw new IllegalArgumentException("Please provide a title that is a valid identifier for a directory");
this.title = title;
}
public void setExecutions(Executions executions) {
this.executions = executions;
}
public Executions getExecutions() {
return executions;
}
public Collection<IDataCollector> getCollectors(){
return executions.getMatching(Properties.Collectors);
}
public Collection<IDataCollector> getCollectors(String collectorId){
return executions.getMatching(Experiment.Properties.Collectors + collectorId);
}
/**
* Register a global object that can be accessed from the {@link ExecutionContext}
*
* @param key - The identifier to be used for subsequent lookups using {@code get(key)}
* @param value - The value to associate with the key
* @return
*/
public Experiment register(String key, Object value) {
context.bind(key, value);
return this;
}
public Experiment addWorkload(Workload... workload) {
for (Workload wl : workload)
workloads.add(wl);
return this;
}
public Experiment addReport(IReporter reporter) {
reporters.add(reporter);
return this;
}
public void generateReports() throws Exception {
PrintWriter log = new PrintWriter(getLogStream(), true);
if (reporters.isEmpty()) {
log.println("Skipping report generation because no reporters have been registered");
return;
}
setProperty(Properties.ReportBaseDir, prepareDirectory());
for (IReporter reporter : reporters) {
log.println("Report" + reporter);
reporter.generate(this);
}
}
public String getReportBaseDir() {
return getProperty(Properties.ReportBaseDir);
}
private String prepareDirectory() {
//create directory
File baseDir = new File(getProperty(Properties.BasePath) + File.separator + getTitle() + File.separator + System.currentTimeMillis());
baseDir.mkdirs();
return baseDir.getAbsolutePath() + File.separator;
}
public <T> T get(String key) {
return context.get(key);
}
public List<Workload> getWorkloads() {
return workloads;
}
public boolean isDefined(String key) {
return context.containsKey(key);
}
public <T> T getProperty(String key) {
return (T) context.get(key);
}
public int getSampleInterval() {
return getProperty(Properties.SampleInterval);
}
public Experiment setBasePath(String basePath) {
return setProperty(Properties.BasePath, basePath);
}
public Experiment setSampleInterval(int sampleInterval) {
return setProperty(Properties.SampleInterval, sampleInterval);
}
public int getTimeoutInSeconds() {
return getProperty(Properties.TimeoutInSeconds);
}
public Experiment setProperty(String key, Object value) {
context.bind(key, value);
return this;
}
public OutputStream getLogStream() {
return isDefined(Properties.LogStream) ? (OutputStream) getProperty(Properties.LogStream) : System.out;
}
public Experiment setLogStream(OutputStream out) {
return setProperty(Properties.LogStream, out);
}
@Override
public String toString() {
StringBuilder exp = new StringBuilder();
exp.append("Experiment ");
exp.append(title);
exp.append("with ");
exp.append(workloads.size() + " workloads");
exp.append("\n");
for(Workload load : workloads){
exp.append("\t");
exp.append(load);
}
exp.append("\n");
exp.append("and additional parameters:\n");
for(Map.Entry entry : context.getProperties().entrySet()){
exp.append("\t");
exp.append(entry.getKey());
exp.append(":");
exp.append(entry.getValue());
exp.append("\n");
}
return exp.toString();
}
public String getTitle() {
return title;
}
public ExecutionContext getClobalContext() {
return context;
}
}

View File

@ -0,0 +1,251 @@
package net.engio.pips.lab;
import net.engio.pips.lab.workload.*;
import java.io.PrintWriter;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @Author bennidi
*/
public class Laboratory {
public void run(Experiment... experiments) throws Exception {
for(Experiment experiment : experiments){
// TODO: verify workload configuration (at least one timebased/immediate wl)
measure(experiment);
PrintWriter log = new PrintWriter(experiment.getLogStream(), true);
log.println("Generating reports....");
experiment.generateReports();
}
}
public void measure(final Experiment experiment) {
// each workload will run in its own thread
final ExecutorService executor = Executors.newFixedThreadPool(experiment.getWorkloads().size());
// keeping track of workloads and their corresponding executables
final Map<Workload, WorkloadManager> workloads = new HashMap<Workload, WorkloadManager>(experiment.getWorkloads().size());
//final Map<Workload, Future<Long>> scheduled = Collections.synchronizedMap(new HashMap<Workload, Future<Long>>(experiment.getWorkloads().size()));
final AtomicInteger finishedWorkloads = new AtomicInteger(0);
final PrintWriter log = new PrintWriter(experiment.getLogStream(), true);
final Timer timer = new Timer(true);
Date start = new Date(System.currentTimeMillis());
log.println("Starting experiment at " + start );
// prepare workloads
for(final Workload workload : experiment.getWorkloads()){
workloads.put(workload, new WorkloadManager(workload, experiment));
// keep track of finished workloads
workload.handle(ExecutionEvent.WorkloadCompletion, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
finishedWorkloads.incrementAndGet();
}
});
// cancel workloads when duration is exceeded
if(workload.getDuration().isTimeBased()){
workload.handle(ExecutionEvent.WorkloadInitialization, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
Date timeout = new Date(System.currentTimeMillis() + workload.getDuration().inMillisecs());
log.println("Scheduling timertask to cancel " + workload + " in " + timeout);
timer.schedule(new TimerTask() {
@Override
public void run() {
workloads.get(workload).stop();
}
}, timeout);
}
});
}
// wire up dependent workloads to be started when their predecessor completes
if(workload.getStartCondition().isDependent()){
workload.getStartCondition().getPreceedingWorkload().handle(ExecutionEvent.WorkloadCompletion, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
workloads.get(workload).start(executor);
}
});
}
// wire up dependent workloads to be stopped when their predecessor completes
if(workload.getDuration().isDependent()){
workload.getDuration().getDependingOn().handle(ExecutionEvent.WorkloadCompletion, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
// interrupt the task
workloads.get(workload).stop();
}
});
}
}
// schedule workloads
for(final Workload workload : experiment.getWorkloads()){
// either now
if(workload.getStartCondition().isImmediately()){
workloads.get(workload).start(executor);
}
// or in the future based on specified start condition
else if(workload.getStartCondition().isTimebased()){
timer.schedule(new TimerTask() {
@Override
public void run() {
workloads.get(workload).start(executor);
}
}, new Date(System.currentTimeMillis() + workload.getStartCondition().inMillisecs()));
}
}
// wait until all tasks have been executed
try {
while(finishedWorkloads.get() < experiment.getWorkloads().size())
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}finally {
log.println("Finished experiment");
// merge contexts
Executions executions = new Executions();
for(WorkloadManager workMan : workloads.values())
executions.addAll(workMan.contexts);
experiment.setExecutions(executions);
}
}
private static class WorkloadManager{
private Workload workload;
private Callable<Long> scheduler;
private ExecutorService workloadExecutor;
private List<Future> scheduledTasks = new LinkedList<Future>();
private Future scheduledWorkload;
private List<ExecutionContext> contexts = new LinkedList<ExecutionContext>();
private volatile boolean stopped = false;
private WorkloadManager(Workload workload, Experiment experiment) {
this.workload = workload;
createScheduler(experiment, experiment.getClobalContext().getChild());
}
private void stop(){
stopped = true;
for(Future task : scheduledTasks)
task.cancel(true); // this doesn't seem to have any effect
System.out.println("Canceling workload" + workload);
scheduledWorkload.cancel(true);
workloadExecutor.shutdown();
}
private Future start(ExecutorService executor){
return scheduledWorkload = executor.submit(scheduler);
}
// create a single executable unit which will run the tasks from the given workload
// in its own thread pool
private Callable<Long> createScheduler(final Experiment experiment, final ExecutionContext workloadContext){
workloadExecutor = Executors.newFixedThreadPool(workload.getParallelUnits());
scheduler = new Callable<Long>() {
@Override
public Long call() {
final AtomicInteger scheduled = new AtomicInteger(0);// number of scheduled tasks
final AtomicInteger finished = new AtomicInteger(0); // number of finished tasks
//final ResultCollector collector = experiment.getResults();
final ITaskFactory tasks = workload.getITaskFactory();
final PrintWriter log = new PrintWriter(experiment.getLogStream(), true);
log.println("Starting workload " + workload.getName());
// call initialization handlers before scheduling the actual tasks
workload.started();
workload.getHandler(ExecutionEvent.WorkloadInitialization).handle(workloadContext);
// create the tasks and schedule for execution
for (int i = 0; i < workload.getParallelUnits() ; i++) {
log.println("Scheduling unit " + scheduled.incrementAndGet());
final int taskNumber = i+1;
final ExecutionContext taskContext = workloadContext.getChild();
contexts.add(taskContext);
// simply submit a runnable as return values are not important
scheduledTasks.add(workloadExecutor.submit(new Runnable() {
@Override
public void run() {
try {
ITask task = tasks.create(taskContext);
log.println("Executing task " + taskNumber);
if (workload.getDuration().isRepetitive()) {
for (int i = 0; i < workload.getDuration().getRepetitions(); i++)
task.run(taskContext);
} else {
while (!stopped) {
task.run(taskContext);
}
}
} catch (Exception e) {
log.println("Task" + taskNumber + " threw an exception will orderly execution: " + e.toString());
e.printStackTrace();
throw new RuntimeException(e);
} finally {
finished.incrementAndGet();
log.println("Finished task: " + taskNumber);
log.println("Tasks left:" + (scheduled.get() - finished.get()));
}
}
}));
}
// wait until all tasks have been executed
try {
while(scheduled.get() > finished.get())
Thread.sleep(1000);
} catch (InterruptedException e) {
if(workload.getDuration().isDependent() && !workload.getDuration().getDependingOn().isFinished()){
log.println(workload + " interrupted although dependent workload not finished");
e.printStackTrace(); // something was wrong here
}
if(!workload.getDuration().isTimeBased()
&& !workload.getDuration().isDependent()){
log.println(workload + " interrupted although no time based duration specified");
e.printStackTrace(); // something was wrong here
}
if(workload.getDuration().isTimeBased()
// interrupted before duration ends
&& System.currentTimeMillis() < workload.getDuration().inMillisecs() + workload.getStarted()){
log.println(workload + " interrupted before timer finished");
e.printStackTrace(); // something was wrong here
}
}finally {
// signal end
workload.finished();
log.println("Finished workload: " + workload);
workload.getHandler(ExecutionEvent.WorkloadCompletion).handle(workloadContext);
}
return 1L;
}
};
return scheduler;
}
}
}

View File

@ -0,0 +1,88 @@
package net.engio.pips.lab;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An immutable set of values of type T. Values are pre-generated by a corresponding
* {@link ValueGenerator} and can be accessed randomly.
*
*
* @author bennidi
* Date: 2/13/14
*/
public class Range<T> {
private T[] elements;
private Random random;
private AtomicInteger index=new AtomicInteger(0);
public Range(ValueGenerator<T> generator, int size){
super();
elements = (T[])new Object[size];
for(int i = 0; i < size ; i++){
elements[i] = generator.next();
}
random = new Random();
}
private Range(T[] elements){
this.elements = elements;
random = new Random();
}
public Range<T> shuffle(){
Range<T> clone = new Range<T>(Arrays.copyOf(elements, elements.length));
shuffle(clone.elements);
return clone;
}
private void shuffle(T[] elements){
Random rnd = new Random();
for (int i = elements.length - 1; i > 0; i--)
{
int index = rnd.nextInt(i + 1);
// Simple swap
T a = elements[index];
elements[index] = elements[i];
elements[i] = a;
}
}
/**
* Get a range containing the same values but with a distinct Random
* to avoid contention on the Random object in multi-threaded use.
* @return
*/
public Range<T> clone(){
return new Range<T>(elements);
}
public T getRandomElement(){
return elements[Math.abs(random.nextInt() % elements.length)];
}
public T getNext(){
return index.get() < elements.length -1
? elements[index.getAndIncrement()] // get next element
: elements[index.getAndIncrement() % elements.length]; // handle out of bounds access gracefully
}
public T getElement(int index){
return index < elements.length ? elements[index] : null;
}
public boolean hasNext(){
return index.get() < elements.length -1;
}
public Range<T> startWithRandomIndex(){
index.set(Math.abs(random.nextInt() % elements.length));
return this;
}
}

View File

@ -0,0 +1,11 @@
package net.engio.pips.lab;
/**
*
* @author bennidi
* Date: 2/13/14
*/
public interface ValueGenerator<T> {
public T next();
}

View File

@ -0,0 +1,68 @@
package net.engio.pips.lab.workload;
import java.util.concurrent.TimeUnit;
/**
* A duration defines when to stop a workload from being executed.
*
* @author bennidi
* Date: 3/9/14
*/
public class Duration {
private int timeout;
private TimeUnit unit;
private Workload dependingOn;
private int repetitions = -1;
public Duration(int timeout, TimeUnit unit) {
if(timeout < 1 || unit == null)
throw new IllegalArgumentException("Illegal timeout condition:" + timeout + unit);
this.timeout = timeout;
this.unit = unit;
}
public Duration(Workload dependingOn) {
this.dependingOn = dependingOn;
}
public Duration(int repetitions) {
this.repetitions = repetitions;
}
public long inMillisecs(){
return TimeUnit.MILLISECONDS.convert(timeout, unit);
}
public Workload getDependingOn() {
return dependingOn;
}
public boolean isTimeBased(){
return timeout != -1 && unit != null;
}
public boolean isDependent(){
return dependingOn != null;
}
public int getRepetitions() {
return repetitions;
}
public boolean isRepetitive() {
return repetitions > 0;
}
@Override
public String toString() {
if(isTimeBased()) return "run for " + timeout + unit;
if(isRepetitive()) return "run " + repetitions + " times";
if(isDependent()) return "run until " + dependingOn.getName() + " ends";
return "Unknown duration";
}
}

View File

@ -0,0 +1,12 @@
package net.engio.pips.lab.workload;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/9/14
*/
public enum ExecutionEvent {
WorkloadInitialization,WorkloadCompletion,TaskInitialization,TaskCompletion
}

View File

@ -0,0 +1,15 @@
package net.engio.pips.lab.workload;
import net.engio.pips.lab.ExecutionContext;
/**
* Execution handlers are used to react to execution events
* such as
*
* @author bennidi
* Date: 3/9/14
*/
public interface ExecutionHandler {
void handle(ExecutionContext context);
}

View File

@ -0,0 +1,14 @@
package net.engio.pips.lab.workload;
import net.engio.pips.lab.ExecutionContext;
/**
*
* @author bennidi
* Date: 2/11/14
*/
public interface ITask {
public void run(final ExecutionContext context) throws Exception;
}

View File

@ -0,0 +1,24 @@
package net.engio.pips.lab.workload;
import net.engio.pips.lab.ExecutionContext;
/**
* A factory that provides an iterator like interface to construct
* execution units that can be run by the executor
*
* @author bennidi
* Date: 2/11/14
*/
public interface ITaskFactory {
/**
* Create a new task to be scheduled as part of an associated workload. This method
* might return a new or the same task instance for each call.
*
*
* @param context - The execution context provided by the {@link net.engio.pips.lab.Laboratory}
* @return - A task instance
*/
ITask create(ExecutionContext context);
}

View File

@ -0,0 +1,61 @@
package net.engio.pips.lab.workload;
import java.util.concurrent.TimeUnit;
/**
* @author bennidi
* Date: 3/9/14
*/
public class StartCondition {
private boolean immediately = false;
private int timeout;
private TimeUnit unit;
private Workload after;
// TODO: add argument verification
public StartCondition(int timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
public long inMillisecs(){
return unit.convert(timeout, TimeUnit.MILLISECONDS);
}
public StartCondition() {
immediately = true;
}
public StartCondition(Workload after) {
this.after = after;
}
public boolean isTimebased(){
return unit != null;
}
public boolean isImmediately(){
return immediately;
}
public boolean isDependent(){
return after != null ;
}
public Workload getPreceedingWorkload() {
return after;
}
@Override
public String toString() {
if(isTimebased())return "start after " + timeout + unit;
if(isDependent()) return "start after workload " + after.getName();
if(isImmediately()) return "start immediately";
return "Unknown startcondition";
}
}

View File

@ -0,0 +1,195 @@
package net.engio.pips.lab.workload;
import net.engio.pips.lab.ExecutionContext;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* A single workload defines a set of {@link ITask} to be executed as part of an {@link net.engio.pips.lab.Experiment}.
* Tasks are potentially run in parallel depending on the configuration of {@code setParallelTasks}.
* Tasks are created using the corresponding {@link net.engio.pips.lab.workload.ITaskFactory}.
*
*
* @author bennidi
* Date: 3/6/14
*/
public class Workload {
private int parallelUnits = 1;
private ITaskFactory ITaskFactory;
private Duration duration;
private StartCondition starting = new StartCondition();
private String name;
private volatile long started;
private volatile long finished;
private Map<ExecutionEvent, ExecutionHandlerWrapper> handlers = new HashMap<ExecutionEvent, ExecutionHandlerWrapper>();
public Workload(String name) {
this.name = name;
}
public void started(){
started = System.currentTimeMillis();
}
public void finished(){
finished = System.currentTimeMillis();
}
public boolean isFinished(){
return finished != -1;
}
public long getExecutionTime(){
return isFinished() ? finished - started : -1;
}
public int getParallelUnits() {
return parallelUnits;
}
public Workload setParallelTasks(int parallelUnits) {
if(parallelUnits < 1 )throw new IllegalArgumentException("At least one task must run");
this.parallelUnits = parallelUnits;
return this;
}
public String getName() {
return name;
}
public ITaskFactory getITaskFactory() {
return ITaskFactory;
}
public Workload setITaskFactory(ITaskFactory ITaskFactory) {
this.ITaskFactory = ITaskFactory;
return this;
}
public Workload handle(ExecutionEvent event, ExecutionHandler handler){
if(handlers.containsKey(event)){
handlers.get(event).delegate.add(handler);
}
else handlers.put(event, new ExecutionHandlerWrapper(handler));
return this;
}
public Duration getDuration() {
return duration;
}
public ExecutionHandler getHandler(ExecutionEvent event) {
return handlers.containsKey(event) ? handlers.get(event) : Empty;
}
public StartCondition getStartCondition() {
return starting;
}
public StartSpecification starts(){
return new StartSpecification();
}
public DurationSpecification duration(){
return new DurationSpecification();
}
private static final ExecutionHandler Empty = new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
// do nothing;
}
};
@Override
public String toString() {
StringBuilder wl = new StringBuilder();
wl.append(name);
wl.append("(" + getExecutionTime() + "ms)");
wl.append("->");
wl.append("Parallel tasks:" + getParallelUnits());
wl.append(",");
wl.append(getStartCondition());
wl.append(",");
wl.append(getDuration());
wl.append("\n");
return wl.toString();
}
public long getStarted() {
return started;
}
public class StartSpecification{
public Workload after(int timeout, TimeUnit unit){
starting = new StartCondition(timeout,unit);
return Workload.this;
}
public Workload after(Workload preceeding){
starting = new StartCondition(preceeding);
return Workload.this;
}
public Workload immediately(){
starting = new StartCondition();
return Workload.this;
}
}
public class DurationSpecification{
public Workload lasts(int timeout, TimeUnit unit){
duration = new Duration(timeout, unit);
return Workload.this;
}
public Workload depends(Workload preceeding) {
duration = new Duration(preceeding);
return Workload.this;
}
public Workload repetitions(int repetitions) {
duration = new Duration(repetitions);
return Workload.this;
}
}
public static class ExecutionHandlerWrapper implements ExecutionHandler{
private List<ExecutionHandler> delegate = new LinkedList<ExecutionHandler>();
public ExecutionHandlerWrapper(ExecutionHandler handler) {
delegate.add(handler);
}
@Override
public void handle(ExecutionContext context) {
for(ExecutionHandler handler : delegate){
try{
handler.handle(context);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}

View File

@ -0,0 +1,38 @@
package net.engio.pips.reports;
import net.engio.pips.data.IDataCollector;
import net.engio.pips.lab.Experiment;
import java.io.File;
import java.io.PrintWriter;
/**
* @author bennidi
* Date: 2/27/14
*/
public class CSVFileExporter implements IReporter {
public void generate(Experiment experiment) throws Exception {
String reportDirectory = experiment.getReportBaseDir();
File report = new File(reportDirectory + "report.txt");
PrintWriter writer = new PrintWriter(report);
try {
// write report header
writer.println("###### EXPERIMENT ##########");
writer.println(experiment);
// write data of collectors
writer.println();
writer.println("##### COLLECTORS ########");
for (IDataCollector collector: experiment.getCollectors()) {
writer.println(collector);
}
} finally {
writer.close();
}
}
}

View File

@ -0,0 +1,135 @@
package net.engio.pips.reports;
import net.engio.pips.lab.Experiment;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartUtilities;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.NumberAxis;
import org.jfree.chart.plot.XYPlot;
import org.jfree.data.time.TimeSeriesCollection;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
* @author bennidi
* Date: 3/3/14
*/
public class ChartGenerator implements IReporter {
private List<SeriesGroup> groups = new ArrayList<SeriesGroup>();
private int pixelPerDatapoint = 5;
private String title = "Title";
private String xLabel = "X-Axis";
private String filename = "chart.jpg";
public ChartGenerator setPixelPerDatapoint(int pxPerDP) {
this.pixelPerDatapoint = pxPerDP;
return this;
}
public ChartGenerator setTitle(String title) {
this.title = title;
return this;
}
public ChartGenerator setXAxisLabel(String xLabel) {
this.xLabel = xLabel;
return this;
}
public ChartGenerator setFileName(String filename){
this.filename = filename;
return this;
}
/**
* Configure a new group that will be treated as a single dataset in the chart.
* Each dataset has its own range axis.
* @param collectorId : The id used to retrieve associated data collectors from the result collector
* @param groupLabel : The label used for the range axis
* @return
*/
public SeriesGroup addGroup(String groupLabel, String collectorId){
SeriesGroup g = new SeriesGroup(groupLabel, collectorId);
groups.add(g);
return g;
}
public ChartGenerator add(SeriesGroup seriesGroup) {
groups.add(seriesGroup);
return this;
}
public void generate(Experiment experiment){
Map<String, Integer> groupAxis = new HashMap<String, Integer>();
// process default group
SeriesGroup defaultGroup = this.groups.get(0);
TimeSeriesCollection collection = defaultGroup.createDataSet(experiment);
int maxNumberOfDatapoints = defaultGroup.getSize();
groupAxis.put(defaultGroup.getLabel(), 0);
// create initial chart
JFreeChart chart = ChartFactory.createTimeSeriesChart(
title, // title
xLabel, // x-axis label
defaultGroup.getLabel(), // y-axis label
collection, // data
true, // create legend?
true, // generate tooltips?
false // generate URLs?
);
// preconfigure plot layout
final XYPlot plot = chart.getXYPlot();
plot.setRangeGridlinesVisible(true);
plot.setRangeGridlinePaint(Color.BLACK);
plot.setDomainGridlinesVisible(true);
plot.setDomainGridlinePaint(Color.BLACK);
plot.setBackgroundPaint(Color.DARK_GRAY);
// add other groups
List<SeriesGroup> groups = this.groups.subList(1, this.groups.size());
int axisIndex = 1,
dataSetIndex = 1;
for(SeriesGroup group : groups){
plot.setDataset(dataSetIndex, group.createDataSet(experiment));
// if the group does not share a range axis with an already mapped group
if(!groupAxis.containsKey(group.getLabel())){
final NumberAxis axis2 = new NumberAxis(group.getLabel());
// prevent the axis to be scaled from zero if the dataset begins with higher values
axis2.setAutoRangeIncludesZero(false);
plot.setRangeAxis(axisIndex, axis2);
plot.mapDatasetToRangeAxis(dataSetIndex, axisIndex);
groupAxis.put(group.getLabel(), axisIndex); // remember this axis for subsequent groups
axisIndex++;
}
else{
plot.mapDatasetToRangeAxis(dataSetIndex, groupAxis.get(group.getLabel()));
}
dataSetIndex++;
if(maxNumberOfDatapoints < group.getSize())maxNumberOfDatapoints = group.getSize();
}
// calculate width of graph based on number of total data points
// Note: assumes that the data of all groups spans (roughly) the same domain range
int width = maxNumberOfDatapoints * pixelPerDatapoint;
try {
String path = experiment.getReportBaseDir() + filename;
ChartUtilities.saveChartAsJPEG(new File(path), chart, width <= 1024 ? 1024 : width, 1024);
} catch (IOException e) {
System.err.println("Problem occurred creating chart.");
}
}
}

View File

@ -0,0 +1,15 @@
package net.engio.pips.reports;
import net.engio.pips.lab.Experiment;
/**
* Take the benchmark and create some output. This may be everything from logging
* to console, writing to disk, creating charts...
*
* @author bennidi
* Date: 3/4/14
*/
public interface IReporter {
void generate(Experiment experiment) throws Exception;
}

View File

@ -0,0 +1,119 @@
package net.engio.pips.reports;
import net.engio.pips.data.DataCollector;
import net.engio.pips.data.IDataCollector;
import net.engio.pips.data.IDataProcessor;
import net.engio.pips.data.aggregator.Average;
import net.engio.pips.data.utils.ItemCounter;
import net.engio.pips.data.utils.TimeBasedAggregator;
import net.engio.pips.lab.Experiment;
import org.jfree.data.time.TimeSeries;
import org.jfree.data.time.TimeSeriesCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* A series group is used to configure a set of data collectors for being included in a time series
* chart. Apart from the collectors themselves, aggregates can be configured to generate series
* like a moving average from the set of collectors.
*
* @author bennidi
* Date: 3/4/14
*/
public class SeriesGroup {
private String collectorId;
private String label;
private Map<String, IDataProcessor> aggregators = new HashMap<String, IDataProcessor>();
private Collection<IDataCollector> collectors = new ArrayList<IDataCollector>();
private int size;
private String yAxis = "";
private int collectorSampleSize = 1;
public SeriesGroup(String collectorId, String label) {
this.collectorId = collectorId;
this.label = label;
}
public SeriesGroup addCollector(IDataCollector collector){
collectors.add(collector);
return this;
}
public String getLabel() {
return label;
}
public int getSize() {
return size;
}
public String getCollectorId() {
return collectorId;
}
public SeriesGroup aggregate(String name, IDataProcessor aggregator){
aggregators.put(name, aggregator);
return this;
}
public String getyAxis() {
return yAxis;
}
public SeriesGroup setyAxis(String yAxis) {
this.yAxis = yAxis;
return this;
}
public SeriesGroup setDrawEveryNthGraph(int factor) {
this.collectorSampleSize = factor;
return this;
}
public TimeSeriesCollection createDataSet(Experiment experiment){
Collection<IDataCollector> collectors = experiment.getCollectors(collectorId);
collectors.addAll(this.collectors);
TimeSeriesCollection collection = new TimeSeriesCollection();
TimeBasedAggregator aggregator = new TimeBasedAggregator();
// create a series from each data collector
int numberOfCollectors = 1;
for(IDataCollector collector : collectors){
// ignore empty data collectors as well as according to sample size
if(collector == null || collector.size() == 0)continue;
if(numberOfCollectors % collectorSampleSize != 0){
numberOfCollectors++;
continue;
}
TimeSeriesConsumer wrapper = new TimeSeriesConsumer(collector.getId());
collector.feed(wrapper);
TimeSeries series = wrapper.getSeries();
collection.addSeries(series);
// prepare the time based aggregator
if(!aggregators.isEmpty())
aggregator.consume(collector);
numberOfCollectors++;
}
DataCollector average = aggregator.fold(new Average());
ItemCounter numberOfDatapoints = new ItemCounter();
for(Map.Entry<String, IDataProcessor> aggregation : aggregators.entrySet()){
TimeSeriesConsumer series = new TimeSeriesConsumer(aggregation.getKey());
IDataProcessor aggregate = aggregation.getValue();
aggregate.pipeInto(numberOfDatapoints).pipeInto(series);
average.feed(aggregate);
collection.addSeries(series.getSeries());
}
size = numberOfDatapoints.getItemCount();
return collection;
}
}

View File

@ -0,0 +1,105 @@
package net.engio.pips.reports;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartUtilities;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.NumberAxis;
import org.jfree.chart.plot.XYPlot;
import org.jfree.chart.renderer.xy.XYLineAndShapeRenderer;
import org.jfree.data.time.TimeSeriesCollection;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author bennidi
* Date: 2/25/14
*/
public class TimeSeriesCollector {
private String id;
private Map<String, List<TimeSeriesConsumer>> series = new HashMap<String, List<TimeSeriesConsumer>>();
public TimeSeriesCollector(String id) {
this.id = id;
}
public synchronized TimeSeriesConsumer makeSeries(String group, String label){
TimeSeriesConsumer mySeries = new TimeSeriesConsumer(label);
// get or create the group
List groupSeries = series.get(group);
if(groupSeries == null){
groupSeries = new ArrayList();
series.put(group, groupSeries);
}
groupSeries.add(mySeries);
return mySeries;
}
public void generateChart(String path, String title, String xLabel, int width){
String defaultGroup = null;
Map<String, TimeSeriesCollection> groups = new HashMap<String, TimeSeriesCollection>();
// each list in the series map forms a collection
// the original map is transformed to a new map with the same amount of keys
// the default group is the first (arbitrary)
for(Map.Entry<String, List<TimeSeriesConsumer>> entry: series.entrySet()){
TimeSeriesCollection collection = new TimeSeriesCollection();
if(defaultGroup == null)defaultGroup = entry.getKey();
for(TimeSeriesConsumer consumer : entry.getValue()){
if(!consumer.getSeries().isEmpty()){
// add the series to the collection that represents its group
collection.addSeries(consumer.getSeries());
}
}
groups.put(entry.getKey(), collection);
}
JFreeChart chart = ChartFactory.createTimeSeriesChart(
title, // title
xLabel, // x-axis label
defaultGroup, // y-axis label
groups.get(defaultGroup), // data
true, // create legend?
true, // generate tooltips?
false // generate URLs?
);
// configure plot
final XYPlot plot = chart.getXYPlot();
plot.setRangeGridlinesVisible(true);
plot.setRangeGridlinePaint(Color.BLACK);
plot.setDomainGridlinesVisible(true);
plot.setDomainGridlinePaint(Color.BLACK);
plot.setBackgroundPaint(Color.DARK_GRAY);
// add other groups
int axisIndex = 1;
for(Map.Entry<String, TimeSeriesCollection> entry: groups.entrySet()){
if(entry.getKey().equals(defaultGroup))continue; // skip the default group, because its already part of the chart
final NumberAxis axis2 = new NumberAxis(entry.getKey());
axis2.setAutoRangeIncludesZero(false); // prevent the axis to be scaled from zero if the dataset begins with higher values
plot.setRangeAxis(axisIndex, axis2);
plot.setDataset(axisIndex, entry.getValue());
plot.mapDatasetToRangeAxis(axisIndex, axisIndex);
axisIndex++;
}
XYLineAndShapeRenderer renderer = (XYLineAndShapeRenderer)plot.getRenderer();
for(int seriesIdx =0 ; seriesIdx < plot.getSeriesCount() ; seriesIdx++){
renderer.setSeriesShapesVisible(seriesIdx, true);
}
try {
ChartUtilities.saveChartAsJPEG(new File(path), chart, width, 1024);
} catch (IOException e) {
System.err.println("Problem occurred creating chart.");
}
}
}

View File

@ -0,0 +1,38 @@
package net.engio.pips.reports;
import net.engio.pips.data.DataPoint;
import net.engio.pips.data.DataProcessor;
import org.jfree.data.time.FixedMillisecond;
import org.jfree.data.time.TimeSeries;
/**
* @author bennidi
* Date: 2/25/14
*/
public class TimeSeriesConsumer<N extends Number> extends DataProcessor<N,N>{
private TimeSeries series;
private String label;
public TimeSeriesConsumer(String label) {
series = new TimeSeries(label);
this.label = label;
}
@Override
public void receive(DataPoint<N> datapoint) {
series.addOrUpdate(new FixedMillisecond(datapoint.getTsCreated()), datapoint.getValue());
emit(datapoint);
}
public TimeSeries getSeries() {
return series;
}
public String getLabel() {
return label;
}
}

View File

@ -0,0 +1,11 @@
package net.engio.lab;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(value = {
LaboratoryTest.class,
ExecutionContextTest.class})
public class AllTests {
}

View File

@ -0,0 +1,150 @@
package net.engio.lab;
import net.engio.pips.lab.ExecutionContext;
import net.engio.pips.lab.Executions;
import net.engio.pips.lab.Experiment;
import org.junit.Test;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author bennidi
* Date: 3/13/14
*/
public class ExecutionContextTest extends UnitTest{
private Map<String, Object> bindings(){
Map<String, Object> bindings = new HashMap<String, Object>();
bindings.put(ExecutionContext.class.toString(), ExecutionContext.class);
bindings.put(UnitTest.class.toString(), UnitTest.class);
bindings.put(Experiment.class.toString(), Experiment.class);
return bindings;
}
private ExecutionContext getInitialContext(Map<String, Object> bindings){
ExecutionContext ctx = new ExecutionContext(new Experiment("test"));
ctx.bindAll(bindings);
assertBindingsExist(bindings, ctx);
return ctx;
}
private void assertBindingsExist(Map<String, Object> bindings, ExecutionContext ctx){
for(String key : bindings.keySet())
assertEquals(bindings.get(key), ctx.get(key));
}
private void assertBindindsExist(Object[] bindings, ExecutionContext ctx){
for(Object value : bindings)
assertEquals(value, ctx.get(value.toString()));
}
private void assertBindingsAbsent(Map<String, Object> bindings, ExecutionContext ctx){
for(String key : bindings.keySet())
assertNull(ctx.get(key));
}
private void assertBindingsAbsent(Object[] bindings, ExecutionContext ctx){
for(Object value : bindings)
assertNull(ctx.get(value.toString()));
}
@Test
public void testRootBindGet(){
ExecutionContext ctx = getInitialContext(bindings());
Object[] bindings = new Object[]{"Object1", 2, new Object()};
// bind values
for(Object value : bindings)
ctx.bind(value);
// retrieve using the two available methods
for(Object value : bindings){
assertEquals(value, ctx.get(value.toString()));
assertEquals(value, ctx.get(value));
}
}
@Test
public void testChildAccess(){
ExecutionContext ctx = getInitialContext(bindings());
ExecutionContext child = ctx.getChild();
assertBindingsExist(bindings(), child);
Object[] bindings = new Object[]{"Object1", 2, new Object()};
child.bind(bindings);
assertBindindsExist(bindings, child);
assertBindingsAbsent(bindings, ctx);
}
@Test
public void testGetAll(){
ExecutionContext ctx = getInitialContext(bindings());
ExecutionContext child = ctx.getChild();
child.bindAll(bindings());
assertBindingsExist(bindings(), child);
for(String key : bindings().keySet()){
assertEquals(2, child.getAll(key).size());
assertEquals(1, ctx.getAll(key).size());
}
}
@Test
public void testGetMatching(){
ExecutionContext ctx = new ExecutionContext(new Experiment("test"));
Object[] bindings = new Object[]{"root", "root:lvl1", "root:lvl1:lvl2", "none"};
ctx.bind(bindings);
assertEquals(3, ctx.getMatching("root").size());
assertEquals(2, ctx.getMatching("root:lvl1").size());
assertEquals(1, ctx.getMatching("root:lvl1:lvl").size());
assertEquals(1, ctx.getMatching("root:lvl1:lvl2").size());
assertEquals(3, ctx.getMatching("ro").size());
assertEquals(0, ctx.getMatching("null").size());
assertEquals(1, ctx.getMatching("no").size());
ExecutionContext child = ctx.getChild();
child.bind(bindings);
assertEquals(6, child.getMatching("root").size());
assertEquals(4, child.getMatching("root:lvl1").size());
assertEquals(2, child.getMatching("root:lvl1:lvl").size());
assertEquals(2, child.getMatching("root:lvl1:lvl2").size());
assertEquals(6, child.getMatching("ro").size());
assertEquals(0, child.getMatching("null").size());
assertEquals(2, child.getMatching("no").size());
}
@Test
public void testExecutions(){
ExecutionContext ctx = new ExecutionContext(new Experiment("test"));
Object[] bindings = new Object[]{"root", "root:lvl1", "root:lvl1:lvl2", "none"};
ctx.bind(bindings);
ExecutionContext child = ctx.getChild();
child.bind(bindings);
ExecutionContext child2 = ctx.getChild();
Executions executions = new Executions();
executions.add(child);
executions.add(child2);
Collection matching = executions.getMatching("root");
assertEquals(9, matching.size());
}
}

View File

@ -0,0 +1,212 @@
package net.engio.lab;
import net.engio.pips.lab.ExecutionContext;
import net.engio.pips.lab.Experiment;
import net.engio.pips.lab.Laboratory;
import net.engio.pips.lab.workload.*;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author bennidi
* Date: 2/25/14
*/
public class LaboratoryTest extends UnitTest{
public static ITaskFactory NoOperation = new ITaskFactory() {
@Override
public ITask create(ExecutionContext context) {
return new ITask() {
@Override
public void run(ExecutionContext context) throws Exception {
// do nothing
}
};
}
};
@Test
public void testWorkloadSchedulingStartingAfter() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
Workload first = new Workload("First workload")
.setParallelTasks(15)
.setITaskFactory(new ITaskFactory() {
@Override
public ITask create(ExecutionContext context) {
return new ITask() {
@Override
public void run(ExecutionContext context) throws Exception {
counter.incrementAndGet();
}
};
}
})
.duration().repetitions(1)
.starts().immediately();
Workload second = new Workload("Second Workload")
.setParallelTasks(15)
.setITaskFactory(new ITaskFactory() {
@Override
public ITask create(ExecutionContext context) {
return new ITask() {
@Override
public void run(ExecutionContext context) throws Exception {
counter.decrementAndGet();
}
};
}
})
.handle(ExecutionEvent.WorkloadInitialization, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
// the first workload
assertEquals(15, counter.get());
}
})
.duration().repetitions(1)
.starts().after(first);
Laboratory lab = new Laboratory();
lab.run(new Experiment("test").addWorkload(first, second));
Thread.sleep(100);
// both have run in the end
assertEquals(0, counter.get());
}
@Test
public void testWorkloadShutdownCancelsTasks() throws Exception {
final AtomicInteger counter = new AtomicInteger(1);
Workload countUp = new Workload("First workload")
.setParallelTasks(15)
.setITaskFactory(new ITaskFactory() {
@Override
public ITask create(ExecutionContext context) {
return new ITask() {
@Override
public void run(ExecutionContext context) throws Exception {
counter.incrementAndGet();
}
};
}
})
.duration().lasts(5, TimeUnit.SECONDS)
.starts().immediately();
Workload countDown = new Workload("Second Workload")
.setParallelTasks(15)
.setITaskFactory(new ITaskFactory() {
@Override
public ITask create(ExecutionContext context) {
return new ITask() {
@Override
public void run(ExecutionContext context) throws Exception {
counter.decrementAndGet();
}
};
}
})
.duration().depends(countUp)
.starts().immediately();
Laboratory lab = new Laboratory();
lab.run(new Experiment("test").addWorkload(countUp, countDown));
int count = counter.get();
// no threads that change the counter are running anymore
Thread.sleep(1000);
assertEquals(count, counter.get());
// both have run in the end
assertTrue(counter.get() < 16);
}
@Test
public void testWorkloadSchedulingDurationDepends() throws Exception {
final AtomicLong start = new AtomicLong(0);
final AtomicLong finish = new AtomicLong(0);
final AtomicLong finishSecond = new AtomicLong(0);
int duration = 3;
Workload first = new Workload("First workload")
.setParallelTasks(5)
.duration().lasts(duration, TimeUnit.SECONDS)
.starts().immediately()
.setITaskFactory(NoOperation)
.handle(ExecutionEvent.WorkloadInitialization, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
start.set(System.currentTimeMillis());
}
})
.handle(ExecutionEvent.WorkloadCompletion, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
finish.set(System.currentTimeMillis());
}
});
Workload second = new Workload("Second Workload")
.setParallelTasks(5)
.setITaskFactory(NoOperation)
.handle(ExecutionEvent.WorkloadCompletion, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
// the first workload
finishSecond.set(System.currentTimeMillis());
}
})
.duration().depends(first)
.starts().immediately();
Laboratory lab = new Laboratory();
lab.run(new Experiment("test").addWorkload(first, second));
// first workload has run at least duration seconds
assertTrue(finish.get() - start.get() >= duration * 1000);
// second workload has not run significantly longer then first (~1ms per thread)
assertTrue(finishSecond.get() - finish.get() < 5);
}
@Test
public void testMultipleDependencies() throws Exception {
final AtomicBoolean finished = new AtomicBoolean(false);
Workload first = new Workload("First workload")
.setParallelTasks(5)
.duration().lasts(10, TimeUnit.SECONDS)
.starts().immediately()
.setITaskFactory(NoOperation);
Workload second = new Workload("Second Workload")
.setParallelTasks(5)
.setITaskFactory(NoOperation)
.duration().depends(first)
.starts().immediately();
Workload third = new Workload("Third Workload")
.setParallelTasks(5)
.setITaskFactory(NoOperation)
.duration().depends(first)
.starts().immediately()
.handle(ExecutionEvent.WorkloadCompletion, new ExecutionHandler() {
@Override
public void handle(ExecutionContext context) {
finished.set(true);
}
});
Laboratory lab = new Laboratory();
lab.run(new Experiment("test").addWorkload(first, second, third));
assertTrue(finished.get());
}
}

View File

@ -0,0 +1,85 @@
package net.engio.lab;
import org.junit.After;
import org.junit.Before;
import java.util.Collection;
import java.util.Random;
public abstract class UnitTest {
private Random random = new Random();
@Before
public void setUp() {
}
@After
public void tearDown() {
}
public void fail(String message) {
org.junit.Assert.fail(message);
}
public void fail() {
org.junit.Assert.fail();
}
public void assertTrue(Boolean condition) {
org.junit.Assert.assertTrue(condition);
}
public void assertTrue(String message, Boolean condition) {
org.junit.Assert.assertTrue(message, condition);
}
public void assertFalse(Boolean condition) {
org.junit.Assert.assertFalse(condition);
}
public void assertNull(Object object) {
org.junit.Assert.assertNull(object);
}
public void assertNotNull(Object object) {
org.junit.Assert.assertNotNull(object);
}
public void assertFalse(String message, Boolean condition) {
org.junit.Assert.assertFalse(message, condition);
}
public void assertEquals(Object expected, Object actual) {
org.junit.Assert.assertEquals(expected, actual);
}
public <T extends Collection> void assertNotEmpty(T collection) {
org.junit.Assert.assertTrue(collection != null && !collection.isEmpty());
}
protected <T> T getRandomElement(T[] values){
int opIndex = Math.abs(random.nextInt()) % values.length;
return values[opIndex];
}
protected void pause(long timeInMs){
try {
Thread.currentThread().sleep(timeInMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
protected void pause(){
pause(10);
}
protected Random getRandom(){
return random;
}
}