cleaned up source, comments. Extracted ErrorHandler. WIP java version adapter

This commit is contained in:
nathan 2015-06-16 11:36:57 +02:00
parent fa0319cdc3
commit 2c2801e54f
115 changed files with 5968 additions and 6031 deletions

View File

@ -1,7 +1,8 @@
MultiMBassador
==============
MessageBus
==========
The "Multi" MBassador fork is a high-performance, low GC, Disruptor-based, method signature parameters > 1 & var-arg distribution.
The MessageBus is a fork from MBassador, and it is a high-performance, very-low GC, custom distribution that supports method signature
parameters > 1 & varity arguments.
*Many* features from the original MBassador have been removed, specifically the ONLY things to remain for a handler are
- rejectSubtypes

View File

@ -1,23 +0,0 @@
#Tests
Asyncbus.shutdown() -> no test coverage
EnvelopedMessageDispatcher -> not tested at all
#Refactorings
#Improvements
Prio 1: Validation of handlers
ERROR:Handler with mismatching parameter types
ERROR:Interfaces + rejectSubtypes
WARN:@Synchronized only for some handlers of a given listener
Prio 2: Lifecycle Callbacks = Implement in MessagePublication (BeforeStart,AfterCompletion)
#Documentation
Add code examples Javadoc of main classes
Describe 1-Thread FIFO scheme with async dispatch
Explain how MBassador can be extended easily using delegation
Refer to Spring integration component
Creating bus hierarchies
How to make sender part of the message publication
How to add global filtering by means of delegation

View File

@ -1,29 +0,0 @@
Note: Please refer to the terminology wiki page before reading the following explanations..
A message bus offers facilities for publishing messages to registered listeners. Messages can be dispatched
synchronously or asynchronously and the dispatch mechanism can by controlled for each message handler and per message publication.
Each message publication is isolated from all other running publications such that it does not interfere with them.
Hence, the bus expects message handlers to be stateless as they may be invoked concurrently if multiple
messages of the same type get published asynchronously.
Messages are published to all listeners that accept the type or super type of the published message. Additionally
a message handler may define filters to narrow the set of messages that it accepts.
Subscribed listeners are available to all pending message publications that have not yet started processing.
Any message listener may only be subscribed once - subsequent subscriptions of an already subscribed message listener
will be silently ignored.
The basic contract of the bus is that it will deliver a specific message exactly once to each of the subscribed message handlers.
Currently, message handlers will be invoked in inverse sequence of subscription but any
client using this bus should not rely on this assumption.
By default, the bus uses weak references to all listeners such that registered listeners do not need to
be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
removed on-the-fly as messages get published. It is possible to enable the use of strong references on the message handler
level.
Unsubscribing a listener means removing all subscribed message handlers of that listener. This remove operation
immediately effects all running publications processes -> A removed listener will under no circumstances receive any message publications.
A listener is considered removed after the unsubscribe(Object) call returned.Any running message publication that has not yet delivered
the message to the recently removed listener will not see the listener after the remove operation completed.

View File

@ -1,143 +0,0 @@
MBassador understands a variety of message handler configurations that will affect how a message
is delivered to a specific listener. There are properties to control the handling of subclasses
of the specified message (the method parameter), the execution order of handlers for the same message type,
filters, delivery modes etc.
<h2>Message handler properties</h2>
<table>
<tr> <td>Property</td> <td>Description</td> <td>Default</td> </tr>
<tr>
<td>delivery</td>
<td>Message handler invocation can be configured to run
<ul>
<li>Synchronously: One handler at a time within a given message publication. Each invocation occurs from the same thread</li>
<li>Asynchronously: Multiple threads are used within a given message publication. Each handler invocation
runs in a separate thread.Note:The number of parallel threads is configurable per instance using the BusConfiguration</li>
</ul>
Note: Use @Synchronized if your handler does not allow multiple, concurrent message publications, i.e.
handlers that are not thread-safe but are used in a multi-threaded environment where asynchronous message publication
is possible.
</td>
<td>Synchronously</td>
</tr>
<tr>
<td>priority</td>
<td>The priority is used to determine the order in which a message is delivered to
different message handlers that consume the same message type. Higher priority means
higher precedence in message delivery.</td>
<td>0</td>
</tr>
<tr>
<td>rejectSubtypes</td>
<td>The primary message type consumed by a message handler is determined by the type of
its parameter.Polymorphism does allow any sub type of that message type to be delivered
to the handler as well, which is the default behaviour of any message handler.
The handler can be configured to not receiving any sub types by specifying thus using this
property.
</td>
<td>false</td>
</tr>
<tr>
<td>enabled</td>
<td>A handler can be explicitly disabled to not take part in message delivery.
</td>
<td>true</td>
</tr>
<tr>
<td>strongReferencess</td>
<td>Whether the bus should use storng references to the listeners instead of weak references
</td>
<td>false</td>
</tr>
<tr>
<td>invocation</td>
<td>Specify a custom implementation for the handler invocation. By default, a generic implementation
that uses reflection will be used. Note: A custom implementation will not be faster than the generic one
since there are heavy optimizations by the JVM using JIT-Compiler and more.
</td>
<td>false</td>
</tr>
</table>
<h2>Message handler definition</h2>
The standard message handler definition looks like the following.It will
receive all messages of type TestEvent or any subtype sequentially.
// every message of type TestEvent or any subtype will be delivered
// to this handler
@Handler
public void handleTestEvent(TestEvent event) {
// do something
}
This handler will receive all messages of type SubTestEvent or any subtype
// handler invocation will occur in a different thread
@Handler(delivery = Invoke.Asynchronously)
public void handleSubTestEvent(SubTestEvent event) {
// do something more expensive here
}
This handler will receive all messages of type SubTestEvent or any subtype,
given that they pass the specified filters. This handler will be invoked before the formerly
defined one, since it specifies a higher priority
// this handler will receive messages of type SubTestEvent
// or any of its sub types that passe the given filter(s)
@Handler(priority = 10,
dispatch = Invoke.Synchronously,
filters = {@Filter(Filters.SpecialEvent.class)})
public void handleFiltered(SubTestEvent event) {
//do something special here
}
@Handler(dispatch = Mode.Synchronous, rejectSubtypes = true)
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
public void handleVariousEvents(MessageEnvelope envelope) {
// the envelope will contain either an instance of TestEvent or TestEvent2
// if rejectSubtypes were set to 'false' (default) also subtypes of TestEvent or TestEvent2 would be allowed
}
<h2>Enveloped message handlers</h2>
Since one parameter (the message) does not offer a great deal of flexibility if different types
of messages should be consumed, there exists the possibility to wrap a message inside an envelope.
An enveloped message handler specifies the message type it consumes by using the @Enveloped annotation
in addition to the @Handler annotation. All configurations of @Handler apply to each of the specified
message types.
@Handler(dispatch = Mode.Synchronous, rejectSubtypes = true)
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
public void handleVariousEvents(MessageEnvelope envelope) {
// the envelope will contain either an instance of TestEvent or TestEvent2
// if rejectSubtypes were set to 'false' (default) also subtypes of TestEvent or TestEvent2 would be allowed
}
<h2>Inheritance</h2>
Message handler inheritance corresponds to inheritance of methods as defined in the Java language itself.
A subclass of any class that defines message handlers will inherit these handler and their configuration.
It is possible to change (override) the configuration simply by overriding the super class' method and
specifying a different configuration. This way, it is also possible to deactivate a message handler of
a super class by setting the "enabled" property to "false" on the overridden method.
If a class overrides a method that is already configured as a message handler
it is still considered a message handler but of course the implementation of the overriding class
will be used.

View File

@ -1,39 +0,0 @@
<h2>Terminology</h2>
To avoid confusion and increase precision of the available documentation a common vocabulary of the most relevant concepts is necessary.
Specifically, the terms "event" and "message" have their own definition within the context of the message bus system and as such require
some clarification.
<h3>Message</h3>
A message is an object used for communication between multiple other objects.Other libraries established the term "event" which essentially
refers to the same idea (an event occurs at some point in the system and is published to other components such that they might react to it).
MBassador uses the term message instead of event since the object sent through it does not necessarily represent an event. It might merely represent
data to be processed, e.g. stored or transformed.
A message can be any object, no restrictions or assumptions are made. A message can be sent by any object that has access to the bus
and is delivered to all registered listeners that consume the type of message.
<h3>Message handler</h3>
A message handler is a method that defines exactly one parameter (the message) and is marked with @Handler. A handler has a message type
that is implicitly defined in the method signature (the parameter type). A message handler will be invoked for each message that has a compatible
type.
<h3>Message listener</h3>
An object that defines one or more message handlers and that has been subscribed at the message bus is referred to as (message) listener.
<h3>Subscription</h3>
Subscription is the process of adding a listener to the message bus, such that it might receive messages. It is used interchangeably with the
term "registration"
<h3>Message publication|Message dispatch</h3>
The process of delivering a message from the sender to all registered listeners is called message publication.
The initial phase of this process, that lasts until the message is actually delivered to the handlers is called message dispatch.
The distinction is necessarily drawn as all method publications share a common scheme but may vary in the way how the dispatching works.
<h3>Event</h3>
The term "event" refers to events that can occur during message publication. Currently there are two types of events:
+ DeadMessage: Whenever a message is published and no listeners exist that defined matching handlers, a DeadMessage event will be created and published
using the common message publication mechanism. Listeners with handlers for DeadEvent can be subscribed to listen for and react to dead
+ FilteredMessage: Since handlers can define filters to narrow the set of messages it consumes, it is possible that a message is not delivered
to any handler. In such a case the FilteredMessage event is published,

