WIP, cleaning code. Getting multi-arg versions working

This commit is contained in:
nathan 2015-02-17 22:43:48 +01:00
parent 40d8bb70bd
commit 8b69d53263
23 changed files with 968 additions and 2276 deletions

View File

@ -1,140 +0,0 @@
package net.engio.mbassy.multi;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.multi.common.DeadMessage;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.error.PublicationError;
import net.engio.mbassy.multi.subscription.Subscription;
import net.engio.mbassy.multi.subscription.SubscriptionManager;
/**
* @author dorkbox, llc
* Date: 2/2/15
*/
public class DispatchRunnable implements Runnable {
private ErrorHandlingSupport errorHandler;
private TransferQueue<Object> dispatchQueue;
private TransferQueue<SubRunnable> invokeQueue;
private SubscriptionManager manager;
public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager,
TransferQueue<Object> dispatchQueue, TransferQueue<SubRunnable> invokeQueue) {
this.errorHandler = errorHandler;
this.manager = subscriptionManager;
this.dispatchQueue = dispatchQueue;
this.invokeQueue = invokeQueue;
}
@Override
public void run() {
final SubscriptionManager manager = this.manager;
final ErrorHandlingSupport errorHandler = this.errorHandler;
final TransferQueue<Object> IN_queue = this.dispatchQueue;
final TransferQueue<SubRunnable> OUT_queue = this.invokeQueue;
final Runnable dummyRunnable = new Runnable() {
@Override
public void run() {
}
};
Object message = null;
int counter;
while (true) {
try {
counter = 200;
while ((message = IN_queue.poll()) == null) {
if (counter > 0) {
--counter;
LockSupport.parkNanos(1L);
} else {
message = IN_queue.take();
break;
}
}
@SuppressWarnings("null")
Class<?> messageClass = message.getClass();
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty();
Collection<Subscription> deadSubscriptions = null;
if (empty) {
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
}
Collection<Class<?>> superClasses = manager.getSuperClasses(messageClass);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
manager.readUnLock();
if (!empty) {
for (Subscription sub : subscriptions) {
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message));
} else if (deadSubscriptions != null) {
if (!deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message);
for (Subscription sub : deadSubscriptions) {
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, deadMessage);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, deadSubscriptions, deadMessage));
}
}
// now get superClasses
for (Class<?> superClass : superClasses) {
subscriptions = manager.getSubscriptionsByMessageType(superClass);
if (!subscriptions.isEmpty()) {
for (Subscription sub : subscriptions) {
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message));
}
}
// now get varargs
if (!varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1);
vararg[0] = message;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
for (Subscription sub : varArgs) {
sub.publishToSubscription(OUT_queue, errorHandler, vararg);
}
// OUT_queue.put(new InvokeRunnable(errorHandler, varArgs, vararg));
}
// make sure it's synced at this point
// OUT_queue.transfer(dummyRunnable);
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
errorHandler.handlePublicationError(new PublicationError().setMessage("Error during publication of message").setCause(e).setPublishedObject(message));
}
}
}
}

View File

