Renamed to MessageBus. Code cleanup

This commit is contained in:
nathan 2015-06-08 00:29:10 +02:00
parent 2247b5f24c
commit 31e9fe84b3
6 changed files with 37 additions and 31 deletions

View File

@ -1,9 +1,9 @@
package dorkbox.util.messagebus; package dorkbox.util.messagebus;
import dorkbox.util.messagebus.common.NamedThreadFactory;
import dorkbox.util.messagebus.common.simpleq.MessageType; import dorkbox.util.messagebus.common.simpleq.MessageType;
import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue; import dorkbox.util.messagebus.common.simpleq.MpmcMultiTransferArrayQueue;
import dorkbox.util.messagebus.common.simpleq.MultiNode; import dorkbox.util.messagebus.common.simpleq.MultiNode;
import dorkbox.util.messagebus.common.thread.NamedThreadFactory;
import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.IPublicationErrorHandler;
import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.error.PublicationError;
import dorkbox.util.messagebus.subscription.Matcher; import dorkbox.util.messagebus.subscription.Matcher;
@ -19,7 +19,7 @@ import java.util.Collection;
* @author dorkbox, llc * @author dorkbox, llc
* Date: 2/2/15 * Date: 2/2/15
*/ */
public class MultiMBassador implements IMessageBus { public class MessageBus implements IMessageBus {
public static final String ERROR_HANDLER_MSG = "INFO: No error handler has been configured to handle exceptions during publication.\n" + public static final String ERROR_HANDLER_MSG = "INFO: No error handler has been configured to handle exceptions during publication.\n" +
"Publication error handlers can be added by bus.addErrorHandler()\n" + "Publication error handlers can be added by bus.addErrorHandler()\n" +
"Falling back to console logger."; "Falling back to console logger.";
@ -43,22 +43,22 @@ public class MultiMBassador implements IMessageBus {
/** /**
* By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages * By default, will permit subTypes and VarArg matching, and will use half of CPUs available for dispatching async messages
*/ */
public MultiMBassador() { public MessageBus() {
this(Runtime.getRuntime().availableProcessors() / 2); this(Runtime.getRuntime().availableProcessors() / 2);
} }
/** /**
* @param numberOfThreads how many threads to have for dispatching async messages * @param numberOfThreads how many threads to have for dispatching async messages
*/ */
public MultiMBassador(int numberOfThreads) { public MessageBus(int numberOfThreads) {
this(PublishMode.ExactWithSuperTypes, numberOfThreads); this(PublishMode.ExactWithSuperTypes, numberOfThreads);
} }
/** /**
* @param publishMode Specifies which publishMode to operate the publication of messages. * @param publishMode Specifies which publishMode to operate the publication of messages.
* @param numberOfThreads how many threads to have for dispatching async messages * @param numberOfThreads how many threads to have for dispatching async messages
*/ */
public MultiMBassador(final PublishMode publishMode, int numberOfThreads) { public MessageBus(final PublishMode publishMode, int numberOfThreads) {
if (numberOfThreads < 2) { if (numberOfThreads < 2) {
numberOfThreads = 2; // at LEAST 2 threads numberOfThreads = 2; // at LEAST 2 threads
} }
@ -86,10 +86,10 @@ public class MultiMBassador implements IMessageBus {
Runnable runnable = new Runnable() { Runnable runnable = new Runnable() {
@Override @Override
public void run() { public void run() {
MpmcMultiTransferArrayQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; MpmcMultiTransferArrayQueue IN_QUEUE = MessageBus.this.dispatchQueue;
MultiNode node = new MultiNode(); MultiNode node = new MultiNode();
while (!MultiMBassador.this.shuttingDown) { while (!MessageBus.this.shuttingDown) {
try { try {
//noinspection InfiniteLoopStatement //noinspection InfiniteLoopStatement
while (true) { while (true) {
@ -113,31 +113,31 @@ public class MultiMBassador implements IMessageBus {
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (!MultiMBassador.this.shuttingDown) { if (!MessageBus.this.shuttingDown) {
switch (node.messageType) { switch (node.messageType) {
case 1: { case 1: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1)); .setCause(e).setPublishedObject(node.item1));
break; break;
} }
case 2: { case 2: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1, node.item2)); .setCause(e).setPublishedObject(node.item1, node.item2));
break; break;
} }
case 3: { case 3: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e) .setCause(e)
.setPublishedObject(node.item1, node.item2, node.item3)); .setPublishedObject(node.item1, node.item2, node.item3));
break; break;
} }
default: { default: {
handlePublicationError( handlePublicationError(
new PublicationError().setMessage("Thread interrupted while processing message") new PublicationError().setMessage("Thread interrupted while processing message")
.setCause(e).setPublishedObject(node.item1)); .setCause(e).setPublishedObject(node.item1));
} }
} }
} }
@ -191,12 +191,12 @@ public class MultiMBassador implements IMessageBus {
@Override @Override
public void subscribe(final Object listener) { public void subscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.subscribe(listener); MessageBus.this.subscriptionManager.subscribe(listener);
} }
@Override @Override
public void unsubscribe(final Object listener) { public void unsubscribe(final Object listener) {
MultiMBassador.this.subscriptionManager.unsubscribe(listener); MessageBus.this.subscriptionManager.unsubscribe(listener);
} }
@Override @Override
@ -210,7 +210,7 @@ public class MultiMBassador implements IMessageBus {
subscriptionMatcher.publish(subscriptionManager, message); subscriptionMatcher.publish(subscriptionManager, message);
} catch (Throwable e) { } catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message)); .setPublishedObject(message));
} }
} }
@ -220,7 +220,7 @@ public class MultiMBassador implements IMessageBus {
subscriptionMatcher.publish(subscriptionManager, message1, message2); subscriptionMatcher.publish(subscriptionManager, message1, message2);
} catch (Throwable e) { } catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2)); .setPublishedObject(message1, message2));
} }
} }
@ -230,7 +230,7 @@ public class MultiMBassador implements IMessageBus {
subscriptionMatcher.publish(subscriptionManager, message1, message2, message3); subscriptionMatcher.publish(subscriptionManager, message1, message2, message3);
} catch (Throwable e) { } catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(message1, message2, message3)); .setPublishedObject(message1, message2, message3));
} }
} }
@ -240,7 +240,7 @@ public class MultiMBassador implements IMessageBus {
subscriptionMatcher.publish(subscriptionManager, messages); subscriptionMatcher.publish(subscriptionManager, messages);
} catch (Throwable e) { } catch (Throwable e) {
handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e) handlePublicationError(new PublicationError().setMessage("Error during invocation of message handler.").setCause(e)
.setPublishedObject(messages)); .setPublishedObject(messages));
} }
} }
@ -251,7 +251,7 @@ public class MultiMBassador implements IMessageBus {
this.dispatchQueue.transfer(message, MessageType.ONE); this.dispatchQueue.transfer(message, MessageType.ONE);
} catch (Exception e) { } catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e) handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message)); .setPublishedObject(message));
} }
} }
else { else {
@ -266,7 +266,7 @@ public class MultiMBassador implements IMessageBus {
this.dispatchQueue.transfer(message1, message2); this.dispatchQueue.transfer(message1, message2);
} catch (Exception e) { } catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e) handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message1, message2)); .setPublishedObject(message1, message2));
} }
} }
else { else {
@ -281,7 +281,7 @@ public class MultiMBassador implements IMessageBus {
this.dispatchQueue.transfer(message1, message2, message3); this.dispatchQueue.transfer(message1, message2, message3);
} catch (Exception e) { } catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e) handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(message1, message2, message3)); .setPublishedObject(message1, message2, message3));
} }
} }
else { else {
@ -296,7 +296,7 @@ public class MultiMBassador implements IMessageBus {
this.dispatchQueue.transfer(messages, MessageType.ARRAY); this.dispatchQueue.transfer(messages, MessageType.ARRAY);
} catch (Exception e) { } catch (Exception e) {
handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e) handlePublicationError(new PublicationError().setMessage("Error while adding an asynchronous message").setCause(e)
.setPublishedObject(messages)); .setPublishedObject(messages));
} }
} }
else { else {

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.common.thread;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
import java.util.List; import java.util.List;

View File

@ -1,4 +1,6 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -1,4 +1,4 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.thread.ConcurrentSet; import dorkbox.util.messagebus.common.thread.ConcurrentSet;

View File

@ -1,10 +1,11 @@
package dorkbox.util.messagebus.subscription; package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.ClassUtils;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8; import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree; import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.thread.ClassHolder; import dorkbox.util.messagebus.common.thread.ClassHolder;
import dorkbox.util.messagebus.common.thread.SubscriptionHolder; import dorkbox.util.messagebus.common.thread.SubscriptionHolder;
import dorkbox.util.messagebus.subscription.Subscription;
import dorkbox.util.messagebus.utils.ClassUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;

View File

@ -1,5 +1,8 @@
package dorkbox.util.messagebus.common; package dorkbox.util.messagebus.utils;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.HashMapTree;
import dorkbox.util.messagebus.common.MessageHandler;
import dorkbox.util.messagebus.subscription.Subscription; import dorkbox.util.messagebus.subscription.Subscription;
import java.util.ArrayList; import java.util.ArrayList;