release of 1.0.4.RC, fixed #8

This commit is contained in:
benni 2012-12-11 12:16:22 +01:00
parent ffb5d01b70
commit 95c5d8e535
16 changed files with 100 additions and 15 deletions

View File

@ -10,7 +10,7 @@ Read this documentation to get an overview of its features and how cool this mes
You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
which also contains a partial list of the features of the compared implementations.
The current version is 1.0.3.RC
The current version is 1.0.4.RC
Table of contents:
+ [Features](#features)

View File

@ -1,7 +1,7 @@
usePlugin('java')
group="org.mbassy"
version="1.0.2.RC"
version="1.0.4.RC"
dependencies {
addMavenRepo()

View File

@ -0,0 +1 @@
2dcb34daf560ac4b9a87a95924aa7be6

View File

@ -0,0 +1 @@
275d892046554f6a86226d813202c5ab8a22cf92

View File

@ -0,0 +1,58 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.4.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1 @@
75dda79a70a43a84877c3d62fdef401d

View File

@ -0,0 +1 @@
ed21bd9587e9ee76e63b31ede3cbb6eedb04fa70

View File

@ -9,7 +9,8 @@
<version>1.0.1.RC</version>
<version>1.0.2.RC</version>
<version>1.0.3.RC</version>
<version>1.0.4.RC</version>
</versions>
<lastUpdated>20121208081117</lastUpdated>
<lastUpdated>20121211111454</lastUpdated>
</versioning>
</metadata>

View File

@ -1 +1 @@
7ef914e14f2998dd2ce1665e2cfb30ba
ea8a55fc0698b7a0ab7811c938529e9c

View File

@ -1 +1 @@
90c7ea303b6a468b004369a2f14691239ae9e8ea
e9eedffd82184cc31168a051bbd9453e426d7e71

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.3.RC</version>
<version>1.0.4.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern

View File

@ -1,9 +1,7 @@
package org.mbassy;
import org.mbassy.common.IPredicate;
import org.mbassy.common.ReflectionUtils;
import org.mbassy.dispatch.MessagingContext;
import org.mbassy.listener.Listener;
import org.mbassy.listener.MetadataReader;
import org.mbassy.subscription.Subscription;
import org.mbassy.subscription.SubscriptionDeliveryRequest;
@ -12,6 +10,7 @@ import org.mbassy.subscription.SubscriptionFactory;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The base class for all message bus implementations.
@ -53,6 +52,10 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
// it can be customized by implementing the getSubscriptionFactory() method
private final SubscriptionFactory subscriptionFactory;
// indicates whether the shutdown method has been invoked
// -> if true, then dispatchers will have been shutdown
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
public AbstractMessageBus(BusConfiguration configuration) {
@ -76,12 +79,13 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
try {
pendingMessages.take().execute();
} catch (InterruptedException e) {
handlePublicationError(new PublicationError(e, "Asynchronous publication interrupted", null, null, null));
Thread.currentThread().interrupt();
return;
}
}
}
});
dispatcher.setDaemon(true); // do not prevent the JVM from exiting
dispatchers.add(dispatcher);
dispatcher.start();
}
@ -114,8 +118,8 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
synchronized (this) { // new subscriptions must be processed sequentially
subscriptionsByListener = subscriptionsPerListener.get(listeningClass);
if (subscriptionsByListener == null) { // double check (a bit ugly but works here)
List<Method> messageHandlers = metadataReader.getListeners(listeningClass); // get all methods with subscriptions
if (messageHandlers.isEmpty()) { // remember the class as non listening class
List<Method> messageHandlers = metadataReader.getMessageHandlers(listeningClass); // get all methods with subscriptions
if (messageHandlers.isEmpty()) { // remember the class as non listening class if no handlers are found
nonListeners.add(listeningClass);
return;
}
@ -160,6 +164,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
if (subscriptionsPerMessage.get(messageType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(messageType));
}
// TODO: get superclasses is eligible for caching
for (Class eventSuperType : ReflectionUtils.getSuperclasses(messageType)) {
if (subscriptionsPerMessage.get(eventSuperType) != null) {
subscriptions.addAll(subscriptionsPerMessage.get(eventSuperType));
@ -171,10 +176,11 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
// associate a suscription with a message type
// NOTE: Not thread-safe! must be synchronized in outer scope
private void addMessageTypeSubscription(Class messageType, Subscription subscription) {
Collection<Subscription> subscriptions = subscriptionsPerMessage.get(messageType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArraySet<Subscription>();
subscriptions = new LinkedList<Subscription>();
subscriptionsPerMessage.put(messageType, subscriptions);
}
subscriptions.add(subscription);
@ -195,8 +201,6 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
return listener.getParameterTypes()[0];
}
public void handlePublicationError(PublicationError error) {
for (IPublicationErrorHandler errorHandler : errorHandlers){
errorHandler.handleError(error);
@ -205,10 +209,20 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
@Override
protected void finalize() throws Throwable {
shutdown();
super.finalize();
}
private void shutdown(){
for (Thread dispatcher : dispatchers) {
dispatcher.interrupt();
}
executor.shutdown();
isShutDown.set(true);
}
public boolean hasPendingMessages(){
return pendingMessages.size() > 0;
}
@Override

View File

@ -104,6 +104,13 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
*/
public Executor getExecutor();
/**
* Check whether any asynchronous message publications are pending for being processed
*
* @return
*/
public boolean hasPendingMessages();
/**
* A post command is used as an intermediate object created by a call to the message bus' post method.

View File

@ -57,7 +57,7 @@ public class MetadataReader {
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public List<Method> getListeners(Class<?> target) {
public List<Method> getMessageHandlers(Class<?> target) {
List<Method> allMethods = ReflectionUtils.getMethods(AllMessageHandlers, target);
List<Method> handlers = new LinkedList<Method>();
for(Method listener : allMethods){

View File

@ -10,6 +10,7 @@ import java.util.List;
*/
public class TestUtil {
public static void setup(final IMessageBus bus, final List<Object> listeners, int numberOfThreads) {
Runnable[] setupUnits = new Runnable[numberOfThreads];
int partitionSize;