View File

@ -61,8 +61,7 @@ import dorkbox.util.messagebus.error.ErrorHandlingSupport;
* @author dorkbox, llc
* Date: 2/2/15
*/
public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport {
public interface IMessageBus extends PubSubSupport {
enum PublishMode {
/**
@ -89,6 +88,12 @@ public interface IMessageBus extends PubSubSupport, ErrorHandlingSupport {
*/
boolean hasPendingMessages();
/**
* @return the error handler responsible for handling publication and subscription errors
*/
ErrorHandlingSupport getErrorHandler();
/**
* Starts the bus
*/

View File

@ -4,9 +4,13 @@ import dorkbox.util.messagebus.common.simpleq.MessageType;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.publication.PublisherAll;
import dorkbox.util.messagebus.publication.PublisherExact;
import dorkbox.util.messagebus.publication.PublisherExactWithSuperTypes;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import org.jctools.util.Pow2;
@ -20,20 +24,11 @@ import java.util.Collection;
* Date: 2/2/15
*/
public class MessageBus implements IMessageBus {
public static final String ERROR_HANDLER_MSG = "INFO: No error handler has been configured to handle exceptions during publication.\n" +
"Publication error handlers can be added by bus.addErrorHandler()\n" +
"Falling back to console logger.";
// this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
private final ErrorHandlingSupport errorHandler;
private final MpmcMultiTransferArrayQueue dispatchQueue;
private final SubscriptionManager subscriptionManager;
private final Collection<Thread> threads;
private final Matcher subscriptionMatcher;
private final Publisher subscriptionPublisher;
/**
* Notifies the consumers during shutdown, that it's on purpose.
@ -59,23 +54,22 @@ public class MessageBus implements IMessageBus {
* @param numberOfThreads how many threads to have for dispatching async messages
*/
public MessageBus(final PublishMode publishMode, int numberOfThreads) {
if (numberOfThreads < 2) {
numberOfThreads = 2; // at LEAST 2 threads
}
numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads);
numberOfThreads = Pow2.roundToPowerOfTwo(getMinNumberOfThreads(numberOfThreads));
this.errorHandler = new DefaultErrorHandler();
this.dispatchQueue = new MpmcMultiTransferArrayQueue(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads);
this.subscriptionManager = new SubscriptionManager(numberOfThreads, errorHandler, true);
switch (publishMode) {
case Exact:
subscriptionMatcher = new MatcherExact();
subscriptionPublisher = new PublisherExact(errorHandler);
break;
case ExactWithSuperTypes:
subscriptionMatcher = new MatcherExactWithSuperTypes();
subscriptionPublisher = new PublisherExactWithSuperTypes(errorHandler);
break;
case ExactWithSuperTypesAndVarArgs:
default:
subscriptionMatcher = new MatcherAll();
subscriptionPublisher = new PublisherAll(errorHandler);
}
this.threads = new ArrayDeque<Thread>(numberOfThreads);
@ -116,28 +110,27 @@ public class MessageBus implements IMessageBus {
if (!MessageBus.this.shuttingDown) {
switch (node.messageType) {
case 1: {
handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1));
break;
}
case 2: {
handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1, node.item2));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1, node.item2));
break;
}
case 3: {
handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e)
.setPublishedObject(node.item1, node.item2, node.item3));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1, node.item2, node.item3));
break;
}
default: {
handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1));
errorHandler.handlePublicationError(new PublicationError().setMessage(
"Thread interrupted while processing message").setCause(e).setPublishedObject(
node.item1));
}
}
}
@ -151,47 +144,20 @@ public class MessageBus implements IMessageBus {
}
}
@Override
public final void addErrorHandler(IPublicationErrorHandler handler) {
synchronized (this.errorHandlers) {
this.errorHandlers.add(handler);
/**
* Always return at least 2 threads
*/
private static int getMinNumberOfThreads(final int numberOfThreads) {
if (numberOfThreads < 2) {
return 2;
}
}
@Override
public final void handlePublicationError(PublicationError error) {
synchronized (this.errorHandlers) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
}
}
}
@Override
public void start() {
for (Thread t : this.threads) {
t.start();
}
synchronized (this.errorHandlers) {
if (this.errorHandlers.isEmpty()) {
this.errorHandlers.add(new IPublicationErrorHandler.ConsoleLogger());
System.out.println(ERROR_HANDLER_MSG);
}
}
}
@Override
public void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) {
t.interrupt();
}
this.subscriptionManager.shutdown();
return numberOfThreads;
}
@Override
public void subscribe(final Object listener) {
MessageBus.this.subscriptionManager.subscribe(listener);
MessageBus.this.subscriptionManager.subscribe(listener);
}
@Override
@ -199,49 +165,24 @@ public class MessageBus implements IMessageBus {
MessageBus.this.subscriptionManager.unsubscribe(listener);
}
@Override
public final boolean hasPendingMessages() {
return this.dispatchQueue.hasPendingMessages();
}
@Override
public void publish(final Object message) {
try {
subscriptionMatcher.publish(subscriptionManager, message);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message));
}
subscriptionPublisher.publish(subscriptionManager, message);
}
@Override
public void publish(final Object message1, final Object message2) {
try {
subscriptionMatcher.publish(subscriptionManager, message1, message2);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2));
}
subscriptionPublisher.publish(subscriptionManager, message1, message2);
}
@Override
public void publish(final Object message1, final Object message2, final Object message3) {
try {
subscriptionMatcher.publish(subscriptionManager, message1, message2, message3);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3));
}
subscriptionPublisher.publish(subscriptionManager, message1, message2, message3);
}
@Override
public void publish(final Object[] messages) {
try {
subscriptionMatcher.publish(subscriptionManager, messages);
} catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(messages));
}
subscriptionPublisher.publish(subscriptionManager, messages);
}
@Override
@ -250,8 +191,8 @@ public class MessageBus implements IMessageBus {
try {
this.dispatchQueue.transfer(message, MessageType.ONE);
} catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message));
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(message));
}
}
else {
@ -265,8 +206,8 @@ public class MessageBus implements IMessageBus {
try {
this.dispatchQueue.transfer(message1, message2);
} catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message1, message2));
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(message1, message2));
}
}
else {
@ -280,8 +221,8 @@ public class MessageBus implements IMessageBus {
try {
this.dispatchQueue.transfer(message1, message2, message3);
} catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message1, message2, message3));
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(message1, message2, message3));
}
}
else {
@ -295,8 +236,8 @@ public class MessageBus implements IMessageBus {
try {
this.dispatchQueue.transfer(messages, MessageType.ARRAY);
} catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(messages));
errorHandler.handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message")
.setCause(e).setPublishedObject(messages));
}
}
else {
@ -304,4 +245,31 @@ public class MessageBus implements IMessageBus {
}
}
@Override
public final boolean hasPendingMessages() {
return this.dispatchQueue.hasPendingMessages();
}
@Override
public final ErrorHandlingSupport getErrorHandler() {
return errorHandler;
}
@Override
public void start() {
for (Thread t : this.threads) {
t.start();
}
errorHandler.start();
}
@Override
public void shutdown() {
this.shuttingDown = true;
for (Thread t : this.threads) {
t.interrupt();
}
this.subscriptionManager.shutdown();
}
}

View File

@ -1,43 +1,43 @@
package dorkbox.util.messagebus.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Mark any method of any class(=listener) as a message handler and configure the handler
* using different properties.
*
* @author bennidi
* Date: 2/8/12
* @author dorkbox
* Date: 2/2/15
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD,ElementType.ANNOTATION_TYPE})
public @interface Handler {
/**
* Define whether or not the handler accepts variable arguments it declares in its signature.
* VarArg "acceptance" means that the handler, handle(String... s), will accept a publication
* of ("s"), ("s", "s"), or (String[3]{"s", "s", "s"}). By default, handle(String... s) will
* only handle publications that are exactly an array (String[3]{"s"})
*/
boolean acceptVarargs() default false;
/**
* Define whether or not the handler accepts sub types of the message type it declares in its
* signature.
*/
boolean acceptSubtypes() default true;
/**
* Enable or disable the handler. Disabled handlers do not receive any messages.
* This property is useful for quick changes in configuration and necessary to disable
* handlers that have been declared by a superclass but do not apply to the subclass
*/
boolean enabled() default true;
}
package dorkbox.util.messagebus.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Mark any method of any class(=listener) as a message handler and configure the handler
* using different properties.
*
* @author bennidi
* Date: 2/8/12
* @author dorkbox
* Date: 2/2/15
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD,ElementType.ANNOTATION_TYPE})
public @interface Handler {
/**
* Define whether or not the handler accepts variable arguments it declares in its signature.
* VarArg "acceptance" means that the handler, handle(String... s), will accept a publication
* of ("s"), ("s", "s"), or (String[3]{"s", "s", "s"}). By default, handle(String... s) will
* only handle publications that are exactly an array (String[3]{"s"})
*/
boolean acceptVarargs() default false;
/**
* Define whether or not the handler accepts sub types of the message type it declares in its
* signature.
*/
boolean acceptSubtypes() default true;
/**
* Enable or disable the handler. Disabled handlers do not receive any messages.
* This property is useful for quick changes in configuration and necessary to disable
* handlers that have been declared by a superclass but do not apply to the subclass
*/
boolean enabled() default true;
}

