release of 1.0.5.RC

This commit is contained in:
benni 2012-12-12 14:56:28 +01:00
parent 95c5d8e535
commit 687fbb72ed
26 changed files with 362 additions and 83 deletions

View File

@ -10,7 +10,7 @@ Read this documentation to get an overview of its features and how cool this mes
You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
which also contains a partial list of the features of the compared implementations.
The current version is 1.0.4.RC
The current version is 1.0.5.RC
Table of contents:
+ [Features](#features)
@ -39,6 +39,8 @@ in certain environments where objects are created by frameworks, i.e. spring, gu
ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.
+ <em><strong>Filtering</em></strong>: Mbassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to
a single message handler
+ <em><strong>Message envelopes</em></strong>: Message handlers can declare to receive an enveloped message. The envelope can wrap around different
types of messages. This allows for a single handler to handle multiple message types
+ <em><strong>Handler priorities</em></strong>: A listener can be associated with a priority to influence the order of the message delivery
+ <em><strong>Error handling</em></strong>: Errors during message delivery are sent to an error handler of which a custom implementation can easily be plugged-in.
+ <em><strong>Ease of Use</em></strong>: Using Mbassador in your project is very easy. Create as many instances of Mbassador as you like (usually a singleton will do),
@ -73,6 +75,20 @@ Listener definition (in any bean):
//do something special here
}
@Listener(dispatch = Mode.Synchronous, filters = @Filter(Filters.RejectSubtypes.class))
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
public void handleSuperTypeEvents(MessageEnvelope envelope) {
if(TestEvent.class.isAssignableFrom(envelope.getMessage().getClass())){
TestEvent event = envelope.getMessage();
event.counter.incrementAndGet();
}
if(envelope.getMessage().getClass().equals(TestEvent2.class)){
TestEvent2 event = envelope.getMessage();
event.counter.incrementAndGet();
}
}
Creation of message bus and registration of listeners:
// create as many instances as necessary

View File

@ -0,0 +1 @@
3ad14134e9752e3a073c75ab296427ef

View File

@ -0,0 +1 @@
bedef44bb92cbfafcba48624c91e185d692ea39d

View File

@ -0,0 +1,58 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.5.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1 @@
0a781a5e9f22e5dafeb7f48fff65e46d

View File

@ -0,0 +1 @@
4378b82fa04f4c31f21321424d0c4c328905d3ad

View File

@ -10,7 +10,8 @@
<version>1.0.2.RC</version>
<version>1.0.3.RC</version>
<version>1.0.4.RC</version>
<version>1.0.5.RC</version>
</versions>
<lastUpdated>20121211111454</lastUpdated>
<lastUpdated>20121212135342</lastUpdated>
</versioning>
</metadata>

View File

@ -1 +1 @@
ea8a55fc0698b7a0ab7811c938529e9c
e37de32d920e2182f2549b0500dc20d8

View File

@ -1 +1 @@
e9eedffd82184cc31168a051bbd9453e426d7e71
6b5568c98223ec0a53f7b7ede78e965ba7f0080f

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.4.RC</version>
<version>1.0.5.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern

View File

