Release 1.1.4

This commit is contained in:
benni 2013-04-01 15:13:52 +02:00
parent f52fc0e681
commit 12a3e778e9
114 changed files with 1256 additions and 1242 deletions

View File

@ -8,7 +8,7 @@ data structure to allow high throughput for concurrent access.
Read this documentation to get an overview of its features. You can also check out the <a href="http://codeblock.engio.net/?p=37" target="_blank">performance comparison</a>
which also contains a partial list of the features of the compared implementations.
The current version is 1.1.3 and it is available from the Maven Central Repository. See the release notes for more details.
The current version is 1.1.4 and it is available from the Maven Central Repository. See the release notes for more details.
Table of contents:
+ [Features](#features)
@ -33,10 +33,11 @@ This means that listeners will also receivesubtypes of the message type they are
+ <em><strong>Synchronous and asynchronous message delivery</em></strong>: A handler can be invoked to handle a message either synchronously or
asynchronously. This is configurable for each handler via annotations. Message publication itself supports synchronous (method
blocks until messages are delivered to all handlers) or asynchronous (fire and forget) dispatch
+ <em><strong>Weak references</em></strong>: MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister
+ <em><strong>Weak references</em></strong>: By default, MBassador uses weak references to all listening objects to relieve the programmer of the burden to explicitly unregister
listeners that are not used anymore (of course it is also possible to explicitly unregister a listener if needed). This is very comfortable
in certain environments where listeners are managed by frameworks, i.e. spring, guice etc. Just stuff everything into the message bus, it will
ignore objects without message handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.
+ <em><strong>Strong references</em></strong>: Instead of using weak references, the bus can be configured to use strong references.
+ <em><strong>Filtering</em></strong>: MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to
a single message handler
+ <em><strong>Message envelopes</em></strong>: Message handlers can declare to receive an enveloped message. The envelope can wrap different
@ -58,7 +59,7 @@ sending messages to your listeners using one of MBassador's publication methods
<h2>Usage</h2>
Listener definition (in any bean):
Handler definition (in any bean):
// every message of type TestMessage or any subtype will be delivered
// to this handler
@ -67,8 +68,15 @@ Listener definition (in any bean):
// do something
}
// this handler will be invoked concurrently
@Handler(delivery = Mode.Concurrent)
// every message of type TestMessage or any subtype will be delivered
// to this handler
@Handler
public void handleTestMessageStrong(TestMessage message) {
// do something
}
// this handler will be invoked asynchronously (in a different thread)
@Handler(delivery = Invoke.Asynchronously)
public void handleSubTestMessage(SubTestMessage message) {
// do something more expensive here
}
@ -90,6 +98,15 @@ Listener definition (in any bean):
}
// configure a listener to be stored using strong instead of weak references
@Listener(references = References.Strong)
public class MessageListener{
// any handler definitions
}
Creation of message bus and registration of listeners:
// create as many instances as necessary
@ -113,22 +130,8 @@ Message publication:
bus.post(subMessage).now(); // same as above
<h2>Installation</h2>
Beginning with version 1.1.0 MBassador is available from the Maven Central Repository (Hooray!). Older versions are
still available from the included maven repository in this github repo but will be deleted in the future.
The recommended way of using MBassador in your project is to add the dependency as shown in step two. Step one is only necessary
if you want to use an older version that is not available in the central repository.
Beginning with version 1.1.0 MBassador is available from the Maven Central Repository using the following coordinates:
1. Add the repository location to your pom.xml
<pre><code class="xml">
&lt;repositories&gt;
&lt;repository&gt;
&lt;id&gt;mbassador-github-repo&lt;/id&gt;
&lt;url&gt;https://raw.github.com/bennidi/mbassador/master/maven &lt;/url&gt;
&lt;/repository&gt;
&lt;/repositories&gt;
</pre></code>
2. Add the MBassador dependency to your pom.xml. You can check which versions are available by browsing
the git repository online.
<pre><code class="xml">
&lt;dependency&gt;
&lt;groupId&gt;net.engio&lt;/groupId&gt;
@ -136,7 +139,6 @@ if you want to use an older version that is not available in the central reposit
&lt;version&gt;1.1.0&lt;/version&gt;
&lt;/dependency&gt;
</pre></code>
3. Run mvn clean package to have maven download and install the required version into your local repository
Of course you can always clone the repository and build from source.
@ -147,6 +149,18 @@ to avoid confusion and misunderstanding.
<h2>Release Notes</h2>
<h3>1.1.4</h3>
+ Added support for choosing between strong and weak references using the new @Listener annotation. @Listener can be
added to any class that defines message handlers and allows to configure which reference type is used
+ Custom handler invocations: It is possible to provide a custom handler invocation for each message handler, see "invocation"
property of @Handler
+ Changed packaging to "bundle" to support OSGI environments
+ Synchronization of message handlers via @Synchronized: Handlers that are not thread-safe can be synchronized to guarantee
that only one thread at a time can invoke that handler
+ Created a message bus implementation that does not use threading to support use in non-multi-threaded environments like GWT,
see ISyncMessageBus
<h3>1.1.3</h3>
+ Added support for FilteredMessage event
@ -179,7 +193,7 @@ First stable release!
+ Fixed behaviour with capacity bound blocking queue such that there now are two methods to schedule a message
asynchronously. One will block until capacity becomes available, the other will timeout after a specified amount of
time.
+ Added unit tests
+ Additional unit tests
<h3>1.0.5.RC</h3>

View File

@ -1 +0,0 @@
d53edd50a68307a4c209fd21fa0625d7

View File

@ -1 +0,0 @@
f2ed26a15178d83c80212ccce73584e97e7d6fb8

View File

@ -1,58 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.0.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1 +0,0 @@
760688c8fb33fc90918146e68dab2646

View File

@ -1 +0,0 @@
0abf762df11d85841f8f8b7f28361ff19549f019

View File

@ -1 +0,0 @@
933328ff6220f5910770b9b5f13f2eeb

View File

@ -1 +0,0 @@
003aa85106cfbab5ce618ffd1a6fe36cd0e37b71

View File

@ -1,58 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.1.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1 +0,0 @@
5959d236fd411d499f3b46ecedc28a84

View File

@ -1 +0,0 @@
5203632e409974a234ff8ce0ea88c5897a8cbffe

View File

@ -1 +0,0 @@
5ad621adc6addc2292ee1fc7a2bd756d

View File

@ -1 +0,0 @@
258a17785e15390e868efe2a51d5370aeb602129

View File

@ -1,58 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.2.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1 +0,0 @@
f7972cec0f8ebde2a2a17a678ef36eb6

View File

@ -1 +0,0 @@
f8c24fe42f192dbdd1f5d2b29bff0220d40039dd

View File

@ -1 +0,0 @@
2bcfd233f83cb6fc4241c91a99f74030

