Moved error handling, code polish

This commit is contained in:
nathan 2015-02-04 01:44:40 +01:00
parent 2338c0ab18
commit 226a5f80a2
21 changed files with 52 additions and 105 deletions

View File

@ -1,8 +1,7 @@
package net.engio.mbassy._misc;
package net.engio.mbassy;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.SyncMessageBus;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;

View File

@ -1,7 +1,9 @@
package net.engio.mbassy.bus.common;
package net.engio.mbassy;
import java.util.concurrent.Executor;
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
/**
* A message bus offers facilities for publishing messages to the message handlers of registered listeners.
* A message publication starts when an object is send to the bus using one of the its publication methods.

View File

@ -1,4 +1,4 @@
package net.engio.mbassy.bus.common;
package net.engio.mbassy;
import java.util.concurrent.TimeUnit;
@ -43,7 +43,7 @@ public interface PubSubSupport<T> {
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* Execute the message publication asynchronously. The behavior of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.
@ -52,7 +52,7 @@ public interface PubSubSupport<T> {
void publishAsync(T message);
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
* Execute the message publication asynchronously. The behavior of this method depends on the
* configured queuing strategy:
* <p/>
* If an unbound queuing strategy is used the call returns immediately.

View File

@ -5,9 +5,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.PubSubSupport;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.subscription.Subscription;
@ -18,9 +19,9 @@ import net.engio.mbassy.subscription.SubscriptionManager;
*
* @param <T>
*/
public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T>, ErrorHandlingSupport {
// error handling is first-class functionality
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
@ -30,9 +31,10 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
public AbstractPubSubSupport(IBusConfiguration configuration) {
// configure the pub sub feature
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader(), getRegisteredErrorHandlers());
this.subscriptionManager = new SubscriptionManager(pubSubFeature.getMetadataReader());
}
@Override
public Collection<IPublicationErrorHandler> getRegisteredErrorHandlers() {
return Collections.unmodifiableCollection(this.errorHandlers);
}
@ -49,8 +51,9 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
}
@Override
public final void addErrorHandler(IPublicationErrorHandler handler) {
synchronized (this){
synchronized (this.errorHandlers) {
this.errorHandlers.add(handler);
}
}
@ -65,13 +68,13 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(deadMessage);
sub.publishToSubscription(this, deadMessage);
}
} else {
boolean delivered = false;
boolean success = false;
for (Subscription sub : subscriptions) {
delivered = sub.publishToSubscription(message);
delivered = sub.publishToSubscription(this, message);
if (delivered) {
success = true;
}
@ -85,7 +88,7 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(deadMessage);
sub.publishToSubscription(this, deadMessage);
}
}
}
@ -104,10 +107,10 @@ public abstract class AbstractPubSubSupport<T> implements PubSubSupport<T> {
}
public void handlePublicationError(PublicationError error) {
@Override
public final void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : this.errorHandlers) {
errorHandler.handleError(error);
}
}
}

View File

@ -8,7 +8,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;

View File

