diff --git a/src/main/java/dorkbox/util/messagebus/IMessageBus.java b/src/main/java/dorkbox/util/messagebus/IMessageBus.java index 53becbc..fe6e675 100644 --- a/src/main/java/dorkbox/util/messagebus/IMessageBus.java +++ b/src/main/java/dorkbox/util/messagebus/IMessageBus.java @@ -66,6 +66,11 @@ public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport { */ boolean hasPendingMessages(); + /** + * Starts the bus + */ + void start(); + /** * Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and * other internally used threads will be shutdown gracefully. After calling shutdown it is not safe diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 7d0fd8d..8a6f529 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -27,6 +27,11 @@ import dorkbox.util.messagebus.subscription.Subscription; */ public class MultiMBassador implements IMessageBus { + public static final String ERROR_HANDLER_MSG = + "INFO: No error handler has been configured to handle exceptions during publication.\n" + + "Publication error handlers can be added by bus.addErrorHandler()\n" + + "Falling back to console logger."; + // this handler will receive all errors that occur during message dispatch or message handling private final Collection errorHandlers = new ArrayDeque(); @@ -133,7 +138,6 @@ public class MultiMBassador implements IMessageBus { Thread runner = dispatchThreadFactory.newThread(runnable); this.threads.add(runner); - runner.start(); } } @@ -168,6 +172,19 @@ public class MultiMBassador implements IMessageBus { return this.dispatchQueue.hasPendingMessages(); } + @Override + public void start() { + for (Thread t : this.threads) { + t.start(); + } + synchronized (this.errorHandlers) { + if (this.errorHandlers.isEmpty()) { + this.errorHandlers.add(new IPublicationErrorHandler.ConsoleLogger()); + System.out.println(ERROR_HANDLER_MSG); + } + } + } + @Override public void shutdown() { this.shuttingDown = true; diff --git a/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java b/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java index 54b0f9b..0125083 100644 --- a/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java +++ b/src/main/java/dorkbox/util/messagebus/common/AbstractConcurrentSet.java @@ -8,6 +8,11 @@ import java.util.concurrent.locks.Lock; import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; + +abstract class pad extends item { + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + /** * This data structure is optimized for non-blocking reads even when write operations occur. * Running read iterators will not be affected by add operations since writes always insert at the head of the @@ -17,14 +22,17 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; * @author bennidi * Date: 2/12/12 */ -public abstract class AbstractConcurrentSet implements Set { +public abstract class AbstractConcurrentSet extends pad implements Set { private static final AtomicLong id = new AtomicLong(); private final transient long ID = id.getAndIncrement(); // Internal state protected final transient ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock(); private final transient Map> entries; // maintain a map of entries for O(log n) lookup - public transient Entry head; // reference to the first element + + volatile long y0, y1, y2, y4, y5, y6 = 7L; + volatile long z0, z1, z2, z4, z5, z6 = 7L; + protected AbstractConcurrentSet(Map> entries) { this.entries = entries; @@ -196,45 +204,4 @@ public abstract class AbstractConcurrentSet implements Set { } - public abstract static class Entry implements ISetEntry { - - private Entry next; - - private Entry predecessor; - - protected Entry(Entry next) { - this.next = next; - next.predecessor = this; - } - - protected Entry() { - } - - // not thread-safe! must be synchronized in enclosing context - @Override - public void remove() { - if (this.predecessor != null) { - this.predecessor.next = this.next; - if (this.next != null) { - this.next.predecessor = this.predecessor; - } - } else if (this.next != null) { - this.next.predecessor = null; - } - // can not nullify references to help GC since running iterators might not see the entire set - // if this element is their current element - //next = null; - //predecessor = null; - } - - @Override - public Entry next() { - return this.next; - } - - @Override - public void clear() { - this.next = null; - } - } } diff --git a/src/main/java/dorkbox/util/messagebus/common/Entry.java b/src/main/java/dorkbox/util/messagebus/common/Entry.java new file mode 100644 index 0000000..87359eb --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/Entry.java @@ -0,0 +1,59 @@ +package dorkbox.util.messagebus.common; + +abstract class pad0 { + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + +abstract class item1 extends pad0{ + Entry next; +} + +abstract class pad1 extends item1 { + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + +abstract class item2 extends pad1 { + Entry prev; +} + +abstract class pad2 extends item2 { + volatile long z0, z1, z2, z4, z5, z6 = 7L; +} + + +public abstract class Entry extends pad2 implements ISetEntry { + protected Entry(Entry next) { + this.next = next; + next.prev = this; + } + + protected Entry() { + } + + // not thread-safe! must be synchronized in enclosing context + @Override + public void remove() { + if (this.prev != null) { + this.prev.next = this.next; + if (this.next != null) { + this.next.prev = this.prev; + } + } else if (this.next != null) { + this.next.prev = null; + } + // can not nullify references to help GC since running iterators might not see the entire set + // if this element is their current element + //next = null; + //predecessor = null; + } + + @Override + public Entry next() { + return this.next; + } + + @Override + public void clear() { + this.next = null; + } +} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/item.java b/src/main/java/dorkbox/util/messagebus/common/item.java new file mode 100644 index 0000000..55919b6 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/item.java @@ -0,0 +1,5 @@ +package dorkbox.util.messagebus.common; + +public abstract class item { + public volatile Entry head; // reference to the first element +} diff --git a/src/main/java/dorkbox/util/messagebus/error/IPublicationErrorHandler.java b/src/main/java/dorkbox/util/messagebus/error/IPublicationErrorHandler.java index 6ddbe39..cc2c912 100644 --- a/src/main/java/dorkbox/util/messagebus/error/IPublicationErrorHandler.java +++ b/src/main/java/dorkbox/util/messagebus/error/IPublicationErrorHandler.java @@ -24,13 +24,11 @@ public interface IPublicationErrorHandler { * print the stack trace if available. */ static final class ConsoleLogger implements IPublicationErrorHandler { - /** * {@inheritDoc} */ @Override public void handleError(final PublicationError error) { - // Printout the error itself System.out.println(error); diff --git a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java index 0bb3298..9a7b461 100644 --- a/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java +++ b/src/main/java/dorkbox/util/messagebus/subscription/Subscription.java @@ -114,7 +114,6 @@ public class Subscription { while (current != null) { listener = current.getValue(); current = current.next(); -//this.count++; try { invocation.invoke(listener, handler, handleIndex, message); diff --git a/src/test/java/dorkbox/util/messagebus/PerfTest_MBassador.java b/src/test/java/dorkbox/util/messagebus/PerfTest_MBassador.java index dbdf65e..5ab20df 100644 --- a/src/test/java/dorkbox/util/messagebus/PerfTest_MBassador.java +++ b/src/test/java/dorkbox/util/messagebus/PerfTest_MBassador.java @@ -7,7 +7,7 @@ public class PerfTest_MBassador { public static final int REPETITIONS = 50 * 1000 * 100; public static final Integer TEST_VALUE = Integer.valueOf(777); - private static final int concurrency = 10; + private static final int concurrency = 1; public static void main(final String[] args) throws Exception { System.out.println("reps:" + REPETITIONS + " Concurrency " + concurrency);