View File

@ -1 +0,0 @@
20df67988104be77a9c3e2edbb8ba615a664671b

View File

@ -1,58 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.3.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1 +0,0 @@
d766622aea3238ee52a60920313274d1

View File

@ -1 +0,0 @@
55c27ec10ee4067843f05fb735a2af95bc2c06cf

View File

@ -1 +0,0 @@
2dcb34daf560ac4b9a87a95924aa7be6

View File

@ -1 +0,0 @@
275d892046554f6a86226d813202c5ab8a22cf92

View File

@ -1,58 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.4.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1 +0,0 @@
75dda79a70a43a84877c3d62fdef401d

View File

@ -1 +0,0 @@
ed21bd9587e9ee76e63b31ede3cbb6eedb04fa70

View File

@ -1 +0,0 @@
3ad14134e9752e3a073c75ab296427ef

View File

@ -1 +0,0 @@
bedef44bb92cbfafcba48624c91e185d692ea39d

View File

@ -1,58 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.5.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1 +0,0 @@
0a781a5e9f22e5dafeb7f48fff65e46d

View File

@ -1 +0,0 @@
4378b82fa04f4c31f21321424d0c4c328905d3ad

View File

@ -1 +0,0 @@
dac16b8c129ee38d08e63d4cd487bdc9

View File

@ -1 +0,0 @@
110fd15551d0a40fafd46cc4e66c590e4d1b2fc7

View File

@ -1,58 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.6.RC</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>Mbassador is a fast and flexible message bus system that follows the publish subscribe pattern
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1 +0,0 @@
ebf2e22bfe53d858092befaa865d0bf4

View File

@ -1 +0,0 @@
950ca7e831a9060060f943ca02d313f7c7f5ccbf

View File

@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<metadata>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.7.RC-SNAPSHOT</version>
<versioning>
<snapshot>
<timestamp>20121229.133808</timestamp>
<buildNumber>1</buildNumber>
</snapshot>
<lastUpdated>20121229133808</lastUpdated>
</versioning>
</metadata>

View File

@ -1 +0,0 @@
732608242394f6b316a7cf631208b635

View File

@ -1 +0,0 @@
2cae8933661f8cf18afc2c923436c2457e0b3809

View File

@ -1 +0,0 @@
13de5221308cf546ff55aa6b5a35ec1f78ecf691

View File

@ -1 +0,0 @@
79eb0716f839f2b63cf5d3d9796800967d54fdb5

View File

@ -1 +0,0 @@
21ac5231ee676ba2461e31392e210b54

View File

@ -1 +0,0 @@
9cfcc0fe9534d2a530b48445331d451b97ca3342

View File