View File

@ -1,10 +1,11 @@
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.

View File

@ -11,7 +11,7 @@ package dorkbox.util.messagebus.common;
*/
public final class DeadMessage {
private Object[] relatedMessages;
private final Object[] relatedMessages;
public DeadMessage(Object message) {

View File

@ -1,5 +1,7 @@
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -8,14 +10,14 @@ import java.util.Set;
/**
* Simple tree structure that is a map that contains a chain of keys to publish to a value.
* <p>
* THREAD SAFE, each level in the tree has it's own write lock, and there a tree-global read lock, to prevent writes
* NOT THREAD SAFE, each call must be protected by a read/write lock of some sort
*
* @author dorkbox, llc
* Date: 2/2/15
*/
public class HashMapTree<KEY, VALUE> {
private Map<KEY, HashMapTree<KEY, VALUE>> children;
private volatile VALUE value;
private VALUE value;
private final int defaultSize;
private final float loadFactor;
@ -30,7 +32,7 @@ public class HashMapTree<KEY, VALUE> {
* can be overridden to provide a custom backing map
*/
protected Map<KEY, HashMapTree<KEY, VALUE>> createChildren(int defaultSize, float loadFactor) {
return new ConcurrentHashMapV8<KEY, HashMapTree<KEY, VALUE>>(defaultSize, loadFactor, 1);
return JavaVersionAdapter.get.concurrentMap(defaultSize, loadFactor, 1);
}
public final VALUE getValue() {
@ -281,11 +283,7 @@ public class HashMapTree<KEY, VALUE> {
// make sure we have a tree for the specified node
if (objectTree == null) {
objectTree = new HashMapTree<KEY, VALUE>(this.defaultSize, this.loadFactor);
HashMapTree<KEY, VALUE> putIfAbsent = this.children.putIfAbsent(key, objectTree);
if (putIfAbsent != null) {
// some other thread beat us.
objectTree = putIfAbsent;
}
this.children.put(key, objectTree);
}
return objectTree;

View File

@ -1,175 +1,175 @@
package dorkbox.util.messagebus.common;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.utils.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
/**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
* the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy
* defines such a message listener.
* <p>
* <p>
* Note: When sending messages to a handler that is of type ARRAY (either an object of type array, or a vararg), the JVM cannot
* tell the difference (the message that is being sent), if it is a vararg or array.
* <p>
* <p>
* BECAUSE OF THIS, we always treat the two the same
* <p>
* <p>
*
* @author bennidi
* Date: 11/14/12
* @author dorkbox, llc
* Date: 2/2/15
*/
public class MessageHandler {
// publish all listeners defined by the given class (includes
// listeners defined in super classes)
public static final MessageHandler[] get(final Class<?> target) {
// publish all handlers (this will include all (inherited) methods directly annotated using @Handler)
final Method[] allMethods = ReflectionUtils.getMethods(target);
final int length = allMethods.length;
final ArrayList<MessageHandler> finalMethods = new ArrayList<MessageHandler>(length);
Method method;
for (int i=0;i<length;i++) {
method = allMethods[i];
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
if (!ReflectionUtils.containsOverridingMethod(allMethods, method)) {
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
final Handler handler = ReflectionUtils.getAnnotation(method, Handler.class);
if (handler == null || !handler.enabled()) {
// disabled or invalid listeners are ignored
continue;
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target);
if (overriddenHandler == null) {
overriddenHandler = method;
}
// if a handler is overwritten it inherits the configuration of its parent method
finalMethods.add(new MessageHandler(overriddenHandler, handler));
}
}
final MessageHandler[] messageHandlers = new MessageHandler[finalMethods.size()];
finalMethods.toArray(messageHandlers);
return messageHandlers;
}
private final MethodAccess handler;
private final int methodIndex;
private final Class<?>[] handledMessages;
private final boolean acceptsSubtypes;
private final Class<?> varArgClass;
private final boolean isSynchronized;
public MessageHandler(Method handler, Handler handlerConfig) {
super();
if (handler == null) {
throw new IllegalArgumentException("The message handler configuration may not be null");
}
Class<?>[] handledMessages = handler.getParameterTypes();
this.handler = MethodAccess.get(handler.getDeclaringClass());
this.methodIndex = this.handler.getIndex(handler.getName(), handledMessages);
this.acceptsSubtypes = handlerConfig.acceptSubtypes();
this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null;
this.handledMessages = handledMessages;
if (handledMessages.length == 1 && handledMessages[0].isArray() && handlerConfig.acceptVarargs()) {
this.varArgClass = handledMessages[0].getComponentType();
}
else {
this.varArgClass = null;
}
}
public final boolean isSynchronized() {
return this.isSynchronized;
}
public final MethodAccess getHandler() {
return this.handler;
}
public final int getMethodIndex() {
return this.methodIndex;
}
public final Class<?>[] getHandledMessages() {
return this.handledMessages;
}
public final Class<?> getVarArgClass() {
return this.varArgClass;
}
public final boolean acceptsSubtypes() {
return this.acceptsSubtypes;
}
public final boolean acceptsVarArgs() {
return this.varArgClass != null;
}
@Override
public final int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (this.acceptsSubtypes ? 1231 : 1237);
result = prime * result + Arrays.hashCode(this.handledMessages);
result = prime * result + (this.handler == null ? 0 : this.handler.hashCode());
result = prime * result + (this.isSynchronized ? 1231 : 1237);
return result;
}
@Override
public final boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
MessageHandler other = (MessageHandler) obj;
if (this.acceptsSubtypes != other.acceptsSubtypes) {
return false;
}
if (!Arrays.equals(this.handledMessages, other.handledMessages)) {
return false;
}
if (this.handler == null) {
if (other.handler != null) {
return false;
}
} else if (!this.handler.equals(other.handler)) {
return false;
}
if (this.isSynchronized != other.isSynchronized) {
return false;
}
return true;
}
}
package dorkbox.util.messagebus.common;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.annotations.Synchronized;
import dorkbox.util.messagebus.utils.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
/**
* Any method in any class annotated with the @Handler annotation represents a message handler. The class that contains
* the handler is called a message listener and more generally, any class containing a message handler in its class hierarchy
* defines such a message listener.
* <p>
* <p>
* Note: When sending messages to a handler that is of type ARRAY (either an object of type array, or a vararg), the JVM cannot
* tell the difference (the message that is being sent), if it is a vararg or array.
* <p>
* <p>
* BECAUSE OF THIS, we always treat the two the same
* <p>
* <p>
*
* @author bennidi
* Date: 11/14/12
* @author dorkbox, llc
* Date: 2/2/15
*/
public class MessageHandler {
// publish all listeners defined by the given class (includes
// listeners defined in super classes)
public static MessageHandler[] get(final Class<?> target) {
// publish all handlers (this will include all (inherited) methods directly annotated using @Handler)
final Method[] allMethods = ReflectionUtils.getMethods(target);
final int length = allMethods.length;
final ArrayList<MessageHandler> finalMethods = new ArrayList<MessageHandler>(length);
Method method;
for (int i=0;i<length;i++) {
method = allMethods[i];
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
if (!ReflectionUtils.containsOverridingMethod(allMethods, method)) {
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
final Handler handler = ReflectionUtils.getAnnotation(method, Handler.class);
if (handler == null || !handler.enabled()) {
// disabled or invalid listeners are ignored
continue;
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(method, target);
if (overriddenHandler == null) {
overriddenHandler = method;
}
// if a handler is overwritten it inherits the configuration of its parent method
finalMethods.add(new MessageHandler(overriddenHandler, handler));
}
}
final MessageHandler[] messageHandlers = new MessageHandler[finalMethods.size()];
finalMethods.toArray(messageHandlers);
return messageHandlers;
}
private final MethodAccess handler;
private final int methodIndex;
private final Class<?>[] handledMessages;
private final boolean acceptsSubtypes;
private final Class<?> varArgClass;
private final boolean isSynchronized;
public MessageHandler(Method handler, Handler handlerConfig) {
super();
if (handler == null) {
throw new IllegalArgumentException("The message handler configuration may not be null");
}
Class<?>[] handledMessages = handler.getParameterTypes();
this.handler = MethodAccess.get(handler.getDeclaringClass());
this.methodIndex = this.handler.getIndex(handler.getName(), handledMessages);
this.acceptsSubtypes = handlerConfig.acceptSubtypes();
this.isSynchronized = ReflectionUtils.getAnnotation(handler, Synchronized.class) != null;
this.handledMessages = handledMessages;
if (handledMessages.length == 1 && handledMessages[0].isArray() && handlerConfig.acceptVarargs()) {
this.varArgClass = handledMessages[0].getComponentType();
}
else {
this.varArgClass = null;
}
}
public final boolean isSynchronized() {
return this.isSynchronized;
}
public final MethodAccess getHandler() {
return this.handler;
}
public final int getMethodIndex() {
return this.methodIndex;
}
public final Class<?>[] getHandledMessages() {
return this.handledMessages;
}
public final Class<?> getVarArgClass() {
return this.varArgClass;
}
public final boolean acceptsSubtypes() {
return this.acceptsSubtypes;
}
public final boolean acceptsVarArgs() {
return this.varArgClass != null;
}
@Override
public final int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (this.acceptsSubtypes ? 1231 : 1237);
result = prime * result + Arrays.hashCode(this.handledMessages);
result = prime * result + (this.handler == null ? 0 : this.handler.hashCode());
result = prime * result + (this.isSynchronized ? 1231 : 1237);
return result;
}
@Override
public final boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
MessageHandler other = (MessageHandler) obj;
if (this.acceptsSubtypes != other.acceptsSubtypes) {
return false;
}
if (!Arrays.equals(this.handledMessages, other.handledMessages)) {
return false;
}
if (this.handler == null) {
if (other.handler != null) {
return false;
}
} else if (!this.handler.equals(other.handler)) {
return false;
}
if (this.isSynchronized != other.isSynchronized) {
return false;
}
return true;
}
}

View File

@ -1,5 +1,7 @@
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
/**
* This implementation uses strong references to the elements, uses an IdentityHashMap
* <p/>
@ -11,11 +13,11 @@ public class StrongConcurrentSetV8<T> extends StrongConcurrentSet<T> {
public StrongConcurrentSetV8(int size, float loadFactor) {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, 16));
super(JavaVersionAdapter.get.<T, ISetEntry<T>>concurrentMap(size, loadFactor, 16));
}
public StrongConcurrentSetV8(int size, float loadFactor, int stripeSize) {
// 1 for the stripe size, because that is the max concurrency with our concurrent set (since it uses R/W locks)
super(new ConcurrentHashMapV8<T, ISetEntry<T>>(size, loadFactor, stripeSize));
super(JavaVersionAdapter.get.<T, ISetEntry<T>>concurrentMap(size, loadFactor, stripeSize));
}
}

View File

@ -1,9 +1,10 @@
package dorkbox.util.messagebus.common;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
import java.util.concurrent.locks.StampedLock;
/**
* This implementation uses weak references to the elements. Iterators automatically perform cleanups of

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,14 @@
package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ConcurrentMap;
/**
*
*/
public class Java7Adapter extends JavaVersionAdapter {
@Override
public final <K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize) {
return new ConcurrentHashMapV8<K, V>(size, loadFactor, stripeSize);
}
}

View File

@ -0,0 +1,11 @@
package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class Java8Adapter extends JavaVersionAdapter {
@Override
public final <K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize) {
return new ConcurrentHashMap<K, V>(size, loadFactor, stripeSize);
}
}

View File

@ -0,0 +1,29 @@
package dorkbox.util.messagebus.common.adapter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public abstract class JavaVersionAdapter {
static {
// get = new Java7Adapter();
get = new Java8Adapter();
}
public static JavaVersionAdapter get;
public abstract <K, V> ConcurrentMap<K, V> concurrentMap(final int size, final float loadFactor, final int stripeSize);
public <K, V> Map<K, V> hashMap(final int size, final float loadFactor) {
return new ConcurrentHashMap<K, V>(size, loadFactor, 1);
}
}

View File

@ -4,7 +4,7 @@
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package dorkbox.util.messagebus.common.thread;
package dorkbox.util.messagebus.common.adapter;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -1523,4 +1523,4 @@ public class StampedLock implements java.io.Serializable {
e.getCause());
}
}
}
}

