added support for DeadEvent

This commit is contained in:
benni 2013-01-18 12:47:42 +01:00
parent d08470c339
commit fca9c6017c
9 changed files with 125 additions and 26 deletions

View File

@ -137,6 +137,7 @@ Of course you can always clone the repository and build from source
<h3>1.1.1</h3>
+ Added support for DeadEvent
+ Introduced new property to @Listener annotation that allows to activate/deactivate any message handler
+ Full support of proxies created by cglib
+ Message handler inheritance changed! See wiki page about handler definition for more details.

View File

@ -44,7 +44,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
private final List<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
// all pending messages scheduled for asynchronous dispatch are queued here
private final BlockingQueue<MessagePublication<T>> pendingMessages;
private final BlockingQueue<MessagePublication> pendingMessages;
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
@ -56,7 +56,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
this.executor = configuration.getExecutor();
subscriptionFactory = configuration.getSubscriptionFactory();
this.metadataReader = configuration.getMetadataReader();
pendingMessages = new LinkedBlockingQueue<MessagePublication<T>>(configuration.getMaximumNumberOfPendingMessages());
pendingMessages = new LinkedBlockingQueue<MessagePublication>(configuration.getMaximumNumberOfPendingMessages());
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
}
@ -151,7 +151,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
// this method enqueues a message delivery request
protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublication<T> request){
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request){
try {
pendingMessages.put(request);
return request.markScheduled();
@ -161,7 +161,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
// this method enqueues a message delivery request
protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublication<T> request, long timeout, TimeUnit unit){
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit){
try {
return pendingMessages.offer(request, timeout, unit)
? request.markScheduled()
@ -172,6 +172,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
protected Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);

View File

@ -136,7 +136,7 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
*
* @return A message publication that can be used to access information about the state of
*/
public MessagePublication<T> asynchronously();
public MessagePublication asynchronously();
/**
@ -149,7 +149,7 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
*
* @return A message publication that wraps up the publication request
*/
public MessagePublication<T> asynchronously(long timeout, TimeUnit unit);
public MessagePublication asynchronously(long timeout, TimeUnit unit);
}

View File

@ -1,5 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
@ -13,16 +14,25 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
}
public MessagePublication<T> publishAsync(T message) {
return addAsynchronousDeliveryRequest(MessagePublication.Create(
getSubscriptionsByMessageType(message.getClass()), message));
public MessagePublication publishAsync(T message) {
return addAsynchronousDeliveryRequest(createMessagePublication(message));
}
public MessagePublication<T> publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousDeliveryRequest(MessagePublication.Create(
getSubscriptionsByMessageType(message.getClass()), message), timeout, unit);
public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousDeliveryRequest(createMessagePublication(message), timeout, unit);
}
private MessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadEvent.class);
return MessagePublication.Create(subscriptions, new DeadEvent(message));
}
else return MessagePublication.Create(subscriptions, message);
}
/**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
@ -32,13 +42,25 @@ public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>>
*/
public void publish(T message) {
try {
MessagePublication publication = createMessagePublication(message);
publication.execute();
/*
final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if (subscriptions == null) {
return; // TODO: Dead Event?
}
for (Subscription subscription : subscriptions) {
subscription.publish(message);
if (subscriptions == null || subscriptions.isEmpty()) {
// publish a DeadEvent since no subscriptions could be found
final Collection<Subscription> deadEventSubscriptions = getSubscriptionsByMessageType(DeadEvent.class);
if (deadEventSubscriptions != null && !deadEventSubscriptions.isEmpty()) {
for (Subscription subscription : deadEventSubscriptions) {
subscription.publish(new DeadEvent(message));
}
}
}
else{
for (Subscription subscription : subscriptions) {
subscription.publish(message);
}
}*/
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")

View File

@ -1,5 +1,6 @@
package net.engio.mbassy;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.subscription.Subscription;
import java.util.Collection;
@ -12,19 +13,19 @@ import java.util.Collection;
* @author bennidi
* Date: 11/16/12
*/
public class MessagePublication<T> {
public class MessagePublication {
public static <T> MessagePublication<T> Create(Collection<Subscription> subscriptions, T message){
return new MessagePublication<T>(subscriptions, message, State.Initial);
public static MessagePublication Create(Collection<Subscription> subscriptions, Object message){
return new MessagePublication(subscriptions, message, State.Initial);
}
private Collection<Subscription> subscriptions;
private T message;
private Object message;
private State state = State.Scheduled;
private MessagePublication(Collection<Subscription> subscriptions, T message, State initialState) {
private MessagePublication(Collection<Subscription> subscriptions, Object message, State initialState) {
this.subscriptions = subscriptions;
this.message = message;
this.state = initialState;
@ -54,18 +55,22 @@ public class MessagePublication<T> {
return state.equals(State.Scheduled);
}
public MessagePublication<T> markScheduled(){
public MessagePublication markScheduled(){
if(!state.equals(State.Initial))
return this;
state = State.Scheduled;
return this;
}
public MessagePublication<T> setError(){
public MessagePublication setError(){
state = State.Error;
return this;
}
public boolean isDeadEvent(){
return DeadEvent.class.isAssignableFrom(message.getClass());
}
private enum State{
Initial,Scheduled,Running,Finished,Error;
}

View File

@ -24,7 +24,7 @@ public class SyncAsyncPostCommand<T> implements IMessageBus.IPostCommand {
}
@Override
public MessagePublication<T> asynchronously() {
public MessagePublication asynchronously() {
return mBassador.publishAsync(message);
}

View File

@ -0,0 +1,21 @@
package net.engio.mbassy.common;
/**
* The DeadEvent is delivered to all subscribed handlers (if any) whenever no message
* handlers could be found for a given message publication.
*
* @author bennidi
* Date: 1/18/13
*/
public class DeadEvent {
private Object event;
public DeadEvent(Object event) {
this.event = event;
}
public Object getEvent() {
return event;
}
}

View File

@ -16,7 +16,8 @@ import org.junit.runners.Suite;
FilterTest.class,
MetadataReaderTest.class,
ListenerSubscriptionTest.class,
MethodDispatchTest.class
MethodDispatchTest.class,
DeadEventTest.class
})
public class AllTests {
}

View File

@ -0,0 +1,48 @@
package net.engio.mbassy;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Listener;
import org.junit.Test;
/**
* Verify correct behaviour in case of empty message publications
*
* @author bennidi
* Date: 1/18/13
*/
public class DeadEventTest extends UnitTest{
@Test
public void testDeadEvent(){
MBassador bus = new MBassador(BusConfiguration.Default());
DeadEventHandler deadEventHandler = new DeadEventHandler();
bus.subscribe(deadEventHandler);
assertEquals(0, deadEventHandler.getDeadEventCount());
bus.post(new Object()).now();
assertEquals(1, deadEventHandler.getDeadEventCount());
bus.post(323).now();
assertEquals(2, deadEventHandler.getDeadEventCount());
bus.publish("fkdfdk");
assertEquals(3, deadEventHandler.getDeadEventCount());
}
public class DeadEventHandler{
private ConcurrentSet deadEvents = new ConcurrentSet();
@Listener
public void handle(DeadEvent event){
deadEvents.add(event);
}
public int getDeadEventCount(){
return deadEvents.size();
}
}
}