introduced specialized subscriptions for better performance and customization options

This commit is contained in:
benni 2012-11-17 12:13:40 +01:00
parent 08f6e45c7c
commit 014f22df9e
6 changed files with 279 additions and 0 deletions

View File

@ -0,0 +1,104 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.common.ConcurrentSet;
import org.mbassy.PublicationError;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Comparator;
/**
* Subscription is a thread safe container for objects that contain message handlers
*/
public abstract class Subscription {
private final Method handler;
protected ConcurrentSet<Object> listeners = new ConcurrentSet<Object>();
private int priority = 0;
private IMessageBus owningBus ;
public Subscription(IMessageBus owningBus, MessageHandlerMetadata messageHandler) {
this.owningBus = owningBus;
this.priority = messageHandler.getPriority();
this.handler = messageHandler.getHandler();
this.handler.setAccessible(true);
}
public abstract void publish(Object message);
protected abstract void dispatch(final Object message, final Object listener);
protected IMessageBus getMessageBus(){
return owningBus;
}
public int getPriority(){
return priority;
}
public void subscribe(Object o) {
listeners.add(o);
}
protected void handlePublicationError(PublicationError error){
Collection<IPublicationErrorHandler> handlers = owningBus.getRegisteredErrorHandlers();
for(IPublicationErrorHandler handler : handlers){
handler.handleError(error);
}
}
protected void invokeHandler(final Object message, final Object listener){
try {
handler.invoke(listener, message);
}catch(IllegalAccessException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"The class or method is not accessible",
handler, listener, message));
}
catch(IllegalArgumentException e){
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Wrong arguments passed to method. Was: " + message.getClass()
+ "Expected: " + handler.getParameterTypes()[0],
handler, listener, message));
}
catch (InvocationTargetException e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Message handler threw exception",
handler, listener, message));
}
catch (Throwable e) {
handlePublicationError(
new PublicationError(e, "Error during messageHandler notification. " +
"Unexpected exception",
handler, listener, message));
}
}
public void unsubscribe(Object existingListener) {
listeners.remove(existingListener);
}
public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
@Override
public int compare(Subscription o1, Subscription o2) {
int result = o1.getPriority() - o2.getPriority();
return result == 0 ? o1.handler.hashCode() - o2.handler.hashCode() : result;
}
};
}

View File

@ -0,0 +1,38 @@
package org.mbassy.subscription;
import java.util.Collection;
import java.util.LinkedList;
/**
* User: benni
* Date: 11/16/12
*/
public class SubscriptionDeliveryRequest<T> {
private Collection<Subscription> subscriptions;
private T message;
public SubscriptionDeliveryRequest(Collection<Subscription> subscriptions, T message) {
this.subscriptions = subscriptions;
this.message = message;
}
public SubscriptionDeliveryRequest(T message){
this.message = message;
subscriptions = new LinkedList<Subscription>();
}
public boolean addAll(Collection<? extends Subscription> c) {
return subscriptions.addAll(c);
}
public boolean add(Subscription subscription) {
return subscriptions.add(subscription);
}
public void execute(){
for(Subscription sub : subscriptions)
sub.publish(message);
}
}

View File

@ -0,0 +1,43 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.listener.MessageHandlerMetadata;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/16/12
* Time: 10:39 AM
* To change this template use File | Settings | File Templates.
*/
public class SubscriptionFactory {
private IMessageBus owner;
public SubscriptionFactory(IMessageBus owner) {
this.owner = owner;
}
public Subscription createSubscription(MessageHandlerMetadata messageHandlerMetadata){
if(messageHandlerMetadata.isFiltered()){
if(messageHandlerMetadata.isAsynchronous()){
return new UnfilteredAsynchronousSubscription(owner, messageHandlerMetadata);
}
else{
return new UnfilteredSynchronousSubscription(owner, messageHandlerMetadata);
}
}
else{
if(messageHandlerMetadata.isAsynchronous()){
return new FilteredAsynchronousSubscription(owner, messageHandlerMetadata);
}
else{
return new FilteredSynchronousSubscription(owner, messageHandlerMetadata);
}
}
}
}

View File

@ -0,0 +1,33 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.MBassador;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/14/12
* Time: 3:48 PM
* To change this template use File | Settings | File Templates.
*/
public class UnfilteredAsynchronousSubscription extends UnfilteredSubscription {
public UnfilteredAsynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
protected void dispatch(final Object message, final Object listener){
getMessageBus().getExecutor().execute(new Runnable() {
@Override
public void run() {
invokeHandler(message, listener);
}
});
}
}

View File

@ -0,0 +1,34 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.MBassador;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Iterator;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/14/12
* Time: 3:45 PM
* To change this template use File | Settings | File Templates.
*/
public abstract class UnfilteredSubscription extends Subscription{
public UnfilteredSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
public void publish(Object message) {
Iterator<Object> iterator = listeners.iterator();
Object listener = null;
while ((listener = iterator.next()) != null) {
dispatch(message, listener);
}
}
}

View File

@ -0,0 +1,27 @@
package org.mbassy.subscription;
import org.mbassy.IMessageBus;
import org.mbassy.IPublicationErrorHandler;
import org.mbassy.MBassador;
import org.mbassy.listener.MessageHandlerMetadata;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Created with IntelliJ IDEA.
* User: benni
* Date: 11/14/12
* Time: 3:49 PM
* To change this template use File | Settings | File Templates.
*/
public class UnfilteredSynchronousSubscription extends UnfilteredSubscription{
public UnfilteredSynchronousSubscription(IMessageBus mBassador, MessageHandlerMetadata messageHandler) {
super(mBassador, messageHandler);
}
protected void dispatch(final Object message, final Object listener){
invokeHandler(message, listener);
}
}