View File

@ -1,8 +1,11 @@
package dorkbox.util.messagebus.common.simpleq;
public class MessageType {
public final class MessageType {
public static final int ONE = 1;
public static final int TWO = 2;
public static final int THREE = 3;
public static final int ARRAY = 4;
private MessageType() {
}
}

View File

@ -6,15 +6,10 @@
package dorkbox.util.messagebus.common.thread;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import org.jctools.util.UnsafeAccess;
import java.util.*;
/**
* An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out).
@ -876,4 +871,4 @@ public class ConcurrentLinkedQueue2<E> extends CLQItem1<E>
throw new Error(e);
}
}
}
}

View File

@ -1,9 +1,10 @@
package dorkbox.util.messagebus.common.thread;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@ -30,7 +31,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
public ConcurrentSet(int size, float loadFactor, int stripeSize) {
super();
this.entries = new ConcurrentHashMapV8<T, Node<T>>(size, loadFactor, 32);
this.entries = JavaVersionAdapter.get.concurrentMap(size, loadFactor, 32);
}
@Override
@ -204,7 +205,7 @@ public class ConcurrentSet<T> extends ConcurrentLinkedQueue2<T> {
T value = l.item;
if (value != null) {
ConcurrentMap<T, Node<T>> entries2 = ConcurrentSet.this.entries;
Map<T, Node<T>> entries2 = ConcurrentSet.this.entries;
while (entries2.get(value) == ConcurrentSet.this.IN_PROGRESS_MARKER) {
; // data race
}

View File

@ -35,7 +35,7 @@ public interface IHandlerInvocation {
*
* @param listener The listener that will receive the message. This can be a reference to a method object
* from the java reflection api or any other wrapper that can be used to invoke the handler
* @param message The message to be delivered to the handler. This can be any object compatible with the object
* @param message1 The message to be delivered to the handler. This can be any object compatible with the object
* type that the handler consumes
* @param handler The handler (method) that will be called via reflection
*/
@ -46,7 +46,7 @@ public interface IHandlerInvocation {
*
* @param listener The listener that will receive the message. This can be a reference to a method object
* from the java reflection api or any other wrapper that can be used to invoke the handler
* @param message The message to be delivered to the handler. This can be any object compatible with the object
* @param message1 The message to be delivered to the handler. This can be any object compatible with the object
* type that the handler consumes
* @param handler The handler (method) that will be called via reflection
*/
@ -57,7 +57,7 @@ public interface IHandlerInvocation {
*
* @param listener The listener that will receive the message. This can be a reference to a method object
* from the java reflection api or any other wrapper that can be used to invoke the handler
* @param message The message to be delivered to the handler. This can be any object compatible with the object
* @param messages The message to be delivered to the handler. This can be any object compatible with the object
* type that the handler consumes
* @param handler The handler (method) that will be called via reflection
*/

View File

@ -0,0 +1,56 @@
package dorkbox.util.messagebus.error;
import java.util.ArrayDeque;
import java.util.Collection;
/**
*
*/
public class DefaultErrorHandler implements ErrorHandlingSupport {
private static final String ERROR_HANDLER_MSG = "INFO: No error handler has been configured to handle exceptions during publication.\n" +
"Publication error handlers can be added by bus.getErrorHandler().addErrorHandler()\n" +
"Falling back to console logger.";
// this handler will receive all errors that occur during message dispatch or message handling
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
public DefaultErrorHandler() {
}
@Override
public final void addErrorHandler(IPublicationErrorHandler handler) {
synchronized (this.errorHandlers) {
this.errorHandlers.add(handler);
}
}
@Override
public final void handlePublicationError(PublicationError error) {
synchronized (this.errorHandlers) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
}
}
}
@Override
public void handleError(final String error, final Class<?> listenerClass) {
synchronized (this.errorHandlers) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error, listenerClass);
}
}
}
@Override
public void start() {
synchronized (this.errorHandlers) {
if (this.errorHandlers.isEmpty()) {
this.errorHandlers.add(new IPublicationErrorHandler.ConsoleLogger());
System.out.println(ERROR_HANDLER_MSG);
}
}
}
}

View File

@ -16,4 +16,8 @@ public interface ErrorHandlingSupport {
void addErrorHandler(IPublicationErrorHandler errorHandler);
void handlePublicationError(PublicationError error);
void handleError(String error, Class<?> listenerClass);
void start();
}

View File

@ -5,7 +5,7 @@ package dorkbox.util.messagebus.error;
* error occurs during message publication.
* A handler might fail with an exception, not be accessible because of the presence
* of a security manager or other reasons might lead to failures during the message publication process.
* <p/>
* <p>
*
* @author bennidi
* Date: 2/22/12
@ -19,11 +19,20 @@ public interface IPublicationErrorHandler {
*/
void handleError(PublicationError error);
/**
* Handle the given publication error.
*
* @param error The PublicationError to handle.
* @param listenerClass
*/
void handleError(String error, final Class<?> listenerClass);
/**
* The default error handler will simply log to standard out and
* print the stack trace if available.
*/
static final class ConsoleLogger implements IPublicationErrorHandler {
final class ConsoleLogger implements IPublicationErrorHandler {
/**
* {@inheritDoc}
*/
@ -37,5 +46,14 @@ public interface IPublicationErrorHandler {
error.getCause().printStackTrace();
}
}
/**
* {@inheritDoc}
*/
@Override
public void handleError(final String error, final Class<?> listenerClass) {
// Printout the error itself
System.out.println(new StringBuilder().append(error).append(": ").append(listenerClass.getSimpleName()).toString());
}
}
}