@ -1,152 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.7.RC-SNAPSHOT</version>
<packaging>jar</packaging>
<name>mbassador</name>
<description>
Mbassador is a fast and flexible message bus system following the publish subscribe pattern.
It is designed for ease of use and aims to be feature rich and extensible
while preserving resource efficiency and performance.
It features:
declarative listener definition via annotations,
sync and/or async message delivery,
weak-references,
message filtering,
ordering of message handlers etc.
</description>
<url>https://github.com/bennidi/mbassador</url>
<licenses>
<license>
<name>MIT license</name>
<url>http://www.opensource.org/licenses/mit-license.php</url>
</license>
</licenses>
<scm>
<url>scm:git:git@github.com:bennidi/mbassador.git</url>
<connection>scm:git:git@github.com:bennidi/mbassador.git</connection>
</scm>
<developers>
<developer>
<id>bennidi</id>
<name>Benjamin Diedrichsen</name>
<timezone>+1</timezone>
</developer>
</developers>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.java.version>1.6</project.build.java.version>
<github.url>file://${project.basedir}/maven</github.url>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>mbassador-github-repo</id>
<url>${github.url}</url>
</repository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${project.build.java.version}</source>
<target>${project.build.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
</plugin>
<!-- bind the source attaching to package phase -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release-sign-artifacts</id>
<activation>
<property>
<name>performRelease</name>
<value>true</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.4</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -1 +0,0 @@
05e16f4d46db6bc39529ebe889ff552d

View File

@ -1 +0,0 @@
5e078c7f168816392ca34e41cbc8f58c81b8c5d9

View File

@ -1,19 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<metadata>
<groupId>org.mbassy</groupId>
<artifactId>mbassador</artifactId>
<version>1.0.0.RC</version>
<versioning>
<versions>
<version>1.0.0.RC</version>
<version>1.0.1.RC</version>
<version>1.0.2.RC</version>
<version>1.0.3.RC</version>
<version>1.0.4.RC</version>
<version>1.0.5.RC</version>
<version>1.0.6.RC</version>
<version>1.0.7.RC-SNAPSHOT</version>
</versions>
<lastUpdated>20121229133808</lastUpdated>
</versioning>
</metadata>

View File

@ -1 +0,0 @@
46231d9bfb55280deeea69a9e18ed52b

View File

@ -1 +0,0 @@
3608eef0e3183c20460c8b1839c6a3d5740dd63e

View File

@ -18,9 +18,10 @@ The basic contract of the bus is that it will deliver a specific message exactly
Currently, message handlers will be invoked in inverse sequence of subscription but any
client using this bus should not rely on this assumption.
The bus uses weak references to all listeners such that registered listeners do not need to
By default, the bus uses weak references to all listeners such that registered listeners do not need to
be explicitly unregistered to be eligible for garbage collection. Dead (garbage collected) listeners are
removed on-the-fly as messages get published.
removed on-the-fly as messages get published. It is possible to enable the use of strong references on the message handler
level.
Unsubscribing a listener means removing all subscribed message handlers of that listener. This remove operation
immediately effects all running publications processes -> A removed listener will under no circumstances receive any message publications.

View File

@ -10,10 +10,17 @@ filters, delivery modes etc.
<tr>
<td>delivery</td>
<td>Message delivery can either run sequentially(i.e. one listener at a time) or concurrently
(i.e. multiple threads are used to deliver the same message to different listeners).
Note:The number of parallel threads is configurable per instance using the BusConfiguration</td>
<td>Sequential</td>
<td>Message handler invocation can be configured to run
<ul>
<li>Synchronously: One handler at a time within a given message publication. Each invocation occurs from the same thread</li>
<li>Asynchronously: Multiple threads are used within a given message publication. Each handler invocation
runs in a separate thread.Note:The number of parallel threads is configurable per instance using the BusConfiguration</li>
</ul>
Note: Use @Synchronized if your handler does not allow multiple, concurrent message publications, i.e.
handlers that are not thread-safe but are used in a multi-threaded environment where asynchronous message publication
is possible.
</td>
<td>Synchronously</td>
</tr>
<tr>
@ -42,6 +49,21 @@ filters, delivery modes etc.
<td>true</td>
</tr>
<tr>
<td>strongReferencess</td>
<td>Whether the bus should use storng references to the listeners instead of weak references
</td>
<td>false</td>
</tr>
<tr>
<td>invocation</td>
<td>Specify a custom implementation for the handler invocation. By default, a generic implementation
that uses reflection will be used. Note: A custom implementation will not be faster than the generic one
since there are heavy optimizations by the JVM using JIT-Compiler and more.
</td>
<td>false</td>
</tr>
</table>
@ -59,22 +81,22 @@ receive all messages of type TestEvent or any subtype sequentially.
This handler will receive all messages of type SubTestEvent or any subtype concurrently
This handler will receive all messages of type SubTestEvent or any subtype
// this handler will be invoked concurrently
@Handler(delivery = Mode.Concurrent)
// handler invocation will occur in a different thread
@Handler(delivery = Invoke.Asynchronously)
public void handleSubTestEvent(SubTestEvent event) {
// do something more expensive here
}
This handler will receive all messages of type SubTestEvent or any subtype sequentially,
This handler will receive all messages of type SubTestEvent or any subtype,
given that they pass the specified filters. This handler will be invoked before the formerly
defined one, since it specifies a higher priority
// this handler will receive messages of type SubTestEvent
// or any of its sub types that passe the given filter(s)
@Handler(priority = 10,
dispatch = Mode.Synchronous,
dispatch = Invoke.Synchronously,
filters = {@Filter(Filters.SpecialEvent.class)})
public void handleFiltered(SubTestEvent event) {
//do something special here
@ -112,8 +134,8 @@ Message handler inheritance corresponds to inheritance of methods as defined in
A subclass of any class that defines message handlers will inherit these handler and their configuration.
It is possible to change (override) the configuration simply by overriding the super class' method and
specifying a different configuration. This way, it is also possible to deactivate a message handler of
a super class by using the "enabled" property on the overridden method.
If a class overrides a method that is configured as a message handler in one of its super classes
a super class by setting the "enabled" property to "false" on the overridden method.
If a class overrides a method that is already configured as a message handler
it is still considered a message handler but of course the implementation of the overriding class
will be used.

View File

@ -0,0 +1,29 @@
package net.engio.mbassy;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public class MessageBusException extends Exception{
public MessageBusException() {
}
public MessageBusException(String message) {
super(message);
}
public MessageBusException(String message, Throwable cause) {
super(message, cause);
}
public MessageBusException(Throwable cause) {
super(cause);
}
public MessageBusException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,100 @@
package net.engio.mbassy.bus;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* The base class for all message bus implementations.
*
* @param <T>
* @param <P>
*/
public abstract class AbstractSyncAsyncMessageBus<T, P extends IMessageBus.IPostCommand> extends AbstractSyncMessageBus<T, P> implements IMessageBus<T, P> {
// executor for asynchronous message handlers
private final ExecutorService executor;
// all threads that are available for asynchronous message dispatching
private final List<Thread> dispatchers;
// all pending messages scheduled for asynchronous dispatch are queued here
private final BlockingQueue<MessagePublication> pendingMessages;
public AbstractSyncAsyncMessageBus(BusConfiguration configuration) {
super(configuration);
this.executor = configuration.getExecutor();
pendingMessages = new LinkedBlockingQueue<MessagePublication>(configuration.getMaximumNumberOfPendingMessages());
dispatchers = new ArrayList<Thread>(configuration.getNumberOfMessageDispatchers());
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
}
// initialize the dispatch workers
private void initDispatcherThreads(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming
//dispatch requests
Thread dispatcher = new Thread(new Runnable() {
public void run() {
while (true) {
try {
pendingMessages.take().execute();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
});
dispatcher.setDaemon(true); // do not prevent the JVM from exiting
dispatchers.add(dispatcher);
dispatcher.start();
}
}
// this method enqueues a message delivery request
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) {
try {
pendingMessages.put(request);
return request.markScheduled();
} catch (InterruptedException e) {
return request.setError();
}
}
// this method enqueues a message delivery request
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) {
try {
return pendingMessages.offer(request, timeout, unit)
? request.markScheduled()
: request.setError();
} catch (InterruptedException e) {
return request.setError();
}
}
@Override
protected void finalize() throws Throwable {
shutdown();
super.finalize();
}
public void shutdown() {
for (Thread dispatcher : dispatchers) {
dispatcher.interrupt();
}
executor.shutdown();
}
public boolean hasPendingMessages() {
return pendingMessages.size() > 0;
}
@Override
public Executor getExecutor() {
return executor;
}
}

View File

@ -9,22 +9,8 @@ import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionContext;
import net.engio.mbassy.subscription.SubscriptionFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
/**
* The base class for all message bus implementations.
@ -32,10 +18,8 @@ import java.util.concurrent.TimeUnit;
* @param <T>
* @param <P>
*/
public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand> implements IMessageBus<T, P> {
public abstract class AbstractSyncMessageBus<T, P extends ISyncMessageBus.ISyncPostCommand> implements ISyncMessageBus<T, P> {
// executor for asynchronous listeners using unbound queuing strategy to ensure that no events get lost
private final ExecutorService executor;
// the metadata reader that is used to parse objects passed to the subscribe method
private final MetadataReader metadataReader;
@ -58,12 +42,6 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new CopyOnWriteArrayList<IPublicationErrorHandler>();
// all threads that are available for asynchronous message dispatching
private final List<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();
// all pending messages scheduled for asynchronous dispatch are queued here
private final BlockingQueue<MessagePublication> pendingMessages;
// this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method
private final SubscriptionFactory subscriptionFactory;
@ -71,40 +49,13 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
private final MessagePublication.Factory publicationFactory;
public AbstractMessageBus(BusConfiguration configuration) {
this.executor = configuration.getExecutor();
public AbstractSyncMessageBus(SyncBusConfiguration configuration) {
subscriptionFactory = configuration.getSubscriptionFactory();
this.metadataReader = configuration.getMetadataReader();
this.publicationFactory = configuration.getMessagePublicationFactory();
pendingMessages = new LinkedBlockingQueue<MessagePublication>(configuration.getMaximumNumberOfPendingMessages());
initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
}
// initialize the dispatch workers
private void initDispatcherThreads(int numberOfThreads) {
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming
//dispatch requests
Thread dispatcher = new Thread(new Runnable() {
public void run() {
while (true) {
try {
pendingMessages.take().execute();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
});
dispatcher.setDaemon(true); // do not prevent the JVM from exiting
dispatchers.add(dispatcher);
dispatcher.start();
}
}
protected MessagePublication.Factory getPublicationFactory() {
return publicationFactory;
}
@ -179,26 +130,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
errorHandlers.add(handler);
}
// this method enqueues a message delivery request
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request) {
try {
pendingMessages.put(request);
return request.markScheduled();
} catch (InterruptedException e) {
return request.setError();
}
}
// this method enqueues a message delivery request
protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit) {
try {
return pendingMessages.offer(request, timeout, unit)
? request.markScheduled()
: request.setError();
} catch (InterruptedException e) {
return request.setError();
}
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
@ -241,26 +173,4 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
}
}
@Override
protected void finalize() throws Throwable {
shutdown();
super.finalize();
}
private void shutdown() {
for (Thread dispatcher : dispatchers) {
dispatcher.interrupt();
}
executor.shutdown();
}
public boolean hasPendingMessages() {
return pendingMessages.size() > 0;
}
@Override
public Executor getExecutor() {
return executor;
}
}

View File

@ -1,14 +1,6 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.SubscriptionFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* The bus configuration holds various parameters that can be used to customize the bus' runtime behaviour.
@ -16,7 +8,7 @@ import java.util.concurrent.TimeUnit;
* @author bennidi
* Date: 12/8/12
*/
public class BusConfiguration {
public class BusConfiguration extends SyncBusConfiguration<BusConfiguration> {
private static final ThreadFactory DaemonThreadFactory = new ThreadFactory() {
@Override
@ -37,36 +29,11 @@ public class BusConfiguration {
private int maximumNumberOfPendingMessages;
private SubscriptionFactory subscriptionFactory;
private MetadataReader metadataReader;
private MessagePublication.Factory messagePublicationFactory;
public BusConfiguration() {
super();
this.numberOfMessageDispatchers = 2;
this.maximumNumberOfPendingMessages = Integer.MAX_VALUE;
this.subscriptionFactory = new SubscriptionFactory();
this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);
this.metadataReader = new MetadataReader();
this.messagePublicationFactory = new MessagePublication.Factory();
}
public MessagePublication.Factory getMessagePublicationFactory() {
return messagePublicationFactory;
}
public void setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) {
this.messagePublicationFactory = messagePublicationFactory;
}
public MetadataReader getMetadataReader() {
return metadataReader;
}
public BusConfiguration setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
return this;
}
public int getNumberOfMessageDispatchers() {
@ -78,6 +45,10 @@ public class BusConfiguration {
return this;
}
/**
* By default an unbound queuing strategy is used to ensure that no events get lost
* @return
*/
public ExecutorService getExecutor() {
return executor;
}
@ -98,12 +69,4 @@ public class BusConfiguration {
return this;
}
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public BusConfiguration setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return this;
}
}

View File

@ -1,8 +1,5 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -47,56 +44,10 @@ import java.util.concurrent.TimeUnit;
* @Author bennidi
* Date: 2/8/12
*/
public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
public interface IMessageBus<T, P extends IMessageBus.IPostCommand> extends ISyncMessageBus<T,P> {
/**
* Subscribe all listeners of the given message to receive message publications.
* Any message may only be subscribed once (subsequent subscriptions of an already subscribed
* message will be silently ignored)
*
* @param listener
*/
void subscribe(Object listener);
/**
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the message listener was still subscribed).
* <p/>
* A call to this method passing null, an already unsubscribed listener or any object that does not define any message
* handlers will not have any effect and is silently ignored.
*
* @param listener
* @return true, if the listener was found and successfully removed
* false otherwise
*/
boolean unsubscribe(Object listener);
/**
* @param message
* @return
*/
P post(T message);
/**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
* may not be accessible due to security constraints or is not annotated properly.
* In any of all possible cases a publication error is created and passed to each of the registered error handlers.
* A call to this method will add the given error handler to the chain
*
* @param errorHandler
*/
void addErrorHandler(IPublicationErrorHandler errorHandler);
/**
* Returns an immutable collection containing all the registered error handlers
*
* @return
*/
Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
/**
* Get the executor service that is used to asynchronous message publication.
* Get the executor service that is used for asynchronous message publications.
* The executor is passed to the message bus at creation time.
*
* @return
@ -111,17 +62,20 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
boolean hasPendingMessages();
/**
* A post command is used as an intermediate object created by a call to the message bus' post method.
* It encapsulates the functionality provided by the message bus that created the command.
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
* Shutdown the bus such that it will stop delivering asynchronous messages. Executor service and
* other internally used threads will be shutdown gracefully. After calling shutdown it is not safe
* to further use the message bus.
*/
interface IPostCommand<T> {
void shutdown();
/**
* Execute the message publication immediately. This call blocks until every matching message handler
* has been invoked.
*/
void now();
/**
* @param message
* @return
*/
P post(T message);
interface IPostCommand extends ISyncPostCommand {
/**
* Execute the message publication asynchronously. The behaviour of this method depends on the
@ -146,4 +100,5 @@ public interface IMessageBus<T, P extends IMessageBus.IPostCommand> {
*/
MessagePublication asynchronously(long timeout, TimeUnit unit);
}
}

View File

@ -0,0 +1,77 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.IPublicationErrorHandler;
import java.util.Collection;
/**
*
*
* @author bennidi
* Date: 3/29/13
*/
public interface ISyncMessageBus<T, P extends IMessageBus.ISyncPostCommand> {
/**
* Subscribe all listeners of the given message to receive message publications.
* Any message may only be subscribed once (subsequent subscriptions of an already subscribed
* message will be silently ignored)
*
* @param listener
*/
void subscribe(Object listener);
/**
* Immediately remove all registered message handlers (if any) of the given listener. When this call returns all handlers
* have effectively been removed and will not receive any message publications (including asynchronously scheduled
* publications that have been published when the message listener was still subscribed).
* <p/>
* A call to this method passing null, an already unsubscribed listener or any object that does not define any message
* handlers will not have any effect and is silently ignored.
*
* @param listener
* @return true, if the listener was found and successfully removed
* false otherwise
*/
boolean unsubscribe(Object listener);
/**
* @param message
* @return
*/
P post(T message);
/**
* Publication errors may occur at various points of time during message delivery. A handler may throw an exception,
* may not be accessible due to security constraints or is not annotated properly.
* In any of all possible cases a publication error is created and passed to each of the registered error handlers.
* A call to this method will add the given error handler to the chain
*
* @param errorHandler
*/
void addErrorHandler(IPublicationErrorHandler errorHandler);
/**
* Returns an immutable collection containing all the registered error handlers
*
* @return
*/
Collection<IPublicationErrorHandler> getRegisteredErrorHandlers();
/**
* A post command is used as an intermediate object created by a call to the message bus' post method.
* It encapsulates the functionality provided by the message bus that created the command.
* Subclasses may extend this interface and add functionality, e.g. different dispatch schemes.
*/
interface ISyncPostCommand {
/**
* Execute the message publication immediately. This call blocks until every matching message handler
* has been invoked.
*/
void now();
}
}

View File

@ -8,7 +8,7 @@ import java.util.Collection;
import java.util.concurrent.TimeUnit;
public class MBassador<T> extends AbstractMessageBus<T, SyncAsyncPostCommand<T>> {
public class MBassador<T> extends AbstractSyncAsyncMessageBus<T, SyncAsyncPostCommand<T>> {
public MBassador(BusConfiguration configuration) {
super(configuration);

View File

@ -0,0 +1,49 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.listener.MetadataReader;
import net.engio.mbassy.subscription.SubscriptionFactory;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public class SyncBusConfiguration<Config extends SyncBusConfiguration<Config>> {
protected SubscriptionFactory subscriptionFactory;
protected MetadataReader metadataReader;
protected MessagePublication.Factory messagePublicationFactory;
public SyncBusConfiguration() {
this.metadataReader = new MetadataReader();
this.subscriptionFactory = new SubscriptionFactory();
this.messagePublicationFactory = new MessagePublication.Factory();
}
public MessagePublication.Factory getMessagePublicationFactory() {
return messagePublicationFactory;
}
public void setMessagePublicationFactory(MessagePublication.Factory messagePublicationFactory) {
this.messagePublicationFactory = messagePublicationFactory;
}
public MetadataReader getMetadataReader() {
return metadataReader;
}
public Config setMetadataReader(MetadataReader metadataReader) {
this.metadataReader = metadataReader;
return (Config)this;
}
public SubscriptionFactory getSubscriptionFactory() {
return subscriptionFactory;
}
public Config setSubscriptionFactory(SubscriptionFactory subscriptionFactory) {
this.subscriptionFactory = subscriptionFactory;
return (Config)this;
}
}

View File

@ -0,0 +1,134 @@
package net.engio.mbassy.common;
import java.util.Map;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
* Running read iterators will not be affected by add operations since writes always insert at the head of the
* structure. Remove operations can affect any running iterator such that a removed element that has not yet
* been reached by the iterator will not appear in that iterator anymore.
*
* @author bennidi
* Date: 2/12/12
*/
public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
// Internal state
private final Object lock = new Object();
private final Map<T, ISetEntry<T>> entries; // maintain a map of entries for O(log n) lookup
protected Entry<T> head; // reference to the first element
protected AbstractConcurrentSet(Map<T, ISetEntry<T>> entries) {
this.entries = entries;
}
protected abstract Entry<T> createEntry(T value, Entry<T> next);
@Override
public IConcurrentSet<T> add(T element) {
if (element == null || entries.containsKey(element)) {
return this;
}
synchronized (lock) {
insert(element);
}
return this;
}
@Override
public boolean contains(T element) {
ISetEntry<T> entry = entries.get(element);
return entry != null && entry.getValue() != null;
}
private void insert(T element) {
if (entries.containsKey(element)) {
return;
}
head = createEntry(element, head);
entries.put(element, head);
}
@Override
public int size() {
return entries.size();
}
@Override
public IConcurrentSet<T> addAll(Iterable<T> elements) {
synchronized (lock) {
for (T element : elements) {
if (element == null || entries.containsKey(element)) {
return this;
}
insert(element);
}
}
return this;
}
@Override
public boolean remove(T element) {
if (!entries.containsKey(element)) {
return false;
}
synchronized (lock) {
ISetEntry<T> listelement = entries.get(element);
if (listelement == null) {
return false; //removed by other thread
}
if (listelement != head) {
listelement.remove();
} else {
ISetEntry<T> oldHead = head;
head = head.next();
oldHead.clear(); // optimize for GC
}
entries.remove(element);
}
return true;
}
public abstract static class Entry<T> implements ISetEntry<T> {
private Entry<T> next;
private Entry<T> predecessor;
protected Entry(Entry<T> next) {
this.next = next;
next.predecessor = this;
}
protected Entry() {
}
// not thread-safe! must be synchronized in enclosing context
@Override
public void remove() {
if (predecessor != null) {
predecessor.next = next;
if (next != null) {
next.predecessor = predecessor;
}
} else if (next != null) {
next.predecessor = null;
}
next = null;
predecessor = null;
}
@Override
public Entry<T> next() {
return next;
}
@Override
public void clear() {
next = null;
}
}
}

View File

@ -1,183 +0,0 @@
package net.engio.mbassy.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
/**
* This data structure is optimized for non-blocking reads even when write operations occur.
* Running read iterators will not be affected by add operations since writes always insert at the head of the
* structure. Remove operations can affect any running iterator such that a removed element that has not yet
* been reached by the iterator will not appear in that iterator anymore.
* <p/>
* The structure uses weak references to the elements. Iterators automatically perform cleanups of
* garbage collected objects during iteration -> no dedicated maintenance operations need to be called or run in background.
* <p/>
* <p/>
* <p/>
*
* @author bennidi
* Date: 2/12/12
*/
public class ConcurrentSet<T> implements Iterable<T> {
// Internal state
private final Object lock = new Object();
private WeakHashMap<T, Entry<T>> entries = new WeakHashMap<T, Entry<T>>(); // maintain a map of entries for O(log n) lookup
private Entry<T> head; // reference to the first element
public ConcurrentSet<T> add(T element) {
if (element == null || entries.containsKey(element)) {
return this;
}
synchronized (lock) {
insert(element);
}
return this;
}
public boolean contains(T element) {
Entry<T> entry = entries.get(element);
return entry != null && entry.getValue() != null;
}
private void insert(T element) {
if (entries.containsKey(element)) {
return;
}
if (head == null) {
head = new Entry<T>(element);
} else {
head = new Entry<T>(element, head);
}
entries.put(element, head);
}
public int size() {
return entries.size();
}
public ConcurrentSet<T> addAll(Iterable<T> elements) {
synchronized (lock) {
for (T element : elements) {
if (element == null || entries.containsKey(element)) {
return this;
}
insert(element);
}
}
return this;
}
public boolean remove(T element) {
if (!entries.containsKey(element)) {
return false;
}
synchronized (lock) {
Entry<T> listelement = entries.get(element);
if (listelement == null) {
return false; //removed by other thread
}
if (listelement != head) {
listelement.remove();
} else {
Entry<T> oldHead = head;
head = head.next();
oldHead.next = null; // optimize for GC
}
entries.remove(element);
}
return true;
}
public Iterator<T> iterator() {
return new Iterator<T>() {
private Entry<T> current = head;
public boolean hasNext() {
if (current == null) return false;
if (current.getValue() == null) { // auto-removal of orphan references
do {
remove();
} while(current != null && current.getValue() == null);
return hasNext();
} else {
return true;
}
}
public T next() {
if (current == null) {
return null;
}
T value = current.getValue();
if (value == null) { // auto-removal of orphan references
do {
remove();
} while(current != null && current.getValue() == null);
return next();
} else {
current = current.next();
return value;
}
}
public void remove() {
if (current == null) {
return;
}
Entry<T> newCurrent = current.next();
ConcurrentSet.this.remove(current.getValue());
current = newCurrent;
}
};
}
public class Entry<T> {
private WeakReference<T> value;
private Entry<T> next;
private Entry<T> predecessor;
private Entry(T value) {
this.value = new WeakReference<T>(value);
}
private Entry(T value, Entry<T> next) {
this(value);
this.next = next;
next.predecessor = this;
}
public T getValue() {
return value.get();
}
// not thread-safe! must be synchronized in enclosing context
public void remove() {
if (predecessor != null) {
predecessor.next = next;
if (next != null) {
next.predecessor = predecessor;
}
} else if (next != null) {
next.predecessor = null;
}
next = null;
predecessor = null;
}
public Entry<T> next() {
return next;
}
}
}

View File

@ -0,0 +1,20 @@
package net.engio.mbassy.common;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public interface IConcurrentSet<T> extends Iterable<T> {
IConcurrentSet<T> add(T element);
boolean contains(T element);
int size();
IConcurrentSet<T> addAll(Iterable<T> elements);
boolean remove(T element);
}

View File

@ -0,0 +1,19 @@
package net.engio.mbassy.common;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public interface ISetEntry<T> {
T getValue();
// not thread-safe! must be synchronized in enclosing context
void remove();
ISetEntry<T> next();
void clear();
}

View File

@ -0,0 +1,84 @@
package net.engio.mbassy.common;
import java.util.HashMap;
import java.util.Iterator;
/**
* This implementation uses weak references to the elements. Iterators automatically perform cleanups of
* garbage collected objects during iteration -> no dedicated maintenance operations need to be called or run in background.
* <p/>
* <p/>
* <p/>
*
* @author bennidi
* Date: 2/12/12
*/
public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T>{
public StrongConcurrentSet() {
super(new HashMap<T, ISetEntry<T>>());
}
public Iterator<T> iterator() {
return new Iterator<T>() {
private ISetEntry<T> current = head;
public boolean hasNext() {
return current != null;
}
public T next() {
if (current == null) {
return null;
}
else {
T value = current.getValue();
current = current.next();
return value;
}
}
public void remove() {
if (current == null) {
return;
}
ISetEntry<T> newCurrent = current.next();
StrongConcurrentSet.this.remove(current.getValue());
current = newCurrent;
}
};
}
@Override
protected Entry<T> createEntry(T value, Entry<T> next) {
return next != null ? new StrongEntry<T>(value, next) : new StrongEntry<T>(value);
}
public static class StrongEntry<T> extends Entry<T> {
private T value;
private StrongEntry(T value, Entry<T> next) {
super(next);
this.value = value;
}
private StrongEntry(T value) {
super();
this.value = value;
}
@Override
public T getValue() {
return value;
}
}
}

View File

@ -0,0 +1,98 @@
package net.engio.mbassy.common;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.WeakHashMap;
/**
* This implementation uses weak references to the elements. Iterators automatically perform cleanups of
* garbage collected objects during iteration -> no dedicated maintenance operations need to be called or run in background.
* <p/>
* <p/>
* <p/>
*
* @author bennidi
* Date: 2/12/12
*/
public class WeakConcurrentSet<T> extends AbstractConcurrentSet<T>{
public WeakConcurrentSet() {
super(new WeakHashMap<T, ISetEntry<T>>());
}
public Iterator<T> iterator() {
return new Iterator<T>() {
private ISetEntry<T> current = head;
public boolean hasNext() {
if (current == null) return false;
if (current.getValue() == null) { // auto-removal of orphan references
do {
remove();
} while(current != null && current.getValue() == null);
return hasNext();
} else {
return true;
}
}
public T next() {
if (current == null) {
return null;
}
T value = current.getValue();
if (value == null) { // auto-removal of orphan references
do {
remove();
} while(current != null && current.getValue() == null);
return next();
} else {
current = current.next();
return value;
}
}
public void remove() {
if (current == null) {
return;
}
ISetEntry<T> newCurrent = current.next();
WeakConcurrentSet.this.remove(current.getValue());
current = newCurrent;
}
};
}
@Override
protected Entry<T> createEntry(T value, Entry<T> next) {
return next != null ? new WeakEntry<T>(value, next) : new WeakEntry<T>(value);
}
public static class WeakEntry<T> extends Entry<T> {
private WeakReference<T> value;
private WeakEntry(T value, Entry<T> next) {
super(next);
this.value = new WeakReference<T>(value);
}
private WeakEntry(T value) {
super();
this.value = new WeakReference<T>(value);
}
@Override
public T getValue() {
return value.get();
}
}
}

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**
@ -8,7 +9,7 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
* @author bennidi
* Date: 11/23/12
*/
public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation {
public class AsynchronousHandlerInvocation extends AbstractSubscriptionContextAware<IMessageBus> implements IHandlerInvocation<Object,Object,IMessageBus> {
private IHandlerInvocation delegate;

View File

@ -11,7 +11,7 @@ import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
*/
public abstract class DelegatingMessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher {
private IMessageDispatcher delegate;
private final IMessageDispatcher delegate;
public DelegatingMessageDispatcher(IMessageDispatcher delegate) {

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.subscription.MessageEnvelope;
/**
@ -21,7 +21,7 @@ public class EnvelopedMessageDispatcher extends DelegatingMessageDispatcher {
}
@Override
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
public void dispatch(MessagePublication publication, Object message, IConcurrentSet listeners) {
getDelegate().dispatch(publication, new MessageEnvelope(message), listeners);
}
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.listener.IMessageFilter;
/**
@ -37,7 +37,7 @@ public class FilteredMessageDispatcher extends DelegatingMessageDispatcher {
@Override
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
public void dispatch(MessagePublication publication, Object message, IConcurrentSet listeners) {
if (passesFilter(message)) {
getDelegate().dispatch(publication, message, listeners);
}

View File

@ -0,0 +1,19 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public abstract class HandlerInvocation<Listener, Message> extends AbstractSubscriptionContextAware<ISyncMessageBus> implements IHandlerInvocation<Listener, Message,ISyncMessageBus>{
public HandlerInvocation(SubscriptionContext context) {
super(context);
}
}

View File

@ -1,5 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.ISyncMessageBus;
/**
* A handler invocation encapsulates the logic that is used to invoke a single
* message handler to process a given message.
@ -9,7 +11,7 @@ package net.engio.mbassy.dispatch;
* @author bennidi
* Date: 11/23/12
*/
public interface IHandlerInvocation extends ISubscriptionContextAware {
public interface IHandlerInvocation<Listener, Message, Bus extends ISyncMessageBus> extends ISubscriptionContextAware<Bus> {
/**
* Invoke the message delivery logic of this handler
@ -17,5 +19,5 @@ public interface IHandlerInvocation extends ISubscriptionContextAware {
* @param listener The listener that will receive the message
* @param message The message to be delivered to the listener
*/
void invoke(Object listener, Object message);
void invoke(Listener listener, Message message);
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.ISyncMessageBus;
/**
* This interface marks components that have access to the message bus that they belong to.
@ -8,7 +8,7 @@ import net.engio.mbassy.bus.IMessageBus;
* @author bennidi
* Date: 3/1/13
*/
public interface IMessageBusAware {
public interface IMessageBusAware<Bus extends ISyncMessageBus> {
IMessageBus getBus();
Bus getBus();
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.IConcurrentSet;
/**
* A message dispatcher provides the functionality to deliver a single message
@ -29,7 +29,7 @@ public interface IMessageDispatcher extends ISubscriptionContextAware {
* @param message The message that should be delivered to the listeners
* @param listeners The listeners that should receive the message
*/
void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners);
void dispatch(MessagePublication publication, Object message, IConcurrentSet listeners);
/**
* Get the handler invocation that will be used to deliver the

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.subscription.SubscriptionContext;
/**
@ -8,7 +9,7 @@ import net.engio.mbassy.subscription.SubscriptionContext;
* @author bennidi
* Date: 3/1/13
*/
public interface ISubscriptionContextAware extends IMessageBusAware {
public interface ISubscriptionContextAware<Bus extends ISyncMessageBus> extends IMessageBusAware<Bus> {
/**
* Get the subscription context associated with this object

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
@ -17,7 +17,7 @@ import net.engio.mbassy.subscription.SubscriptionContext;
*/
public class MessageDispatcher extends AbstractSubscriptionContextAware implements IMessageDispatcher {
private IHandlerInvocation invocation;
private final IHandlerInvocation invocation;
public MessageDispatcher(SubscriptionContext context, IHandlerInvocation invocation) {
super(context);
@ -25,7 +25,7 @@ public class MessageDispatcher extends AbstractSubscriptionContextAware implemen
}
@Override
public void dispatch(MessagePublication publication, Object message, ConcurrentSet listeners) {
public void dispatch(final MessagePublication publication, final Object message, final IConcurrentSet listeners) {
publication.markDelivered();
for (Object listener : listeners) {
getInvocation().invoke(listener, message);

View File

@ -2,7 +2,6 @@ package net.engio.mbassy.dispatch;
import net.engio.mbassy.IPublicationErrorHandler;
import net.engio.mbassy.PublicationError;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
import net.engio.mbassy.subscription.SubscriptionContext;
import java.lang.reflect.InvocationTargetException;
@ -15,7 +14,7 @@ import java.util.Collection;
* @author bennidi
* Date: 11/23/12
*/
public class ReflectiveHandlerInvocation extends AbstractSubscriptionContextAware implements IHandlerInvocation {
public class ReflectiveHandlerInvocation extends HandlerInvocation{
public ReflectiveHandlerInvocation(SubscriptionContext context) {
super(context);

View File

@ -0,0 +1,31 @@
package net.engio.mbassy.dispatch;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.subscription.AbstractSubscriptionContextAware;
/**
* Synchronizes message handler invocations for all handlers that specify @Synchronized
*
* @author bennidi
* Date: 3/31/13
*/
public class SynchronizedHandlerInvocation extends AbstractSubscriptionContextAware<IMessageBus> implements IHandlerInvocation<Object,Object,IMessageBus> {
private IHandlerInvocation delegate;
public SynchronizedHandlerInvocation(IHandlerInvocation delegate) {
super(delegate.getContext());
this.delegate = delegate;
}
/**
* {@inheritDoc}
*/
@Override
public void invoke(final Object listener, final Object message) {
synchronized (listener){
delegate.invoke(listener, message);
}
}
}

View File

@ -1,10 +1,9 @@
package net.engio.mbassy.listener;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import net.engio.mbassy.dispatch.HandlerInvocation;
import net.engio.mbassy.dispatch.ReflectiveHandlerInvocation;
import java.lang.annotation.*;
/**
* Mark any method of any object(=listener) as a message handler and configure the handler
@ -28,7 +27,7 @@ public @interface Handler {
* Define the mode in which a message is delivered to each listener. Listeners can be notified
* sequentially or concurrently.
*/
Mode delivery() default Mode.Sequential;
Invoke delivery() default Invoke.Synchronously;
/**
* Handlers are ordered by priority and handlers with higher priority are processed before
@ -51,4 +50,17 @@ public @interface Handler {
*/
boolean enabled() default true;
/**
* Each handler call is implemented as an invocation object that implements the invocation mechanism.
* The basic implementation uses reflection and is the default. It is possible though to provide a custom
* invocation to add additional logic.
*
* Note: Providing a custom invocation will most likely reduce performance, since the JIT-Compiler
* can not do some of its sophisticated byte code optimizations.
*
*/
Class<? extends HandlerInvocation> invocation() default ReflectiveHandlerInvocation.class;
}

View File

@ -0,0 +1,11 @@
package net.engio.mbassy.listener;
/**
* Created with IntelliJ IDEA.
*
* @author bennidi
* Date: 11/16/12
*/
public enum Invoke {
Synchronously, Asynchronously
}

View File

@ -0,0 +1,24 @@
package net.engio.mbassy.listener;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author bennidi
* Date: 3/29/13
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.TYPE})
public @interface Listener {
/**
* By default, references to message listeners are weak to eliminate risks of memory leaks.
* It is possible to use strong references instead.
*
* @return
*/
References references() default References.Weak;
}

View File

@ -1,7 +1,8 @@
package net.engio.mbassy.listener;
import net.engio.mbassy.dispatch.HandlerInvocation;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@ -11,36 +12,54 @@ import java.util.List;
*/
public class MessageHandlerMetadata {
private Method handler;
private final Method handler;
private IMessageFilter[] filter;
private final IMessageFilter[] filter;
private Handler handlerConfig;
private final Handler handlerConfig;
private boolean isAsynchronous = false;
private final boolean isAsynchronous;
private Enveloped envelope = null;
private final Enveloped envelope;
private List<Class<?>> handledMessages = new LinkedList<Class<?>>();
private final List<Class<?>> handledMessages = new LinkedList<Class<?>>();
private boolean acceptsSubtypes = true;
private final boolean acceptsSubtypes;
private final Listener listenerConfig;
private final boolean isSynchronized;
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Handler handlerConfig) {
public MessageHandlerMetadata(Method handler, IMessageFilter[] filter, Handler handlerConfig, Listener listenerConfig) {
if(handler == null || handlerConfig == null){
throw new IllegalArgumentException("The message handler configuration may not be null");
}
this.handler = handler;
this.filter = filter;
this.handlerConfig = handlerConfig;
this.isAsynchronous = handlerConfig.delivery().equals(Mode.Concurrent);
this.isAsynchronous = handlerConfig.delivery().equals(Invoke.Asynchronously);
this.envelope = handler.getAnnotation(Enveloped.class);
this.acceptsSubtypes = !handlerConfig.rejectSubtypes();
this.listenerConfig = listenerConfig;
this.isSynchronized = handler.getAnnotation(Synchronized.class) != null;
if (this.envelope != null) {
Collections.addAll(handledMessages, envelope.messages());
for(Class messageType : envelope.messages()){
handledMessages.add(messageType);
}
} else {
handledMessages.add(handler.getParameterTypes()[0]);
}
this.handler.setAccessible(true);
}
public boolean isSynchronized(){
return isSynchronized;
}
public boolean useStrongReferences(){
return listenerConfig != null && listenerConfig.references().equals(References.Strong);
}
public boolean isAsynchronous() {
return isAsynchronous;
@ -70,6 +89,10 @@ public class MessageHandlerMetadata {
return envelope != null;
}
public Class<? extends HandlerInvocation> getHandlerInvocation(){
return handlerConfig.invocation();
}
public boolean handlesMessage(Class<?> messageType) {
for (Class<?> handledMessage : handledMessages) {
if (handledMessage.equals(messageType)) {

View File

@ -54,14 +54,12 @@ public class MetadataReader {
}
public MessageHandlerMetadata getHandlerMetadata(Method messageHandler) {
Handler config = messageHandler.getAnnotation(Handler.class);
return new MessageHandlerMetadata(messageHandler, getFilter(config), config);
}
// get all listeners defined by the given class (includes
// listeners defined in super classes)
public List<MessageHandlerMetadata> getMessageHandlers(Class<?> target) {
Listener listenerConfig = target.getAnnotation(Listener.class);
// get all handlers (this will include all (inherited) methods directly annotated using @Handler)
List<Method> allHandlers = ReflectionUtils.getMethods(AllMessageHandlers, target);
// retain only those that are at the bottom of their respective class hierarchy (deepest overriding method)
@ -72,19 +70,18 @@ public class MetadataReader {
}
}
List<MessageHandlerMetadata> filteredHandlers = new LinkedList<MessageHandlerMetadata>();
// for each handler there will be no overriding method that specifies @Handler annotation
// but an overriding method does inherit the listener configuration of the overwritten method
for (Method handler : bottomMostHandlers) {
Handler handle = handler.getAnnotation(Handler.class);
if (!handle.enabled() || !isValidMessageHandler(handler)) {
Handler handlerConfig = handler.getAnnotation(Handler.class);
if (!handlerConfig.enabled() || !isValidMessageHandler(handler)) {
continue; // disabled or invalid listeners are ignored
}
Method overriddenHandler = ReflectionUtils.getOverridingMethod(handler, target);
// if a handler is overwritten it inherits the configuration of its parent method
MessageHandlerMetadata handlerMetadata = new MessageHandlerMetadata(overriddenHandler == null ? handler : overriddenHandler,
getFilter(handle), handle);
getFilter(handlerConfig), handlerConfig, listenerConfig);
filteredHandlers.add(handlerMetadata);
}

View File

@ -1,13 +0,0 @@
package net.engio.mbassy.listener;
/**
* Created with IntelliJ IDEA.
*
* @author bennidi
* Date: 11/16/12
* Time: 10:01 AM
* To change this template use File | Settings | File Templates.
*/
public enum Mode {
Sequential, Concurrent
}

View File

@ -0,0 +1,10 @@
package net.engio.mbassy.listener;
/**
*
* @author bennidi
* Date: 3/29/13
*/
public enum References {
Strong,Weak
}

View File

@ -0,0 +1,16 @@
package net.engio.mbassy.listener;
import java.lang.annotation.*;
/**
* A handler marked with this annotation is guaranteed to be invoked in a thread-safe manner, that is, no
* other running message publication will be able to invoke this handler as long as it has not done its work.
*
* @author bennidi
* Date: 3/31/13
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.METHOD})
public @interface Synchronized {
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.dispatch.ISubscriptionContextAware;
/**
@ -9,20 +9,20 @@ import net.engio.mbassy.dispatch.ISubscriptionContextAware;
* @author bennidi
* Date: 3/1/13
*/
public class AbstractSubscriptionContextAware implements ISubscriptionContextAware {
public class AbstractSubscriptionContextAware<Bus extends ISyncMessageBus> implements ISubscriptionContextAware<Bus> {
private SubscriptionContext context;
private final SubscriptionContext<Bus> context;
public AbstractSubscriptionContextAware(SubscriptionContext context) {
public AbstractSubscriptionContextAware(SubscriptionContext<Bus> context) {
this.context = context;
}
public SubscriptionContext getContext() {
public SubscriptionContext<Bus> getContext() {
return context;
}
@Override
public IMessageBus getBus() {
public Bus getBus() {
return context.getOwningBus();
}
}

View File

@ -1,7 +1,7 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.MessagePublication;
import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.IConcurrentSet;
import net.engio.mbassy.dispatch.IMessageDispatcher;
import java.util.Comparator;
@ -12,17 +12,18 @@ import java.util.UUID;
*/
public class Subscription {
private UUID id = UUID.randomUUID();
private final UUID id = UUID.randomUUID();
protected ConcurrentSet<Object> listeners = new ConcurrentSet<Object>();
protected final IConcurrentSet<Object> listeners;
private IMessageDispatcher dispatcher;
private final IMessageDispatcher dispatcher;
private SubscriptionContext context;
private final SubscriptionContext context;
public Subscription(SubscriptionContext context, IMessageDispatcher dispatcher) {
Subscription(SubscriptionContext context, IMessageDispatcher dispatcher, IConcurrentSet<Object> listeners) {
this.context = context;
this.dispatcher = dispatcher;
this.listeners = listeners;
}

View File

@ -1,6 +1,6 @@
package net.engio.mbassy.subscription;
import net.engio.mbassy.bus.IMessageBus;
import net.engio.mbassy.bus.ISyncMessageBus;
import net.engio.mbassy.listener.MessageHandlerMetadata;
/**
@ -12,13 +12,13 @@ import net.engio.mbassy.listener.MessageHandlerMetadata;
* @author bennidi
* Date: 11/23/12
*/
public class SubscriptionContext {
public class SubscriptionContext<Bus extends ISyncMessageBus> {
private IMessageBus owningBus;
private Bus owningBus;
private MessageHandlerMetadata handlerMetadata;
public SubscriptionContext(IMessageBus owningBus, MessageHandlerMetadata handlerMetadata) {
public SubscriptionContext(Bus owningBus, MessageHandlerMetadata handlerMetadata) {
this.owningBus = owningBus;
this.handlerMetadata = handlerMetadata;
}
@ -28,7 +28,7 @@ public class SubscriptionContext {
*
* @return
*/
public IMessageBus getOwningBus() {
public Bus getOwningBus() {
return owningBus;
}

Some files were not shown because too many files have changed in this diff Show More