@ -2,12 +2,12 @@ package org.mbassy;
import org.mbassy.common.ReflectionUtils;
import org.mbassy.dispatch.MessagingContext;
import org.mbassy.listener.MessageHandlerMetadata;
import org.mbassy.listener.MetadataReader;
import org.mbassy.subscription.Subscription;
import org.mbassy.subscription.SubscriptionDeliveryRequest;
import org.mbassy.subscription.SubscriptionFactory;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@ -118,21 +118,24 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
synchronized (this) { // new subscriptions must be processed sequentially
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<Method> messageHandlers = metadataReader.getMessageHandlers(listeningClass); // get all methods with subscriptions
List<MessageHandlerMetadata> messageHandlers = metadataReader.getMessageHandlers(listeningClass);
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
nonListeners.add(listeningClass);
return;
}
subscriptionsByListener = new ArrayList<Subscription>(messageHandlers.size()); // it's safe to use non-concurrent collection here (read only)
// create subscriptions for all detected listeners
for (Method messageHandler : messageHandlers) {
if (!isValidMessageHandler(messageHandler)) continue; // ignore invalid listeners
Class eventType = getMessageType(messageHandler);
for (MessageHandlerMetadata messageHandler : messageHandlers) {
// create the subscription
Subscription subscription = subscriptionFactory
.createSubscription(new MessagingContext(this, metadataReader.getHandlerMetadata(messageHandler)));
.createSubscription(new MessagingContext(this, messageHandler));
subscription.subscribe(listener);
addMessageTypeSubscription(eventType, subscription);
subscriptionsByListener.add(subscription);
subscriptionsByListener.add(subscription);// add it for the listener type (for future subscriptions)
List<Class<?>> messageTypes = messageHandler.getHandledMessages();
for(Class<?> messageType : messageTypes){
addMessageTypeSubscription(messageType, subscription);
}
//updateMessageTypeHierarchy(eventType);
}
subscriptionsPerListener.put(listeningClass, subscriptionsByListener);
@ -187,19 +190,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
private boolean isValidMessageHandler(Method handler) {
if (handler.getParameterTypes().length != 1) {
// a messageHandler only defines one parameter (the message)
System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName()
+ "]. A messageHandler must define exactly one parameter");
return false;
}
return true;
}
private static Class getMessageType(Method listener) {
return listener.getParameterTypes()[0];
}
public void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : errorHandlers){

View File

@ -6,14 +6,22 @@ import org.mbassy.subscription.SubscriptionFactory;
import java.util.concurrent.*;
/**
*
*
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour. *
*
* @author bennidi
* Date: 12/8/12
*/
public class BusConfiguration {
private static final ThreadFactory DaemonThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);
return thread;
}
};
public static final BusConfiguration Default(){
return new BusConfiguration();
}
@ -32,7 +40,7 @@ public class BusConfiguration {
this.numberOfMessageDispatchers = 2;
this.maximumNumberOfPendingMessages = Integer.MAX_VALUE;
this.subscriptionFactory = new SubscriptionFactory();
this.executor = new ThreadPoolExecutor(5, 20, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);
this.metadataReader = new MetadataReader();
}

View File

@ -0,0 +1,34 @@
package org.mbassy.dispatch;
import org.mbassy.common.ConcurrentSet;
import org.mbassy.subscription.MessageEnvelope;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 12/12/12
*/
public class EnvelopedMessageDispatcher implements IMessageDispatcher {
private IMessageDispatcher del;
public EnvelopedMessageDispatcher(IMessageDispatcher dispatcher) {
this.del = dispatcher;
}
@Override
public void dispatch(Object message, ConcurrentSet listeners) {
del.dispatch(new MessageEnvelope(message), listeners);
}
@Override
public MessagingContext getContext() {
return del.getContext();
}
@Override
public IHandlerInvocation getInvocation() {
return del.getInvocation();
}
}

View File

@ -0,0 +1,23 @@
package org.mbassy.listener;
import java.lang.annotation.*;
/**
* Configure a handler to receive an enveloped message as a wrapper around the source
* message. An enveloped message can be
*
* @author bennidi
* Date: 2/8/12
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD})
public @interface Enveloped {
/**
* The set of messages that should be dispatched to the message handler
*/
Class[] messages();
}

View File

@ -0,0 +1,38 @@
package org.mbassy.listener;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 12/12/12
*/
public class Filters {
public static final class AllowAll implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return true;
}
}
public static final class RejectAll implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return false;
}
}
public static final class RejectSubtypes implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
for(Class handledMessage : metadata.getHandledMessages()){
if(handledMessage.equals(event.getClass()))return true;
}
return false;
}
}
}

View File

@ -1,8 +1,5 @@
package org.mbassy.listener;
import java.util.HashMap;
import java.util.Map;
/**
* Message filters can be used to prevent certain messages to be delivered to a specific listener.
* If a filter is used the message will only be delivered if it passes the filter(s)
@ -23,32 +20,4 @@ public interface IMessageFilter {
*/
public boolean accepts(Object message, MessageHandlerMetadata metadata);
public static final class All implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return true;
}
}
public static final class None implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return false;
}
}
public static final class DontAllowSubtypes implements IMessageFilter {
@Override
public boolean accepts(Object event, MessageHandlerMetadata metadata) {
return event.getClass().equals(metadata.getDeclaredMessageType());
}
}
}

View File