@ -1,6 +1,5 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.PublicationEvent;
/**
* The dead message event is published whenever no message
@ -9,9 +8,16 @@ import net.engio.mbassy.bus.common.PublicationEvent;
* @author bennidi
* Date: 1/18/13
*/
public final class DeadMessage extends PublicationEvent {
public final class DeadMessage {
private Object relatedMessage;
DeadMessage(Object message) {
super(message);
this.relatedMessage = message;
}
public Object getMessage() {
return this.relatedMessage;
}
}

View File

@ -2,7 +2,7 @@ package net.engio.mbassy.bus;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;

View File

@ -3,7 +3,7 @@ package net.engio.mbassy.bus;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.IMessageBus;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;

View File

@ -1,21 +0,0 @@
package net.engio.mbassy.bus.common;
/**
* A wrapped event is created when various conditions are matched (these depend on the concrete
* (sub)type of wrapped event).
*
* @author bennidi
* Date: 3/1/13
*/
public abstract class PublicationEvent {
private Object relatedMessage;
public PublicationEvent(Object message) {
this.relatedMessage = message;
}
public Object getMessage() {
return relatedMessage;
}
}

View File

@ -1,6 +1,4 @@
package net.engio.mbassy.bus.common;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
package net.engio.mbassy.bus.error;
import java.util.Collection;
@ -17,6 +15,9 @@ public interface ErrorHandlingSupport {
*/
void addErrorHandler(IPublicationErrorHandler errorHandler);
void handlePublicationError(PublicationError error);
/**
* Returns an immutable collection containing all the registered error handlers
*

View File

@ -6,7 +6,7 @@ package net.engio.mbassy.bus.error;
* @author bennidi
* Date: 3/29/13
*/
public class MessageBusException extends Exception{
public class MessageBusException extends Exception {
public MessageBusException() {
}

View File

@ -1,14 +0,0 @@
package net.engio.mbassy.bus.error;
/**
* This exception is thrown when a property value that is unavailable at runtime is accessed.
* It indicates that some parts of the runtime configuration are actually incompatible,
* i.e. one component is trying to access a feature of another component that does not
* provide the feature (e.g. asynchronous functionality within a synchronous bus)
*/
public class MissingPropertyException extends RuntimeException {
public MissingPropertyException(String message) {
super(message);
}
}

View File

@ -12,7 +12,7 @@ import java.lang.reflect.Method;
* Date: 2/22/12
* Time: 4:59 PM
*/
public class PublicationError{
public class PublicationError {
// Internal state
private Throwable cause;
@ -37,11 +37,10 @@ public class PublicationError{
final Object listener,
final Object publishedObject) {
this.cause = cause;
this.message = message;
this(cause, message, publishedObject);
this.handler = handler;
this.listener = listener;
this.publishedObject = publishedObject;
}
public PublicationError(final Throwable cause,

View File

@ -2,9 +2,8 @@ package net.engio.mbassy.subscription;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.ErrorHandlingSupport;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IHandlerInvocation;
@ -27,18 +26,11 @@ public class Subscription {
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
private final MessageHandler handlerMetadata;
// error handling is first-class functionality
private final Collection<IPublicationErrorHandler> errorHandlers;
private final IHandlerInvocation invocation;
protected final IConcurrentSet<Object> listeners;
Subscription(MessageHandler handler, Collection<IPublicationErrorHandler> errorHandlers,
IHandlerInvocation invocation, IConcurrentSet<Object> listeners) {
Subscription(MessageHandler handler, IHandlerInvocation invocation, IConcurrentSet<Object> listeners) {
this.handlerMetadata = handler;
this.errorHandlers = errorHandlers;
this.invocation = invocation;
this.listeners = listeners;
}
@ -78,7 +70,7 @@ public class Subscription {
/**
* @return TRUE if there were listeners/handlers available to publish to
*/
public boolean publishToSubscription(Object message){
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message){
if (this.listeners.size() > 0) {
/**
@ -96,20 +88,20 @@ public class Subscription {
try {
this.invocation.invoke(listener, message, handler);
} catch (IllegalAccessException e) {
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"The class or method is not accessible",
handler, listener, message));
} catch (IllegalArgumentException e) {
handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
errorHandler.handlePublicationError(new PublicationError(e, "Error during invocation of message handler. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
} catch (InvocationTargetException e) {
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
errorHandler.handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"Message handler threw exception",
handler, listener, message));
} catch (Throwable e) {
handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
errorHandler.handlePublicationError( new PublicationError(e, "Error during invocation of message handler. " +
"The handler code threw an exception",
handler, listener, message));
}
@ -121,13 +113,6 @@ public class Subscription {
return false;
}
private final void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler handler : this.errorHandlers) {
handler.handleError(error);
}
}
public void subscribe(Object o) {
this.listeners.add(o);
}

View File

@ -9,7 +9,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.MessageBusException;
import net.engio.mbassy.common.ReflectionUtils;
import net.engio.mbassy.common.WeakConcurrentSet;
@ -52,12 +51,9 @@ public class SubscriptionManager {
// synchronize read/write acces to the subscription maps
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// error handling is first-class functionality
private final Collection<IPublicationErrorHandler> errorHandlers;
public SubscriptionManager(MetadataReader metadataReader, Collection<IPublicationErrorHandler> errorHandlers) {
public SubscriptionManager(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
this.errorHandlers = errorHandlers;
}
@ -117,7 +113,7 @@ public class SubscriptionManager {
invocation = new SynchronizedHandlerInvocation(invocation);
}
Subscription subscription = new Subscription(messageHandler, this.errorHandlers, invocation, new WeakConcurrentSet<Object>());
Subscription subscription = new Subscription(messageHandler, invocation, new WeakConcurrentSet<Object>());
subscriptionsByListener.add(subscription);
} catch (Exception e) {
throw new MessageBusException(e);

View File

@ -3,9 +3,7 @@ package net.engio.mbassy;
import java.util.LinkedList;
import java.util.List;
import net.engio.mbassy._misc.BusFactory;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.common.MessageBusTest;
import org.junit.Test;

View File

@ -1,7 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.common.MessageBusTest;
import org.junit.Test;

View File

@ -1,8 +1,5 @@
package net.engio.mbassy;
import java.util.Collections;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.common.AssertSupport;
import net.engio.mbassy.common.ConcurrentExecutor;
import net.engio.mbassy.common.ListenerFactory;
@ -166,7 +163,7 @@ public class SubscriptionManagerTest extends AssertSupport {
Overloading.ListenerBase.class,
Overloading.ListenerSub.class);
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), Collections.<IPublicationErrorHandler>emptyList());
SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);
SubscriptionValidator expectedSubscriptions = new SubscriptionValidator(listeners)
@ -185,7 +182,7 @@ public class SubscriptionManagerTest extends AssertSupport {
}
private void runTestWith(final ListenerFactory listeners, final SubscriptionValidator validator){
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader(), Collections.<IPublicationErrorHandler>emptyList());
final SubscriptionManager subscriptionManager = new SubscriptionManager(new MetadataReader());
ConcurrentExecutor.runConcurrent(TestUtil.subscriber(subscriptionManager, listeners), ConcurrentUnits);

View File

@ -2,9 +2,7 @@ package net.engio.mbassy;
import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy._misc.BusFactory;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.engio.mbassy.common.ConcurrentExecutor;

View File

@ -6,7 +6,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import net.engio.mbassy.annotations.Handler;
import net.engio.mbassy.annotations.Synchronized;
import net.engio.mbassy.bus.common.IMessageBus;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.common.MessageBusTest;

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.common;
import net.engio.mbassy.PubSubSupport;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.subscription.SubscriptionManager;
import java.util.Iterator;