diff --git a/README.md b/README.md index 4ca5ea1..73f3b31 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ data structure to allow high throughput for concurrent access. Read this documentation to get an overview of its features. You can also check out the performance comparison which also contains a partial list of the features of the compared implementations. -The current version is 1.1.3 and it is available from the Maven Central Repository. See the release notes for more details. +The current version is 1.1.4 and it is available from the Maven Central Repository. See the release notes for more details. Table of contents: + [Features](#features) @@ -33,10 +33,11 @@ This means that listeners will also receivesubtypes of the message type they are + Synchronous and asynchronous message delivery: A handler can be invoked to handle a message either synchronously or asynchronously. This is configurable for each handler via annotations. Message publication itself supports synchronous (method blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch -+ Weak references: MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister ++ Weak references: By default, MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister listeners that are not used anymore (of course it is also possible to explicitly unregister a listener if needed). This is very comfortable in certain environments where listeners are managed by frameworks, i.e. spring, guice etc. Just stuff everything into the message bus, it will ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job. ++ Strong references: Instead of using weak references, the bus can be configured to use strong references. + Filtering: MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to a single message handler + Message envelopes: Message handlers can declare to receive an enveloped message. The envelope can wrap different @@ -58,7 +59,7 @@ sending messages to your listeners using one of MBassador's publication methods

Usage

-Listener definition (in any bean): +Handler definition (in any bean): // every message of type TestMessage or any subtype will be delivered // to this handler @@ -67,8 +68,15 @@ Listener definition (in any bean): // do something } - // this handler will be invoked concurrently - @Handler(delivery = Mode.Concurrent) + // every message of type TestMessage or any subtype will be delivered + // to this handler + @Handler + public void handleTestMessageStrong(TestMessage message) { + // do something + } + + // this handler will be invoked asynchronously (in a different thread) + @Handler(delivery = Invoke.Asynchronously) public void handleSubTestMessage(SubTestMessage message) { // do something more expensive here } @@ -90,6 +98,15 @@ Listener definition (in any bean): } + // configure a listener to be stored using strong instead of weak references + @Listener(references = References.Strong) + public class MessageListener{ + + // any handler definitions + + } + + Creation of message bus and registration of listeners: // create as many instances as necessary @@ -113,22 +130,8 @@ Message publication: bus.post(subMessage).now(); // same as above

Installation

-Beginning with version 1.1.0 MBassador is available from the Maven Central Repository (Hooray!). Older versions are -still available from the included maven repository in this github repo but will be deleted in the future. -The recommended way of using MBassador in your project is to add the dependency as shown in step two. Step one is only necessary -if you want to use an older version that is not available in the central repository. +Beginning with version 1.1.0 MBassador is available from the Maven Central Repository using the following coordinates: - 1. Add the repository location to your pom.xml -

-    <repositories>
-        <repository>
-            <id>mbassador-github-repo</id>
-            <url>https://raw.github.com/bennidi/mbassador/master/maven </url>
-        </repository>
-    </repositories>
-    
- 2. Add the MBassador dependency to your pom.xml. You can check which versions are available by browsing - the git repository online.

         <dependency>
             <groupId>net.engio</groupId>
@@ -136,7 +139,6 @@ if you want to use an older version that is not available in the central reposit
             <version>1.1.0</version>
         </dependency>
     
- 3. Run mvn clean package to have maven download and install the required version into your local repository Of course you can always clone the repository and build from source. @@ -147,6 +149,18 @@ to avoid confusion and misunderstanding.

Release Notes

+

1.1.4

+ + + Added support for choosing between strong and weak references using the new @Listener annotation. @Listener can be + added to any class that defines message handlers and allows to configure which reference type is used + + Custom handler invocations: It is possible to provide a custom handler invocation for each message handler, see "invocation" + property of @Handler + + Changed packaging to "bundle" to support OSGI environments + + Synchronization of message handlers via @Synchronized: Handlers that are not thread-safe can be synchronized to guarantee + that only one thread at a time can invoke that handler + + Created a message bus implementation that does not use threading to support use in non-multi-threaded environments like GWT, + see ISyncMessageBus +

1.1.3

+ Added support for FilteredMessage event @@ -179,7 +193,7 @@ First stable release! + Fixed behaviour with capacity bound blocking queue such that there now are two methods to schedule a message asynchronously. One will block until capacity becomes available, the other will timeout after a specified amount of time. - + Added unit tests + + Additional unit tests

1.0.5.RC

diff --git a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar b/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar deleted file mode 100644 index e9fe134..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar.md5 b/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar.md5 deleted file mode 100644 index 4bb0b1e..0000000 --- a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -d53edd50a68307a4c209fd21fa0625d7 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar.sha1 b/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar.sha1 deleted file mode 100644 index dc96805..0000000 --- a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -f2ed26a15178d83c80212ccce73584e97e7d6fb8 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom b/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom deleted file mode 100644 index 5826b60..0000000 --- a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.0.RC - jar - mbassador - Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom.md5 b/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom.md5 deleted file mode 100644 index ce93c9f..0000000 --- a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -760688c8fb33fc90918146e68dab2646 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom.sha1 b/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom.sha1 deleted file mode 100644 index 0c831da..0000000 --- a/maven/org/mbassy/mbassador/1.0.0.RC/mbassador-1.0.0.RC.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -0abf762df11d85841f8f8b7f28361ff19549f019 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar b/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar deleted file mode 100644 index a84d113..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar.md5 b/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar.md5 deleted file mode 100644 index c66d198..0000000 --- a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -933328ff6220f5910770b9b5f13f2eeb \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar.sha1 b/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar.sha1 deleted file mode 100644 index ece776c..0000000 --- a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -003aa85106cfbab5ce618ffd1a6fe36cd0e37b71 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom b/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom deleted file mode 100644 index 90464ef..0000000 --- a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.1.RC - jar - mbassador - Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom.md5 b/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom.md5 deleted file mode 100644 index d88057e..0000000 --- a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -5959d236fd411d499f3b46ecedc28a84 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom.sha1 b/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom.sha1 deleted file mode 100644 index 9727a29..0000000 --- a/maven/org/mbassy/mbassador/1.0.1.RC/mbassador-1.0.1.RC.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -5203632e409974a234ff8ce0ea88c5897a8cbffe \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar b/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar deleted file mode 100644 index e4b8220..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar.md5 b/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar.md5 deleted file mode 100644 index 1f9b48d..0000000 --- a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -5ad621adc6addc2292ee1fc7a2bd756d \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar.sha1 b/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar.sha1 deleted file mode 100644 index 201e316..0000000 --- a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -258a17785e15390e868efe2a51d5370aeb602129 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom b/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom deleted file mode 100644 index 5595a95..0000000 --- a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.2.RC - jar - mbassador - Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom.md5 b/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom.md5 deleted file mode 100644 index 66edd8c..0000000 --- a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -f7972cec0f8ebde2a2a17a678ef36eb6 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom.sha1 b/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom.sha1 deleted file mode 100644 index 607a740..0000000 --- a/maven/org/mbassy/mbassador/1.0.2.RC/mbassador-1.0.2.RC.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -f8c24fe42f192dbdd1f5d2b29bff0220d40039dd \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar b/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar deleted file mode 100644 index 1603cba..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar.md5 b/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar.md5 deleted file mode 100644 index 74e0f26..0000000 --- a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -2bcfd233f83cb6fc4241c91a99f74030 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar.sha1 b/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar.sha1 deleted file mode 100644 index 37363fa..0000000 --- a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -20df67988104be77a9c3e2edbb8ba615a664671b \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom b/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom deleted file mode 100644 index 672013e..0000000 --- a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.3.RC - jar - mbassador - Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom.md5 b/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom.md5 deleted file mode 100644 index eb940fa..0000000 --- a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -d766622aea3238ee52a60920313274d1 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom.sha1 b/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom.sha1 deleted file mode 100644 index 2eb3bea..0000000 --- a/maven/org/mbassy/mbassador/1.0.3.RC/mbassador-1.0.3.RC.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -55c27ec10ee4067843f05fb735a2af95bc2c06cf \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar b/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar deleted file mode 100644 index 15d5a54..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar.md5 b/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar.md5 deleted file mode 100644 index 7e766e5..0000000 --- a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -2dcb34daf560ac4b9a87a95924aa7be6 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar.sha1 b/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar.sha1 deleted file mode 100644 index 0149eeb..0000000 --- a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -275d892046554f6a86226d813202c5ab8a22cf92 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom b/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom deleted file mode 100644 index f53981a..0000000 --- a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.4.RC - jar - mbassador - Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom.md5 b/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom.md5 deleted file mode 100644 index 7bbab14..0000000 --- a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -75dda79a70a43a84877c3d62fdef401d \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom.sha1 b/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom.sha1 deleted file mode 100644 index ae3f6d3..0000000 --- a/maven/org/mbassy/mbassador/1.0.4.RC/mbassador-1.0.4.RC.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -ed21bd9587e9ee76e63b31ede3cbb6eedb04fa70 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar b/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar deleted file mode 100644 index d583821..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar.md5 b/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar.md5 deleted file mode 100644 index 73b5650..0000000 --- a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -3ad14134e9752e3a073c75ab296427ef \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar.sha1 b/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar.sha1 deleted file mode 100644 index ca57baf..0000000 --- a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -bedef44bb92cbfafcba48624c91e185d692ea39d \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom b/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom deleted file mode 100644 index 12d88d5..0000000 --- a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.5.RC - jar - mbassador - Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom.md5 b/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom.md5 deleted file mode 100644 index aabc0b6..0000000 --- a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -0a781a5e9f22e5dafeb7f48fff65e46d \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom.sha1 b/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom.sha1 deleted file mode 100644 index 121ebfa..0000000 --- a/maven/org/mbassy/mbassador/1.0.5.RC/mbassador-1.0.5.RC.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -4378b82fa04f4c31f21321424d0c4c328905d3ad \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar b/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar deleted file mode 100644 index 8554bc9..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar.md5 b/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar.md5 deleted file mode 100644 index 1487a66..0000000 --- a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -dac16b8c129ee38d08e63d4cd487bdc9 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar.sha1 b/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar.sha1 deleted file mode 100644 index de87fa1..0000000 --- a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -110fd15551d0a40fafd46cc4e66c590e4d1b2fc7 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom b/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom deleted file mode 100644 index 946afa5..0000000 --- a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom +++ /dev/null @@ -1,58 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.6.RC - jar - mbassador - Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom.md5 b/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom.md5 deleted file mode 100644 index ab78db3..0000000 --- a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -ebf2e22bfe53d858092befaa865d0bf4 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom.sha1 b/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom.sha1 deleted file mode 100644 index 2d480c2..0000000 --- a/maven/org/mbassy/mbassador/1.0.6.RC/mbassador-1.0.6.RC.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -950ca7e831a9060060f943ca02d313f7c7f5ccbf \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml deleted file mode 100644 index 0ac50ee..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - org.mbassy - mbassador - 1.0.7.RC-SNAPSHOT - - - 20121229.133808 - 1 - - 20121229133808 - - diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml.md5 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml.md5 deleted file mode 100644 index 9ff2df9..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -732608242394f6b316a7cf631208b635 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml.sha1 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml.sha1 deleted file mode 100644 index e8163e1..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/maven-metadata.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -2cae8933661f8cf18afc2c923436c2457e0b3809 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar deleted file mode 100644 index 5bdb1dd..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar.md5 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar.md5 deleted file mode 100644 index 04d9196..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -d20f34dc532b790786a74566598e5e68 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar.sha1 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar.sha1 deleted file mode 100644 index d32ad4b..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-javadoc.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -13de5221308cf546ff55aa6b5a35ec1f78ecf691 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar deleted file mode 100644 index 9562a81..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar.md5 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar.md5 deleted file mode 100644 index 0ae4ace..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -ec63e6aa1e908e4036cca782675742df \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar.sha1 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar.sha1 deleted file mode 100644 index 449d649..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1-sources.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -79eb0716f839f2b63cf5d3d9796800967d54fdb5 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar deleted file mode 100644 index c8ce605..0000000 Binary files a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar and /dev/null differ diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar.md5 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar.md5 deleted file mode 100644 index e0d2a74..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -21ac5231ee676ba2461e31392e210b54 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar.sha1 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar.sha1 deleted file mode 100644 index f9d3314..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9cfcc0fe9534d2a530b48445331d451b97ca3342 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom deleted file mode 100644 index 5761d5e..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom +++ /dev/null @@ -1,152 +0,0 @@ - - - 4.0.0 - org.mbassy - mbassador - 1.0.7.RC-SNAPSHOT - jar - mbassador - - Mbassador is a fast and flexible message bus system following the publish subscribe pattern. - It is designed for ease of use and aims to be feature rich and extensible - while preserving resource efficiency and performance. - - It features: - declarative listener definition via annotations, - sync and/or async message delivery, - weak-references, - message filtering, - ordering of message handlers etc. - - - - https://github.com/bennidi/mbassador - - - MIT license - http://www.opensource.org/licenses/mit-license.php - - - - scm:git:git@github.com:bennidi/mbassador.git - scm:git:git@github.com:bennidi/mbassador.git - - - - - bennidi - Benjamin Diedrichsen - +1 - - - - - UTF-8 - 1.6 - file://${project.basedir}/maven - - - - - - junit - junit - 4.10 - test - - - - - - - mbassador-github-repo - ${github.url} - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-release-plugin - 2.4 - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - org.apache.maven.plugins - maven-source-plugin - - - attach-sources - - jar - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - attach-javadocs - - jar - - - - - - - - - - - release-sign-artifacts - - - performRelease - true - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.4 - - - sign-artifacts - verify - - sign - - - - - - - - - diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom.md5 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom.md5 deleted file mode 100644 index d4d779a..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -05e16f4d46db6bc39529ebe889ff552d \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom.sha1 b/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom.sha1 deleted file mode 100644 index 7393001..0000000 --- a/maven/org/mbassy/mbassador/1.0.7.RC-SNAPSHOT/mbassador-1.0.7.RC-20121229.133808-1.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -5e078c7f168816392ca34e41cbc8f58c81b8c5d9 \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/maven-metadata.xml b/maven/org/mbassy/mbassador/maven-metadata.xml deleted file mode 100644 index 08b6b73..0000000 --- a/maven/org/mbassy/mbassador/maven-metadata.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - org.mbassy - mbassador - 1.0.0.RC - - - 1.0.0.RC - 1.0.1.RC - 1.0.2.RC - 1.0.3.RC - 1.0.4.RC - 1.0.5.RC - 1.0.6.RC - 1.0.7.RC-SNAPSHOT - - 20121229133808 - - diff --git a/maven/org/mbassy/mbassador/maven-metadata.xml.md5 b/maven/org/mbassy/mbassador/maven-metadata.xml.md5 deleted file mode 100644 index 8158d92..0000000 --- a/maven/org/mbassy/mbassador/maven-metadata.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -46231d9bfb55280deeea69a9e18ed52b \ No newline at end of file diff --git a/maven/org/mbassy/mbassador/maven-metadata.xml.sha1 b/maven/org/mbassy/mbassador/maven-metadata.xml.sha1 deleted file mode 100644 index e152e77..0000000 --- a/maven/org/mbassy/mbassador/maven-metadata.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -3608eef0e3183c20460c8b1839c6a3d5740dd63e \ No newline at end of file diff --git a/src/docs/wiki-bus-overview.md b/src/docs/wiki-bus-overview.md index 6b34ee8..81b0eed 100644 --- a/src/docs/wiki-bus-overview.md +++ b/src/docs/wiki-bus-overview.md @@ -18,9 +18,10 @@ The basic contract of the bus is that it will deliver a specific message exactly Currently, message handlers will be invoked in inverse sequence of subscription but any client using this bus should not rely on this assumption. -The bus uses weak references to all listeners such that registered listeners do not need to +By default, the bus uses weak references to all listeners such that registered listeners do not need to be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are -removed on-the-fly as messages get published. +removed on-the-fly as messages get published. It is possible to enable the use of strong references on the message handler +level. Unsubscribing a listener means removing all subscribed message handlers of that listener. This remove operation immediately effects all running publications processes -> A removed listener will under no circumstances receive any message publications. diff --git a/src/docs/wiki-listener-def.md b/src/docs/wiki-listener-def.md index 1bde8d0..b047af0 100644 --- a/src/docs/wiki-listener-def.md +++ b/src/docs/wiki-listener-def.md @@ -10,10 +10,17 @@ filters, delivery modes etc. delivery - Message delivery can either run sequentially(i.e. one listener at a time) or concurrently - (i.e. multiple threads are used to deliver the same message to different listeners). - Note:The number of parallel threads is configurable per instance using the BusConfiguration - Sequential + Message handler invocation can be configured to run + + Note: Use @Synchronized if your handler does not allow multiple, concurrent message publications, i.e. + handlers that are not thread-safe but are used in a multi-threaded environment where asynchronous message publication + is possible. + + Synchronously @@ -42,6 +49,21 @@ filters, delivery modes etc. true + + strongReferencess + Whether the bus should use storng references to the listeners instead of weak references + + false + + + invocation + Specify a custom implementation for the handler invocation. By default, a generic implementation + that uses reflection will be used. Note: A custom implementation will not be faster than the generic one + since there are heavy optimizations by the JVM using JIT-Compiler and more. + + false + + @@ -59,22 +81,22 @@ receive all messages of type TestEvent or any subtype sequentially. -This handler will receive all messages of type SubTestEvent or any subtype concurrently +This handler will receive all messages of type SubTestEvent or any subtype - // this handler will be invoked concurrently - @Handler(delivery = Mode.Concurrent) + // handler invocation will occur in a different thread + @Handler(delivery = Invoke.Asynchronously) public void handleSubTestEvent(SubTestEvent event) { // do something more expensive here } -This handler will receive all messages of type SubTestEvent or any subtype sequentially, +This handler will receive all messages of type SubTestEvent or any subtype, given that they pass the specified filters. This handler will be invoked before the formerly defined one, since it specifies a higher priority // this handler will receive messages of type SubTestEvent // or any of its sub types that passe the given filter(s) @Handler(priority = 10, - dispatch = Mode.Synchronous, + dispatch = Invoke.Synchronously, filters = {@Filter(Filters.SpecialEvent.class)}) public void handleFiltered(SubTestEvent event) { //do something special here @@ -112,8 +134,8 @@ Message handler inheritance corresponds to inheritance of methods as defined in A subclass of any class that defines message handlers will inherit these handler and their configuration. It is possible to change (override) the configuration simply by overriding the super class' method and specifying a different configuration. This way, it is also possible to deactivate a message handler of -a super class by using the "enabled" property on the overridden method. -If a class overrides a method that is configured as a message handler in one of its super classes +a super class by setting the "enabled" property to "false" on the overridden method. +If a class overrides a method that is already configured as a message handler it is still considered a message handler but of course the implementation of the overriding class will be used. diff --git a/src/main/java/net/engio/mbassy/MessageBusException.java b/src/main/java/net/engio/mbassy/MessageBusException.java new file mode 100644 index 0000000..7fcde70 --- /dev/null +++ b/src/main/java/net/engio/mbassy/MessageBusException.java @@ -0,0 +1,29 @@ +package net.engio.mbassy; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public class MessageBusException extends Exception{ + + public MessageBusException() { + } + + public MessageBusException(String message) { + super(message); + } + + public MessageBusException(String message, Throwable cause) { + super(message, cause); + } + + public MessageBusException(Throwable cause) { + super(cause); + } + + public MessageBusException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java new file mode 100644 index 0000000..bacfddc --- /dev/null +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java @@ -0,0 +1,100 @@ +package net.engio.mbassy.bus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * The base class for all message bus implementations. + * + * @param + * @param

+ */ +public abstract class AbstractSyncAsyncMessageBus extends AbstractSyncMessageBus implements IMessageBus { + + // executor for asynchronous message handlers + private final ExecutorService executor; + + // all threads that are available for asynchronous message dispatching + private final List dispatchers; + + // all pending messages scheduled for asynchronous dispatch are queued here + private final BlockingQueue pendingMessages; + + public AbstractSyncAsyncMessageBus(BusConfiguration configuration) { + super(configuration); + this.executor = configuration.getExecutor(); + pendingMessages = new LinkedBlockingQueue(configuration.getMaximumNumberOfPendingMessages()); + dispatchers = new ArrayList(configuration.getNumberOfMessageDispatchers()); + initDispatcherThreads(configuration.getNumberOfMessageDispatchers()); + } + + + // initialize the dispatch workers + private void initDispatcherThreads(int numberOfThreads) { + for (int i = 0; i < numberOfThreads; i++) { + // each thread will run forever and process incoming + //dispatch requests + Thread dispatcher = new Thread(new Runnable() { + public void run() { + while (true) { + try { + pendingMessages.take().execute(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + }); + dispatcher.setDaemon(true); // do not prevent the JVM from exiting + dispatchers.add(dispatcher); + dispatcher.start(); + } + } + + + // this method enqueues a message delivery request + protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) { + try { + pendingMessages.put(request); + return request.markScheduled(); + } catch (InterruptedException e) { + return request.setError(); + } + } + + // this method enqueues a message delivery request + protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) { + try { + return pendingMessages.offer(request, timeout, unit) + ? request.markScheduled() + : request.setError(); + } catch (InterruptedException e) { + return request.setError(); + } + } + + @Override + protected void finalize() throws Throwable { + shutdown(); + super.finalize(); + } + + public void shutdown() { + for (Thread dispatcher : dispatchers) { + dispatcher.interrupt(); + } + executor.shutdown(); + } + + public boolean hasPendingMessages() { + return pendingMessages.size() > 0; + } + + @Override + public Executor getExecutor() { + return executor; + } + +} diff --git a/src/main/java/net/engio/mbassy/bus/AbstractMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java similarity index 68% rename from src/main/java/net/engio/mbassy/bus/AbstractMessageBus.java rename to src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java index 7a0e439..f5a2cf1 100644 --- a/src/main/java/net/engio/mbassy/bus/AbstractMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncMessageBus.java @@ -9,22 +9,8 @@ import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.SubscriptionContext; import net.engio.mbassy.subscription.SubscriptionFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; /** * The base class for all message bus implementations. @@ -32,10 +18,8 @@ import java.util.concurrent.TimeUnit; * @param * @param

*/ -public abstract class AbstractMessageBus implements IMessageBus { +public abstract class AbstractSyncMessageBus implements ISyncMessageBus { - // executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost - private final ExecutorService executor; // the metadata reader that is used to parse objects passed to the subscribe method private final MetadataReader metadataReader; @@ -58,12 +42,6 @@ public abstract class AbstractMessageBus // this handler will receive all errors that occur during message dispatch or message handling private final List errorHandlers = new CopyOnWriteArrayList(); - // all threads that are available for asynchronous message dispatching - private final List dispatchers = new CopyOnWriteArrayList(); - - // all pending messages scheduled for asynchronous dispatch are queued here - private final BlockingQueue pendingMessages; - // this factory is used to create specialized subscriptions based on the given message handler configuration // it can be customized by implementing the getSubscriptionFactory() method private final SubscriptionFactory subscriptionFactory; @@ -71,40 +49,13 @@ public abstract class AbstractMessageBus private final MessagePublication.Factory publicationFactory; - public AbstractMessageBus(BusConfiguration configuration) { - this.executor = configuration.getExecutor(); + public AbstractSyncMessageBus(SyncBusConfiguration configuration) { subscriptionFactory = configuration.getSubscriptionFactory(); this.metadataReader = configuration.getMetadataReader(); this.publicationFactory = configuration.getMessagePublicationFactory(); - pendingMessages = new LinkedBlockingQueue(configuration.getMaximumNumberOfPendingMessages()); - initDispatcherThreads(configuration.getNumberOfMessageDispatchers()); addErrorHandler(new IPublicationErrorHandler.ConsoleLogger()); } - - // initialize the dispatch workers - private void initDispatcherThreads(int numberOfThreads) { - for (int i = 0; i < numberOfThreads; i++) { - // each thread will run forever and process incoming - //dispatch requests - Thread dispatcher = new Thread(new Runnable() { - public void run() { - while (true) { - try { - pendingMessages.take().execute(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } - } - }); - dispatcher.setDaemon(true); // do not prevent the JVM from exiting - dispatchers.add(dispatcher); - dispatcher.start(); - } - } - protected MessagePublication.Factory getPublicationFactory() { return publicationFactory; } @@ -179,26 +130,7 @@ public abstract class AbstractMessageBus errorHandlers.add(handler); } - // this method enqueues a message delivery request - protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) { - try { - pendingMessages.put(request); - return request.markScheduled(); - } catch (InterruptedException e) { - return request.setError(); - } - } - // this method enqueues a message delivery request - protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) { - try { - return pendingMessages.offer(request, timeout, unit) - ? request.markScheduled() - : request.setError(); - } catch (InterruptedException e) { - return request.setError(); - } - } // obtain the set of subscriptions for the given message type // Note: never returns null! @@ -241,26 +173,4 @@ public abstract class AbstractMessageBus } } - @Override - protected void finalize() throws Throwable { - shutdown(); - super.finalize(); - } - - private void shutdown() { - for (Thread dispatcher : dispatchers) { - dispatcher.interrupt(); - } - executor.shutdown(); - } - - public boolean hasPendingMessages() { - return pendingMessages.size() > 0; - } - - @Override - public Executor getExecutor() { - return executor; - } - } diff --git a/src/main/java/net/engio/mbassy/bus/BusConfiguration.java b/src/main/java/net/engio/mbassy/bus/BusConfiguration.java index 03b1572..b13ee45 100644 --- a/src/main/java/net/engio/mbassy/bus/BusConfiguration.java +++ b/src/main/java/net/engio/mbassy/bus/BusConfiguration.java @@ -1,14 +1,6 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.listener.MetadataReader; -import net.engio.mbassy.subscription.SubscriptionFactory; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour. @@ -16,7 +8,7 @@ import java.util.concurrent.TimeUnit; * @author bennidi * Date: 12/8/12 */ -public class BusConfiguration { +public class BusConfiguration extends SyncBusConfiguration { private static final ThreadFactory DaemonThreadFactory = new ThreadFactory() { @Override @@ -37,36 +29,11 @@ public class BusConfiguration { private int maximumNumberOfPendingMessages; - private SubscriptionFactory subscriptionFactory; - - private MetadataReader metadataReader; - - private MessagePublication.Factory messagePublicationFactory; - public BusConfiguration() { + super(); this.numberOfMessageDispatchers = 2; this.maximumNumberOfPendingMessages = Integer.MAX_VALUE; - this.subscriptionFactory = new SubscriptionFactory(); this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue(), DaemonThreadFactory); - this.metadataReader = new MetadataReader(); - this.messagePublicationFactory = new MessagePublication.Factory(); - } - - public MessagePublication.Factory getMessagePublicationFactory() { - return messagePublicationFactory; - } - - public void setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) { - this.messagePublicationFactory = messagePublicationFactory; - } - - public MetadataReader getMetadataReader() { - return metadataReader; - } - - public BusConfiguration setMetadataReader(MetadataReader metadataReader) { - this.metadataReader = metadataReader; - return this; } public int getNumberOfMessageDispatchers() { @@ -78,6 +45,10 @@ public class BusConfiguration { return this; } + /** + * By default an unbound queuing strategy is used to ensure that no events get lost + * @return + */ public ExecutorService getExecutor() { return executor; } @@ -98,12 +69,4 @@ public class BusConfiguration { return this; } - public SubscriptionFactory getSubscriptionFactory() { - return subscriptionFactory; - } - - public BusConfiguration setSubscriptionFactory(SubscriptionFactory subscriptionFactory) { - this.subscriptionFactory = subscriptionFactory; - return this; - } } diff --git a/src/main/java/net/engio/mbassy/bus/IMessageBus.java b/src/main/java/net/engio/mbassy/bus/IMessageBus.java index b4764c8..f80dbda 100644 --- a/src/main/java/net/engio/mbassy/bus/IMessageBus.java +++ b/src/main/java/net/engio/mbassy/bus/IMessageBus.java @@ -1,8 +1,5 @@ package net.engio.mbassy.bus; -import net.engio.mbassy.IPublicationErrorHandler; - -import java.util.Collection; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -47,56 +44,10 @@ import java.util.concurrent.TimeUnit; * @Author bennidi * Date: 2/8/12 */ -public interface IMessageBus { +public interface IMessageBus extends ISyncMessageBus { /** - * Subscribe all listeners of the given message to receive message publications. - * Any message may only be subscribed once (subsequent subscriptions of an already subscribed - * message will be silently ignored) - * - * @param listener - */ - void subscribe(Object listener); - - /** - * Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers - * have effectively been removed and will not receive any message publications (including asynchronously scheduled - * publications that have been published when the message listener was still subscribed). - *

- * A call to this method passing null, an already unsubscribed listener or any object that does not define any message - * handlers will not have any effect and is silently ignored. - * - * @param listener - * @return true, if the listener was found and successfully removed - * false otherwise - */ - boolean unsubscribe(Object listener); - - /** - * @param message - * @return - */ - P post(T message); - - /** - * Publication errors may occur at various points of time during message delivery. A handler may throw an exception, - * may not be accessible due to security constraints or is not annotated properly. - * In any of all possible cases a publication error is created and passed to each of the registered error handlers. - * A call to this method will add the given error handler to the chain - * - * @param errorHandler - */ - void addErrorHandler(IPublicationErrorHandler errorHandler); - - /** - * Returns an immutable collection containing all the registered error handlers - * - * @return - */ - Collection getRegisteredErrorHandlers(); - - /** - * Get the executor service that is used to asynchronous message publication. + * Get the executor service that is used for asynchronous message publications. * The executor is passed to the message bus at creation time. * * @return @@ -111,17 +62,20 @@ public interface IMessageBus { boolean hasPendingMessages(); /** - * A post command is used as an intermediate object created by a call to the message bus' post method. - * It encapsulates the functionality provided by the message bus that created the command. - * Subclasses may extend this interface and add functionality, e.g. different dispatch schemes. + * Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and + * other internally used threads will be shutdown gracefully. After calling shutdown it is not safe + * to further use the message bus. */ - interface IPostCommand { + void shutdown(); - /** - * Execute the message publication immediately. This call blocks until every matching message handler - * has been invoked. - */ - void now(); + /** + * @param message + * @return + */ + P post(T message); + + + interface IPostCommand extends ISyncPostCommand { /** * Execute the message publication asynchronously. The behaviour of this method depends on the @@ -146,4 +100,5 @@ public interface IMessageBus { */ MessagePublication asynchronously(long timeout, TimeUnit unit); } + } diff --git a/src/main/java/net/engio/mbassy/bus/ISyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/ISyncMessageBus.java new file mode 100644 index 0000000..3c187d5 --- /dev/null +++ b/src/main/java/net/engio/mbassy/bus/ISyncMessageBus.java @@ -0,0 +1,77 @@ +package net.engio.mbassy.bus; + +import net.engio.mbassy.IPublicationErrorHandler; + +import java.util.Collection; + +/** + * + * + * @author bennidi + * Date: 3/29/13 + */ +public interface ISyncMessageBus { + + + /** + * Subscribe all listeners of the given message to receive message publications. + * Any message may only be subscribed once (subsequent subscriptions of an already subscribed + * message will be silently ignored) + * + * @param listener + */ + void subscribe(Object listener); + + /** + * Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers + * have effectively been removed and will not receive any message publications (including asynchronously scheduled + * publications that have been published when the message listener was still subscribed). + *

+ * A call to this method passing null, an already unsubscribed listener or any object that does not define any message + * handlers will not have any effect and is silently ignored. + * + * @param listener + * @return true, if the listener was found and successfully removed + * false otherwise + */ + boolean unsubscribe(Object listener); + + /** + * @param message + * @return + */ + P post(T message); + + /** + * Publication errors may occur at various points of time during message delivery. A handler may throw an exception, + * may not be accessible due to security constraints or is not annotated properly. + * In any of all possible cases a publication error is created and passed to each of the registered error handlers. + * A call to this method will add the given error handler to the chain + * + * @param errorHandler + */ + void addErrorHandler(IPublicationErrorHandler errorHandler); + + /** + * Returns an immutable collection containing all the registered error handlers + * + * @return + */ + Collection getRegisteredErrorHandlers(); + + + + /** + * A post command is used as an intermediate object created by a call to the message bus' post method. + * It encapsulates the functionality provided by the message bus that created the command. + * Subclasses may extend this interface and add functionality, e.g. different dispatch schemes. + */ + interface ISyncPostCommand { + + /** + * Execute the message publication immediately. This call blocks until every matching message handler + * has been invoked. + */ + void now(); + } +} diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java index fffa567..6bf1941 100644 --- a/src/main/java/net/engio/mbassy/bus/MBassador.java +++ b/src/main/java/net/engio/mbassy/bus/MBassador.java @@ -8,7 +8,7 @@ import java.util.Collection; import java.util.concurrent.TimeUnit; -public class MBassador extends AbstractMessageBus> { +public class MBassador extends AbstractSyncAsyncMessageBus> { public MBassador(BusConfiguration configuration) { super(configuration); diff --git a/src/main/java/net/engio/mbassy/bus/SyncBusConfiguration.java b/src/main/java/net/engio/mbassy/bus/SyncBusConfiguration.java new file mode 100644 index 0000000..07bebe9 --- /dev/null +++ b/src/main/java/net/engio/mbassy/bus/SyncBusConfiguration.java @@ -0,0 +1,49 @@ +package net.engio.mbassy.bus; + +import net.engio.mbassy.listener.MetadataReader; +import net.engio.mbassy.subscription.SubscriptionFactory; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public class SyncBusConfiguration> { + + protected SubscriptionFactory subscriptionFactory; + protected MetadataReader metadataReader; + protected MessagePublication.Factory messagePublicationFactory; + + public SyncBusConfiguration() { + this.metadataReader = new MetadataReader(); + this.subscriptionFactory = new SubscriptionFactory(); + this.messagePublicationFactory = new MessagePublication.Factory(); + } + + public MessagePublication.Factory getMessagePublicationFactory() { + return messagePublicationFactory; + } + + public void setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) { + this.messagePublicationFactory = messagePublicationFactory; + } + + public MetadataReader getMetadataReader() { + return metadataReader; + } + + public Config setMetadataReader(MetadataReader metadataReader) { + this.metadataReader = metadataReader; + return (Config)this; + } + + public SubscriptionFactory getSubscriptionFactory() { + return subscriptionFactory; + } + + public Config setSubscriptionFactory(SubscriptionFactory subscriptionFactory) { + this.subscriptionFactory = subscriptionFactory; + return (Config)this; + } +} diff --git a/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java new file mode 100644 index 0000000..3af5ea2 --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/AbstractConcurrentSet.java @@ -0,0 +1,134 @@ +package net.engio.mbassy.common; + + +import java.util.Map; + +/** + * This data structure is optimized for non-blocking reads even when write operations occur. + * Running read iterators will not be affected by add operations since writes always insert at the head of the + * structure. Remove operations can affect any running iterator such that a removed element that has not yet + * been reached by the iterator will not appear in that iterator anymore. + * + * @author bennidi + * Date: 2/12/12 + */ +public abstract class AbstractConcurrentSet implements IConcurrentSet { + + // Internal state + private final Object lock = new Object(); + private final Map> entries; // maintain a map of entries for O(log n) lookup + protected Entry head; // reference to the first element + + protected AbstractConcurrentSet(Map> entries) { + this.entries = entries; + } + + protected abstract Entry createEntry(T value, Entry next); + + @Override + public IConcurrentSet add(T element) { + if (element == null || entries.containsKey(element)) { + return this; + } + synchronized (lock) { + insert(element); + } + return this; + } + + @Override + public boolean contains(T element) { + ISetEntry entry = entries.get(element); + return entry != null && entry.getValue() != null; + } + + private void insert(T element) { + if (entries.containsKey(element)) { + return; + } + head = createEntry(element, head); + entries.put(element, head); + } + + @Override + public int size() { + return entries.size(); + } + + @Override + public IConcurrentSet addAll(Iterable elements) { + synchronized (lock) { + for (T element : elements) { + if (element == null || entries.containsKey(element)) { + return this; + } + + insert(element); + } + } + return this; + } + + @Override + public boolean remove(T element) { + if (!entries.containsKey(element)) { + return false; + } + synchronized (lock) { + ISetEntry listelement = entries.get(element); + if (listelement == null) { + return false; //removed by other thread + } + if (listelement != head) { + listelement.remove(); + } else { + ISetEntry oldHead = head; + head = head.next(); + oldHead.clear(); // optimize for GC + } + entries.remove(element); + } + return true; + } + + + public abstract static class Entry implements ISetEntry { + + private Entry next; + + private Entry predecessor; + + protected Entry(Entry next) { + this.next = next; + next.predecessor = this; + } + + protected Entry() { + } + + // not thread-safe! must be synchronized in enclosing context + @Override + public void remove() { + if (predecessor != null) { + predecessor.next = next; + if (next != null) { + next.predecessor = predecessor; + } + } else if (next != null) { + next.predecessor = null; + } + next = null; + predecessor = null; + } + + @Override + public Entry next() { + return next; + } + + @Override + public void clear() { + next = null; + } + } +} diff --git a/src/main/java/net/engio/mbassy/common/ConcurrentSet.java b/src/main/java/net/engio/mbassy/common/ConcurrentSet.java deleted file mode 100644 index 6c5b9fd..0000000 --- a/src/main/java/net/engio/mbassy/common/ConcurrentSet.java +++ /dev/null @@ -1,183 +0,0 @@ -package net.engio.mbassy.common; - - -import java.lang.ref.WeakReference; -import java.util.Iterator; -import java.util.WeakHashMap; - -/** - * This data structure is optimized for non-blocking reads even when write operations occur. - * Running read iterators will not be affected by add operations since writes always insert at the head of the - * structure. Remove operations can affect any running iterator such that a removed element that has not yet - * been reached by the iterator will not appear in that iterator anymore. - *

- * The structure uses weak references to the elements. Iterators automatically perform cleanups of - * garbage collected objects during iteration -> no dedicated maintenance operations need to be called or run in background. - *

- *

- *

- * - * @author bennidi - * Date: 2/12/12 - */ -public class ConcurrentSet implements Iterable { - - // Internal state - private final Object lock = new Object(); - private WeakHashMap> entries = new WeakHashMap>(); // maintain a map of entries for O(log n) lookup - private Entry head; // reference to the first element - - public ConcurrentSet add(T element) { - if (element == null || entries.containsKey(element)) { - return this; - } - synchronized (lock) { - insert(element); - } - return this; - } - - public boolean contains(T element) { - Entry entry = entries.get(element); - return entry != null && entry.getValue() != null; - } - - private void insert(T element) { - if (entries.containsKey(element)) { - return; - } - if (head == null) { - head = new Entry(element); - } else { - head = new Entry(element, head); - } - entries.put(element, head); - } - - public int size() { - return entries.size(); - } - - public ConcurrentSet addAll(Iterable elements) { - synchronized (lock) { - for (T element : elements) { - if (element == null || entries.containsKey(element)) { - return this; - } - - insert(element); - } - } - return this; - } - - public boolean remove(T element) { - if (!entries.containsKey(element)) { - return false; - } - synchronized (lock) { - Entry listelement = entries.get(element); - if (listelement == null) { - return false; //removed by other thread - } - if (listelement != head) { - listelement.remove(); - } else { - Entry oldHead = head; - head = head.next(); - oldHead.next = null; // optimize for GC - } - entries.remove(element); - } - return true; - } - - public Iterator iterator() { - return new Iterator() { - - private Entry current = head; - - public boolean hasNext() { - if (current == null) return false; - if (current.getValue() == null) { // auto-removal of orphan references - do { - remove(); - } while(current != null && current.getValue() == null); - return hasNext(); - } else { - return true; - } - } - - public T next() { - if (current == null) { - return null; - } - T value = current.getValue(); - if (value == null) { // auto-removal of orphan references - do { - remove(); - } while(current != null && current.getValue() == null); - return next(); - } else { - current = current.next(); - return value; - } - } - - public void remove() { - if (current == null) { - return; - } - Entry newCurrent = current.next(); - ConcurrentSet.this.remove(current.getValue()); - current = newCurrent; - } - }; - } - - - public class Entry { - - private WeakReference value; - - private Entry next; - - private Entry predecessor; - - - private Entry(T value) { - this.value = new WeakReference(value); - } - - private Entry(T value, Entry next) { - this(value); - this.next = next; - next.predecessor = this; - } - - public T getValue() { - return value.get(); - } - - // not thread-safe! must be synchronized in enclosing context - public void remove() { - if (predecessor != null) { - predecessor.next = next; - if (next != null) { - next.predecessor = predecessor; - } - } else if (next != null) { - next.predecessor = null; - } - next = null; - predecessor = null; - } - - public Entry next() { - return next; - } - - - } -} diff --git a/src/main/java/net/engio/mbassy/common/IConcurrentSet.java b/src/main/java/net/engio/mbassy/common/IConcurrentSet.java new file mode 100644 index 0000000..432e4ed --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/IConcurrentSet.java @@ -0,0 +1,20 @@ +package net.engio.mbassy.common; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public interface IConcurrentSet extends Iterable { + + IConcurrentSet add(T element); + + boolean contains(T element); + + int size(); + + IConcurrentSet addAll(Iterable elements); + + boolean remove(T element); +} diff --git a/src/main/java/net/engio/mbassy/common/ISetEntry.java b/src/main/java/net/engio/mbassy/common/ISetEntry.java new file mode 100644 index 0000000..aabb826 --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/ISetEntry.java @@ -0,0 +1,19 @@ +package net.engio.mbassy.common; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public interface ISetEntry { + + T getValue(); + + // not thread-safe! must be synchronized in enclosing context + void remove(); + + ISetEntry next(); + + void clear(); +} diff --git a/src/main/java/net/engio/mbassy/common/StrongConcurrentSet.java b/src/main/java/net/engio/mbassy/common/StrongConcurrentSet.java new file mode 100644 index 0000000..ad42994 --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/StrongConcurrentSet.java @@ -0,0 +1,84 @@ +package net.engio.mbassy.common; + + +import java.util.HashMap; +import java.util.Iterator; + +/** + * This implementation uses weak references to the elements. Iterators automatically perform cleanups of + * garbage collected objects during iteration -> no dedicated maintenance operations need to be called or run in background. + *

+ *

+ *

+ * + * @author bennidi + * Date: 2/12/12 + */ +public class StrongConcurrentSet extends AbstractConcurrentSet{ + + + public StrongConcurrentSet() { + super(new HashMap>()); + } + + public Iterator iterator() { + return new Iterator() { + + private ISetEntry current = head; + + public boolean hasNext() { + return current != null; + } + + public T next() { + if (current == null) { + return null; + } + else { + T value = current.getValue(); + current = current.next(); + return value; + } + } + + public void remove() { + if (current == null) { + return; + } + ISetEntry newCurrent = current.next(); + StrongConcurrentSet.this.remove(current.getValue()); + current = newCurrent; + } + }; + } + + @Override + protected Entry createEntry(T value, Entry next) { + return next != null ? new StrongEntry(value, next) : new StrongEntry(value); + } + + + public static class StrongEntry extends Entry { + + private T value; + + private StrongEntry(T value, Entry next) { + super(next); + this.value = value; + } + + private StrongEntry(T value) { + super(); + this.value = value; + } + + @Override + public T getValue() { + return value; + } + + + + + } +} diff --git a/src/main/java/net/engio/mbassy/common/WeakConcurrentSet.java b/src/main/java/net/engio/mbassy/common/WeakConcurrentSet.java new file mode 100644 index 0000000..d5caf9a --- /dev/null +++ b/src/main/java/net/engio/mbassy/common/WeakConcurrentSet.java @@ -0,0 +1,98 @@ +package net.engio.mbassy.common; + + +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.WeakHashMap; + +/** + * This implementation uses weak references to the elements. Iterators automatically perform cleanups of + * garbage collected objects during iteration -> no dedicated maintenance operations need to be called or run in background. + *

+ *

+ *

+ * + * @author bennidi + * Date: 2/12/12 + */ +public class WeakConcurrentSet extends AbstractConcurrentSet{ + + + public WeakConcurrentSet() { + super(new WeakHashMap>()); + } + + public Iterator iterator() { + return new Iterator() { + + private ISetEntry current = head; + + public boolean hasNext() { + if (current == null) return false; + if (current.getValue() == null) { // auto-removal of orphan references + do { + remove(); + } while(current != null && current.getValue() == null); + return hasNext(); + } else { + return true; + } + } + + public T next() { + if (current == null) { + return null; + } + T value = current.getValue(); + if (value == null) { // auto-removal of orphan references + do { + remove(); + } while(current != null && current.getValue() == null); + return next(); + } else { + current = current.next(); + return value; + } + } + + public void remove() { + if (current == null) { + return; + } + ISetEntry newCurrent = current.next(); + WeakConcurrentSet.this.remove(current.getValue()); + current = newCurrent; + } + }; + } + + @Override + protected Entry createEntry(T value, Entry next) { + return next != null ? new WeakEntry(value, next) : new WeakEntry(value); + } + + + public static class WeakEntry extends Entry { + + private WeakReference value; + + private WeakEntry(T value, Entry next) { + super(next); + this.value = new WeakReference(value); + } + + private WeakEntry(T value) { + super(); + this.value = new WeakReference(value); + } + + @Override + public T getValue() { + return value.get(); + } + + + + + } +} diff --git a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java index 5d7ffbb..24e1994 100644 --- a/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/AsynchronousHandlerInvocation.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.IMessageBus; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; /** @@ -8,7 +9,7 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; * @author bennidi * Date: 11/23/12 */ -public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation { +public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation { private IHandlerInvocation delegate; diff --git a/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java index 2b76f24..c05e865 100644 --- a/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/DelegatingMessageDispatcher.java @@ -11,7 +11,7 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; */ public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher { - private IMessageDispatcher delegate; + private final IMessageDispatcher delegate; public DelegatingMessageDispatcher(IMessageDispatcher delegate) { diff --git a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java index 642d306..5173678 100644 --- a/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/EnvelopedMessageDispatcher.java @@ -1,7 +1,7 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.bus.MessagePublication; -import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.subscription.MessageEnvelope; /** @@ -21,7 +21,7 @@ public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher { } @Override - public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) { + public void dispatch(MessagePublication publication, Object message, IConcurrentSet listeners) { getDelegate().dispatch(publication, new MessageEnvelope(message), listeners); } } diff --git a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java index 40fe10a..48e9b68 100644 --- a/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/FilteredMessageDispatcher.java @@ -1,7 +1,7 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.bus.MessagePublication; -import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.listener.IMessageFilter; /** @@ -37,7 +37,7 @@ public class FilteredMessageDispatcher extends DelegatingMessageDispatcher { @Override - public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) { + public void dispatch(MessagePublication publication, Object message, IConcurrentSet listeners) { if (passesFilter(message)) { getDelegate().dispatch(publication, message, listeners); } diff --git a/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java new file mode 100644 index 0000000..c49c16a --- /dev/null +++ b/src/main/java/net/engio/mbassy/dispatch/HandlerInvocation.java @@ -0,0 +1,19 @@ +package net.engio.mbassy.dispatch; + +import net.engio.mbassy.bus.ISyncMessageBus; +import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; +import net.engio.mbassy.subscription.SubscriptionContext; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public abstract class HandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation{ + + + public HandlerInvocation(SubscriptionContext context) { + super(context); + } +} diff --git a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java index 6751640..9f14b84 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/IHandlerInvocation.java @@ -1,5 +1,7 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.ISyncMessageBus; + /** * A handler invocation encapsulates the logic that is used to invoke a single * message handler to process a given message. @@ -9,7 +11,7 @@ package net.engio.mbassy.dispatch; * @author bennidi * Date: 11/23/12 */ -public interface IHandlerInvocation extends ISubscriptionContextAware { +public interface IHandlerInvocation extends ISubscriptionContextAware { /** * Invoke the message delivery logic of this handler @@ -17,5 +19,5 @@ public interface IHandlerInvocation extends ISubscriptionContextAware { * @param listener The listener that will receive the message * @param message The message to be delivered to the listener */ - void invoke(Object listener, Object message); + void invoke(Listener listener, Message message); } diff --git a/src/main/java/net/engio/mbassy/dispatch/IMessageBusAware.java b/src/main/java/net/engio/mbassy/dispatch/IMessageBusAware.java index 240b2ce..8984122 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IMessageBusAware.java +++ b/src/main/java/net/engio/mbassy/dispatch/IMessageBusAware.java @@ -1,6 +1,6 @@ package net.engio.mbassy.dispatch; -import net.engio.mbassy.bus.IMessageBus; +import net.engio.mbassy.bus.ISyncMessageBus; /** * This interface marks components that have access to the message bus that they belong to. @@ -8,7 +8,7 @@ import net.engio.mbassy.bus.IMessageBus; * @author bennidi * Date: 3/1/13 */ -public interface IMessageBusAware { +public interface IMessageBusAware { - IMessageBus getBus(); + Bus getBus(); } diff --git a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java index 89d8fbc..056467c 100644 --- a/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/IMessageDispatcher.java @@ -1,7 +1,7 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.bus.MessagePublication; -import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.IConcurrentSet; /** * A message dispatcher provides the functionality to deliver a single message @@ -29,7 +29,7 @@ public interface IMessageDispatcher extends ISubscriptionContextAware { * @param message The message that should be delivered to the listeners * @param listeners The listeners that should receive the message */ - void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners); + void dispatch(MessagePublication publication, Object message, IConcurrentSet listeners); /** * Get the handler invocation that will be used to deliver the diff --git a/src/main/java/net/engio/mbassy/dispatch/ISubscriptionContextAware.java b/src/main/java/net/engio/mbassy/dispatch/ISubscriptionContextAware.java index ec0a3ab..b24ff71 100644 --- a/src/main/java/net/engio/mbassy/dispatch/ISubscriptionContextAware.java +++ b/src/main/java/net/engio/mbassy/dispatch/ISubscriptionContextAware.java @@ -1,5 +1,6 @@ package net.engio.mbassy.dispatch; +import net.engio.mbassy.bus.ISyncMessageBus; import net.engio.mbassy.subscription.SubscriptionContext; /** @@ -8,7 +9,7 @@ import net.engio.mbassy.subscription.SubscriptionContext; * @author bennidi * Date: 3/1/13 */ -public interface ISubscriptionContextAware extends IMessageBusAware { +public interface ISubscriptionContextAware extends IMessageBusAware { /** * Get the subscription context associated with this object diff --git a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java index 6ffb69a..d92cb4b 100644 --- a/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java +++ b/src/main/java/net/engio/mbassy/dispatch/MessageDispatcher.java @@ -1,7 +1,7 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.bus.MessagePublication; -import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; import net.engio.mbassy.subscription.SubscriptionContext; @@ -17,7 +17,7 @@ import net.engio.mbassy.subscription.SubscriptionContext; */ public class MessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher { - private IHandlerInvocation invocation; + private final IHandlerInvocation invocation; public MessageDispatcher(SubscriptionContext context, IHandlerInvocation invocation) { super(context); @@ -25,7 +25,7 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen } @Override - public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) { + public void dispatch(final MessagePublication publication, final Object message, final IConcurrentSet listeners) { publication.markDelivered(); for (Object listener : listeners) { getInvocation().invoke(listener, message); diff --git a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java index 9894352..df582ff 100644 --- a/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java +++ b/src/main/java/net/engio/mbassy/dispatch/ReflectiveHandlerInvocation.java @@ -2,7 +2,6 @@ package net.engio.mbassy.dispatch; import net.engio.mbassy.IPublicationErrorHandler; import net.engio.mbassy.PublicationError; -import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; import net.engio.mbassy.subscription.SubscriptionContext; import java.lang.reflect.InvocationTargetException; @@ -15,7 +14,7 @@ import java.util.Collection; * @author bennidi * Date: 11/23/12 */ -public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation { +public class ReflectiveHandlerInvocation extends HandlerInvocation{ public ReflectiveHandlerInvocation(SubscriptionContext context) { super(context); diff --git a/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java b/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java new file mode 100644 index 0000000..462c005 --- /dev/null +++ b/src/main/java/net/engio/mbassy/dispatch/SynchronizedHandlerInvocation.java @@ -0,0 +1,31 @@ +package net.engio.mbassy.dispatch; + +import net.engio.mbassy.bus.IMessageBus; +import net.engio.mbassy.subscription.AbstractSubscriptionContextAware; + +/** + * Synchronizes message handler invocations for all handlers that specify @Synchronized + * + * @author bennidi + * Date: 3/31/13 + */ +public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation { + + private IHandlerInvocation delegate; + + public SynchronizedHandlerInvocation(IHandlerInvocation delegate) { + super(delegate.getContext()); + this.delegate = delegate; + } + + /** + * {@inheritDoc} + */ + @Override + public void invoke(final Object listener, final Object message) { + synchronized (listener){ + delegate.invoke(listener, message); + } + } + +} diff --git a/src/main/java/net/engio/mbassy/listener/Handler.java b/src/main/java/net/engio/mbassy/listener/Handler.java index 283927a..2aeb653 100644 --- a/src/main/java/net/engio/mbassy/listener/Handler.java +++ b/src/main/java/net/engio/mbassy/listener/Handler.java @@ -1,10 +1,9 @@ package net.engio.mbassy.listener; -import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import net.engio.mbassy.dispatch.HandlerInvocation; +import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; + +import java.lang.annotation.*; /** * Mark any method of any object(=listener) as a message handler and configure the handler @@ -28,7 +27,7 @@ public @interface Handler { * Define the mode in which a message is delivered to each listener. Listeners can be notified * sequentially or concurrently. */ - Mode delivery() default Mode.Sequential; + Invoke delivery() default Invoke.Synchronously; /** * Handlers are ordered by priority and handlers with higher priority are processed before @@ -51,4 +50,17 @@ public @interface Handler { */ boolean enabled() default true; + + /** + * Each handler call is implemented as an invocation object that implements the invocation mechanism. + * The basic implementation uses reflection and is the default. It is possible though to provide a custom + * invocation to add additional logic. + * + * Note: Providing a custom invocation will most likely reduce performance, since the JIT-Compiler + * can not do some of its sophisticated byte code optimizations. + * + */ + Class invocation() default ReflectiveHandlerInvocation.class; + + } diff --git a/src/main/java/net/engio/mbassy/listener/Invoke.java b/src/main/java/net/engio/mbassy/listener/Invoke.java new file mode 100644 index 0000000..c0f88a7 --- /dev/null +++ b/src/main/java/net/engio/mbassy/listener/Invoke.java @@ -0,0 +1,11 @@ +package net.engio.mbassy.listener; + +/** + * Created with IntelliJ IDEA. + * + * @author bennidi + * Date: 11/16/12 + */ +public enum Invoke { + Synchronously, Asynchronously +} diff --git a/src/main/java/net/engio/mbassy/listener/Listener.java b/src/main/java/net/engio/mbassy/listener/Listener.java new file mode 100644 index 0000000..eee9a1e --- /dev/null +++ b/src/main/java/net/engio/mbassy/listener/Listener.java @@ -0,0 +1,24 @@ +package net.engio.mbassy.listener; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author bennidi + * Date: 3/29/13 + */ +@Retention(value = RetentionPolicy.RUNTIME) +@Target(value = {ElementType.TYPE}) +public @interface Listener { + + /** + * By default, references to message listeners are weak to eliminate risks of memory leaks. + * It is possible to use strong references instead. + * + * @return + */ + References references() default References.Weak; + +} diff --git a/src/main/java/net/engio/mbassy/listener/MessageHandlerMetadata.java b/src/main/java/net/engio/mbassy/listener/MessageHandlerMetadata.java index 2614b03..365df73 100644 --- a/src/main/java/net/engio/mbassy/listener/MessageHandlerMetadata.java +++ b/src/main/java/net/engio/mbassy/listener/MessageHandlerMetadata.java @@ -1,7 +1,8 @@ package net.engio.mbassy.listener; +import net.engio.mbassy.dispatch.HandlerInvocation; + import java.lang.reflect.Method; -import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -11,36 +12,54 @@ import java.util.List; */ public class MessageHandlerMetadata { - private Method handler; + private final Method handler; - private IMessageFilter[] filter; + private final IMessageFilter[] filter; - private Handler handlerConfig; + private final Handler handlerConfig; - private boolean isAsynchronous = false; + private final boolean isAsynchronous; - private Enveloped envelope = null; + private final Enveloped envelope; - private List> handledMessages = new LinkedList>(); + private final List> handledMessages = new LinkedList>(); - private boolean acceptsSubtypes = true; + private final boolean acceptsSubtypes; + + private final Listener listenerConfig; + + private final boolean isSynchronized; - public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Handler handlerConfig) { + public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Handler handlerConfig, Listener listenerConfig) { + if(handler == null || handlerConfig == null){ + throw new IllegalArgumentException("The message handler configuration may not be null"); + } this.handler = handler; this.filter = filter; this.handlerConfig = handlerConfig; - this.isAsynchronous = handlerConfig.delivery().equals(Mode.Concurrent); + this.isAsynchronous = handlerConfig.delivery().equals(Invoke.Asynchronously); this.envelope = handler.getAnnotation(Enveloped.class); this.acceptsSubtypes = !handlerConfig.rejectSubtypes(); + this.listenerConfig = listenerConfig; + this.isSynchronized = handler.getAnnotation(Synchronized.class) != null; if (this.envelope != null) { - Collections.addAll(handledMessages, envelope.messages()); + for(Class messageType : envelope.messages()){ + handledMessages.add(messageType); + } } else { handledMessages.add(handler.getParameterTypes()[0]); } this.handler.setAccessible(true); } + public boolean isSynchronized(){ + return isSynchronized; + } + + public boolean useStrongReferences(){ + return listenerConfig != null && listenerConfig.references().equals(References.Strong); + } public boolean isAsynchronous() { return isAsynchronous; @@ -70,6 +89,10 @@ public class MessageHandlerMetadata { return envelope != null; } + public Class getHandlerInvocation(){ + return handlerConfig.invocation(); + } + public boolean handlesMessage(Class messageType) { for (Class handledMessage : handledMessages) { if (handledMessage.equals(messageType)) { diff --git a/src/main/java/net/engio/mbassy/listener/MetadataReader.java b/src/main/java/net/engio/mbassy/listener/MetadataReader.java index b2e6775..a5bdcb7 100644 --- a/src/main/java/net/engio/mbassy/listener/MetadataReader.java +++ b/src/main/java/net/engio/mbassy/listener/MetadataReader.java @@ -54,14 +54,12 @@ public class MetadataReader { } - public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) { - Handler config = messageHandler.getAnnotation(Handler.class); - return new MessageHandlerMetadata(messageHandler, getFilter(config), config); - } + // get all listeners defined by the given class (includes // listeners defined in super classes) public List getMessageHandlers(Class target) { + Listener listenerConfig = target.getAnnotation(Listener.class); // get all handlers (this will include all (inherited) methods directly annotated using @Handler) List allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target); // retain only those that are at the bottom of their respective class hierarchy (deepest overriding method) @@ -72,19 +70,18 @@ public class MetadataReader { } } - List filteredHandlers = new LinkedList(); // for each handler there will be no overriding method that specifies @Handler annotation // but an overriding method does inherit the listener configuration of the overwritten method for (Method handler : bottomMostHandlers) { - Handler handle = handler.getAnnotation(Handler.class); - if (!handle.enabled() || !isValidMessageHandler(handler)) { + Handler handlerConfig = handler.getAnnotation(Handler.class); + if (!handlerConfig.enabled() || !isValidMessageHandler(handler)) { continue; // disabled or invalid listeners are ignored } Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target); // if a handler is overwritten it inherits the configuration of its parent method MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler, - getFilter(handle), handle); + getFilter(handlerConfig), handlerConfig, listenerConfig); filteredHandlers.add(handlerMetadata); } diff --git a/src/main/java/net/engio/mbassy/listener/Mode.java b/src/main/java/net/engio/mbassy/listener/Mode.java deleted file mode 100644 index 05aac70..0000000 --- a/src/main/java/net/engio/mbassy/listener/Mode.java +++ /dev/null @@ -1,13 +0,0 @@ -package net.engio.mbassy.listener; - -/** - * Created with IntelliJ IDEA. - * - * @author bennidi - * Date: 11/16/12 - * Time: 10:01 AM - * To change this template use File | Settings | File Templates. - */ -public enum Mode { - Sequential, Concurrent -} diff --git a/src/main/java/net/engio/mbassy/listener/References.java b/src/main/java/net/engio/mbassy/listener/References.java new file mode 100644 index 0000000..915a815 --- /dev/null +++ b/src/main/java/net/engio/mbassy/listener/References.java @@ -0,0 +1,10 @@ +package net.engio.mbassy.listener; + +/** +* +* @author bennidi +* Date: 3/29/13 +*/ +public enum References { + Strong,Weak +} diff --git a/src/main/java/net/engio/mbassy/listener/Synchronized.java b/src/main/java/net/engio/mbassy/listener/Synchronized.java new file mode 100644 index 0000000..679eb40 --- /dev/null +++ b/src/main/java/net/engio/mbassy/listener/Synchronized.java @@ -0,0 +1,16 @@ +package net.engio.mbassy.listener; + +import java.lang.annotation.*; + +/** + * A handler marked with this annotation is guaranteed to be invoked in a thread-safe manner, that is, no + * other running message publication will be able to invoke this handler as long as it has not done its work. + * + * @author bennidi + * Date: 3/31/13 + */ +@Retention(value = RetentionPolicy.RUNTIME) +@Inherited +@Target(value = {ElementType.METHOD}) +public @interface Synchronized { +} diff --git a/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java b/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java index 946387f..d4f7f9f 100644 --- a/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java +++ b/src/main/java/net/engio/mbassy/subscription/AbstractSubscriptionContextAware.java @@ -1,6 +1,6 @@ package net.engio.mbassy.subscription; -import net.engio.mbassy.bus.IMessageBus; +import net.engio.mbassy.bus.ISyncMessageBus; import net.engio.mbassy.dispatch.ISubscriptionContextAware; /** @@ -9,20 +9,20 @@ import net.engio.mbassy.dispatch.ISubscriptionContextAware; * @author bennidi * Date: 3/1/13 */ -public class AbstractSubscriptionContextAware implements ISubscriptionContextAware { +public class AbstractSubscriptionContextAware implements ISubscriptionContextAware { - private SubscriptionContext context; + private final SubscriptionContext context; - public AbstractSubscriptionContextAware(SubscriptionContext context) { + public AbstractSubscriptionContextAware(SubscriptionContext context) { this.context = context; } - public SubscriptionContext getContext() { + public SubscriptionContext getContext() { return context; } @Override - public IMessageBus getBus() { + public Bus getBus() { return context.getOwningBus(); } } diff --git a/src/main/java/net/engio/mbassy/subscription/Subscription.java b/src/main/java/net/engio/mbassy/subscription/Subscription.java index df84560..9a48911 100644 --- a/src/main/java/net/engio/mbassy/subscription/Subscription.java +++ b/src/main/java/net/engio/mbassy/subscription/Subscription.java @@ -1,7 +1,7 @@ package net.engio.mbassy.subscription; import net.engio.mbassy.bus.MessagePublication; -import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.dispatch.IMessageDispatcher; import java.util.Comparator; @@ -12,17 +12,18 @@ import java.util.UUID; */ public class Subscription { - private UUID id = UUID.randomUUID(); + private final UUID id = UUID.randomUUID(); - protected ConcurrentSet listeners = new ConcurrentSet(); + protected final IConcurrentSet listeners; - private IMessageDispatcher dispatcher; + private final IMessageDispatcher dispatcher; - private SubscriptionContext context; + private final SubscriptionContext context; - public Subscription(SubscriptionContext context, IMessageDispatcher dispatcher) { + Subscription(SubscriptionContext context, IMessageDispatcher dispatcher, IConcurrentSet listeners) { this.context = context; this.dispatcher = dispatcher; + this.listeners = listeners; } diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java index 170d934..122a5fe 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionContext.java @@ -1,6 +1,6 @@ package net.engio.mbassy.subscription; -import net.engio.mbassy.bus.IMessageBus; +import net.engio.mbassy.bus.ISyncMessageBus; import net.engio.mbassy.listener.MessageHandlerMetadata; /** @@ -12,13 +12,13 @@ import net.engio.mbassy.listener.MessageHandlerMetadata; * @author bennidi * Date: 11/23/12 */ -public class SubscriptionContext { +public class SubscriptionContext { - private IMessageBus owningBus; + private Bus owningBus; private MessageHandlerMetadata handlerMetadata; - public SubscriptionContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) { + public SubscriptionContext(Bus owningBus, MessageHandlerMetadata handlerMetadata) { this.owningBus = owningBus; this.handlerMetadata = handlerMetadata; } @@ -28,7 +28,7 @@ public class SubscriptionContext { * * @return */ - public IMessageBus getOwningBus() { + public Bus getOwningBus() { return owningBus; } diff --git a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java index e59fca2..0f1ea79 100644 --- a/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java +++ b/src/main/java/net/engio/mbassy/subscription/SubscriptionFactory.java @@ -1,12 +1,12 @@ package net.engio.mbassy.subscription; -import net.engio.mbassy.dispatch.AsynchronousHandlerInvocation; -import net.engio.mbassy.dispatch.EnvelopedMessageDispatcher; -import net.engio.mbassy.dispatch.FilteredMessageDispatcher; -import net.engio.mbassy.dispatch.IHandlerInvocation; -import net.engio.mbassy.dispatch.IMessageDispatcher; -import net.engio.mbassy.dispatch.MessageDispatcher; -import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; +import net.engio.mbassy.MessageBusException; +import net.engio.mbassy.common.StrongConcurrentSet; +import net.engio.mbassy.common.WeakConcurrentSet; +import net.engio.mbassy.dispatch.*; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Modifier; /** * Created with IntelliJ IDEA. @@ -18,14 +18,23 @@ import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation; */ public class SubscriptionFactory { - public Subscription createSubscription(SubscriptionContext context) { - IHandlerInvocation invocation = buildInvocationForHandler(context); - IMessageDispatcher dispatcher = buildDispatcher(context, invocation); - return new Subscription(context, dispatcher); + public Subscription createSubscription(SubscriptionContext context) throws MessageBusException{ + try { + IHandlerInvocation invocation = buildInvocationForHandler(context); + IMessageDispatcher dispatcher = buildDispatcher(context, invocation); + return new Subscription(context, dispatcher, context.getHandlerMetadata().useStrongReferences() + ? new StrongConcurrentSet() + : new WeakConcurrentSet()); + } catch (Exception e) { + throw new MessageBusException(e); + } } - protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context) { - IHandlerInvocation invocation = new ReflectiveHandlerInvocation(context); + protected IHandlerInvocation buildInvocationForHandler(SubscriptionContext context) throws Exception { + IHandlerInvocation invocation = createBaseHandlerInvocation(context); + if(context.getHandlerMetadata().isSynchronized()){ + invocation = new SynchronizedHandlerInvocation(invocation); + } if (context.getHandlerMetadata().isAsynchronous()) { invocation = new AsynchronousHandlerInvocation(invocation); } @@ -42,4 +51,13 @@ public class SubscriptionFactory { } return dispatcher; } + + protected IHandlerInvocation createBaseHandlerInvocation(SubscriptionContext context) throws Exception { + Class invocation = context.getHandlerMetadata().getHandlerInvocation(); + if(invocation.isMemberClass() && !Modifier.isStatic(invocation.getModifiers())){ + throw new MessageBusException("The handler invocation must be top level class or nested STATIC inner class"); + } + Constructor constructor = invocation.getConstructor(SubscriptionContext.class); + return constructor.newInstance(context); + } } diff --git a/src/test/java/net/engio/mbassy/AllTests.java b/src/test/java/net/engio/mbassy/AllTests.java index 58bc64c..756e141 100644 --- a/src/test/java/net/engio/mbassy/AllTests.java +++ b/src/test/java/net/engio/mbassy/AllTests.java @@ -12,13 +12,15 @@ import org.junit.runners.Suite; */ @RunWith(Suite.class) @Suite.SuiteClasses({ - ConcurrentSetTest.class, + StrongConcurrentSetTest.class, + WeakConcurrentSetTest.class, MessagePublicationTest.class, FilterTest.class, MetadataReaderTest.class, ListenerSubscriptionTest.class, MethodDispatchTest.class, - DeadEventTest.class + DeadEventTest.class, + SynchronizedHandlerTest.class }) public class AllTests { } diff --git a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java index ae1cb97..ea9ee0f 100644 --- a/src/test/java/net/engio/mbassy/ConcurrentSetTest.java +++ b/src/test/java/net/engio/mbassy/ConcurrentSetTest.java @@ -1,11 +1,10 @@ package net.engio.mbassy; import junit.framework.Assert; -import org.junit.Ignore; -import org.junit.Test; import net.engio.mbassy.common.ConcurrentExecutor; -import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.IConcurrentSet; import net.engio.mbassy.common.UnitTest; +import org.junit.Test; import java.util.HashSet; import java.util.Iterator; @@ -23,50 +22,14 @@ import java.util.Random; * @author bennidi * Date: 11/12/12 */ -public class ConcurrentSetTest extends UnitTest { +public abstract class ConcurrentSetTest extends UnitTest { // Shared state - private int numberOfElements = 100000; - private int numberOfThreads = 50; - - @Ignore("Currently fails when building as a suite with JDK 1.7.0_15 and Maven 3.0.5 on a Mac") - @Test - public void testIteratorCleanup() { - - // Assemble - final HashSet persistingCandidates = new HashSet(); - final ConcurrentSet testSet = new ConcurrentSet(); - final Random rand = new Random(); - - for (int i = 0; i < numberOfElements; i++) { - Object candidate = new Object(); - - if (rand.nextInt() % 3 == 0) { - persistingCandidates.add(candidate); - } - testSet.add(candidate); - } - - // Remove/Garbage collect all objects that have not - // been inserted into the set of persisting candidates. - runGC(); - - ConcurrentExecutor.runConcurrent(new Runnable() { - @Override - public void run() { - for (Object testObject : testSet) { - // do nothing - // just iterate to trigger automatic clean up - System.currentTimeMillis(); - } - } - }, numberOfThreads); - - assertEquals(persistingCandidates.size(), testSet.size()); - for (Object test : testSet) { - assertTrue(persistingCandidates.contains(test)); - } - } + protected final int numberOfElements = 100000; + protected final int numberOfThreads = 50; + + + protected abstract IConcurrentSet createSet(); @Test @@ -74,7 +37,7 @@ public class ConcurrentSetTest extends UnitTest { final LinkedList duplicates = new LinkedList(); final HashSet distinct = new HashSet(); - final ConcurrentSet testSet = new ConcurrentSet(); + final IConcurrentSet testSetWeak = createSet(); Random rand = new Random(); // build set of distinct objects and list of duplicates @@ -92,15 +55,15 @@ public class ConcurrentSetTest extends UnitTest { @Override public void run() { for (Object src : duplicates) { - testSet.add(src); + testSetWeak.add(src); } } }, numberOfThreads); // check that the control set and the test set contain the exact same elements - assertEquals(distinct.size(), testSet.size()); + assertEquals(distinct.size(), testSetWeak.size()); for (Object uniqueObject : distinct) { - assertTrue(testSet.contains(uniqueObject)); + assertTrue(testSetWeak.contains(uniqueObject)); } } @@ -110,7 +73,7 @@ public class ConcurrentSetTest extends UnitTest { final HashSet hashSet = new HashSet(); - final ConcurrentSet concurrentSet = new ConcurrentSet(); + final IConcurrentSet weakConcurrentSet = createSet(); for (int i = 0; i < 1000000; i++) { source.add(new Object()); @@ -126,7 +89,7 @@ public class ConcurrentSetTest extends UnitTest { start = System.currentTimeMillis(); for (Object o : source) { - concurrentSet.add(o); + weakConcurrentSet.add(o); } duration = System.currentTimeMillis() - start; System.out.println("Performance of ConcurrentSet for 1.000.000 object insertions " + duration); @@ -138,7 +101,7 @@ public class ConcurrentSetTest extends UnitTest { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final ConcurrentSet testSet = new ConcurrentSet(); + final IConcurrentSet testSetWeak = createSet(); // build set of distinct objects and mark a subset of those for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); @@ -153,7 +116,7 @@ public class ConcurrentSetTest extends UnitTest { @Override public void run() { for (Object src : source) { - testSet.add(src); + testSetWeak.add(src); } } }, numberOfThreads); @@ -163,20 +126,20 @@ public class ConcurrentSetTest extends UnitTest { @Override public void run() { for (Object src : toRemove) { - testSet.remove(src); + testSetWeak.remove(src); } } }, numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it - for (Object tar : testSet) { + for (Object tar : testSetWeak) { Assert.assertTrue(!toRemove.contains(tar)); } // ensure that the test set still contains all objects from the source set that have not been marked // for removal - assertEquals(source.size() - toRemove.size(), testSet.size()); + assertEquals(source.size() - toRemove.size(), testSetWeak.size()); for (Object src : source) { - if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); + if (!toRemove.contains(src)) assertTrue(testSetWeak.contains(src)); } } @@ -185,7 +148,7 @@ public class ConcurrentSetTest extends UnitTest { final HashSet source = new HashSet(); final HashSet toRemove = new HashSet(); - final ConcurrentSet testSet = new ConcurrentSet(); + final IConcurrentSet testSetWeak = createSet(); // build set of candidates and mark subset for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); @@ -201,35 +164,35 @@ public class ConcurrentSetTest extends UnitTest { @Override public void run() { for (Object src : source) { - testSet.add(src); + testSetWeak.add(src); if (toRemove.contains(src)) - testSet.remove(src); + testSetWeak.remove(src); } } }, numberOfThreads); // ensure that the test set does not contain any of the elements that have been removed from it - for (Object tar : testSet) { + for (Object tar : testSetWeak) { Assert.assertTrue(!toRemove.contains(tar)); } // ensure that the test set still contains all objects from the source set that have not been marked // for removal - assertEquals(source.size() - toRemove.size(), testSet.size()); + assertEquals(source.size() - toRemove.size(), testSetWeak.size()); for (Object src : source) { - if (!toRemove.contains(src)) assertTrue(testSet.contains(src)); + if (!toRemove.contains(src)) assertTrue(testSetWeak.contains(src)); } } @Test public void testCompleteRemoval() { final HashSet source = new HashSet(); - final ConcurrentSet testSet = new ConcurrentSet(); + final IConcurrentSet testSetWeak = createSet(); // build set of candidates and mark subset for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); - testSet.add(candidate); + testSetWeak.add(candidate); } // build test set by adding the candidates @@ -238,7 +201,7 @@ public class ConcurrentSetTest extends UnitTest { @Override public void run() { for (Object src : source) { - testSet.remove(src); + testSetWeak.remove(src); } } }, numberOfThreads); @@ -246,22 +209,22 @@ public class ConcurrentSetTest extends UnitTest { // ensure that the test set still contains all objects from the source set that have not been marked // for removal - assertEquals(0, testSet.size()); + assertEquals(0, testSetWeak.size()); for(Object src : source){ - assertFalse(testSet.contains(src)); + assertFalse(testSetWeak.contains(src)); } } @Test public void testRemovalViaIterator() { final HashSet source = new HashSet(); - final ConcurrentSet testSet = new ConcurrentSet(); + final IConcurrentSet testSetWeak = createSet(); // build set of candidates and mark subset for removal for (int i = 0; i < numberOfElements; i++) { Object candidate = new Object(); source.add(candidate); - testSet.add(candidate); + testSetWeak.add(candidate); } // build test set by adding the candidates @@ -269,7 +232,7 @@ public class ConcurrentSetTest extends UnitTest { ConcurrentExecutor.runConcurrent(new Runnable() { @Override public void run() { - Iterator iterator = testSet.iterator(); + Iterator iterator = testSetWeak.iterator(); while(iterator.hasNext()){ iterator.remove(); } @@ -279,9 +242,9 @@ public class ConcurrentSetTest extends UnitTest { // ensure that the test set still contains all objects from the source set that have not been marked // for removal - assertEquals(0, testSet.size()); + assertEquals(0, testSetWeak.size()); for(Object src : source){ - assertFalse(testSet.contains(src)); + assertFalse(testSetWeak.contains(src)); } } diff --git a/src/test/java/net/engio/mbassy/DeadEventTest.java b/src/test/java/net/engio/mbassy/DeadEventTest.java index f1b3ddb..5f84127 100644 --- a/src/test/java/net/engio/mbassy/DeadEventTest.java +++ b/src/test/java/net/engio/mbassy/DeadEventTest.java @@ -2,7 +2,7 @@ package net.engio.mbassy; import net.engio.mbassy.bus.BusConfiguration; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.common.ConcurrentSet; +import net.engio.mbassy.common.WeakConcurrentSet; import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.listener.Handler; @@ -33,7 +33,7 @@ public class DeadEventTest extends MessageBusTest{ public class DeadEventHandler{ - private ConcurrentSet deadEvents = new ConcurrentSet(); + private WeakConcurrentSet deadEvents = new WeakConcurrentSet(); @Handler public void handle(DeadMessage message){ diff --git a/src/test/java/net/engio/mbassy/FilterTest.java b/src/test/java/net/engio/mbassy/FilterTest.java index e20d3b0..cda9d8a 100644 --- a/src/test/java/net/engio/mbassy/FilterTest.java +++ b/src/test/java/net/engio/mbassy/FilterTest.java @@ -1,19 +1,19 @@ package net.engio.mbassy; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - import net.engio.mbassy.bus.BusConfiguration; import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.common.DeadMessage; import net.engio.mbassy.common.FilteredMessage; import net.engio.mbassy.common.MessageBusTest; -import net.engio.mbassy.events.SubTestMessage; -import net.engio.mbassy.listener.*; -import org.junit.Test; import net.engio.mbassy.common.TestUtil; +import net.engio.mbassy.events.SubTestMessage; import net.engio.mbassy.events.TestMessage; +import net.engio.mbassy.listener.*; import net.engio.mbassy.listeners.ListenerFactory; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * Testing of filter functionality @@ -28,6 +28,8 @@ public class FilterTest extends MessageBusTest { @Test public void testSubclassFilter() throws Exception { + FilteredEventCounter.set(0); + DeadEventCounter.set(0); MBassador bus = getBus(new BusConfiguration()); ListenerFactory listenerFactory = new ListenerFactory() diff --git a/src/test/java/net/engio/mbassy/MessagePublicationTest.java b/src/test/java/net/engio/mbassy/MessagePublicationTest.java index d3dd9a6..aa5b373 100644 --- a/src/test/java/net/engio/mbassy/MessagePublicationTest.java +++ b/src/test/java/net/engio/mbassy/MessagePublicationTest.java @@ -1,23 +1,18 @@ package net.engio.mbassy; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - import net.engio.mbassy.bus.BusConfiguration; import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.common.MessageBusTest; -import net.engio.mbassy.events.SubTestMessage; -import org.junit.Test; import net.engio.mbassy.common.ConcurrentExecutor; +import net.engio.mbassy.common.MessageBusTest; import net.engio.mbassy.common.TestUtil; +import net.engio.mbassy.events.SubTestMessage; import net.engio.mbassy.events.TestMessage; import net.engio.mbassy.events.TestMessage2; -import net.engio.mbassy.listeners.EventingTestBean; -import net.engio.mbassy.listeners.EventingTestBean2; -import net.engio.mbassy.listeners.EventingTestBean3; -import net.engio.mbassy.listeners.ListenerFactory; -import net.engio.mbassy.listeners.MultiEventHandler; -import net.engio.mbassy.listeners.NonListeningBean; +import net.engio.mbassy.listeners.*; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * Test synchronous and asynchronous dispatch in single and multi-threaded scenario. @@ -95,6 +90,31 @@ public class MessagePublicationTest extends MessageBusTest { } + @Test + public void testStrongListenerSubscription() throws Exception { + + MBassador bus = getBus(new BusConfiguration()); + + + for(int i = 0; i< 10000; i++){ + bus.subscribe(new EventingTestBean2()); + } + + runGC(); + + TestMessage message = new TestMessage(); + TestMessage subMessage = new SubTestMessage(); + + bus.publish(message); + bus.publish(subMessage); + + pause(processingTimeInMS); + + assertEquals(10000, message.counter.get()); + assertEquals(20000, subMessage.counter.get()); + + } + @Test public void testConcurrentMixedMessagePublication() throws Exception { final CopyOnWriteArrayList testMessages = new CopyOnWriteArrayList(); diff --git a/src/test/java/net/engio/mbassy/StrongConcurrentSetTest.java b/src/test/java/net/engio/mbassy/StrongConcurrentSetTest.java new file mode 100644 index 0000000..1f061ed --- /dev/null +++ b/src/test/java/net/engio/mbassy/StrongConcurrentSetTest.java @@ -0,0 +1,18 @@ +package net.engio.mbassy; + +import net.engio.mbassy.common.IConcurrentSet; +import net.engio.mbassy.common.StrongConcurrentSet; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public class StrongConcurrentSetTest extends ConcurrentSetTest{ + + @Override + protected IConcurrentSet createSet() { + return new StrongConcurrentSet(); + } +} diff --git a/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java new file mode 100644 index 0000000..d015b88 --- /dev/null +++ b/src/test/java/net/engio/mbassy/SynchronizedHandlerTest.java @@ -0,0 +1,103 @@ +package net.engio.mbassy; + +import net.engio.mbassy.bus.BusConfiguration; +import net.engio.mbassy.bus.IMessageBus; +import net.engio.mbassy.bus.MessagePublication; +import net.engio.mbassy.common.MessageBusTest; +import net.engio.mbassy.listener.Handler; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.listener.Synchronized; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.List; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/31/13 + */ +public class SynchronizedHandlerTest extends MessageBusTest { + + + private static int incrementsPerHandler = 10000; + private static int numberOfMessages = 1000; + private static int numberOfHandlers = 1000; + + @Test + public void testSynchronizedWithSynchronousInvocation(){ + List handlers = new LinkedList(); + IMessageBus bus = getBus(BusConfiguration.Default() + .setNumberOfMessageDispatchers(6)); + for(int i = 0; i < numberOfHandlers; i++){ + SynchronizedMessageHandlerSync handler = new SynchronizedMessageHandlerSync(); + handlers.add(handler); + bus.subscribe(handler); + } + + MessagePublication publication = null; + for(int i = 0; i < numberOfMessages; i++){ + publication = bus.post(new Object()).asynchronously(); + } + while (!publication.isFinished()){ + pause(2000); + } + + for(SynchronizedMessageHandlerSync handler : handlers){ + assertEquals(incrementsPerHandler * numberOfMessages, handler.Counter); + } + + } + + @Test + public void testSynchronizedWithAsSynchronousInvocation(){ + List handlers = new LinkedList(); + IMessageBus bus = getBus(BusConfiguration.Default() + .setNumberOfMessageDispatchers(6)); + for(int i = 0; i < numberOfHandlers; i++){ + SynchronizedMessageHandlerAsyn handler = new SynchronizedMessageHandlerAsyn(); + handlers.add(handler); + bus.subscribe(handler); + } + + for(int i = 0; i < numberOfMessages; i++){ + bus.post(new Object()).asynchronously(); + } + + pause(10000); + + for(SynchronizedMessageHandlerAsyn handler : handlers){ + assertEquals(incrementsPerHandler * numberOfMessages, handler.Counter); + } + + } + + public static class SynchronizedMessageHandlerSync{ + + private int Counter = 0; + + @Handler + @Synchronized + public void handleMessage(Object o){ + for(int i = 0; i < incrementsPerHandler; i++){ + Counter++; + } + } + + } + + public static class SynchronizedMessageHandlerAsyn{ + + private int Counter = 0; + + @Handler(delivery = Invoke.Asynchronously) + @Synchronized + public void handleMessage(Object o){ + for(int i = 0; i < incrementsPerHandler; i++){ + Counter++; + } + } + + } +} diff --git a/src/test/java/net/engio/mbassy/WeakConcurrentSetTest.java b/src/test/java/net/engio/mbassy/WeakConcurrentSetTest.java new file mode 100644 index 0000000..797c924 --- /dev/null +++ b/src/test/java/net/engio/mbassy/WeakConcurrentSetTest.java @@ -0,0 +1,64 @@ +package net.engio.mbassy; + +import net.engio.mbassy.common.ConcurrentExecutor; +import net.engio.mbassy.common.IConcurrentSet; +import net.engio.mbassy.common.WeakConcurrentSet; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Random; + +/** + * Todo: Add javadoc + * + * @author bennidi + * Date: 3/29/13 + */ +public class WeakConcurrentSetTest extends ConcurrentSetTest{ + + @Override + protected IConcurrentSet createSet() { + return new WeakConcurrentSet(); + } + + //@Ignore("Currently fails when building as a suite with JDK 1.7.0_15 and Maven 3.0.5 on a Mac") + @Test + public void testIteratorCleanup() { + + // Assemble + final HashSet persistingCandidates = new HashSet(); + final IConcurrentSet testSetWeak = createSet(); + final Random rand = new Random(); + + for (int i = 0; i < numberOfElements; i++) { + Object candidate = new Object(); + + if (rand.nextInt() % 3 == 0) { + persistingCandidates.add(candidate); + } + testSetWeak.add(candidate); + } + + // Remove/Garbage collect all objects that have not + // been inserted into the set of persisting candidates. + runGC(); + + ConcurrentExecutor.runConcurrent(new Runnable() { + @Override + public void run() { + for (Object testObject : testSetWeak) { + // do nothing + // just iterate to trigger automatic clean up + System.currentTimeMillis(); + } + } + }, numberOfThreads); + + assertEquals(persistingCandidates.size(), testSetWeak.size()); + for (Object test : testSetWeak) { + assertTrue(persistingCandidates.contains(test)); + } + } + + +} diff --git a/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java b/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java index f0ff5b1..35ef3d1 100644 --- a/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java +++ b/src/test/java/net/engio/mbassy/listeners/EventingTestBean.java @@ -1,8 +1,10 @@ package net.engio.mbassy.listeners; +import net.engio.mbassy.dispatch.HandlerInvocation; import net.engio.mbassy.events.SubTestMessage; import net.engio.mbassy.events.TestMessage; import net.engio.mbassy.listener.*; +import net.engio.mbassy.subscription.SubscriptionContext; /** * Basic bean that defines some event handlers to be used for different unit testting scenarios @@ -20,7 +22,7 @@ public class EventingTestBean { } // this handler will be invoked asynchronously - @Handler(priority = 0, delivery = Mode.Concurrent) + @Handler(priority = 0, delivery = Invoke.Asynchronously, invocation = HandleSubTestEventInvocation.class) public void handleSubTestEvent(SubTestMessage message) { message.counter.incrementAndGet(); } @@ -29,11 +31,21 @@ public class EventingTestBean { // or any subtabe and that passes the given filter @Handler( priority = 10, - delivery = Mode.Sequential, + delivery = Invoke.Synchronously, filters = {@Filter(Filters.RejectAll.class), @Filter(Filters.AllowAll.class)}) public void handleFiltered(SubTestMessage message) { message.counter.incrementAndGet(); } + public static class HandleSubTestEventInvocation extends HandlerInvocation { + public HandleSubTestEventInvocation(SubscriptionContext context) { + super(context); + } + + @Override + public void invoke(EventingTestBean listener, SubTestMessage message) { + listener.handleSubTestEvent(message); + } + } } diff --git a/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java b/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java index a21abbf..3ac9395 100644 --- a/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java +++ b/src/test/java/net/engio/mbassy/listeners/EventingTestBean2.java @@ -2,17 +2,21 @@ package net.engio.mbassy.listeners; import net.engio.mbassy.events.SubTestMessage; import net.engio.mbassy.listener.Handler; -import net.engio.mbassy.listener.Mode; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.listener.Listener; +import net.engio.mbassy.listener.References; /** * @author bennidi * Date: 11/22/12 */ +@Listener(references = References.Strong) public class EventingTestBean2 extends EventingTestBean{ // redefine the configuration for this handler - @Handler(delivery = Mode.Sequential) + @Handler(delivery = Invoke.Synchronously) public void handleSubTestEvent(SubTestMessage message) { super.handleSubTestEvent(message); } + } diff --git a/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java b/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java index 3df3c5f..aa50901 100644 --- a/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java +++ b/src/test/java/net/engio/mbassy/listeners/EventingTestBean3.java @@ -2,17 +2,20 @@ package net.engio.mbassy.listeners; import net.engio.mbassy.events.SubTestMessage; import net.engio.mbassy.listener.Handler; -import net.engio.mbassy.listener.Mode; +import net.engio.mbassy.listener.Invoke; +import net.engio.mbassy.listener.Listener; +import net.engio.mbassy.listener.References; /** * @author bennidi * Date: 11/22/12 */ +@Listener(references = References.Strong) public class EventingTestBean3 extends EventingTestBean2{ // this handler will be invoked asynchronously - @Handler(priority = 0, delivery = Mode.Sequential) + @Handler(priority = 0, delivery = Invoke.Synchronously) public void handleSubTestEventAgain(SubTestMessage message) { message.counter.incrementAndGet(); } diff --git a/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java b/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java index b6d7478..446577e 100644 --- a/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java +++ b/src/test/java/net/engio/mbassy/listeners/MultiEventHandler.java @@ -2,11 +2,8 @@ package net.engio.mbassy.listeners; import net.engio.mbassy.events.TestMessage; import net.engio.mbassy.events.TestMessage2; -import net.engio.mbassy.listener.Enveloped; -import net.engio.mbassy.listener.Filter; -import net.engio.mbassy.listener.Filters; -import net.engio.mbassy.listener.Handler; -import net.engio.mbassy.listener.Mode; +import net.engio.mbassy.listener.*; +import net.engio.mbassy.listener.Invoke; import net.engio.mbassy.subscription.MessageEnvelope; /** @@ -18,7 +15,7 @@ import net.engio.mbassy.subscription.MessageEnvelope; public class MultiEventHandler { - @Handler(delivery = Mode.Sequential) + @Handler(delivery = Invoke.Synchronously) @Enveloped(messages = {TestMessage.class, TestMessage2.class}) public void handleEvents(MessageEnvelope envelope) { if(TestMessage.class.isAssignableFrom(envelope.getMessage().getClass())){ @@ -31,7 +28,7 @@ public class MultiEventHandler { } } - @Handler(delivery = Mode.Sequential, filters = @Filter(Filters.RejectSubtypes.class)) + @Handler(delivery = Invoke.Synchronously, filters = @Filter(Filters.RejectSubtypes.class)) @Enveloped(messages = {TestMessage.class, TestMessage2.class}) public void handleSuperTypeEvents(MessageEnvelope envelope) { if(TestMessage.class.isAssignableFrom(envelope.getMessage().getClass())){ diff --git a/testNTimes.sh b/testNTimes.sh new file mode 100755 index 0000000..35c8b12 --- /dev/null +++ b/testNTimes.sh @@ -0,0 +1,12 @@ +#!/bin/bash +for (( i = 1; i < $1 ; i++ )) +do + echo "Attempt $i" + mvn test -o -Dtest=$2 + exitcode=$? + if [ $exitcode -ne 0 ] + then + echo "Error at attempt $i" + exit + fi +done