@ -1,54 +0,0 @@
package net.engio.mbassy.multi;
import java.lang.reflect.Array;
import java.util.Collection;
import net.engio.mbassy.multi.error.ErrorHandlingSupport;
import net.engio.mbassy.multi.subscription.Subscription;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public class InvokeRunnable implements Runnable {
private ErrorHandlingSupport errorHandler;
private Collection<Subscription> subscriptions;
private Object message;
public InvokeRunnable(ErrorHandlingSupport errorHandler, Collection<Subscription> subscriptions, Object message) {
this.errorHandler = errorHandler;
this.subscriptions = subscriptions;
this.message = message;
}
@Override
public void run() {
ErrorHandlingSupport errorHandler = this.errorHandler;
Collection<Subscription> subs = this.subscriptions;
Object message = this.message;
Object[] vararg = null;
for (Subscription sub : subs) {
boolean handled = false;
if (sub.isVarArg()) {
// messageClass will NEVER be an array to begin with, since that will call the multi-arg method
if (vararg == null) {
// messy, but the ONLY way to do it.
vararg = (Object[]) Array.newInstance(message.getClass(), 1);
vararg[0] = message;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
handled = true;
sub.publishToSubscription(errorHandler, vararg);
}
if (!handled) {
sub.publishToSubscription(errorHandler, message);
}
}
}
}

View File

@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import net.engio.mbassy.multi.common.DeadMessage;
import net.engio.mbassy.multi.common.DisruptorThreadFactory;
import net.engio.mbassy.multi.common.NamedThreadFactory;
import net.engio.mbassy.multi.common.LinkedTransferQueue;
import net.engio.mbassy.multi.common.TransferQueue;
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
@ -29,9 +29,8 @@ public class MultiMBassador implements IMessageBus {
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
private final SubscriptionManager subscriptionManager;
private final TransferQueue<Runnable> dispatchQueue;
private final SubscriptionManager subscriptionManager = new SubscriptionManager();
private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
// all threads that are available for asynchronous message dispatching
@ -44,22 +43,16 @@ public class MultiMBassador implements IMessageBus {
public MultiMBassador(int numberOfThreads) {
if (numberOfThreads < 1) {
numberOfThreads = 1; // at LEAST 1 threads
numberOfThreads = 1; // at LEAST 1 thread
}
this.threads = new ArrayList<Thread>(numberOfThreads);
this.subscriptionManager = new SubscriptionManager();
this.dispatchQueue = new LinkedTransferQueue<Runnable>();
int dispatchSize = numberOfThreads;
this.threads = new ArrayList<Thread>();
DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch");
for (int i = 0; i < dispatchSize; i++) {
NamedThreadFactory dispatchThreadFactory = new NamedThreadFactory("MessageBus");
for (int i = 0; i < numberOfThreads; i++) {
// each thread will run forever and process incoming message publication requests
Runnable runnable = new Runnable() {
@SuppressWarnings("null")
@Override
public void run() {
TransferQueue<Runnable> IN_QUEUE= MultiMBassador.this.dispatchQueue;
@ -131,29 +124,29 @@ public class MultiMBassador implements IMessageBus {
}
@SuppressWarnings("null")
@Override
public void publish(Object message) {
Class<?> messageClass = message.getClass();
SubscriptionManager manager = this.subscriptionManager;
// Collection<Subscription> subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass);
Class<?> messageClass = message.getClass();
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
boolean empty = subscriptions.isEmpty();
Collection<Subscription> deadSubscriptions = null;
if (empty) {
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
}
Collection<Class<?>> superClasses = manager.getSuperClasses(messageClass);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
Collection<Subscription> deadSubscriptions = null;
if (!validSubs) {
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
}
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
manager.readUnLock();
if (!empty) {
// Run subscriptions
if (validSubs) {
for (Subscription sub : subscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message);
@ -165,260 +158,268 @@ public class MultiMBassador implements IMessageBus {
// this catches all exception types
sub.publishToSubscription(this, deadMessage);
}
// Dead Event. only matches EXACT handlers (no vararg, no subclasses)
return;
}
// now get superClasses
if (superSubscriptions != null) {
for (Subscription sub : superSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message);
}
}
// now get varargs
if (varArgs != null && !varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = null;
for (Subscription sub : varArgs) {
if (sub.isVarArg()) {
if (vararg == null) {
vararg = (Object[]) Array.newInstance(messageClass, 1);
vararg[0] = message;
// if (subscriptions.isEmpty()) {
// // Dead Event. only matches EXACT handlers (no vararg, no subclasses)
// subscriptions = this.subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class);
//
// DeadMessage deadMessage = new DeadMessage(message);
// if (!subscriptions.isEmpty()) {
// for (Subscription sub : subscriptions) {
// // this catches all exception types
// sub.publishToSubscription(this, deadMessage);
// }
// }
// }
// else {
//// Object[] vararg = null;
// for (Subscription sub : subscriptions) {
// // this catches all exception types
// sub.publishToSubscription(this, message);
//
//// if (sub.isVarArg()) {
//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
//// if (vararg == null) {
//// // messy, but the ONLY way to do it.
//// vararg = (Object[]) Array.newInstance(message.getClass(), 1);
//// vararg[0] = message;
////
//// Object[] newInstance = new Object[1];
//// newInstance[0] = vararg;
//// vararg = newInstance;
//// }
////
//// // this catches all exception types
//// sub.publishToSubscription(this, vararg);
//// continue;
//// }
// }
// }
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, vararg);
}
}
}
}
@SuppressWarnings("null")
@Override
public void publish(Object message1, Object message2) {
try {
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
SubscriptionManager manager = this.subscriptionManager;
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
if (subscriptions == null || subscriptions.isEmpty()) {
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
DeadMessage deadMessage = new DeadMessage(message1, message2);
for (Subscription sub : subscriptions) {
sub.publishToSubscription(this, deadMessage);
}
Collection<Subscription> deadSubscriptions = null;
if (!validSubs) {
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
}
else {
Object[] vararg = null;
for (Subscription sub : subscriptions) {
if (sub.isVarArg()) {
Class<?> class1 = message1.getClass();
Class<?> class2 = message2.getClass();
if (!class1.isArray() && class1 == class2) {
if (vararg == null) {
// messy, but the ONLY way to do it.
vararg = (Object[]) Array.newInstance(class1, 2);
vararg[0] = message1;
vararg[1] = message2;
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass1, messageClass2);
manager.readUnLock();
Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, vararg);
continue;
}
// Run subscriptions
if (validSubs) {
for (Subscription sub : subscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2);
}
} else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message1, message2);
for (Subscription sub : deadSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, deadMessage);
}
// Dead Event. only matches EXACT handlers (no vararg, no subclasses)
return;
}
// now get superClasses
if (superSubscriptions != null) {
for (Subscription sub : superSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2);
}
}
// now get varargs
if (varArgs != null && !varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = null;
for (Subscription sub : varArgs) {
if (sub.isVarArg()) {
if (vararg == null) {
vararg = (Object[]) Array.newInstance(messageClass1, 2);
vararg[0] = message1;
vararg[1] = message2;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, message1, message2);
sub.publishToSubscription(this, vararg);
}
}
} catch (Throwable e) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
.setPublishedObject(message1, message2));
}
}
@SuppressWarnings("null")
@Override
public void publish(Object message1, Object message2, Object message3) {
// try {
// Class<?> messageClass1 = message1.getClass();
// Class<?> messageClass2 = message2.getClass();
// Class<?> messageClass3 = message3.getClass();
//
// SubscriptionManager manager = this.subscriptionManager;
// Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
//
// if (subscriptions == null || subscriptions.isEmpty()) {
// // Dead Event
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
// DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
//
// for (Subscription sub : subscriptions) {
// sub.publishToSubscription(this, deadMessage);
// }
// } else {
// Object[] vararg = null;
//
// for (Subscription sub : subscriptions) {
// boolean handled = false;
// if (sub.isVarArg()) {
// Class<?> class1 = message1.getClass();
// Class<?> class2 = message2.getClass();
// Class<?> class3 = message3.getClass();
// if (!class1.isArray() && class1 == class2 && class2 == class3) {
// // messy, but the ONLY way to do it.
// if (vararg == null) {
// vararg = (Object[]) Array.newInstance(class1, 3);
// vararg[0] = message1;
// vararg[1] = message2;
// vararg[2] = message3;
//
// Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
// newInstance[0] = vararg;
// vararg = newInstance;
// }
//
// handled = true;
// sub.publishToSubscription(this, vararg);
// }
// }
//
// if (!handled) {
// sub.publishToSubscription(this, message1, message2, message3);
// }
// }
//
// // if the message did not have any listener/handler accept it
// if (subscriptions.isEmpty()) {
// // cannot have DeadMessage published to this, so no extra check necessary
// // Dead Event
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
// DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
//
// for (Subscription sub : subscriptions) {
// sub.publishToSubscription(this, deadMessage);
// }
//
// // cleanup
// deadMessage = null;
// }
// }
// } catch (Throwable e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error during publication of message")
// .setCause(e)
// .setPublishedObject(message1, message2, message3));
// }
SubscriptionManager manager = this.subscriptionManager;
Class<?> messageClass1 = message1.getClass();
Class<?> messageClass2 = message2.getClass();
Class<?> messageClass3 = message3.getClass();
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2, messageClass3);
boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
Collection<Subscription> deadSubscriptions = null;
if (!validSubs) {
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
}
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass1, messageClass2, messageClass3);
Collection<Subscription> varArgs = manager.getVarArgs(messageClass1, messageClass2, messageClass3);
manager.readUnLock();
// Run subscriptions
if (validSubs) {
for (Subscription sub : subscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2, message3);
}
} else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(message1, message2, message3);
for (Subscription sub : deadSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, deadMessage);
}
// Dead Event. only matches EXACT handlers (no vararg, no subclasses)
return;
}
// now get superClasses
if (superSubscriptions != null) {
for (Subscription sub : superSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, message1, message2, message3);
}
}
// now get varargs
if (varArgs != null && !varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = null;
for (Subscription sub : varArgs) {
if (sub.isVarArg()) {
if (vararg == null) {
vararg = (Object[]) Array.newInstance(messageClass1, 3);
vararg[0] = message1;
vararg[0] = message2;
vararg[0] = message3;
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, vararg);
}
}
}
}
@SuppressWarnings("null")
@Override
public void publish(Object... messages) {
// try {
// // cannot have DeadMessage published to this!
// int size = messages.length;
// boolean allSameType = true;
//
// Class<?>[] messageClasses = new Class[size];
// Class<?> first = null;
// if (size > 0) {
// first = messageClasses[0] = messages[0].getClass();
SubscriptionManager manager = this.subscriptionManager;
int size = messages.length;
boolean allSameType = true;
Class<?>[] messageClasses = new Class[size];
Class<?> first = null;
if (size > 0) {
first = messageClasses[0] = messages[0].getClass();
}
for (int i=1;i<size;i++) {
messageClasses[i] = messages[i].getClass();
if (first != messageClasses[i]) {
allSameType = false;
}
}
manager.readLock();
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClasses);
boolean validSubs = subscriptions != null && !subscriptions.isEmpty();
Collection<Subscription> deadSubscriptions = null;
if (!validSubs) {
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
}
// we don't support super subscriptions for var-args
// Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClasses);
Collection<Subscription> varArgs = null;
if (allSameType) {
varArgs = manager.getVarArgs(messageClasses);
}
manager.readUnLock();
// Run subscriptions
if (validSubs) {
for (Subscription sub : subscriptions) {
// this catches all exception types
sub.publishToSubscription(this, messages);
}
} else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
DeadMessage deadMessage = new DeadMessage(messages);
for (Subscription sub : deadSubscriptions) {
// this catches all exception types
sub.publishToSubscription(this, deadMessage);
}
// Dead Event. only matches EXACT handlers (no vararg, no subclasses)
return;
}
// now get superClasses (not supported)
// if (superSubscriptions != null) {
// for (Subscription sub : superSubscriptions) {
// // this catches all exception types
// sub.publishToSubscription(this, message);
// }
//
// for (int i=1;i<size;i++) {
// messageClasses[i] = messages[i].getClass();
// if (first != messageClasses[i]) {
// allSameType = false;
// }
// }
//
// SubscriptionManager manager = this.subscriptionManager;
// Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClasses);
//
// if (subscriptions == null || subscriptions.isEmpty()) {
// // Dead Event
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
// DeadMessage deadMessage = new DeadMessage(messages);
//
// for (Subscription sub : subscriptions) {
// sub.publishToSubscription(this, deadMessage);
// }
// } else {
// Object[] vararg = null;
//
// for (Subscription sub : subscriptions) {
// boolean handled = false;
// if (first != null && allSameType && sub.isVarArg()) {
// if (vararg == null) {
// // messy, but the ONLY way to do it.
// vararg = (Object[]) Array.newInstance(first, size);
//
// for (int i=0;i<size;i++) {
// vararg[i] = messages[i];
// }
//
// Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
// newInstance[0] = vararg;
// vararg = newInstance;
// }
//
// handled = true;
// sub.publishToSubscription(this, vararg);
// }
//
// if (!handled) {
// sub.publishToSubscription(this, messages);
// }
// }
//
// // if the message did not have any listener/handler accept it
// if (subscriptions.isEmpty()) {
// // cannot have DeadMessage published to this, so no extra check necessary
// // Dead Event
//
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
// DeadMessage deadMessage = new DeadMessage(messages);
//
// for (Subscription sub : subscriptions) {
// sub.publishToSubscription(this, deadMessage);
// }
// }
// }
// } catch (Throwable e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error during publication of message")
// .setCause(e)
// .setPublishedObject(messages));
// }
// now get varargs
if (varArgs != null && !varArgs.isEmpty()) {
// messy, but the ONLY way to do it.
Object[] vararg = null;
for (Subscription sub : varArgs) {
if (sub.isVarArg()) {
if (vararg == null) {
vararg = (Object[]) Array.newInstance(first, size);
for (int i=0;i<size;i++) {
vararg[i] = messages[i];
}
Object[] newInstance = new Object[1];
newInstance[0] = vararg;
vararg = newInstance;
}
sub.publishToSubscription(this, vararg);
}
}
}
}
@Override
@ -431,247 +432,186 @@ public class MultiMBassador implements IMessageBus {
}
};
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
}
}
// faster if we can skip locking
// int counter = 200;
// while (!this.dispatchQueue.offer(message)) {
//// if (counter > 100) {
//// --counter;
//// Thread.yield();
//// } else
// if (counter > 0) {
// --counter;
// LockSupport.parkNanos(1L);
// } else {
try {
this.dispatchQueue.transfer(runnable);
return;
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
@Override
public void publishAsync(final Object message1, final Object message2) {
if (message1 != null && message2 != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(message1, message2);
}
// }
// }
};
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
}
}
}
@Override
public void publishAsync(Object message1, Object message2) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.TWO;
// eventJob.message1 = message1;
// eventJob.message2 = message2;
// } catch (Exception e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message1, message2));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
public void publishAsync(final Object message1, final Object message2, final Object message3) {
if (message1 != null || message2 != null | message3 != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(message1, message2, message3);
}
};
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
}
@Override
public void publishAsync(Object message1, Object message2, Object message3) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.THREE;
// eventJob.message1 = message1;
// eventJob.message2 = message2;
// eventJob.message3 = message3;
// } catch (Exception e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(new Object[] {message1, message2, message3}));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
public void publishAsync(final Object... messages) {
if (messages != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(messages);
}
};
try {
this.dispatchQueue.transfer(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(messages));
}
}
}
@Override
public void publishAsync(Object... messages) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.ARRAY;
// eventJob.messages = messages;
// } catch (Exception e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(messages));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
public void publishAsync(long timeout, TimeUnit unit, final Object message) {
if (message != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(message);
}
};
try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message));
}
}
}
@Override
public void publishAsync(long timeout, TimeUnit unit, final Object message1, final Object message2) {
if (message1 != null && message2 != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(message1, message2);
}
};
try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2));
}
}
}
@Override
public void publishAsync(long timeout, TimeUnit unit, final Object message1, final Object message2, final Object message3) {
if (message1 != null && message2 != null && message3 != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(message1, message2, message3);
}
};
try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(message1, message2, message3));
}
}
}
@Override
public void publishAsync(long timeout, TimeUnit unit, Object message) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
//
// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// // to become available.
// while (!ringBuffer.hasAvailableCapacity(1)) {
// LockSupport.parkNanos(10L);
// if (expireTimestamp <= System.currentTimeMillis()) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(new Exception("Timeout"))
// .setPublishedObject(message));
// return;
// }
// }
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.ONE;
// eventJob.message1 = message;
// } catch (Exception e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
}
@Override
public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
//
// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// // to become available.
// while (!ringBuffer.hasAvailableCapacity(1)) {
// LockSupport.parkNanos(10L);
// if (expireTimestamp <= System.currentTimeMillis()) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(new Exception("Timeout"))
// .setPublishedObject(message1, message2));
// return;
// }
// }
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.TWO;
// eventJob.message1 = message1;
// eventJob.message2 = message2;
// } catch (Exception e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message1, message2));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
}
@Override
public void publishAsync(long timeout, TimeUnit unit, Object message1, Object message2, Object message3) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
//
// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// // to become available.
// while (!ringBuffer.hasAvailableCapacity(1)) {
// LockSupport.parkNanos(10L);
// if (expireTimestamp <= System.currentTimeMillis()) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(new Exception("Timeout"))
// .setPublishedObject(message1, message2, message3));
// return;
// }
// }
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.THREE;
// eventJob.message1 = message1;
// eventJob.message2 = message2;
// eventJob.message3 = message3;
// } catch (Exception e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(message1, message2, message3));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
}
public void publishAsync(long timeout, TimeUnit unit, final Object... messages) {
if (messages != null) {
Runnable runnable = new Runnable() {
@Override
public void run() {
MultiMBassador.this.publish(messages);
}
};
@Override
public void publishAsync(long timeout, TimeUnit unit, Object... messages) {
// // put this on the disruptor ring buffer
// final RingBuffer<MessageHolder> ringBuffer = this.ringBuffer;
// final long expireTimestamp = TimeUnit.MILLISECONDS.convert(timeout, unit) + System.currentTimeMillis();
//
// // Inserts the specified element into this buffer, waiting up to the specified wait time if necessary for space
// // to become available.
// while (!ringBuffer.hasAvailableCapacity(1)) {
// LockSupport.parkNanos(10L);
// if (expireTimestamp <= System.currentTimeMillis()) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(new Exception("Timeout"))
// .setPublishedObject(messages));
// return;
// }
// }
//
// // setup the job
// final long seq = ringBuffer.next();
// try {
// MessageHolder eventJob = ringBuffer.get(seq);
// eventJob.messageType = MessageType.ARRAY;
// eventJob.messages = messages;
// } catch (Exception e) {
// handlePublicationError(new PublicationError()
// .setMessage("Error while adding an asynchronous message")
// .setCause(e)
// .setPublishedObject(messages));
// } finally {
// // always publish the job
// ringBuffer.publish(seq);
// }
try {
this.dispatchQueue.tryTransfer(runnable, timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
// log.error(e);
handlePublicationError(new PublicationError()
.setMessage("Error while adding an asynchronous message")
.setCause(e)
.setPublishedObject(messages));
}
}
}
}

View File

@ -1,20 +0,0 @@
package net.engio.mbassy.multi;
import java.lang.reflect.Method;
/**
* @author dorkbox, llc Date: 2/2/15
*/
public class SubRunnable {
public Method handler;
public Object listener;
public Object message;
public SubRunnable(Method handler, Object listener, Object message) {
this.handler = handler;
this.listener = listener;
this.message = message;
}
}

View File

@ -1,184 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.engio.mbassy.multi.common;
import static net.engio.mbassy.multi.common.UnsafeAccess.UNSAFE;
import java.util.AbstractQueue;
import java.util.Iterator;
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}
/**
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
* the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
* padding.
* <p>
* Offset calculation is separate from access to enable the reuse of a give compute offset.
* <p>
* Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of final field reload after a
* LoadLoad barrier.
* <p>
*
* @author nitsanw
*
* @param <E>
*/
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
protected static final int BUFFER_PAD = 32;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
static {
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
REF_ELEMENT_SHIFT = 2 + SPARSE_SHIFT;
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unknown pointer size");
}
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
+ (BUFFER_PAD << REF_ELEMENT_SHIFT - SPARSE_SHIFT);
}
protected final long mask;
// @Stable :(
protected final E[] buffer;
@SuppressWarnings("unchecked")
public ConcurrentCircularArrayQueue(int capacity) {
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = actualCapacity - 1;
// pad data on either end with some empty slots.
this.buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
}
/**
* @param index desirable element index
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index) {
return calcElementOffset(index, this.mask);
}
/**
* @param index desirable element index
* @param mask
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index, long mask) {
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElement(long offset, E e) {
spElement(this.buffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElement(E[] buffer, long offset, E e) {
UNSAFE.putObject(buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(long offset, E e) {
soElement(this.buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(E[] buffer, long offset, E e) {
UNSAFE.putOrderedObject(buffer, offset, e);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lpElement(long offset) {
return lpElement(this.buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lpElement(E[] buffer, long offset) {
return (E) UNSAFE.getObject(buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lvElement(long offset) {
return lvElement(this.buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lvElement(E[] buffer, long offset) {
return (E) UNSAFE.getObjectVolatile(buffer, offset);
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
// we have to test isEmpty because of the weaker poll() guarantee
while (poll() != null || !isEmpty()) {
;
}
}
}

View File

@ -1,54 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.engio.mbassy.multi.common;
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E> {
private static final long ARRAY_BASE;
private static final int ELEMENT_SHIFT;
static {
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class);
if (8 == scale) {
ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
} else {
throw new IllegalStateException("Unexpected long[] element size");
}
// Including the buffer pad in the array base offset
ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << ELEMENT_SHIFT - SPARSE_SHIFT);
}
protected final long[] sequenceBuffer;
public ConcurrentSequencedCircularArrayQueue(int capacity) {
super(capacity);
int actualCapacity = (int) (this.mask + 1);
// pad data on either end with some empty slots.
this.sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
for (long i = 0; i < actualCapacity; i++) {
soSequence(this.sequenceBuffer, calcSequenceOffset(i), i);
}
}
protected final long calcSequenceOffset(long index) {
return ARRAY_BASE + ((index & this.mask) << ELEMENT_SHIFT);
}
protected final void soSequence(long[] buffer, long offset, long e) {
UnsafeAccess.UNSAFE.putOrderedLong(buffer, offset, e);
}
protected final long lvSequence(long[] buffer, long offset) {
return UnsafeAccess.UNSAFE.getLongVolatile(buffer, offset);
}
}

View File

@ -1,31 +0,0 @@
/*
* Copyright 2014 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package net.engio.mbassy.multi.common;
import java.io.Serializable;
/**
* Padding up to 128 bytes at the front.
* Based on netty's implementation
*/
abstract class FrontPadding implements Serializable {
/** */
private static final long serialVersionUID = -596356687591714352L;
/** Padding. */
public transient long p1, p2, p3, p4, p5, p6; // 48 bytes (header is 16 bytes)
/** Padding. */
public transient long p8, p9, p10, p11, p12, p13, p14, p15; // 64 bytes
}

