Added default console logger. memory padding tweakes

This commit is contained in:
nathan 2015-05-14 15:32:25 +02:00
parent d0dfafb5a5
commit 6668c2ff81
8 changed files with 98 additions and 48 deletions

View File

@ -66,6 +66,11 @@ public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport {
*/
boolean hasPendingMessages();
/**
* Starts the bus
*/
void start();
/**
* Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and
* other internally used threads will be shutdown gracefully. After calling shutdown it is not safe

View File

@ -27,6 +27,11 @@ import dorkbox.util.messagebus.subscription.Subscription;
*/
public class MultiMBassador implements IMessageBus {
public static final String ERROR_HANDLER_MSG =
"INFO: No error handler has been configured to handle exceptions during publication.\n" +
"Publication error handlers can be added by bus.addErrorHandler()\n" +
"Falling back to console logger.";
// this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
@ -133,7 +138,6 @@ public class MultiMBassador implements IMessageBus {
Thread runner = dispatchThreadFactory.newThread(runnable);
this.threads.add(runner);
runner.start();
}
}
@ -168,6 +172,19 @@ public class MultiMBassador implements IMessageBus {
return this.dispatchQueue.hasPendingMessages();
}
@Override
public void start() {
for (Thread t : this.threads) {
t.start();
}
synchronized (this.errorHandlers) {
if (this.errorHandlers.isEmpty()) {
this.errorHandlers.add(new IPublicationErrorHandler.ConsoleLogger());
System.out.println(ERROR_HANDLER_MSG);
}
}
}
@Override
public void shutdown() {
this.shuttingDown = true;

View File

@ -8,6 +8,11 @@ import java.util.concurrent.locks.Lock;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
abstract class pad<T> extends item<T> {
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
* Running read iterators will not be affected by add operations since writes always insert at the head of the
@ -17,14 +22,17 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
* @author bennidi
* Date: 2/12/12
*/
public abstract class AbstractConcurrentSet<T> implements Set<T> {
public abstract class AbstractConcurrentSet<T> extends pad<T> implements Set<T> {
private static final AtomicLong id = new AtomicLong();
private final transient long ID = id.getAndIncrement();
// Internal state
protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
private final transient Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
public transient Entry<T> head; // reference to the first element
volatile long y0, y1, y2, y4, y5, y6 = 7L;
volatile long z0, z1, z2, z4, z5, z6 = 7L;
protected AbstractConcurrentSet(Map<T, ISetEntry<T>> entries) {
this.entries = entries;
@ -196,45 +204,4 @@ public abstract class AbstractConcurrentSet<T> implements Set<T> {
}
public abstract static class Entry<T> implements ISetEntry<T> {
private Entry<T> next;
private Entry<T> predecessor;
protected Entry(Entry<T> next) {
this.next = next;
next.predecessor = this;
}
protected Entry() {
}
// not thread-safe! must be synchronized in enclosing context
@Override
public void remove() {
if (this.predecessor != null) {
this.predecessor.next = this.next;
if (this.next != null) {
this.next.predecessor = this.predecessor;
}
} else if (this.next != null) {
this.next.predecessor = null;
}
// can not nullify references to help GC since running iterators might not see the entire set
// if this element is their current element
//next = null;
//predecessor = null;
}
@Override
public Entry<T> next() {
return this.next;
}
@Override
public void clear() {
this.next = null;
}
}
}

View File

@ -0,0 +1,59 @@
package dorkbox.util.messagebus.common;
abstract class pad0<T> {
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class item1<T> extends pad0<T>{
Entry<T> next;
}
abstract class pad1<T> extends item1<T> {
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
abstract class item2<T> extends pad1<T> {
Entry<T> prev;
}
abstract class pad2<T> extends item2<T> {
volatile long z0, z1, z2, z4, z5, z6 = 7L;
}
public abstract class Entry<T> extends pad2<T> implements ISetEntry<T> {
protected Entry(Entry<T> next) {
this.next = next;
next.prev = this;
}
protected Entry() {
}
// not thread-safe! must be synchronized in enclosing context
@Override
public void remove() {
if (this.prev != null) {
this.prev.next = this.next;
if (this.next != null) {
this.next.prev = this.prev;
}
} else if (this.next != null) {
this.next.prev = null;
}
// can not nullify references to help GC since running iterators might not see the entire set
// if this element is their current element
//next = null;
//predecessor = null;
}
@Override
public Entry<T> next() {
return this.next;
}
@Override
public void clear() {
this.next = null;
}
}

View File

@ -0,0 +1,5 @@
package dorkbox.util.messagebus.common;
public abstract class item<T> {
public volatile Entry<T> head; // reference to the first element
}

View File

@ -24,13 +24,11 @@ public interface IPublicationErrorHandler {
* print the stack trace if available.
*/
static final class ConsoleLogger implements IPublicationErrorHandler {
/**
* {@inheritDoc}
*/
@Override
public void handleError(final PublicationError error) {
// Printout the error itself
System.out.println(error);

View File

@ -114,7 +114,6 @@ public class Subscription {
while (current != null) {
listener = current.getValue();
current = current.next();
//this.count++;
try {
invocation.invoke(listener, handler, handleIndex, message);

View File

@ -7,7 +7,7 @@ public class PerfTest_MBassador {
public static final int REPETITIONS = 50 * 1000 * 100;
public static final Integer TEST_VALUE = Integer.valueOf(777);
private static final int concurrency = 10;
private static final int concurrency = 1;
public static void main(final String[] args) throws Exception {
System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);