View File

@ -0,0 +1,317 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.lang.reflect.Array;
public class PublisherAll implements Publisher {
private final ErrorHandlingSupport errorHandler;
public PublisherAll(final ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final boolean isArray = messageClass.isArray();
final StampedLock lock = subscriptionManager.getLock();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass); // can return null
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
}
// publish to var arg, only if not already an array (because that would be unnecessary)
if (subscriptionManager.canPublishVarArg() && !isArray) {
final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils();
long stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
Subscription sub;
int length = varArgSubs.length;
Object[] asArray = null;
if (length > 1) {
hasSubs = true;
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message1;
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
length = varArgSuperSubs.length;
if (length > 1) {
hasSubs = true;
if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message1;
}
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
sub.publish(asArray);
}
}
}
// only get here if there were no other subscriptions
// Dead Event must EXACTLY MATCH (no subclasses)
if (!hasSubs) {
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final StampedLock lock = subscriptionManager.getLock();
long stamp = lock.readLock();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass1,
messageClass2); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
// publish to var arg, only if not already an array AND we are all of the same type
if (subscriptionManager.canPublishVarArg() && !messageClass1.isArray() && !messageClass2.isArray()) {
final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils();
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
lock.unlockRead(stamp);
final int length = varArgSubs.length;
if (length > 0) {
hasSubs = true;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
}
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1,
messageClass2); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
if (length > 0) {
hasSubs = true;
Class<?> arrayType;
Object[] asArray;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
arrayType = sub.getHandler().getVarArgClass();
asArray = (Object[]) Array.newInstance(arrayType, 2);
asArray[0] = message1;
asArray[1] = message2;
sub.publish(asArray);
}
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2,
final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final StampedLock lock = subscriptionManager.getLock();
long stamp = lock.readLock();
final Subscription[] subs = subscriptionManager.getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2,
messageClass3); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subs != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subs.length; i++) {
sub = subs[i];
sub.publish(message1, message2, message3);
}
}
// publish to var arg, only if not already an array AND we are all of the same type
if (subscriptionManager.canPublishVarArg() && !messageClass1.isArray() && !messageClass2.isArray() &&
!messageClass3.isArray()) {
final VarArgUtils varArgUtils = subscriptionManager.getVarArgUtils();
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
lock.unlockRead(stamp);
final int length = varArgSubs.length;
if (length > 0) {
hasSubs = true;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
}
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = varArgUtils.getVarArgSuperSubscriptions(messageClass1, messageClass2,
messageClass3); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
if (length > 0) {
hasSubs = true;
Class<?> arrayType;
Object[] asArray;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
arrayType = sub.getHandler().getVarArgClass();
asArray = (Object[]) Array.newInstance(arrayType, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
sub.publish(asArray);
}
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) {
publish(subscriptionManager, (Object) messages);
}
}

View File

@ -0,0 +1,128 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class PublisherExact implements Publisher {
private final ErrorHandlingSupport errorHandler;
public PublisherExact(final ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass1, messageClass2); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2,
final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExact(messageClass1, messageClass2,
messageClass3); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2, message3);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) {
publish(subscriptionManager, (Object) messages);
}
}

View File

@ -0,0 +1,129 @@
package dorkbox.util.messagebus.publication;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Publisher;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class PublisherExactWithSuperTypes implements Publisher {
private final ErrorHandlingSupport errorHandler;
public PublisherExactWithSuperTypes(final ErrorHandlingSupport errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) {
try {
final Class<?> messageClass = message1.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass1,
messageClass2); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2,
final Object message3) {
try {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = subscriptionManager.getSubscriptionsExactAndSuper(messageClass1, messageClass2,
messageClass3); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2, message3);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = subscriptionManager.getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) {
publish(subscriptionManager, (Object) messages);
}
}

View File

@ -1,11 +1,11 @@
package dorkbox.util.messagebus.subscription;
public interface Matcher {
void publish(SubscriptionManager subscriptionManager, Object message1) throws Throwable;
public interface Publisher {
void publish(SubscriptionManager subscriptionManager, Object message1);
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2) throws Throwable;
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2);
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2, Object message3) throws Throwable;
void publish(SubscriptionManager subscriptionManager, Object message1, Object message2, Object message3);
void publish(SubscriptionManager subscriptionManager, Object[] messages) throws Throwable;
void publish(SubscriptionManager subscriptionManager, Object[] messages);
}

View File

@ -7,6 +7,7 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import java.util.ArrayList;
import java.util.Collection;
@ -19,10 +20,10 @@ import java.util.concurrent.atomic.AtomicInteger;
* A subscription is a thread-safe container that manages exactly one message handler of all registered
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a SingleMessageHandler.class
* will be referenced in the subscription created for SingleMessageHandler.class.
*
* <p>
* There will be as many unique subscription objects per message listener class as there are message handlers
* defined in the message listeners class hierarchy.
*
* <p>
* The subscription provides functionality for message publication by means of delegation to the respective
* message dispatcher.
*
@ -88,7 +89,7 @@ public final class Subscription {
Iterator<Object> iterator;
Object listener;
for (iterator = this.listeners.iterator(); iterator.hasNext();) {
for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
listener = iterator.next();
invocation.invoke(listener, handler, handleIndex, message);
@ -164,9 +165,9 @@ public final class Subscription {
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
public void registerForPublication(final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
public void registerMulti(final ErrorHandlingSupport errorHandler, final Class<?> listenerClass, final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
final Class<?>[] messageHandlerTypes = handlerMetadata.getHandledMessages();
final int size = messageHandlerTypes.length;
@ -174,6 +175,10 @@ public final class Subscription {
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
errorHandler.handleError("Error while trying to subscribe class", listenerClass);
return;
}
case 1: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
@ -224,4 +229,39 @@ public final class Subscription {
}
}
}
// inside a write lock
// add this subscription to each of the handled types
// to activate this sub for publication
public void registerFirst(final ErrorHandlingSupport errorHandler, final Class<?> listenerClass, final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final AtomicBoolean varArgPossibility) {
final Class<?>[] messageHandlerTypes = handlerMetadata.getHandledMessages();
final int size = messageHandlerTypes.length;
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
case 0: {
errorHandler.handleError("Error while trying to subscribe class", listenerClass);
return;
}
default: {
ArrayList<Subscription> subs = subsPerMessageSingle.get(type0);
if (subs == null) {
subs = new ArrayList<Subscription>();
// is this handler able to accept var args?
if (handlerMetadata.getVarArgClass() != null) {
varArgPossibility.lazySet(true);
}
subsPerMessageSingle.put(type0, subs);
}
subs.add(this);
return;
}
}
}
}

View File

@ -0,0 +1,475 @@
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.common.adapter.StampedLock;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known
* listener is subscribed and takes care of creating new set of subscriptions for any unknown class that defines
* message handlers.
* <p>
* <p>
* Subscribe/Unsubscribe, while it is possible for them to be 100% concurrent (in relation to listeners per subscription),
* getting an accurate reflection of the number of subscriptions, or guaranteeing a "HAPPENS-BEFORE" relationship really
* complicates this, so it has been modified for subscribe/unsubscibe to be mutually exclusive.
* <p>
* Given these restrictions and complexity, it is much easier to create a MPSC blocking queue, and have a single thread
* manage sub/unsub.
*
* @author dorkbox, llc
* Date: 2/2/15
*/
public final class SubscriptionManager {
private static final float LOAD_FACTOR = 0.8F;
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final Map<Class<?>, Subscription[]> subscriptionsPerListener;
private final ClassUtils classUtils;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
private final StampedLock lock = new StampedLock();
private final int numberOfThreads;
private final Subscriber subscriber;
public SubscriptionManager(int numberOfThreads, final ErrorHandlingSupport errorHandler, boolean isMultiMode) {
this.numberOfThreads = numberOfThreads;
// modified ONLY during SUB/UNSUB
{
this.nonListeners = JavaVersionAdapter.get.concurrentMap(4, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageSingle = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
// only used during SUB/UNSUB
this.subscriptionsPerListener = JavaVersionAdapter.get.concurrentMap(32, LOAD_FACTOR, 1);
}
classUtils = new ClassUtils(LOAD_FACTOR);
this.subUtils = new SubscriptionUtils(classUtils, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti,
LOAD_FACTOR, numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads);
if (isMultiMode) {
subscriber = new Subscriber() {
@Override
public void register(final Class<?> listenerClass, final Subscription subscription,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
// now add this subscription to each of the handled types
subscription.registerMulti(errorHandler, listenerClass, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
};
}
else {
subscriber = new Subscriber() {
@Override
public void register(final Class<?> listenerClass, final Subscription subscription,
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility) {
// only register based on the FIRST parameter
subscription.registerFirst(errorHandler, listenerClass, subsPerMessageSingle, varArgPossibility);
}
};
}
}
public void shutdown() {
this.nonListeners.clear();
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerListener.clear();
clearConcurrentCollections();
this.classUtils.clear();
}
public void subscribe(final Object listener) {
if (listener == null) {
return;
}
final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers
return;
}
// these are concurrent collections
clearConcurrentCollections();
Subscription[] subscriptions = getListenerSubs(listenerClass);
// the subscriptions from the map were null, so create them
if (subscriptions == null) {
// it is important to note that this section CAN be repeated, however the write lock is gained before
// anything 'permanent' is saved. This is so the time spent inside the writelock is minimized.
final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
final int handlersSize = messageHandlers.length;
// remember the class as non listening class if no handlers are found
if (handlersSize == 0) {
this.nonListeners.put(listenerClass, Boolean.TRUE);
return;
}
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final Subscription[] subsPerListener = new Subscription[handlersSize];
// create the subscription
MessageHandler messageHandler;
Subscription subscription;
for (int i = 0; i < handlersSize; i++) {
messageHandler = messageHandlers[i];
// create the subscription
subscription = new Subscription(messageHandler, LOAD_FACTOR, numberOfThreads);
subscription.subscribe(listener);
subsPerListener[i] = subscription; // activates this sub for sub/unsub
}
final Map<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
final StampedLock lock = this.lock;
final long stamp = lock.writeLock();
subscriptions = subsPerListenerMap.get(listenerClass);
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// activate this subscription for publication
subscriber.register(listenerClass, subscription, subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
subsPerListenerMap.put(listenerClass, subsPerListener);
lock.unlockWrite(stamp);
return;
}
else {
// continue to subscription
lock.unlockWrite(stamp);
}
}
// subscriptions already exist and must only be updated
// only publish here if our single-check was OK, or our double-check was OK
Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
subscription.subscribe(listener);
}
}
public void unsubscribe(final Object listener) {
if (listener == null) {
return;
}
final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers
return;
}
// these are concurrent collections
clearConcurrentCollections();
final Subscription[] subscriptions = getListenerSubs(listenerClass);
if (subscriptions != null) {
Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
subscription.unsubscribe(listener);
}
}
}
private void clearConcurrentCollections() {
this.subUtils.clear();
this.varArgUtils.clear();
}
private Subscription[] getListenerSubs(final Class<?> listenerClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
// public because it is also used by unit tests
public Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, messageClass3);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
public Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2,
messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2,
messageClass3); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
public boolean canPublishVarArg() {
return varArgPossibility.get();
}
public VarArgUtils getVarArgUtils() {
return varArgUtils;
}
public StampedLock getLock() {
return lock;
}
interface Subscriber {
void register(final Class<?> listenerClass, Subscription subscription,
Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti, AtomicBoolean varArgPossibility);
}
// public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
// @Override
// public int compare(Subscription o1, Subscription o2) {
//// int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority());
//// return byPriority == 0 ? o2.id.compareTo(o1.id) : byPriority;
// if (o2.ID > o1.ID) {
// return 1;
// } else if (o2.ID < o1.ID) {
// return -1;
// } else {
// return 0;
// }
// }
// };
}