View File

@ -1,30 +0,0 @@
package net.engio.mbassy.multi.common;
import java.util.Collection;
/**
* Todo: Add javadoc
*
* @author bennidi
* Date: 3/29/13
*/
public interface IConcurrentSet<T> extends Collection<T> {
@Override
boolean add(T element);
boolean contains(Object element);
@Override
int size();
@Override
boolean isEmpty();
void addAll(Iterable<T> elements);
/**
* @return TRUE if the element was removed
*/
boolean remove(Object element);
}

View File

@ -1,13 +0,0 @@
package net.engio.mbassy.multi.common;
public abstract class InterruptRunnable implements Runnable {
protected volatile boolean running = true;
public InterruptRunnable() {
super();
}
public void stop() {
this.running = false;
}
}

View File

@ -1,71 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.engio.mbassy.multi.common;
import java.util.Queue;
/**
* This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface
* sufficient for concurrent message passing.<br>
* Message passing queues offer happens before semantics to messages passed through, namely that writes made by the
* producer before offering the message are visible to the consuming thread after the message has been polled out of the
* queue.
*
* @author nitsanw
*
* @param <M> the event/message type
*/
interface MessagePassingQueue<M> {
/**
* Called from a producer thread subject to the restrictions appropriate to the implementation and according to the
* {@link Queue#offer(Object)} interface.
*
* @param message
* @return true if element was inserted into the queue, false iff full
*/
boolean offer(M message);
/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#poll()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
M poll();
/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#peek()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
M peek();
/**
* This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a
* best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
*
* @return number of messages in the queue, between 0 and queue capacity or {@link Integer#MAX_VALUE} if not bounded
*/
int size();
/**
* This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
*
* @return true if empty, false otherwise
*/
boolean isEmpty();
}

View File