@ -1,6 +1,8 @@
package org.mbassy.listener;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
/**
*
@ -18,12 +20,24 @@ public class MessageHandlerMetadata {
private boolean isAsynchronous = false;
private Enveloped envelope = null;
private List<Class<?>> handledMessages = new LinkedList<Class<?>>();
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Listener listenerConfig) {
this.handler = handler;
this.filter = filter;
this.listenerConfig = listenerConfig;
this.isAsynchronous = listenerConfig.dispatch().equals(Mode.Asynchronous);
this.envelope = handler.getAnnotation(Enveloped.class);
if(this.envelope != null){
for(Class messageType : envelope.messages())
handledMessages.add(messageType);
}
else{
handledMessages.add(handler.getParameterTypes()[0]);
}
this.handler.setAccessible(true);
}
@ -48,7 +62,11 @@ public class MessageHandlerMetadata {
return filter;
}
public Class getDeclaredMessageType(){
return handler.getParameterTypes()[0];
public List<Class<?>> getHandledMessages(){
return handledMessages;
}
public boolean isEnveloped() {
return envelope != null;
}
}

View File

@ -1,21 +1,20 @@
package org.mbassy.listener;
import com.sun.xml.internal.messaging.saaj.soap.Envelope;
import org.mbassy.common.IPredicate;
import org.mbassy.common.ReflectionUtils;
import org.mbassy.subscription.MessageEnvelope;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* Created with IntelliJ IDEA.
*
* The meta data reader is responsible for parsing and validating message handler configurations.
*
* @author bennidi
* Date: 11/16/12
* Time: 10:22 AM
* To change this template use File | Settings | File Templates.
*/
public class MetadataReader {
@ -31,15 +30,20 @@ public class MetadataReader {
private final Map<Class<? extends IMessageFilter>, IMessageFilter> filterCache = new HashMap<Class<? extends IMessageFilter>, IMessageFilter>();
// retrieve all instances of filters associated with the given subscription
private IMessageFilter[] getFilter(Listener subscription) throws Exception{
private IMessageFilter[] getFilter(Listener subscription){
if (subscription.filters().length == 0) return null;
IMessageFilter[] filters = new IMessageFilter[subscription.filters().length];
int i = 0;
for (Filter filterDef : subscription.filters()) {
IMessageFilter filter = filterCache.get(filterDef.value());
if (filter == null) {
try{
filter = filterDef.value().newInstance();
filterCache.put(filterDef.value(), filter);
}
catch (Exception e){
throw new RuntimeException(e);// propagate as runtime exception
}
}
filters[i] = filter;
@ -49,15 +53,14 @@ public class MetadataReader {
}
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) throws Exception{
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler){
Listener config = messageHandler.getAnnotation(Listener.class);
IMessageFilter[] filter = getFilter(config);
return new MessageHandlerMetadata(messageHandler, filter, config);
return new MessageHandlerMetadata(messageHandler, getFilter(config), config);
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public List<Method> getMessageHandlers(Class<?> target) {
public List<MessageHandlerMetadata> getMessageHandlers(Class<?> target) {
List<Method> allMethods = ReflectionUtils.getMethods(AllMessageHandlers, target);
List<Method> handlers = new LinkedList<Method>();
for(Method listener : allMethods){
@ -70,7 +73,13 @@ public class MetadataReader {
handlers.add(listener);
}
}
return ReflectionUtils.withoutOverridenSuperclassMethods(handlers);
handlers = ReflectionUtils.withoutOverridenSuperclassMethods(handlers);
List<MessageHandlerMetadata> messageHandlers = new ArrayList<MessageHandlerMetadata>(handlers.size());
for(Method handler : handlers){
if(isValidMessageHandler(handler))
messageHandlers.add(getHandlerMetadata(handler));
}
return messageHandlers;
}
private static boolean isHandler(Method m){
@ -79,7 +88,25 @@ public class MetadataReader {
if(annotation.equals(Listener.class))return true;
}
return false;
}
private boolean isValidMessageHandler(Method handler) {
if (handler.getParameterTypes().length != 1) {
// a messageHandler only defines one parameter (the message)
System.out.println("Found no or more than one parameter in messageHandler [" + handler.getName()
+ "]. A messageHandler must define exactly one parameter");
return false;
}
Enveloped envelope = handler.getAnnotation(Enveloped.class);
if(envelope != null && !MessageEnvelope.class.isAssignableFrom(handler.getParameterTypes()[0])){
System.out.println("Message envelope configured but no subclass of MessageEnvelope found as parameter");
return false;
}
if(envelope != null && envelope.messages().length == 0){
System.out.println("Message envelope configured but message types defined for handler");
return false;
}
return true;
}
}

View File

@ -0,0 +1,27 @@
package org.mbassy.subscription;
import java.sql.Timestamp;
/**
* A message envelope is used to wrap messages of arbitrary type such that a handler
* my receive messages of different types.
*
* @author bennidi
* Date: 12/12/12
*/
public class MessageEnvelope {
private Timestamp tsCreated = new Timestamp(System.currentTimeMillis());
private Object message;
public MessageEnvelope(Object message) {
this.message = message;
}
public <T> T getMessage(){
return (T)message;
}
}

View File

@ -32,6 +32,9 @@ public class SubscriptionFactory {
protected IMessageDispatcher buildDispatcher(MessagingContext context, IHandlerInvocation invocation){
IMessageDispatcher dispatcher = new MessageDispatcher(context, invocation);
if(context.getHandlerMetadata().isEnveloped()){
dispatcher = new EnvelopedMessageDispatcher(dispatcher);
}
if(context.getHandlerMetadata().isFiltered()){
dispatcher = new FilteredMessageDispatcher(dispatcher);
}

View File

@ -4,6 +4,7 @@ import org.junit.Test;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listener.Filter;
import org.mbassy.listener.Filters;
import org.mbassy.listener.IMessageFilter;
import org.mbassy.listener.Listener;
import org.mbassy.listeners.*;
@ -46,7 +47,7 @@ public class FilterTest extends UnitTest{
public static class FilteredMessageListener{
@Listener(filters = {@Filter(IMessageFilter.DontAllowSubtypes.class)})
@Listener(filters = {@Filter(Filters.RejectSubtypes.class)})
public void handleTestEvent(TestEvent event){
event.counter.incrementAndGet();
}

View File

@ -4,6 +4,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.events.TestEvent2;
import org.mbassy.listeners.*;
import org.mbassy.subscription.Subscription;
@ -113,7 +114,8 @@ public class MBassadorTest extends UnitTest {
.create(100, EventingTestBean2.class)
.create(100, EventingTestBean3.class)
.create(100, Object.class)
.create(100, NonListeningBean.class);
.create(100, NonListeningBean.class)
.create(100, MultiEventHandler.class);
List<Object> listeners = listenerFactory.build();
@ -122,14 +124,17 @@ public class MBassadorTest extends UnitTest {
TestEvent event = new TestEvent();
TestEvent subEvent = new SubTestEvent();
TestEvent2 event2 = new TestEvent2();
bus.publishAsync(event);
bus.publishAsync(subEvent);
bus.publishAsync(event2);
pause(2000);
assertEquals(300, event.counter.get());
assertEquals(700, subEvent.counter.get());
assertEquals(500, event.counter.get());
assertEquals(800, subEvent.counter.get());
assertEquals(200, event2.counter.get());
}

View File

@ -0,0 +1,15 @@
package org.mbassy.events;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*
* @author bennidi
* Date: 11/22/12
*/
public class TestEvent2 {
public AtomicInteger counter = new AtomicInteger();
}

View File

@ -2,10 +2,7 @@ package org.mbassy.listeners;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.listener.Filter;
import org.mbassy.listener.IMessageFilter;
import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;
import org.mbassy.listener.*;
/**
* Basic bean that defines some event handlers to be used for different unit testting scenarios
@ -33,7 +30,7 @@ public class EventingTestBean {
@Listener(
priority = 10,
dispatch = Mode.Synchronous,
filters = {@Filter(IMessageFilter.None.class), @Filter(IMessageFilter.All.class)})
filters = {@Filter(Filters.RejectAll.class), @Filter(Filters.AllowAll.class)})
public void handleFiltered(SubTestEvent event) {
event.counter.incrementAndGet();
}

View File

@ -0,0 +1,44 @@
package org.mbassy.listeners;
import org.mbassy.events.SubTestEvent;
import org.mbassy.events.TestEvent;
import org.mbassy.events.TestEvent2;
import org.mbassy.listener.*;
import org.mbassy.subscription.MessageEnvelope;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 12/12/12
*/
public class MultiEventHandler {
@Listener(dispatch = Mode.Synchronous)
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
public void handleEvents(MessageEnvelope envelope) {
if(TestEvent.class.isAssignableFrom(envelope.getMessage().getClass())){
TestEvent event = envelope.getMessage();
event.counter.incrementAndGet();
}
if(envelope.getMessage().getClass().equals(TestEvent2.class)){
TestEvent2 event = envelope.getMessage();
event.counter.incrementAndGet();
}
}
@Listener(dispatch = Mode.Synchronous, filters = @Filter(Filters.RejectSubtypes.class))
@Enveloped(messages = {TestEvent.class, TestEvent2.class})
public void handleSuperTypeEvents(MessageEnvelope envelope) {
if(TestEvent.class.isAssignableFrom(envelope.getMessage().getClass())){
TestEvent event = envelope.getMessage();
event.counter.incrementAndGet();
}
if(envelope.getMessage().getClass().equals(TestEvent2.class)){
TestEvent2 event = envelope.getMessage();
event.counter.incrementAndGet();
}
}
}