View File

@ -1,6 +1,6 @@
package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import java.lang.reflect.Array;
import java.util.ArrayList;
@ -13,8 +13,8 @@ public final class ClassUtils {
private final Map<Class<?>, Class<?>[]> superClassesCache;
public ClassUtils(final float loadFactor) {
this.arrayCache = new ConcurrentHashMapV8<Class<?>, Class<?>>(32, loadFactor, 1);
this.superClassesCache = new ConcurrentHashMapV8<Class<?>, Class<?>[]>(32, loadFactor, 1);
this.arrayCache = JavaVersionAdapter.get.concurrentMap(32, loadFactor, 1);
this.superClassesCache = JavaVersionAdapter.get.concurrentMap(32, loadFactor, 1);
}
/**

View File

@ -1,13 +1,13 @@
package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
/**
* @author bennidi
@ -16,7 +16,10 @@ import java.util.Collection;
* @author dorkbox
* Date: 2/2/15
*/
public class ReflectionUtils {
public final class ReflectionUtils {
private ReflectionUtils() {
}
public static Method[] getMethods(Class<?> target) {
ArrayList<Method> methods = new ArrayList<Method>();
@ -47,10 +50,6 @@ public class ReflectionUtils {
* Traverses the class hierarchy upwards, starting at the given subclass, looking
* for an override of the given methods -> finds the bottom most override of the given
* method if any exists
*
* @param overridingMethod
* @param subclass
* @return
*/
public static Method getOverridingMethod(final Method overridingMethod, final Class<?> subclass) {
Class<?> current = subclass;
@ -94,7 +93,7 @@ public class ReflectionUtils {
}
}
public static final boolean containsOverridingMethod(final Method[] allMethods, final Method methodToCheck) {
public static boolean containsOverridingMethod(final Method[] allMethods, final Method methodToCheck) {
final int length = allMethods.length;
Method method;
@ -138,7 +137,7 @@ public class ReflectionUtils {
}
public static <A extends Annotation> A getAnnotation(AnnotatedElement from, Class<A> annotationType) {
A annotation = getAnnotation(from, annotationType, new ConcurrentSet<AnnotatedElement>(16, .8F, 1));
A annotation = getAnnotation(from, annotationType, new HashSet<AnnotatedElement>(16));
return annotation;
}

View File

@ -1,9 +1,7 @@
package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.common.adapter.ConcurrentHashMapV8;
import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayList;
@ -12,8 +10,6 @@ import java.util.Map;
public final class SubscriptionUtils {
private final ClassUtils superClass;
private final ClassHolder classHolderSingle;
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
// it's faster to create a new one for SUB/UNSUB than it is to shutdown() on the original one
@ -23,8 +19,6 @@ public final class SubscriptionUtils {
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final SubscriptionHolder subHolderSingle;
private final SubscriptionHolder subHolderConcurrent;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
@ -37,15 +31,10 @@ public final class SubscriptionUtils {
this.subscriptionsPerMessageMulti = subscriptionsPerMessageMulti;
this.classHolderSingle = new ClassHolder(loadFactor, stripeSize);
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.superClassSubscriptions = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>();
this.superClassSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.subHolderSingle = new SubscriptionHolder();
this.subHolderConcurrent = new SubscriptionHolder();
}
public void clear() {

View File

@ -1,6 +1,6 @@
package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.adapter.JavaVersionAdapter;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.subscription.Subscription;
@ -25,10 +25,10 @@ public final class VarArgUtils {
this.superClassUtils = superClassUtils;
this.subscriptionsPerMessageSingle = subscriptionsPerMessageSingle;
this.varArgSubscriptionsSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, stripeSize);
this.varArgSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.varArgSuperSubscriptionsSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(16, loadFactor, stripeSize);
this.varArgSuperSubscriptionsSingle = JavaVersionAdapter.get.concurrentMap(16, loadFactor, stripeSize);
this.varArgSuperSubscriptionsMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
}
@ -70,6 +70,8 @@ public final class VarArgUtils {
}
local.put(messageClass, varArgSubs);
} else {
varArgSubs = new ArrayList<Subscription>(0);
}
}

View File

@ -1,27 +0,0 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class MatcherAll implements Matcher {
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable {
subscriptionManager.publishAll(message1);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishAll(message1, message2);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3)
throws Throwable {
subscriptionManager.publishAll(message1, message2, message3);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable {
subscriptionManager.publishAll(messages);
}
}

View File

@ -1,27 +0,0 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class MatcherExact implements Matcher {
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable {
subscriptionManager.publishExact(message1);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishExact(message1, message2);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3)
throws Throwable {
subscriptionManager.publishExact(message1, message2, message3);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable {
subscriptionManager.publishExact(messages);
}
}

View File

@ -1,27 +0,0 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.subscription.Matcher;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
public class MatcherExactWithSuperTypes implements Matcher {
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1) throws Throwable {
subscriptionManager.publishExactAndSuper(message1);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2) throws Throwable {
subscriptionManager.publishExactAndSuper(message1, message2);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object message1, final Object message2, final Object message3)
throws Throwable {
subscriptionManager.publishExactAndSuper(message1, message2, message3);
}
@Override
public void publish(final SubscriptionManager subscriptionManager, final Object[] messages) throws Throwable {
subscriptionManager.publishExactAndSuper(messages);
}
}

View File

@ -1,22 +0,0 @@
package dorkbox.util.messagebus.common.thread;
public class ClassHolder extends ThreadLocal<ConcurrentSet<Class<?>>> {
private final float loadFactor;
private final int stripeSize;
public ClassHolder(float loadFactor, int stripeSize) {
super();
this.loadFactor = loadFactor;
this.stripeSize = stripeSize;
}
@Override
public ConcurrentSet<Class<?>> initialValue() {
return new ConcurrentSet<Class<?>>(16, this.loadFactor, this.stripeSize);
}
}

View File

@ -1,18 +0,0 @@
package dorkbox.util.messagebus.common.thread;
import java.util.ArrayList;
import dorkbox.util.messagebus.subscription.Subscription;
public class SubscriptionHolder extends ThreadLocal<ArrayList<Subscription>> {
public SubscriptionHolder() {
super();
}
@Override
public ArrayList<Subscription> initialValue() {
return new ArrayList<Subscription>();
}
}

View File

@ -1,879 +0,0 @@
package dorkbox.util.messagebus.subscription;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.DeadMessage;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.utils.ClassUtils;
import dorkbox.util.messagebus.utils.SubscriptionUtils;
import dorkbox.util.messagebus.utils.VarArgUtils;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.StampedLock;
/**
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
* It provides fast lookup of existing subscriptions when another instance of an already known
* listener is subscribed and takes care of creating new set of subscriptions for any unknown class that defines
* message handlers.
* <p>
* <p>
* Subscribe/Unsubscribe, while it is possible for them to be 100% concurrent (in relation to listeners per subscription),
* getting an accurate reflection of the number of subscriptions, or guaranteeing a "HAPPENS-BEFORE" relationship really
* complicates this, so it has been modified for subscribe/unsubscibe to be mutually exclusive.
* <p>
* Given these restrictions and complexity, it is much easier to create a MPSC blocking queue, and have a single thread
* manage sub/unsub.
*
* @author dorkbox, llc
* Date: 2/2/15
*/
public final class SubscriptionManager {
private static final float LOAD_FACTOR = 0.8F;
// remember already processed classes that do not contain any message handlers
private final Map<Class<?>, Boolean> nonListeners;
// shortcut publication if we know there is no possibility of varArg (ie: a method that has an array as arguments)
private final AtomicBoolean varArgPossibility = new AtomicBoolean(false);
// all subscriptions per message type. We perpetually KEEP the types, as this lowers the amount of locking required
// this is the primary list for dispatching a specific message
// write access is synchronized and happens only when a listener of a specific class is registered the first time
private final Map<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageSingle;
private final HashMapTree<Class<?>, ArrayList<Subscription>> subscriptionsPerMessageMulti;
// all subscriptions per messageHandler type
// this map provides fast access for subscribing and unsubscribing
// write access is synchronized and happens very infrequently
// once a collection of subscriptions is stored it does not change
private final ConcurrentMap<Class<?>, Subscription[]> subscriptionsPerListener;
private final ClassUtils classUtils;
private final SubscriptionUtils subUtils;
private final VarArgUtils varArgUtils;
private final StampedLock lock = new StampedLock();
private final int numberOfThreads;
public SubscriptionManager(int numberOfThreads) {
this.numberOfThreads = numberOfThreads;
// modified ONLY during SUB/UNSUB
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>(32, LOAD_FACTOR, 1);
}
classUtils = new ClassUtils(LOAD_FACTOR);
this.subUtils = new SubscriptionUtils(classUtils, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti,
LOAD_FACTOR, numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(classUtils, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads);
}
public void shutdown() {
this.nonListeners.clear();
this.subscriptionsPerMessageSingle.clear();
this.subscriptionsPerMessageMulti.clear();
this.subscriptionsPerListener.clear();
clearConcurrentCollections();
this.classUtils.clear();
}
public void subscribe(final Object listener) {
if (listener == null) {
return;
}
final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers
return;
}
// these are concurrent collections
clearConcurrentCollections();
Subscription[] subscriptions = getListenerSubs(listenerClass);
// the subscriptions from the map were null, so create them
if (subscriptions == null) {
// it is important to note that this section CAN be repeated, however the write lock is gained before
// anything 'permanent' is saved. This is so the time spent inside the writelock is minimized.
final MessageHandler[] messageHandlers = MessageHandler.get(listenerClass);
final int handlersSize = messageHandlers.length;
// remember the class as non listening class if no handlers are found
if (handlersSize == 0) {
this.nonListeners.put(listenerClass, Boolean.TRUE);
return;
}
final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle = this.subscriptionsPerMessageSingle;
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final Subscription[] subsPerListener = new Subscription[handlersSize];
// create the subscription
MessageHandler messageHandler;
Subscription subscription;
for (int i = 0; i < handlersSize; i++) {
messageHandler = messageHandlers[i];
// create the subscription
subscription = new Subscription(messageHandler, LOAD_FACTOR, numberOfThreads);
subscription.subscribe(listener);
subsPerListener[i] = subscription; // activates this sub for sub/unsub
}
final ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
final StampedLock lock = this.lock;
final long stamp = lock.writeLock();
subscriptions = subsPerListenerMap.get(listenerClass);
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// now add this subscription to each of the handled types
// to activate this sub for publication
subscription.registerForPublication(subsPerMessageSingle, subsPerMessageMulti, varArgPossibility);
}
subsPerListenerMap.put(listenerClass, subsPerListener);
lock.unlockWrite(stamp);
return;
}
else {
// continue to subscription
lock.unlockWrite(stamp);
}
}
// subscriptions already exist and must only be updated
// only publish here if our single-check was OK, or our double-check was OK
Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
subscription.subscribe(listener);
}
}
public void unsubscribe(final Object listener) {
if (listener == null) {
return;
}
final Class<?> listenerClass = listener.getClass();
if (this.nonListeners.containsKey(listenerClass)) {
// early reject of known classes that do not define message handlers
return;
}
// these are concurrent collections
clearConcurrentCollections();
final Subscription[] subscriptions = getListenerSubs(listenerClass);
if (subscriptions != null) {
Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
subscription.unsubscribe(listener);
}
}
}
private void clearConcurrentCollections() {
this.subUtils.clear();
this.varArgUtils.clear();
}
private Subscription[] getListenerSubs(final Class<?> listenerClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = this.subscriptionsPerListener.get(listenerClass);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass1, final Class<?> messageClass2) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
public Subscription[] getSubscriptionsExact(final Class<?> messageClass1, final Class<?> messageClass2, final Class<?> messageClass3) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2, messageClass3);
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
lock.unlockRead(stamp);
return subscriptions;
}
lock.unlockRead(stamp);
return null;
}
// can return null
// public because it is also used by unit tests
public Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
private Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
private Subscription[] getSubscriptionsExactAndSuper(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
final StampedLock lock = this.lock;
final long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, messageClass3);
lock.unlockRead(stamp);
return subscriptions;
}
// can return null
private Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageSingle.get(messageClass); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubscriptions = this.subUtils.getSuperSubscriptions(messageClass); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubscriptions.isEmpty()) {
collection.addAll(superSubscriptions);
}
}
else if (!superSubscriptions.isEmpty()) {
collection = superSubscriptions;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
private Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass1, final Class<?> messageClass2) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti.get(messageClass1, messageClass2); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils.getSuperSubscriptions(messageClass1, messageClass2); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
// can return null
private Subscription[] getSubscriptionsExactAndSuper_NoLock(final Class<?> messageClass1, final Class<?> messageClass2,
final Class<?> messageClass3) {
ArrayList<Subscription> collection = this.subscriptionsPerMessageMulti
.get(messageClass1, messageClass2, messageClass3); // can return null
// now publish superClasses
final ArrayList<Subscription> superSubs = this.subUtils
.getSuperSubscriptions(messageClass1, messageClass2, messageClass3); // NOT return null
if (collection != null) {
collection = new ArrayList<Subscription>(collection);
if (!superSubs.isEmpty()) {
collection.addAll(superSubs);
}
}
else if (!superSubs.isEmpty()) {
collection = superSubs;
}
if (collection != null) {
final Subscription[] subscriptions = new Subscription[collection.size()];
collection.toArray(subscriptions);
return subscriptions;
}
else {
return null;
}
}
public void publishExact(final Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final Subscription[] subscriptions = getSubscriptionsExact(messageClass); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishExact(final Object message1, final Object message2) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = getSubscriptionsExact(messageClass1, messageClass2); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishExact(final Object message1, final Object message2, final Object message3) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = getSubscriptionsExact(messageClass1, messageClass2, messageClass3); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2, message3);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishExactAndSuper(final Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishExactAndSuper(final Object message1, final Object message2) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishExactAndSuper(final Object message1, final Object message2, final Object message3) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper(messageClass1, messageClass2, messageClass3); // can return null
// Run subscriptions
if (subscriptions != null) {
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2, message3);
}
}
else {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishAll(final Object message) throws Throwable {
final Class<?> messageClass = message.getClass();
final boolean isArray = messageClass.isArray();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message);
}
}
// publish to var arg, only if not already an array (because that would be unnecessary)
if (varArgPossibility.get() && !isArray) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
Subscription sub;
int length = varArgSubs.length;
Object[] asArray = null;
if (length > 1) {
hasSubs = true;
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = this.varArgUtils.getVarArgSuperSubscriptions(messageClass);// CAN NOT RETURN NULL
lock.unlockRead(stamp);
length = varArgSuperSubs.length;
if (length > 1) {
hasSubs = true;
if (asArray == null) {
asArray = (Object[]) Array.newInstance(messageClass, 1);
asArray[0] = message;
}
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
sub.publish(asArray);
}
}
}
// only get here if there were no other subscriptions
// Dead Event must EXACTLY MATCH (no subclasses)
if (!hasSubs) {
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class);
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishAll(final Object message1, final Object message2) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subscriptions = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subscriptions != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subscriptions.length; i++) {
sub = subscriptions[i];
sub.publish(message1, message2);
}
}
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray()) {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
lock.unlockRead(stamp);
final int length = varArgSubs.length;
if (length > 0) {
hasSubs = true;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 2);
asArray[0] = message1;
asArray[1] = message2;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
}
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = this.varArgUtils
.getVarArgSuperSubscriptions(messageClass1, messageClass2); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
if (length > 0) {
hasSubs = true;
Class<?> arrayType;
Object[] asArray;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
arrayType = sub.getHandler().getVarArgClass();
asArray = (Object[]) Array.newInstance(arrayType, 2);
asArray[0] = message1;
asArray[1] = message2;
sub.publish(asArray);
}
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
public void publishAll(final Object message1, final Object message2, final Object message3) throws Throwable {
final Class<?> messageClass1 = message1.getClass();
final Class<?> messageClass2 = message2.getClass();
final Class<?> messageClass3 = message3.getClass();
final StampedLock lock = this.lock;
long stamp = lock.readLock();
final Subscription[] subs = getSubscriptionsExactAndSuper_NoLock(messageClass1, messageClass2, messageClass3); // can return null
lock.unlockRead(stamp);
boolean hasSubs = false;
// Run subscriptions
if (subs != null) {
hasSubs = true;
Subscription sub;
for (int i = 0; i < subs.length; i++) {
sub = subs[i];
sub.publish(message1, message2, message3);
}
}
// publish to var arg, only if not already an array AND we are all of the same type
if (varArgPossibility.get() && !messageClass1.isArray() && !messageClass2.isArray() && !messageClass3.isArray()) {
// vararg can ONLY work if all types are the same
if (messageClass1 == messageClass2 && messageClass1 == messageClass3) {
stamp = lock.readLock();
final Subscription[] varArgSubs = varArgUtils.getVarArgSubscriptions(messageClass1); // can NOT return null
lock.unlockRead(stamp);
final int length = varArgSubs.length;
if (length > 0) {
hasSubs = true;
Object[] asArray = (Object[]) Array.newInstance(messageClass1, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSubs[i];
sub.publish(asArray);
}
}
}
// now publish array based superClasses (but only if those ALSO accept vararg)
stamp = lock.readLock();
final Subscription[] varArgSuperSubs = this.varArgUtils
.getVarArgSuperSubscriptions(messageClass1, messageClass2, messageClass3); // CAN NOT RETURN NULL
lock.unlockRead(stamp);
final int length = varArgSuperSubs.length;
if (length > 0) {
hasSubs = true;
Class<?> arrayType;
Object[] asArray;
Subscription sub;
for (int i = 0; i < length; i++) {
sub = varArgSuperSubs[i];
arrayType = sub.getHandler().getVarArgClass();
asArray = (Object[]) Array.newInstance(arrayType, 3);
asArray[0] = message1;
asArray[1] = message2;
asArray[2] = message3;
sub.publish(asArray);
}
}
}
if (!hasSubs) {
// Dead Event must EXACTLY MATCH (no subclasses)
final Subscription[] deadSubscriptions = getSubscriptionsExact(DeadMessage.class); // can return null
if (deadSubscriptions != null) {
final DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
Subscription sub;
for (int i = 0; i < deadSubscriptions.length; i++) {
sub = deadSubscriptions[i];
sub.publish(deadMessage);
}
}
}
}
// public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
// @Override
// public int compare(Subscription o1, Subscription o2) {
//// int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority());
//// return byPriority == 0 ? o2.id.compareTo(o1.id) : byPriority;
// if (o2.ID > o1.ID) {
// return 1;
// } else if (o2.ID < o1.ID) {
// return -1;
// } else {
// return 0;
// }
// }
// };
}