@ -1,250 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.engio.mbassy.multi.common;
abstract class MpmcArrayQueueL1Pad<E> extends ConcurrentSequencedCircularArrayQueue<E> {
long p10, p11, p12, p13, p14, p15, p16;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueueL1Pad(int capacity) {
super(capacity);
}
}
abstract class MpmcArrayQueueProducerField<E> extends MpmcArrayQueueL1Pad<E> {
private final static long P_INDEX_OFFSET;
static {
try {
P_INDEX_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class
.getDeclaredField("producerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long producerIndex;
public MpmcArrayQueueProducerField(int capacity) {
super(capacity);
}
protected final long lvProducerIndex() {
return this.producerIndex;
}
protected final boolean casProducerIndex(long expect, long newValue) {
return UnsafeAccess.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
}
abstract class MpmcArrayQueueL2Pad<E> extends MpmcArrayQueueProducerField<E> {
long p20, p21, p22, p23, p24, p25, p26;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueueL2Pad(int capacity) {
super(capacity);
}
}
abstract class MpmcArrayQueueConsumerField<E> extends MpmcArrayQueueL2Pad<E> {
private final static long C_INDEX_OFFSET;
static {
try {
C_INDEX_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class
.getDeclaredField("consumerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long consumerIndex;
public MpmcArrayQueueConsumerField(int capacity) {
super(capacity);
}
protected final long lvConsumerIndex() {
return this.consumerIndex;
}
protected final boolean casConsumerIndex(long expect, long newValue) {
return UnsafeAccess.UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
}
}
/**
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
* any and all threads may call the offer/poll/peek methods and correctness is maintained. <br>
* This implementation follows patterns documented on the package level for False Sharing protection.<br>
* The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See <a
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original
* algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
* Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
* field of the struct. There is a further alternative in the experimental project which uses iteration phase
* markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
* well as this implementation.<br>
* Tradeoffs to keep in mind:
* <ol>
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of
* both arrays. We are trading memory to avoid false sharing(active and passive).
* <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
* elements array. This is doubling/tripling the memory allocated for the buffer.
* <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
* equal to the requested capacity.
* </ol>
*
* @param <E>
* type of the element stored in the {@link java.util.Queue}
*/
public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpmcArrayQueue(final int capacity) {
super(Math.max(2, capacity));
}
@Override
public boolean offer(final E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final long capacity = this.mask + 1;
final long[] lSequenceBuffer = this.sequenceBuffer;
long currentProducerIndex;
long seqOffset;
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
while (true) {
currentProducerIndex = lvProducerIndex(); // LoadLoad
seqOffset = calcSequenceOffset(currentProducerIndex);
final long seq = lvSequence(lSequenceBuffer, seqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;
if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
break;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
// Extra check required to ensure [Queue.offer == false iff queue is full]
return false;
}
// another producer has moved the sequence by one, retry 2
}
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long elementOffset = calcElementOffset(currentProducerIndex);
spElement(elementOffset, e);
// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(lSequenceBuffer, seqOffset, currentProducerIndex + 1); // StoreStore
return true;
}
/**
* {@inheritDoc}
* <p>
* Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
* and must test producer index when next element is not visible.
*/
@Override
public E poll() {
// local load of field to avoid repeated loads after volatile reads
final long[] lSequenceBuffer = this.sequenceBuffer;
long currentConsumerIndex;
long seqOffset;
long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
currentConsumerIndex = lvConsumerIndex();// LoadLoad
seqOffset = calcSequenceOffset(currentConsumerIndex);
final long seq = lvSequence(lSequenceBuffer, seqOffset);// LoadLoad
final long delta = seq - (currentConsumerIndex + 1);
if (delta == 0) {
if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) {
// Successful CAS: full barrier
break;
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
// another consumer beat us and moved sequence ahead, retry 2
}
// on 64bit(no compressed oops) JVM this is the same as seqOffset
final long offset = calcElementOffset(currentConsumerIndex);
final E e = lpElement(offset);
spElement(offset, null);
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + this.mask + 1);// StoreStore
return e;
}
@Override
public E peek() {
long currConsumerIndex;
E e;
do {
currConsumerIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty
e = lpElement(calcElementOffset(currConsumerIndex));
// only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex());
return e;
}
@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}
@Override
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return lvConsumerIndex() == lvProducerIndex();
}
}

View File

@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author dorkbox, llc
* Date: 2/2/15
*/
public class DisruptorThreadFactory implements ThreadFactory {
public class NamedThreadFactory implements ThreadFactory {
/**
* The stack size is arbitrary based on JVM implementation. Default is 0
* 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k
@ -64,7 +64,7 @@ public class DisruptorThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final String groupName;
public DisruptorThreadFactory(String groupName) {
public NamedThreadFactory(String groupName) {
this.groupName = groupName;
this.group = new ThreadGroup(groupName);
}
@ -80,7 +80,7 @@ public class DisruptorThreadFactory implements ThreadFactory {
// 8k is the size of the android stack. Depending on the version of android, this can either change, or will always be 8k
// To be honest, 8k is pretty reasonable for an asynchronous/event based system (32bit) or 16k (64bit)
// Setting the size MAY or MAY NOT have any effect!!!
Thread t = new Thread(this.group, r, stringBuilder.toString(), DisruptorThreadFactory.stackSizeForThreads);
Thread t = new Thread(this.group, r, stringBuilder.toString(), NamedThreadFactory.stackSizeForThreads);
t.setDaemon(true);// FORCE these threads to finish before allowing the JVM to exit
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);

View File

@ -1,158 +0,0 @@
/*
* Copyright 2014 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package net.engio.mbassy.multi.common;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* A bounded MPMC queue implementation based on
* https://blogs.oracle.com/dave/entry/ptlqueue_a_scalable_bounded_capacity
* <p>
* Does not support null values.
*/
public final class PTLQueue<E> {
private final int mask;
private final int length;
private final PaddedAtomicLong offerCursor;
private final PaddedAtomicLong pollCursor;
private final AtomicLongArray turns;
private final AtomicReferenceArray<E> values;
public PTLQueue(int capacity) {
capacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = capacity - 1;
this.length = capacity;
this.offerCursor = new PaddedAtomicLong();
this.pollCursor = new PaddedAtomicLong();
this.turns = new AtomicLongArray(capacity);
this.values = new AtomicReferenceArray<>(capacity);
for (int i = 0; i < this.length - 1; i++) {
this.turns.lazySet(i, i);
}
this.turns.set(this.length - 1, this.length - 1);
}
public void put(E value, Runnable ifWait) {
nullCheck(value);
long ticket = this.offerCursor.getAndIncrement();
int slot = (int)ticket & this.mask;
while (this.turns.get(slot) != ticket) {
ifWait.run();
}
this.values.set(slot, value);
}
public boolean offer(E value) {
nullCheck(value);
for (;;) {
long ticket = this.offerCursor.get();
int slot = (int)ticket & this.mask;
if (this.turns.get(slot) != ticket) {
return false;
}
if (this.offerCursor.compareAndSet(ticket, ticket + 1)) {
this.values.set(slot, value);
return true;
}
}
}
private void nullCheck(E value) {
if (value == null) {
throw new NullPointerException("Null values not allowed here!");
}
}
public E take(Runnable ifWait) {
long ticket = this.pollCursor.getAndIncrement();
int slot = (int)ticket & this.mask;
while (this.turns.get(slot) != ticket) {
ifWait.run();
}
for (;;) {
E v = this.values.get(slot);
if (v != null) {
this.values.lazySet(slot, null);
this.turns.set(slot, ticket /* + mask + 1*/);
return v;
}
ifWait.run();
}
}
public boolean isEmpty() {
for (;;) {
long ticket = this.pollCursor.get();
int slot = (int)ticket & this.mask;
if (this.turns.get(slot) != ticket) {
return true;
}
E v = this.values.get(slot);
if (v == null) {
return true;
}
return false;
}
}
public E poll() {
for (;;) {
long ticket = this.pollCursor.get();
int slot = (int)ticket & this.mask;
if (this.turns.get(slot) != ticket) {
return null;
}
E v = this.values.get(slot);
if (v == null) {
return null;
}
if (this.pollCursor.compareAndSet(ticket, ticket + 1)) {
this.values.lazySet(slot, null);
this.turns.set(slot, ticket + this.length);
return v;
}
}
}
public E pollStrong() {
for (;;) {
long ticket = this.pollCursor.get();
int slot = (int)ticket & this.mask;
if (this.turns.get(slot) != ticket) {
if (this.pollCursor.get() != ticket) {
continue;
}
return null;
}
E v = this.values.get(slot);
if (v == null) {
if ((this.pollCursor.get() ^ ticket | this.turns.get(slot) ^ ticket) != 0) {
continue;
}
return null;
}
if (this.pollCursor.compareAndSet(ticket, ticket + 1)) {
this.values.lazySet(slot, null);
this.turns.set(slot, ticket + this.length);
return v;
}
}
}
}

View File

@ -1,31 +0,0 @@
/*
* Copyright 2014 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package net.engio.mbassy.multi.common;
/**
* A padded atomic integer to fill in 4 cache lines to avoid any false sharing or
* adjacent prefetch.
* Based on Netty's implementation.
*/
public final class PaddedAtomicLong extends PaddedAtomicLongBase {
/** */
private static final long serialVersionUID = 8781891581317286855L;
/** Padding. */
public transient long p16, p17, p18, p19, p20, p21, p22; // 56 bytes (the remaining 8 is in the base)
/** Padding. */
public transient long p24, p25, p26, p27, p28, p29, p30, p31; // 64 bytes
}

View File

@ -1,84 +0,0 @@
/*
* Copyright 2014 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package net.engio.mbassy.multi.common;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* The atomic reference base padded at the front.
* Based on Netty's implementation.
*/
abstract class PaddedAtomicLongBase extends FrontPadding {
private static final long serialVersionUID = 6513142711280243198L;
private static final AtomicLongFieldUpdater<PaddedAtomicLongBase> updater;
static {
updater = AtomicLongFieldUpdater.newUpdater(PaddedAtomicLongBase.class, "value");
}
private volatile long value; // 8-byte object field (or 4-byte + padding)
public final long get() {
return this.value;
}
public final void set(long referent) {
this.value = referent;
}
public final void lazySet(long referent) {
updater.lazySet(this, referent);
}
public final boolean compareAndSet(long expect, long update) {
return updater.compareAndSet(this, expect, update);
}
public final boolean weakCompareAndSet(long expect, long update) {
return updater.weakCompareAndSet(this, expect, update);
}
public final long getAndSet(long newValue) {
return updater.getAndSet(this, this.value);
}
public final long getAndAdd(long delta) {
return updater.getAndAdd(this, delta);
}
public final long incrementAndGet() {
return updater.incrementAndGet(this);
}
public final long decrementAndGet() {
return updater.decrementAndGet(this);
}
public final long getAndIncrement() {
return updater.getAndIncrement(this);
}
public final long getAndDecrement() {
return updater.getAndDecrement(this);
}
public final long addAndGet(long delta) {
return updater.addAndGet(this, delta);
}
@Override
public String toString() {
return String.valueOf(get());
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.engio.mbassy.multi.common;
/**
* Power of 2 utility functions.
*/
public class Pow2 {
/**
* Find the next larger positive power of two value up from the given value. If value is a power of two then
* this value will be returned.
*
* @param value from which next positive power of two will be found.
* @return the next positive power of 2 or this value if it is a power of 2.
*/
public static int roundToPowerOfTwo(final int value) {
return 1 << 32 - Integer.numberOfLeadingZeros(value - 1);
}
/**
* Is this value a power of two.
*
* @param value to be tested to see if it is a power of two.
* @return true if the value is a power of 2 otherwise false.
*/
public static boolean isPowerOfTwo(final int value) {
return (value & value - 1) == 0;
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.engio.mbassy.multi.common;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import sun.misc.Unsafe;
/**
* Why should we resort to using Unsafe?<br>
* <ol>
* <li>To construct class fields which allow volatile/ordered/plain access: This requirement is covered by
* {@link AtomicReferenceFieldUpdater} and similar but their performance is arguably worse than the DIY approach
* (depending on JVM version) while Unsafe intrinsification is a far lesser challenge for JIT compilers.
* <li>To construct flavors of {@link AtomicReferenceArray}.
* <li>Other use cases exist but are not present in this library yet.
* </ol>
*
* @author nitsanw
*
*/
public class UnsafeAccess {
public static final boolean SUPPORTS_GET_AND_SET;
public static final Unsafe UNSAFE;
static {
try {
final Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);
} catch (Exception e) {
SUPPORTS_GET_AND_SET = false;
throw new RuntimeException(e);
}
boolean getAndSetSupport = false;
try {
Unsafe.class.getMethod("getAndSetObject", Object.class, Long.TYPE,Object.class);
getAndSetSupport = true;
} catch (Exception e) {
}
SUPPORTS_GET_AND_SET = getAndSetSupport;
}
}

View File

@ -88,202 +88,6 @@ public class MessageHandler {
return this.acceptsSubtypes;
}
/**
* @return true if the message types are handled
*/
public boolean handlesMessage(Class<?> messageType) {
Class<?>[] handledMessages = this.handledMessages;
int handledLength = handledMessages.length;
if (handledLength != 1) {
return false;
}
if (this.acceptsSubtypes) {
if (!handledMessages[0].isAssignableFrom(messageType)) {
return false;
}
} else {
if (handledMessages[0] != messageType) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
public boolean handlesMessage(Class<?> messageType1, Class<?> messageType2) {
Class<?>[] handledMessages = this.handledMessages;
int handledLength = handledMessages.length;
if (handledLength != 2) {
return false;
}
if (this.acceptsSubtypes) {
if (!handledMessages[0].isAssignableFrom(messageType1)) {
return false;
}
if (!handledMessages[1].isAssignableFrom(messageType2)) {
return false;
}
} else {
if (handledMessages[0] != messageType1) {
return false;
}
if (handledMessages[1] != messageType2) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
public boolean handlesMessage(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
Class<?>[] handledMessages = this.handledMessages;
int handledLength = handledMessages.length;
if (handledLength != 3) {
return false;
}
if (this.acceptsSubtypes) {
if (!handledMessages[0].isAssignableFrom(messageType1)) {
return false;
}
if (!handledMessages[1].isAssignableFrom(messageType2)) {
return false;
}
if (!handledMessages[2].isAssignableFrom(messageType3)) {
return false;
}
} else {
if (handledMessages[0] != messageType1) {
return false;
}
if (handledMessages[1] != messageType2) {
return false;
}
if (handledMessages[2] != messageType3) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
public boolean handlesMessage(Class<?>... messageTypes) {
Class<?>[] handledMessages = this.handledMessages;
int handledLength = handledMessages.length;
int handledLengthMinusVarArg = handledLength-1;
int messagesLength = messageTypes.length;
// do we even have enough to even CHECK the var-arg?
if (messagesLength < handledLengthMinusVarArg) {
// totally wrong number of args
return false;
}
// check BEFORE var-arg in handler (var-arg can ONLY be last element in array)
if (handledLengthMinusVarArg <= messagesLength) {
if (this.acceptsSubtypes) {
for (int i = 0; i < handledLengthMinusVarArg; i++) {
Class<?> handledMessage = handledMessages[i];
Class<?> messageType = messageTypes[i];
if (!handledMessage.isAssignableFrom(messageType)) {
return false;
}
}
} else {
for (int i = 0; i < handledLengthMinusVarArg; i++) {
Class<?> handledMessage = handledMessages[i];
Class<?> messageType = messageTypes[i];
if (handledMessage != messageType) {
return false;
}
}
}
}
// do we even HAVE var-arg?
if (!handledMessages[handledLengthMinusVarArg].isArray()) {
// DO NOT HAVE VAR_ARG PRESENT IN HANDLERS
// fast exit
if (handledLength != messagesLength) {
return false;
}
// compare remaining arg
Class<?> handledMessage = handledMessages[handledLengthMinusVarArg];
Class<?> messageType = messageTypes[handledLengthMinusVarArg];
if (this.acceptsSubtypes) {
if (!handledMessage.isAssignableFrom(messageType)) {
return false;
}
} else {
if (handledMessage != messageType) {
return false;
}
}
// all args are dandy
return true;
}
// WE HAVE VAR_ARG PRESENT IN HANDLER
// do we have enough args to NEED to check the var-arg?
if (handledLengthMinusVarArg == messagesLength) {
// var-arg doesn't need checking
return true;
}
// then check var-arg in handler
// all the args to check for the var-arg MUST be the same! (ONLY ONE ARRAY THOUGH CAN BE PRESENT)
int messagesLengthMinusVarArg = messagesLength-1;
Class<?> typeCheck = messageTypes[handledLengthMinusVarArg];
for (int i = handledLengthMinusVarArg; i < messagesLength; i++) {
Class<?> t1 = messageTypes[i];
if (t1 != typeCheck) {
return false;
}
}
// if we got this far, then the args are the same type. IF we have more than one, AND they are arrays, NOPE!
if (messagesLength - handledLengthMinusVarArg > 1 && messageTypes[messagesLengthMinusVarArg].isArray()) {
return false;
}
// are we comparing array -> array or string -> array
Class<?> componentType;
if (messageTypes[messagesLengthMinusVarArg].isArray()) {
componentType = handledMessages[handledLengthMinusVarArg];
} else {
componentType = handledMessages[handledLengthMinusVarArg].getComponentType();
}
if (this.acceptsSubtypes) {
return componentType.isAssignableFrom(typeCheck);
} else {
return typeCheck == componentType;
}
}
@Override
public int hashCode() {
final int prime = 31;

View File

@ -66,40 +66,11 @@ public class Subscription {
return this.handlerMetadata.acceptsSubtypes();
}
/**
* Check whether this subscription manages a message handler
*/
public boolean handlesMessageType(Class<?> messageType) {
return this.handlerMetadata.handlesMessage(messageType);
}
/**
* Check whether this subscription manages a message handler
*/
public boolean handlesMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.handlerMetadata.handlesMessage(messageType1, messageType2);
}
/**
* Check whether this subscription manages a message handler
*/
public boolean handlesMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.handlerMetadata.handlesMessage(messageType1, messageType2, messageType3);
}
/**
* Check whether this subscription manages a message handler
*/
public boolean handlesMessageType(Class<?>... messageTypes) {
return this.handlerMetadata.handlesMessage(messageTypes);
}
public Class<?>[] getHandledMessageTypes() {
return this.handlerMetadata.getHandledMessages();
}
public void subscribe(Object listener) {
// this.listeners.put(listener, Boolean.TRUE);
this.listeners.add(listener);
}
@ -119,17 +90,16 @@ public class Subscription {
return this.listeners.size();
}
// private AtomicLong counter = new AtomicLong();
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
Collection<Object> listeners = this.listeners;
if (listeners.size() > 0) {
Method handler = this.handlerMetadata.getHandler();
// int count = 0;
IHandlerInvocation invocation = this.invocation;
for (Object listener : listeners) {
// count++;
try {
this.invocation.invoke(listener, handler, message);
invocation.invoke(listener, handler, message);
} catch (IllegalAccessException e) {
errorHandler.handlePublicationError(new PublicationError()
.setMessage("Error during invocation of message handler. " +
@ -165,7 +135,6 @@ public class Subscription {
.setPublishedObject(message));
}
}
// this.counter.getAndAdd(count);
}
}

View File

@ -2,10 +2,9 @@ package net.engio.mbassy.multi.subscription;
import java.lang.reflect.Array;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -17,6 +16,7 @@ import net.engio.mbassy.multi.common.ReflectionUtils;
import net.engio.mbassy.multi.common.StrongConcurrentSet;
import net.engio.mbassy.multi.listener.MessageHandler;
import net.engio.mbassy.multi.listener.MetadataReader;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
@ -32,6 +32,8 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
* Date: 2/2/15
*/
public class SubscriptionManager {
private static final int DEFAULT_SUPER_CLASS_TREE_SIZE = 4;
// the metadata reader that is used to inspect objects passed to the subscribe method
private final MetadataReader metadataReader = new MetadataReader();
@ -48,13 +50,15 @@ public class SubscriptionManager {
// once a collection of subscriptions is stored it does not change
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerListener = new IdentityHashMap<Class<?>, Collection<Subscription>>(50);
private final Object holder = new Object[0];
// remember classes that can have VarArg casting performed
private final ConcurrentHashMap<Class<?>, Class<?>> varArgClasses = new ConcurrentHashMap<Class<?>, Class<?>>();
private final Map<Class<?>, ArrayList<Class<?>>> superClassesCache = new IdentityHashMap<Class<?>, ArrayList<Class<?>>>();
private final Map<Class<?>, ArrayDeque<Class<?>>> superClassesCache = new IdentityHashMap<Class<?>, ArrayDeque<Class<?>>>();
// private final Map<Class<?>, Collection<Subscription>> superClassSubscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(50);
// remember already processed classes that do not contain any message handlers
private final ConcurrentHashMap<Class<?>, Object> nonListeners = new ConcurrentHashMap<Class<?>, Object>();
@ -101,6 +105,15 @@ public class SubscriptionManager {
this.subscriptionsPerMessageSingle.remove(clazz);
}
}
// Collection<Subscription> superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz);
// if (superSubs != null) {
// superSubs.remove(subscription);
//
// if (superSubs.isEmpty()) {
// // remove element
// this.superClassSubscriptionsPerMessageSingle.remove(clazz);
// }
// }
} else {
// NOTE: Not thread-safe! must be synchronized in outer scope
IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
@ -197,14 +210,25 @@ public class SubscriptionManager {
// single
Class<?> clazz = handledMessageTypes[0];
// NOTE: Order is important for safe publication
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
// Collection<Subscription> superSubs = this.superClassSubscriptionsPerMessageSingle.get(clazz);
if (subs == null) {
// NOTE: Order is important for safe publication
subs = new StrongConcurrentSet<Subscription>(2);
subs.add(subscription);
this.subscriptionsPerMessageSingle.put(clazz, subs);
// if (subscription.acceptsSubtypes()) {
// superSubs = new StrongConcurrentSet<Subscription>(2);
// superSubs.add(subscription);
// this.superClassSubscriptionsPerMessageSingle.put(clazz, superSubs);
// }
} else {
subs.add(subscription);
// if (subscription.acceptsSubtypes()) {
// superSubs.add(subscription);
// }
}
// have to save our the VarArg class types, because creating var-arg arrays for objects is expensive
@ -215,11 +239,10 @@ public class SubscriptionManager {
// since it's vararg, this means that it's an ARRAY, so we ALSO
// have to add the component classes of the array
if (subscription.acceptsSubtypes()) {
ArrayList<Class<?>> setupSuperClassCache2 = setupSuperClassCache(componentType);
// have to setup each vararg chain
for (int i = 0; i < setupSuperClassCache2.size(); i++) {
Class<?> superClass = setupSuperClassCache2.get(i);
ArrayDeque<Class<?>> superClasses = setupSuperClassCache(componentType);
// have to setup each vararg chain
for (Class<?> superClass : superClasses) {
if (!this.varArgClasses.containsKey(superClass)) {
// this is expensive, so we check the cache first
Class<?> c2 = Array.newInstance(superClass, 1).getClass();
@ -285,309 +308,182 @@ public class SubscriptionManager {
}
}
private final Collection<Subscription> EMPTY_LIST = Collections.emptyList();
// cannot return null, not thread safe.
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType);
if (subs != null) {
return subs;
} else {
return this.EMPTY_LIST;
}
return this.subscriptionsPerMessageSingle.get(messageType);
}
// obtain the set of subscriptions for the given message type
// Note: never returns null!
public Collection<Subscription> DEPRECATED_getSubscriptionsByMessageType(Class<?> messageType) {
// thread safe publication
Collection<Subscription> subscriptions;
try {
this.LOCK.readLock().lock();
int count = 0;
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType);
if (subs != null) {
subscriptions = new ArrayDeque<Subscription>(count);
subscriptions.addAll(subs);
} else {
subscriptions = new ArrayDeque<Subscription>(16);
}
// also add all subscriptions that match super types
ArrayList<Class<?>> types1 = setupSuperClassCache(messageType);
if (types1 != null) {
Class<?> eventSuperType;
int i;
for (i = 0; i < types1.size(); i++) {
eventSuperType = types1.get(i);
subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType)) {
subscriptions.add(sub);
}
}
}
addVarArgClass(subscriptions, eventSuperType);
}
}
addVarArgClass(subscriptions, messageType);
} finally {
this.LOCK.readLock().unlock();
}
return subscriptions;
}
// obtain the set of subscriptions for the given message types
// Note: never returns null!
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
// thread safe publication
Collection<Subscription> subscriptions = new ArrayDeque<Subscription>();
try {
this.LOCK.readLock().lock();
// also add all subscriptions that match super types
ArrayList<Class<?>> types1 = setupSuperClassCache(messageType1);
ArrayList<Class<?>> types2 = setupSuperClassCache(messageType2);
Collection<Subscription> subs;
Class<?> eventSuperType1 = messageType1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
Class<?> eventSuperType2 = messageType1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
int i;
int j;
for (i = -1; i < types1.size(); i++) {
if (i > -1) {
eventSuperType1 = types1.get(i);
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
for (j = -1; j < types2.size(); j++) {
if (j > -1) {
eventSuperType2 = types2.get(j);
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
subs = leaf2.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType1, messageType2)) {
subscriptions.add(sub);
}
}
}
}
}
}
}
///////////////
// if they are ALL the same type, a var-arg handler might match
///////////////
if (messageType1 == messageType2) {
addVarArgClasses(subscriptions, messageType1, types1);
}
} finally {
this.LOCK.readLock().unlock();
}
return subscriptions;
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2);
}
// obtain the set of subscriptions for the given message types
// Note: never returns null!
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
// thread safe publication
Collection<Subscription> subscriptions = new ArrayDeque<Subscription>();
try {
this.LOCK.readLock().lock();
// also add all subscriptions that match super types
ArrayList<Class<?>> types1 = setupSuperClassCache(messageType1);
ArrayList<Class<?>> types2 = setupSuperClassCache(messageType2);
ArrayList<Class<?>> types3 = setupSuperClassCache(messageType3);
Class<?> eventSuperType1 = messageType1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
Class<?> eventSuperType2 = messageType2;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
Class<?> eventSuperType3 = messageType3;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf3;
Collection<Subscription> subs;
int i;
int j;
int k;
for (i = -1; i < types1.size(); i++) {
if (i > -1) {
eventSuperType1 = types1.get(i);
}
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
for (j = -1; j < types2.size(); j++) {
if (j > -1) {
eventSuperType2 = types2.get(j);
}
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
for (k = -1; k < types3.size(); k++) {
if (k > -1) {
eventSuperType3 = types3.get(k);
}
leaf3 = leaf2.getLeaf(eventSuperType3);
if (leaf3 != null) {
subs = leaf3.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageType1, messageType2, messageType3)) {
subscriptions.add(sub);
}
}
}
}
}
}
}
}
}
///////////////
// if they are ALL the same type, a var-arg handler might match
///////////////
if (messageType1 == messageType2 && messageType2 == messageType3) {
addVarArgClasses(subscriptions, messageType1, types1);
}
} finally {
this.LOCK.readLock().unlock();
}
return subscriptions;
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
}
// obtain the set of subscriptions for the given message types
// Note: never returns null!
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
public Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) {
// thread safe publication
Collection<Subscription> subscriptions = new ArrayDeque<Subscription>();
return this.subscriptionsPerMessageMulti.getValue(messageTypes);
}
try {
this.LOCK.readLock().lock();
int count = 16;
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
Collection<Class<?>> types = this.superClassesCache.get(superType);
if (types == null || types.isEmpty()) {
return null;
}
// NOTE: Not thread-safe! must be synchronized in outer scope
Collection<Subscription> subs = this.subscriptionsPerMessageMulti.getValue(messageTypes);
Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
for (Class<?> superClass : types) {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageTypes)) {
subscriptions.add(sub);
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
int size = messageTypes.length;
if (size > 0) {
boolean allSameType = true;
Class<?> firstType = messageTypes[0];
return subsPerType;
}
int i;
for (i = 0;i<size;i++) {
if (messageTypes[i] != firstType) {
allSameType = false;
// must be protected by read lock
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
// not thread safe. DO NOT MODIFY
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
Collection<Subscription> subs;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2);
while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next();
if (type1Matches && eventSuperType2 == superType2) {
continue;
}
}
leaf2 = leaf1.getLeaf(eventSuperType2);
// add all subscriptions that match super types combinations
// have to use recursion for this. BLEH
getSubsVarArg(subscriptions, size-1, 0, this.subscriptionsPerMessageMulti, messageTypes);
///////////////
// if they are ALL the same type, a var-arg handler might match
///////////////
if (allSameType) {
// do we have a var-arg (it shows as an array) subscribed?
ArrayList<Class<?>> superClasses = setupSuperClassCache(firstType);
Class<?> eventSuperType = firstType;
int j;
for (j = -1; j < superClasses.size(); j++) {
if (j > -1) {
eventSuperType = superClasses.get(j);
}
if (this.varArgClasses.containsKey(eventSuperType)) {
// messy, but the ONLY way to do it.
// NOTE: this will NEVER be an array to begin with, since that will call a DIFFERENT method
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
// also add all subscriptions that match super types
subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
if (subs != null) {
for (Subscription sub : subs) {
subscriptions.add(sub);
if (leaf2 != null) {
subs = leaf2.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
} finally {
this.LOCK.readLock().unlock();
}
return subscriptions;
return subsPerType;
}
private final Collection<Class<?>> EMPTY_LIST_CLASSES = Collections.emptyList();
// must be protected by read lock
public Collection<Class<?>> getSuperClasses(Class<?> clazz) {
// ALSO checks to see if the superClass accepts subtypes.
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
// not thread safe. DO NOT MODIFY
ArrayList<Class<?>> types = this.superClassesCache.get(clazz);
Collection<Class<?>> types1 = this.superClassesCache.get(superType1);
Collection<Class<?>> types2 = this.superClassesCache.get(superType2);
Collection<Class<?>> types3 = this.superClassesCache.get(superType3);
if (types != null) {
return types;
Collection<Subscription> subsPerType = new ArrayDeque<Subscription>(DEFAULT_SUPER_CLASS_TREE_SIZE);
Collection<Subscription> subs;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf2;
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf3;
Iterator<Class<?>> iterator1 = new SuperClassIterator(superType1, types1);
Iterator<Class<?>> iterator2;
Iterator<Class<?>> iterator3;
Class<?> eventSuperType1;
Class<?> eventSuperType2;
Class<?> eventSuperType3;
while (iterator1.hasNext()) {
eventSuperType1 = iterator1.next();
boolean type1Matches = eventSuperType1 == superType1;
leaf1 = this.subscriptionsPerMessageMulti.getLeaf(eventSuperType1);
if (leaf1 != null) {
iterator2 = new SuperClassIterator(superType2, types2);
while (iterator2.hasNext()) {
eventSuperType2 = iterator2.next();
boolean type12Matches = type1Matches && eventSuperType2 == superType2;
leaf2 = leaf1.getLeaf(eventSuperType2);
if (leaf2 != null) {
iterator3 = new SuperClassIterator(superType3, types3);
while (iterator3.hasNext()) {
eventSuperType3 = iterator3.next();
if (type12Matches && eventSuperType3 == superType3) {
continue;
}
leaf3 = leaf2.getLeaf(eventSuperType3);
subs = leaf3.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.acceptsSubtypes()) {
subsPerType.add(sub);
}
}
}
}
}
}
}
}
return this.EMPTY_LIST_CLASSES;
return subsPerType;
}
// not a thread safe collection. must be locked by caller
private ArrayList<Class<?>> setupSuperClassCache(Class<?> clazz) {
ArrayList<Class<?>> types = this.superClassesCache.get(clazz);
// not a thread safe collection, but it doesn't matter
private ArrayDeque<Class<?>> setupSuperClassCache(Class<?> clazz) {
ArrayDeque<Class<?>> types = this.superClassesCache.get(clazz);
if (types == null) {
// it doesn't matter if concurrent access stomps on values, since they are always the same.
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
types = new ArrayList<Class<?>>(superTypes);
// NOTE: no need to write lock, since race conditions will result in duplicate answers
types = new ArrayDeque<Class<?>>(superTypes);
// NOTE: no need to write lock, since race conditions will result in duplicate answers (which we don't care about)
this.superClassesCache.put(clazz, types);
}
@ -619,20 +515,56 @@ public class SubscriptionManager {
public Collection<Subscription> getVarArgs(Class<?> clazz) {
Class<?> varArgClass = this.varArgClasses.get(clazz);
if (varArgClass != null) {
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(varArgClass);
if (subs != null) {
return subs;
return this.subscriptionsPerMessageSingle.get(varArgClass);
}
return null;
}
// must be protected by read lock
public Collection<Subscription> getVarArgs(Class<?> clazz1, Class<?> clazz2) {
if (clazz1 == clazz2) {
Class<?> varArgClass = this.varArgClasses.get(clazz1);
if (varArgClass != null) {
return this.subscriptionsPerMessageSingle.get(varArgClass);
}
}
return this.EMPTY_LIST;
return null;
}
// must be protected by read lock
public Collection<Subscription> getVarArgs(Class<?> clazz1, Class<?> clazz2, Class<?> clazz3) {
if (clazz1 == clazz2 && clazz2 == clazz3) {
Class<?> varArgClass = this.varArgClasses.get(clazz1);
if (varArgClass != null) {
return this.subscriptionsPerMessageSingle.get(varArgClass);
}
}
return null;
}
// must be protected by read lock
public Collection<Subscription> getVarArgs(Class<?>... classes) {
// classes IS ALREADY ALL SAME TYPE!
Class<?> firstClass = classes[0];
Class<?> varArgClass = this.varArgClasses.get(firstClass);
if (varArgClass != null) {
return this.subscriptionsPerMessageSingle.get(varArgClass);
}
return null;
}
///////////////
// a var-arg handler might match
// tricky part. We have to check the ARRAY version
///////////////
private void addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayList<Class<?>> types1) {
private void addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayDeque<Class<?>> types1) {
Collection<Subscription> subs;
Class<?> varArgClass = this.varArgClasses.get(messageType);
@ -665,7 +597,7 @@ public class SubscriptionManager {
Class<?> classType = messageTypes[index];
// get all the super types, if there are any.
ArrayList<Class<?>> superClasses = setupSuperClassCache(classType);
ArrayDeque<Class<?>> superClasses = setupSuperClassCache(classType);
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf;
Collection<Subscription> subs;
@ -674,27 +606,27 @@ public class SubscriptionManager {
int i;
int newIndex;
for (i = -1; i < superClasses.size(); i++) {
if (i > -1) {
superClass = superClasses.get(i);
}
leaf = tree.getLeaf(superClass);
if (leaf != null) {
newIndex = index+1;
if (index == length) {
subs = leaf.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.handlesMessageType(messageTypes)) {
subscriptions.add(sub);
}
}
}
} else {
getSubsVarArg(subscriptions, length, newIndex, leaf, messageTypes);
}
}
}
// for (i = -1; i < superClasses.size(); i++) {
// if (i > -1) {
// superClass = superClasses.get(i);
// }
// leaf = tree.getLeaf(superClass);
// if (leaf != null) {
// newIndex = index+1;
// if (index == length) {
// subs = leaf.getValue();
// if (subs != null) {
// for (Subscription sub : subs) {
// if (sub.handlesMessageType(messageTypes)) {
// subscriptions.add(sub);
// }
// }
// }
// } else {
// getSubsVarArg(subscriptions, length, newIndex, leaf, messageTypes);
// }
// }
// }
}
public void readLock() {
@ -704,4 +636,52 @@ public class SubscriptionManager {
public void readUnLock() {
this.LOCK.readLock().unlock();
}
public static class SuperClassIterator implements Iterator<Class<?>> {
private final Iterator<Class<?>> iterator;
private Class<?> clazz;
public SuperClassIterator(Class<?> clazz, Collection<Class<?>> types) {
this.clazz = clazz;
if (types != null) {
this.iterator = types.iterator();
} else {
this.iterator = null;
}
}
@Override
public boolean hasNext() {
if (this.clazz != null) {
return true;
}
if (this.iterator != null) {
return this.iterator.hasNext();
}
return false;
}
@Override
public Class<?> next() {
if (this.clazz != null) {
Class<?> clazz2 = this.clazz;
this.clazz = null;
return clazz2;
}
if (this.iterator != null) {
return this.iterator.next();
}
return null;
}
@Override
public void remove() {
throw new NotImplementedException();
}
}
}

View File

@ -1,17 +1,23 @@
package net.engio.mbassy.multi;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import net.engio.mbassy.multi.common.AssertSupport;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.IConcurrentSet;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This test ensures the correct behaviour of the set implementation that is the building
* block of the subscription implementations used by the Mbassador message bus.
@ -31,13 +37,14 @@ public abstract class ConcurrentSetTest extends AssertSupport {
protected Set gcProtector = new HashSet();
@Override
@Before
public void beforeTest(){
super.beforeTest();
gcProtector = new HashSet();
this.gcProtector = new HashSet();
}
protected abstract IConcurrentSet createSet();
protected abstract Collection createSet();
@Test
@ -45,12 +52,12 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final LinkedList<Object> duplicates = new LinkedList<Object>();
final HashSet<Object> distinct = new HashSet<Object>();
final IConcurrentSet testSet = createSet();
final Collection<Object> testSet = createSet();
Random rand = new Random();
// getAll set of distinct objects and list of duplicates
Object candidate = new Object();
for (int i = 0; i < numberOfElements; i++) {
for (int i = 0; i < this.numberOfElements; i++) {
if (rand.nextInt() % 3 == 0) {
candidate = new Object();
}
@ -66,7 +73,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.add(src);
}
}
}, numberOfThreads);
}, this.numberOfThreads);
// check that the control set and the test set contain the exact same elements
assertEquals(distinct.size(), testSet.size());
@ -77,21 +84,22 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Test()
public void testIterationWithConcurrentRemoval() {
final IConcurrentSet<AtomicInteger> testSet = createSet();
final Collection<AtomicInteger> testSet = createSet();
final Random rand = new Random();
for (int i = 0; i < numberOfElements; i++) {
for (int i = 0; i < this.numberOfElements; i++) {
AtomicInteger element = new AtomicInteger();
testSet.add(element);
gcProtector.add(element);
this.gcProtector.add(element);
}
Runnable incrementer = new Runnable() {
@Override
public void run() {
while(testSet.size() > 100){
for(AtomicInteger element : testSet)
for(AtomicInteger element : testSet) {
element.incrementAndGet();
}
}
}
@ -101,9 +109,11 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Override
public void run() {
while(testSet.size() > 100){
for(AtomicInteger element : testSet)
if(rand.nextInt() % 3 == 0 && testSet.size() > 100)
for(AtomicInteger element : testSet) {
if(rand.nextInt() % 3 == 0 && testSet.size() > 100) {
testSet.remove(element);
}
}
}
}
};
@ -128,9 +138,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSet = createSet();
final Collection testSet = createSet();
// getAll set of distinct objects and mark a subset of those for removal
for (int i = 0; i < numberOfElements; i++) {
for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
if (i % 3 == 0) {
@ -146,7 +156,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.add(src);
}
}
}, numberOfThreads);
}, this.numberOfThreads);
// remove all candidates that have previously been marked for removal from the test set
ConcurrentExecutor.runConcurrent(new Runnable() {
@ -156,7 +166,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.remove(src);
}
}
}, numberOfThreads);
}, this.numberOfThreads);
// ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) {
@ -166,7 +176,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// for removal
assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSet.contains(src));
if (!toRemove.contains(src)) {
assertTrue(testSet.contains(src));
}
}
}
@ -175,9 +187,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
final HashSet<Object> source = new HashSet<Object>();
final HashSet<Object> toRemove = new HashSet<Object>();
final IConcurrentSet testSet = createSet();
final Collection testSet = createSet();
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
if (i % 3 == 0) {
@ -192,11 +204,12 @@ public abstract class ConcurrentSetTest extends AssertSupport {
public void run() {
for (Object src : source) {
testSet.add(src);
if (toRemove.contains(src))
if (toRemove.contains(src)) {
testSet.remove(src);
}
}
}
}, numberOfThreads);
}, this.numberOfThreads);
// ensure that the test set does not contain any of the elements that have been removed from it
for (Object tar : testSet) {
@ -206,17 +219,19 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// for removal
assertEquals(source.size() - toRemove.size(), testSet.size());
for (Object src : source) {
if (!toRemove.contains(src)) assertTrue(testSet.contains(src));
if (!toRemove.contains(src)) {
assertTrue(testSet.contains(src));
}
}
}
@Test
public void testCompleteRemoval() {
final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet testSet = createSet();
final Collection testSet = createSet();
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
testSet.add(candidate);
@ -231,7 +246,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
testSet.remove(src);
}
}
}, numberOfThreads);
}, this.numberOfThreads);
// ensure that the test set still contains all objects from the source set that have not been marked
@ -245,10 +260,10 @@ public abstract class ConcurrentSetTest extends AssertSupport {
@Test
public void testRemovalViaIterator() {
final HashSet<Object> source = new HashSet<Object>();
final IConcurrentSet setUnderTest = createSet();
final Collection setUnderTest = createSet();
// getAll set of candidates and mark subset for removal
for (int i = 0; i < numberOfElements; i++) {
for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object();
source.add(candidate);
setUnderTest.add(candidate);
@ -264,7 +279,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
iterator.remove();
}
}
}, numberOfThreads);
}, this.numberOfThreads);
// ensure that the test set still contains all objects from the source set that have not been marked
@ -288,7 +303,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
*/
@Test
public void testConcurrentAddRemove() {
final IConcurrentSet testSet = createSet();
final Collection testSet = createSet();
// a set of unique integers that will stay permanently in the test set
final List permanentObjects = new ArrayList();
// a set of objects that will be added and removed at random to the test set to force rehashing
@ -306,6 +321,7 @@ public abstract class ConcurrentSetTest extends AssertSupport {
// Adds and removes items
// thus forcing constant rehashing of the backing hashtable
Runnable rehasher = new Runnable() {
@Override
public void run() {
Random rand = new Random();
for(int times = 0; times < 1000 ; times++){
@ -330,8 +346,9 @@ public abstract class ConcurrentSetTest extends AssertSupport {
for (Object permanent : permanentObjects) {
// permanent items are never touched,
// --> set.contains(j) should always return true
if(!testSet.contains(permanent))
if(!testSet.contains(permanent)) {
missing.add(permanent);
}
}
}
}
@ -344,19 +361,23 @@ public abstract class ConcurrentSetTest extends AssertSupport {
public Set createWithRandomIntegers(int size, List<Integer> excluding){
if(excluding == null) excluding = new ArrayList<Integer>();
if(excluding == null) {
excluding = new ArrayList<Integer>();
}
Set<Integer> result = new HashSet<Integer>(size);
Random rand = new Random();
while(result.size() < size){
result.add(rand.nextInt());
}
for(Integer excluded : excluding)
for(Integer excluded : excluding) {
result.remove(excluded);
}
return result;
}
protected void protectFromGarbageCollector(Set elements){
for(Object element : elements)
gcProtector.add(element);
for(Object element : elements) {
this.gcProtector.add(element);
}
}
}

View File

@ -138,7 +138,7 @@ public class MetadataReaderTest extends AssertSupport {
public List<MessageHandler> getHandlers(MessageListener listener, Class<?>... messageTypes) {
List<MessageHandler> matching = new LinkedList<MessageHandler>();
for (MessageHandler handler : listener.getHandlers()) {
if (handler.handlesMessage(messageTypes)) {
if (handlesMessage(handler, messageTypes)) {
matching.add(handler);
}
}
@ -244,4 +244,201 @@ public class MetadataReaderTest extends AssertSupport {
@Handler public void handleStringXplus1plusN(String[] s, Object o, Object... o1) {}
}
/**
* @return true if the message types are handled
*/
private static boolean handlesMessage(MessageHandler handler, Class<?> messageType) {
Class<?>[] handledMessages = handler.getHandledMessages();
int handledLength = handledMessages.length;
if (handledLength != 1) {
return false;
}
if (handler.acceptsSubtypes()) {
if (!handledMessages[0].isAssignableFrom(messageType)) {
return false;
}
} else {
if (handledMessages[0] != messageType) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
private static boolean handlesMessage(MessageHandler handler, Class<?> messageType1, Class<?> messageType2) {
Class<?>[] handledMessages = handler.getHandledMessages();
int handledLength = handledMessages.length;
if (handledLength != 2) {
return false;
}
if (handler.acceptsSubtypes()) {
if (!handledMessages[0].isAssignableFrom(messageType1)) {
return false;
}
if (!handledMessages[1].isAssignableFrom(messageType2)) {
return false;
}
} else {
if (handledMessages[0] != messageType1) {
return false;
}
if (handledMessages[1] != messageType2) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
private static boolean handlesMessage(MessageHandler handler, Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
Class<?>[] handledMessages = handler.getHandledMessages();
int handledLength = handledMessages.length;
if (handledLength != 3) {
return false;
}
if (handler.acceptsSubtypes()) {
if (!handledMessages[0].isAssignableFrom(messageType1)) {
return false;
}
if (!handledMessages[1].isAssignableFrom(messageType2)) {
return false;
}
if (!handledMessages[2].isAssignableFrom(messageType3)) {
return false;
}
} else {
if (handledMessages[0] != messageType1) {
return false;
}
if (handledMessages[1] != messageType2) {
return false;
}
if (handledMessages[2] != messageType3) {
return false;
}
}
return true;
}
/**
* @return true if the message types are handled
*/
private static boolean handlesMessage(MessageHandler handler, Class<?>... messageTypes) {
Class<?>[] handledMessages = handler.getHandledMessages();
int handledLength = handledMessages.length;
int handledLengthMinusVarArg = handledLength-1;
int messagesLength = messageTypes.length;
// do we even have enough to even CHECK the var-arg?
if (messagesLength < handledLengthMinusVarArg) {
// totally wrong number of args
return false;
}
// check BEFORE var-arg in handler (var-arg can ONLY be last element in array)
if (handledLengthMinusVarArg <= messagesLength) {
if (handler.acceptsSubtypes()) {
for (int i = 0; i < handledLengthMinusVarArg; i++) {
Class<?> handledMessage = handledMessages[i];
Class<?> messageType = messageTypes[i];
if (!handledMessage.isAssignableFrom(messageType)) {
return false;
}
}
} else {
for (int i = 0; i < handledLengthMinusVarArg; i++) {
Class<?> handledMessage = handledMessages[i];
Class<?> messageType = messageTypes[i];
if (handledMessage != messageType) {
return false;
}
}
}
}
// do we even HAVE var-arg?
if (!handledMessages[handledLengthMinusVarArg].isArray()) {
// DO NOT HAVE VAR_ARG PRESENT IN HANDLERS
// fast exit
if (handledLength != messagesLength) {
return false;
}
// compare remaining arg
Class<?> handledMessage = handledMessages[handledLengthMinusVarArg];
Class<?> messageType = messageTypes[handledLengthMinusVarArg];
if (handler.acceptsSubtypes()) {
if (!handledMessage.isAssignableFrom(messageType)) {
return false;
}
} else {
if (handledMessage != messageType) {
return false;
}
}
// all args are dandy
return true;
}
// WE HAVE VAR_ARG PRESENT IN HANDLER
// do we have enough args to NEED to check the var-arg?
if (handledLengthMinusVarArg == messagesLength) {
// var-arg doesn't need checking
return true;
}
// then check var-arg in handler
// all the args to check for the var-arg MUST be the same! (ONLY ONE ARRAY THOUGH CAN BE PRESENT)
int messagesLengthMinusVarArg = messagesLength-1;
Class<?> typeCheck = messageTypes[handledLengthMinusVarArg];
for (int i = handledLengthMinusVarArg; i < messagesLength; i++) {
Class<?> t1 = messageTypes[i];
if (t1 != typeCheck) {
return false;
}
}
// if we got this far, then the args are the same type. IF we have more than one, AND they are arrays, NOPE!
if (messagesLength - handledLengthMinusVarArg > 1 && messageTypes[messagesLengthMinusVarArg].isArray()) {
return false;
}
// are we comparing array -> array or string -> array
Class<?> componentType;
if (messageTypes[messagesLengthMinusVarArg].isArray()) {
componentType = handledMessages[handledLengthMinusVarArg];
} else {
componentType = handledMessages[handledLengthMinusVarArg].getComponentType();
}
if (handler.acceptsSubtypes()) {
return componentType.isAssignableFrom(typeCheck);
} else {
return typeCheck == componentType;
}
}
}

View File

@ -1,15 +1,13 @@
package net.engio.mbassy.multi;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.IConcurrentSet;
import net.engio.mbassy.multi.common.WeakConcurrentSet;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import net.engio.mbassy.multi.common.ConcurrentExecutor;
import net.engio.mbassy.multi.common.WeakConcurrentSet;
import org.junit.Test;
/**
*
@ -24,7 +22,7 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{
@Override
protected IConcurrentSet createSet() {
protected Collection createSet() {
return new WeakConcurrentSet();
}
@ -33,10 +31,10 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{
// Assemble
final HashSet<Object> permanentElements = new HashSet<Object>();
final IConcurrentSet testSetWeak = createSet();
final Collection testSetWeak = createSet();
final Random rand = new Random();
for (int i = 0; i < numberOfElements; i++) {
for (int i = 0; i < this.numberOfElements; i++) {
Object candidate = new Object();
if (rand.nextInt() % 3 == 0) {
@ -58,13 +56,13 @@ public class WeakConcurrentSetTest extends ConcurrentSetTest{
System.currentTimeMillis();
}
}
}, numberOfThreads);
}, this.numberOfThreads);
// the set should have cleaned up the garbage collected elements
// it must still contain all of the permanent objects
// since different GC mechanisms can be used (not necessarily full, stop-the-world) not all dead objects
// must have been collected
assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < numberOfElements);
assertTrue(permanentElements.size() <= testSetWeak.size() && testSetWeak.size() < this.numberOfElements);
for (Object test : testSetWeak) {
assertTrue(permanentElements.contains(test));
}