View File

@ -1,32 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{HH:mm:ss,SSS} %-5p - %m%n"/>
</layout>
</appender>
<category name="org.springframework" additivity="false">
<priority value="info"/>
<appender-ref ref="console"/>
</category>
<category name="de.safetynet" additivity="false">
<priority value="info"/>
<appender-ref ref="console"/>
</category>
<category name="de.safetynet.tracking" additivity="false">
<priority value="debug"/>
<appender-ref ref="console"/>
</category>
<root>
<priority value ="info" />
<appender-ref ref="console"/>
</root>
</log4j:configuration>

View File

@ -98,10 +98,16 @@ public class MBassadorTest extends MessageBusTest {
public void handleError(PublicationError error) {
exceptionCount.incrementAndGet();
}
@Override
public void handleError(final String error, final Class<?> listenerClass) {
// Printout the error itself
System.out.println(new StringBuilder().append(error).append(": ").append(listenerClass.getSimpleName()).toString());
}
};
final MessageBus bus = new MessageBus();
bus.addErrorHandler(ExceptionCounter);
bus.getErrorHandler().addErrorHandler(ExceptionCounter);
bus.start();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -2,7 +2,6 @@ package dorkbox.util.messagebus;
import org.junit.Test;
import dorkbox.util.messagebus.IMessageBus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.MessageBusTest;

View File

@ -1,7 +1,6 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.common.StrongConcurrentSet;
import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
@ -11,6 +10,7 @@ import dorkbox.util.messagebus.subscription.Subscription;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedTransferQueue;
@ -55,7 +55,7 @@ public class PerfTest_Collections {
bench(size, new LinkedList<Subscription>());
bench(size, new StrongConcurrentSetV8<Subscription>(size*2, LOAD_FACTOR));
bench(size, new StrongConcurrentSet<Subscription>(size*2, LOAD_FACTOR));
bench(size, Collections.<Subscription>newSetFromMap(new ConcurrentHashMapV8<Subscription, Boolean>(size*2, LOAD_FACTOR, 1)));
bench(size, Collections.newSetFromMap(new ConcurrentHashMap<Subscription, Boolean>(size*2, LOAD_FACTOR, 1)));
bench(size, new HashSet<Subscription>());
// bench(size, new ConcurrentSkipListSet<Subscription>()); // needs comparable
}

View File

@ -26,11 +26,17 @@ public class PerformanceTest {
error.getCause().printStackTrace();
Assert.fail();
}
@Override
public void handleError(final String error, final Class<?> listenerClass) {
// Printout the error itself
System.out.println(new StringBuilder().append(error).append(": ").append(listenerClass.getSimpleName()).toString());
}
};
public static void main(String[] args) throws Exception {
final MessageBus bus = new MessageBus(CONCURRENCY_LEVEL);
bus.addErrorHandler(TestFailingHandler);
bus.getErrorHandler().addErrorHandler(TestFailingHandler);
Listener listener1 = new Listener();

View File

@ -1,6 +1,7 @@
package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.*;
import dorkbox.util.messagebus.error.DefaultErrorHandler;
import dorkbox.util.messagebus.listeners.*;
import dorkbox.util.messagebus.messages.*;
import dorkbox.util.messagebus.subscription.SubscriptionManager;
@ -117,7 +118,7 @@ public class SubscriptionManagerTest extends AssertSupport {
@Test public void testOverloadedMessageHandlers() {
ListenerFactory listeners = listeners(Overloading.ListenerBase.class, Overloading.ListenerSub.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(1);
SubscriptionManager subscriptionManager = new SubscriptionManager(1, new DefaultErrorHandler(), true);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners).listener(Overloading.ListenerBase.class)
@ -136,7 +137,7 @@ public class SubscriptionManagerTest extends AssertSupport {
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator) {
final SubscriptionManager subscriptionManager = new SubscriptionManager(1);
final SubscriptionManager subscriptionManager = new SubscriptionManager(1, new DefaultErrorHandler(), true);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), 1);

View File

@ -80,10 +80,16 @@ public class SyncBusTest extends MessageBusTest {
public void handleError(PublicationError error) {
exceptionCount.incrementAndGet();
}
@Override
public void handleError(final String error, final Class<?> listenerClass) {
// Printout the error itself
System.out.println(new StringBuilder().append(error).append(": ").append(listenerClass.getSimpleName()).toString());
}
};
final IMessageBus bus = new MessageBus();
bus.addErrorHandler(ExceptionCounter);
bus.getErrorHandler().addErrorHandler(ExceptionCounter);
bus.start();
ListenerFactory listeners = new ListenerFactory()
.create(InstancesPerListener, ExceptionThrowingListener.class);

View File

@ -1,7 +1,5 @@
package dorkbox.util.messagebus.common;
import java.lang.ref.WeakReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -10,6 +8,8 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.ref.WeakReference;
/**
* @author bennidi
*/

View File

@ -29,6 +29,11 @@ public abstract class MessageBusTest extends AssertSupport {
error.getCause().printStackTrace();
Assert.fail();
}
@Override
public void handleError(final String error, final Class<?> listenerClass) {
System.err.println(error + " " + listenerClass);
}
};
@Before
@ -41,14 +46,14 @@ public abstract class MessageBusTest extends AssertSupport {
public MessageBus createBus() {
MessageBus bus = new MessageBus();
bus.addErrorHandler(TestFailingHandler);
bus.getErrorHandler().addErrorHandler(TestFailingHandler);
bus.start();
return bus;
}
public MessageBus createBus(ListenerFactory listeners) {
MessageBus bus = new MessageBus();
bus.addErrorHandler(TestFailingHandler);
bus.getErrorHandler().addErrorHandler(TestFailingHandler);
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(bus, listeners), ConcurrentUnits);
bus.start();
return bus;

Some files were not shown because too many files have changed in this diff Show More