NOT MPMC, LTQ - fastest so far via using non-copy interation for subscriptions
This commit is contained in:
parent
67c68dbc14
commit
40d8bb70bd
|
@ -19,14 +19,16 @@ public class DispatchRunnable implements Runnable {
|
|||
|
||||
private ErrorHandlingSupport errorHandler;
|
||||
private TransferQueue<Object> dispatchQueue;
|
||||
private TransferQueue<SubRunnable> invokeQueue;
|
||||
private SubscriptionManager manager;
|
||||
|
||||
public DispatchRunnable(ErrorHandlingSupport errorHandler, SubscriptionManager subscriptionManager,
|
||||
TransferQueue<Object> dispatchQueue) {
|
||||
TransferQueue<Object> dispatchQueue, TransferQueue<SubRunnable> invokeQueue) {
|
||||
|
||||
this.errorHandler = errorHandler;
|
||||
this.manager = subscriptionManager;
|
||||
this.dispatchQueue = dispatchQueue;
|
||||
this.invokeQueue = invokeQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -34,18 +36,21 @@ public class DispatchRunnable implements Runnable {
|
|||
final SubscriptionManager manager = this.manager;
|
||||
final ErrorHandlingSupport errorHandler = this.errorHandler;
|
||||
final TransferQueue<Object> IN_queue = this.dispatchQueue;
|
||||
final TransferQueue<SubRunnable> OUT_queue = this.invokeQueue;
|
||||
|
||||
final Runnable dummyRunnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
};
|
||||
|
||||
Object message = null;
|
||||
int counter;
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
counter = MultiMBassador.WORK_RUN_BLITZ;
|
||||
counter = 200;
|
||||
while ((message = IN_queue.poll()) == null) {
|
||||
// if (counter > 100) {
|
||||
// --counter;
|
||||
// Thread.yield();
|
||||
// } else
|
||||
if (counter > 0) {
|
||||
--counter;
|
||||
LockSupport.parkNanos(1L);
|
||||
|
@ -55,43 +60,76 @@ public class DispatchRunnable implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("null")
|
||||
Class<?> messageClass = message.getClass();
|
||||
|
||||
manager.readLock();
|
||||
|
||||
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
||||
|
||||
boolean empty = subscriptions.isEmpty();
|
||||
if (empty) {
|
||||
// Dead Event
|
||||
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
|
||||
DeadMessage deadMessage = new DeadMessage(message);
|
||||
message = deadMessage;
|
||||
empty = subscriptions.isEmpty();
|
||||
Collection<Subscription> deadSubscriptions = null;
|
||||
if (empty) {
|
||||
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
|
||||
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
}
|
||||
Collection<Class<?>> superClasses = manager.getSuperClasses(messageClass);
|
||||
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
|
||||
|
||||
manager.readUnLock();
|
||||
|
||||
|
||||
if (!empty) {
|
||||
Object[] vararg = null;
|
||||
for (Subscription sub : subscriptions) {
|
||||
boolean handled = false;
|
||||
if (sub.isVarArg()) {
|
||||
// messageClass will NEVER be an array to begin with, since that will call the multi-arg method
|
||||
if (vararg == null) {
|
||||
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message);
|
||||
}
|
||||
|
||||
// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message));
|
||||
} else if (deadSubscriptions != null) {
|
||||
if (!deadSubscriptions.isEmpty()) {
|
||||
DeadMessage deadMessage = new DeadMessage(message);
|
||||
|
||||
for (Subscription sub : deadSubscriptions) {
|
||||
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, deadMessage);
|
||||
}
|
||||
|
||||
// OUT_queue.put(new InvokeRunnable(errorHandler, deadSubscriptions, deadMessage));
|
||||
}
|
||||
}
|
||||
|
||||
// now get superClasses
|
||||
for (Class<?> superClass : superClasses) {
|
||||
subscriptions = manager.getSubscriptionsByMessageType(superClass);
|
||||
|
||||
if (!subscriptions.isEmpty()) {
|
||||
for (Subscription sub : subscriptions) {
|
||||
sub.publishToSubscriptionSingle(OUT_queue, errorHandler, message);
|
||||
}
|
||||
|
||||
// OUT_queue.put(new InvokeRunnable(errorHandler, subscriptions, message));
|
||||
}
|
||||
}
|
||||
|
||||
// now get varargs
|
||||
if (!varArgs.isEmpty()) {
|
||||
// messy, but the ONLY way to do it.
|
||||
vararg = (Object[]) Array.newInstance(message.getClass(), 1);
|
||||
Object[] vararg = (Object[]) Array.newInstance(message.getClass(), 1);
|
||||
vararg[0] = message;
|
||||
|
||||
Object[] newInstance = new Object[1];
|
||||
newInstance[0] = vararg;
|
||||
vararg = newInstance;
|
||||
}
|
||||
handled = true;
|
||||
sub.publishToSubscription(errorHandler, vararg);
|
||||
|
||||
for (Subscription sub : varArgs) {
|
||||
sub.publishToSubscription(OUT_queue, errorHandler, vararg);
|
||||
}
|
||||
|
||||
if (!handled) {
|
||||
sub.publishToSubscription(errorHandler, message);
|
||||
}
|
||||
}
|
||||
// OUT_queue.put(new InvokeRunnable(errorHandler, varArgs, vararg));
|
||||
}
|
||||
|
||||
// make sure it's synced at this point
|
||||
// OUT_queue.transfer(dummyRunnable);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
package net.engio.mbassy.multi;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
import net.engio.mbassy.multi.common.DeadMessage;
|
||||
import net.engio.mbassy.multi.common.DisruptorThreadFactory;
|
||||
import net.engio.mbassy.multi.common.LinkedTransferQueue;
|
||||
import net.engio.mbassy.multi.common.TransferQueue;
|
||||
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
|
||||
import net.engio.mbassy.multi.error.PublicationError;
|
||||
import net.engio.mbassy.multi.subscription.Subscription;
|
||||
import net.engio.mbassy.multi.subscription.SubscriptionManager;
|
||||
|
||||
/**
|
||||
|
@ -26,7 +31,7 @@ public class MultiMBassador implements IMessageBus {
|
|||
|
||||
private final SubscriptionManager subscriptionManager;
|
||||
|
||||
private final TransferQueue<Object> dispatchQueue;
|
||||
private final TransferQueue<Runnable> dispatchQueue;
|
||||
|
||||
|
||||
// all threads that are available for asynchronous message dispatching
|
||||
|
@ -37,9 +42,6 @@ public class MultiMBassador implements IMessageBus {
|
|||
}
|
||||
|
||||
|
||||
public static final int WORK_RUN_BLITZ = 50;
|
||||
public static final int WORK_RUN_BLITZ_DIV2 = WORK_RUN_BLITZ/2;
|
||||
|
||||
public MultiMBassador(int numberOfThreads) {
|
||||
if (numberOfThreads < 1) {
|
||||
numberOfThreads = 1; // at LEAST 1 threads
|
||||
|
@ -47,17 +49,43 @@ public class MultiMBassador implements IMessageBus {
|
|||
|
||||
|
||||
this.subscriptionManager = new SubscriptionManager();
|
||||
this.dispatchQueue = new LinkedTransferQueue<>();
|
||||
this.dispatchQueue = new LinkedTransferQueue<Runnable>();
|
||||
|
||||
|
||||
int dispatchSize = 8;
|
||||
int dispatchSize = numberOfThreads;
|
||||
this.threads = new ArrayList<Thread>();
|
||||
|
||||
|
||||
DisruptorThreadFactory dispatchThreadFactory = new DisruptorThreadFactory("MB_Dispatch");
|
||||
for (int i = 0; i < dispatchSize; i++) {
|
||||
// each thread will run forever and process incoming message publication requests
|
||||
Runnable runnable = new DispatchRunnable(this, this.subscriptionManager, this.dispatchQueue);
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
TransferQueue<Runnable> IN_QUEUE= MultiMBassador.this.dispatchQueue;
|
||||
Runnable event = null;
|
||||
int counter;
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
counter = 200;
|
||||
while ((event = IN_QUEUE.poll()) == null) {
|
||||
if (counter > 0) {
|
||||
--counter;
|
||||
LockSupport.parkNanos(1L);
|
||||
} else {
|
||||
event = IN_QUEUE.take();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
event.run();
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread runner = dispatchThreadFactory.newThread(runnable);
|
||||
this.threads.add(runner);
|
||||
|
@ -92,7 +120,6 @@ public class MultiMBassador implements IMessageBus {
|
|||
|
||||
@Override
|
||||
public boolean hasPendingMessages() {
|
||||
// return this.dispatch_RingBuffer.remainingCapacity() < this.dispatch_RingBufferSize;
|
||||
return !this.dispatchQueue.isEmpty();
|
||||
}
|
||||
|
||||
|
@ -101,136 +128,144 @@ public class MultiMBassador implements IMessageBus {
|
|||
for (Thread t : this.threads) {
|
||||
t.interrupt();
|
||||
}
|
||||
|
||||
// System.err.println(this.counter);
|
||||
|
||||
// for (InterruptRunnable runnable : this.invokeRunners) {
|
||||
// runnable.stop();
|
||||
// }
|
||||
|
||||
// this.dispatch_Disruptor.shutdown();
|
||||
// this.dispatch_Executor.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void publish(Object message) {
|
||||
// Class<?> messageClass = message.getClass();
|
||||
//
|
||||
// SubscriptionManager manager = this.subscriptionManager;
|
||||
// Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
||||
//
|
||||
// try {
|
||||
Class<?> messageClass = message.getClass();
|
||||
|
||||
SubscriptionManager manager = this.subscriptionManager;
|
||||
// Collection<Subscription> subscriptions = subscriptionManager.getSubscriptionsByMessageType(messageClass);
|
||||
|
||||
manager.readLock();
|
||||
|
||||
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
||||
boolean empty = subscriptions.isEmpty();
|
||||
|
||||
Collection<Subscription> deadSubscriptions = null;
|
||||
if (empty) {
|
||||
// Dead Event. must EXACTLY MATCH (no subclasses or varargs)
|
||||
deadSubscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
}
|
||||
Collection<Class<?>> superClasses = manager.getSuperClasses(messageClass);
|
||||
Collection<Subscription> varArgs = manager.getVarArgs(messageClass);
|
||||
|
||||
manager.readUnLock();
|
||||
|
||||
if (!empty) {
|
||||
for (Subscription sub : subscriptions) {
|
||||
// this catches all exception types
|
||||
sub.publishToSubscription(this, message);
|
||||
}
|
||||
} else if (deadSubscriptions != null && !deadSubscriptions.isEmpty()) {
|
||||
DeadMessage deadMessage = new DeadMessage(message);
|
||||
|
||||
for (Subscription sub : deadSubscriptions) {
|
||||
// this catches all exception types
|
||||
sub.publishToSubscription(this, deadMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// if (subscriptions.isEmpty()) {
|
||||
// // Dead Event
|
||||
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
// // Dead Event. only matches EXACT handlers (no vararg, no subclasses)
|
||||
// subscriptions = this.subscriptionManager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
//
|
||||
// DeadMessage deadMessage = new DeadMessage(message);
|
||||
//
|
||||
// if (!subscriptions.isEmpty()) {
|
||||
// for (Subscription sub : subscriptions) {
|
||||
// // this catches all exception types
|
||||
// sub.publishToSubscription(this, deadMessage);
|
||||
// }
|
||||
// } else {
|
||||
// Object[] vararg = null;
|
||||
//
|
||||
// }
|
||||
// }
|
||||
// else {
|
||||
//// Object[] vararg = null;
|
||||
// for (Subscription sub : subscriptions) {
|
||||
// boolean handled = false;
|
||||
// if (sub.isVarArg()) {
|
||||
// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
|
||||
// if (vararg == null) {
|
||||
// // messy, but the ONLY way to do it.
|
||||
// vararg = (Object[]) Array.newInstance(messageClass, 1);
|
||||
// vararg[0] = message;
|
||||
//
|
||||
// Object[] newInstance = new Object[1];
|
||||
// newInstance[0] = vararg;
|
||||
// vararg = newInstance;
|
||||
// }
|
||||
//
|
||||
// handled = true;
|
||||
// sub.publishToSubscription(this, vararg);
|
||||
// }
|
||||
//
|
||||
// if (!handled) {
|
||||
// // this catches all exception types
|
||||
// sub.publishToSubscription(this, message);
|
||||
//
|
||||
//// if (sub.isVarArg()) {
|
||||
//// // messageClass will NEVER be an array to begin with, since that will call the multi-arg method
|
||||
//// if (vararg == null) {
|
||||
//// // messy, but the ONLY way to do it.
|
||||
//// vararg = (Object[]) Array.newInstance(message.getClass(), 1);
|
||||
//// vararg[0] = message;
|
||||
////
|
||||
//// Object[] newInstance = new Object[1];
|
||||
//// newInstance[0] = vararg;
|
||||
//// vararg = newInstance;
|
||||
//// }
|
||||
////
|
||||
//// // this catches all exception types
|
||||
//// sub.publishToSubscription(this, vararg);
|
||||
//// continue;
|
||||
//// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// } catch (Throwable e) {
|
||||
// handlePublicationError(new PublicationError()
|
||||
// .setMessage("Error during publication of message")
|
||||
// .setCause(e)
|
||||
// .setPublishedObject(message));
|
||||
// }
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void publish(Object message1, Object message2) {
|
||||
// try {
|
||||
// Class<?> messageClass1 = message1.getClass();
|
||||
// Class<?> messageClass2 = message2.getClass();
|
||||
//
|
||||
// SubscriptionManager manager = this.subscriptionManager;
|
||||
// Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
|
||||
//
|
||||
// if (subscriptions == null || subscriptions.isEmpty()) {
|
||||
// // Dead Event
|
||||
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
//
|
||||
// DeadMessage deadMessage = new DeadMessage(message1, message2);
|
||||
//
|
||||
// for (Subscription sub : subscriptions) {
|
||||
// sub.publishToSubscription(this, deadMessage);
|
||||
// }
|
||||
// } else {
|
||||
// Object[] vararg = null;
|
||||
//
|
||||
// for (Subscription sub : subscriptions) {
|
||||
// boolean handled = false;
|
||||
// if (sub.isVarArg()) {
|
||||
// Class<?> class1 = message1.getClass();
|
||||
// Class<?> class2 = message2.getClass();
|
||||
// if (!class1.isArray() && class1 == class2) {
|
||||
// if (vararg == null) {
|
||||
// // messy, but the ONLY way to do it.
|
||||
// vararg = (Object[]) Array.newInstance(class1, 2);
|
||||
// vararg[0] = message1;
|
||||
// vararg[1] = message2;
|
||||
//
|
||||
// Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
|
||||
// newInstance[0] = vararg;
|
||||
// vararg = newInstance;
|
||||
// }
|
||||
//
|
||||
// handled = true;
|
||||
// sub.publishToSubscription(this, vararg);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (!handled) {
|
||||
// sub.publishToSubscription(this, message1, message2);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // if the message did not have any listener/handler accept it
|
||||
// if (subscriptions.isEmpty()) {
|
||||
// // cannot have DeadMessage published to this, so no extra check necessary
|
||||
// // Dead Event
|
||||
// subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
// DeadMessage deadMessage = new DeadMessage(message1, message2);
|
||||
//
|
||||
// for (Subscription sub : subscriptions) {
|
||||
// sub.publishToSubscription(this, deadMessage);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// } catch (Throwable e) {
|
||||
// handlePublicationError(new PublicationError()
|
||||
// .setMessage("Error during publication of message")
|
||||
// .setCause(e)
|
||||
// .setPublishedObject(message1, message2));
|
||||
// }
|
||||
try {
|
||||
Class<?> messageClass1 = message1.getClass();
|
||||
Class<?> messageClass2 = message2.getClass();
|
||||
|
||||
SubscriptionManager manager = this.subscriptionManager;
|
||||
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass1, messageClass2);
|
||||
|
||||
if (subscriptions == null || subscriptions.isEmpty()) {
|
||||
subscriptions = manager.getSubscriptionsByMessageType(DeadMessage.class);
|
||||
|
||||
DeadMessage deadMessage = new DeadMessage(message1, message2);
|
||||
|
||||
for (Subscription sub : subscriptions) {
|
||||
sub.publishToSubscription(this, deadMessage);
|
||||
}
|
||||
}
|
||||
else {
|
||||
Object[] vararg = null;
|
||||
|
||||
for (Subscription sub : subscriptions) {
|
||||
if (sub.isVarArg()) {
|
||||
Class<?> class1 = message1.getClass();
|
||||
Class<?> class2 = message2.getClass();
|
||||
if (!class1.isArray() && class1 == class2) {
|
||||
if (vararg == null) {
|
||||
// messy, but the ONLY way to do it.
|
||||
vararg = (Object[]) Array.newInstance(class1, 2);
|
||||
vararg[0] = message1;
|
||||
vararg[1] = message2;
|
||||
|
||||
Object[] newInstance = (Object[]) Array.newInstance(vararg.getClass(), 1);
|
||||
newInstance[0] = vararg;
|
||||
vararg = newInstance;
|
||||
}
|
||||
|
||||
sub.publishToSubscription(this, vararg);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
sub.publishToSubscription(this, message1, message2);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
handlePublicationError(new PublicationError()
|
||||
.setMessage("Error during publication of message")
|
||||
.setCause(e)
|
||||
.setPublishedObject(message1, message2));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -387,51 +422,30 @@ public class MultiMBassador implements IMessageBus {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void publishAsync(Object message) {
|
||||
public void publishAsync(final Object message) {
|
||||
if (message != null) {
|
||||
// // put this on the disruptor ring buffer
|
||||
// final RingBuffer<DispatchHolder> ringBuffer = this.dispatch_RingBuffer;
|
||||
//
|
||||
// // setup the job
|
||||
// final long seq = ringBuffer.next();
|
||||
// try {
|
||||
// DispatchHolder eventJob = ringBuffer.get(seq);
|
||||
// eventJob.messageType = MessageType.ONE;
|
||||
// eventJob.message1 = message;
|
||||
// } catch (Throwable e) {
|
||||
// handlePublicationError(new PublicationError()
|
||||
// .setMessage("Error while adding an asynchronous message")
|
||||
// .setCause(e)
|
||||
// .setPublishedObject(message));
|
||||
// } finally {
|
||||
// // always publish the job
|
||||
// ringBuffer.publish(seq);
|
||||
// }
|
||||
|
||||
// MessageHolder messageHolder = new MessageHolder();
|
||||
// messageHolder.messageType = MessageType.ONE;
|
||||
// messageHolder.message1 = message;
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
MultiMBassador.this.publish(message);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// new Runnable() {
|
||||
// @Override
|
||||
// public void run() {
|
||||
//
|
||||
// }
|
||||
// };
|
||||
|
||||
// faster if we can skip locking
|
||||
// int counter = 200;
|
||||
// while (!this.dispatchQueue.offer(message)) {
|
||||
// if (counter > 100) {
|
||||
// --counter;
|
||||
// Thread.yield();
|
||||
// } else if (counter > 0) {
|
||||
//// if (counter > 100) {
|
||||
//// --counter;
|
||||
//// Thread.yield();
|
||||
//// } else
|
||||
// if (counter > 0) {
|
||||
// --counter;
|
||||
// LockSupport.parkNanos(1L);
|
||||
// } else {
|
||||
try {
|
||||
this.dispatchQueue.transfer(message);
|
||||
this.dispatchQueue.transfer(runnable);
|
||||
return;
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
|
@ -442,9 +456,9 @@ public class MultiMBassador implements IMessageBus {
|
|||
.setCause(e)
|
||||
.setPublishedObject(message));
|
||||
}
|
||||
// }
|
||||
// }
|
||||
}
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
package net.engio.mbassy.multi.common;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
|
||||
|
||||
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
|
||||
|
||||
/**
|
||||
|
@ -14,7 +17,7 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
|
|||
* @author bennidi
|
||||
* Date: 2/12/12
|
||||
*/
|
||||
public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
|
||||
public abstract class AbstractConcurrentSet<T> implements Collection<T> {
|
||||
|
||||
// Internal state
|
||||
protected final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
|
||||
|
@ -28,21 +31,25 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
|
|||
protected abstract Entry<T> createEntry(T value, Entry<T> next);
|
||||
|
||||
@Override
|
||||
public void add(T element) {
|
||||
public boolean add(T element) {
|
||||
if (element == null) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
boolean changed = false;
|
||||
|
||||
Lock writeLock = this.lock.writeLock();
|
||||
writeLock.lock();
|
||||
if (this.entries.containsKey(element)) {
|
||||
} else {
|
||||
if (!this.entries.containsKey(element)) {
|
||||
insert(element);
|
||||
changed = true;
|
||||
}
|
||||
writeLock.unlock();
|
||||
|
||||
return changed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(T element) {
|
||||
public boolean contains(Object element) {
|
||||
Lock readLock = this.lock.readLock();
|
||||
ISetEntry<T> entry;
|
||||
try {
|
||||
|
@ -73,42 +80,40 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addAll(Iterable<T> elements) {
|
||||
public boolean addAll(Collection<? extends T> elements) {
|
||||
boolean changed = false;
|
||||
Lock writeLock = this.lock.writeLock();
|
||||
try {
|
||||
writeLock.lock();
|
||||
for (T element : elements) {
|
||||
if (element != null) {
|
||||
insert(element);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return TRUE if the element was successfully removed
|
||||
*/
|
||||
@Override
|
||||
public boolean remove(T element) {
|
||||
public boolean remove(Object element) {
|
||||
|
||||
Lock updateLock = this.lock.updateLock();
|
||||
boolean isNull;
|
||||
try {
|
||||
updateLock.lock();
|
||||
ISetEntry<T> entry = this.entries.get(element);
|
||||
|
||||
isNull = entry == null || entry.getValue() == null;
|
||||
if (isNull) {
|
||||
if (entry != null && entry.getValue() != null) {
|
||||
Lock writeLock = this.lock.writeLock();
|
||||
try {
|
||||
writeLock.lock();
|
||||
ISetEntry<T> listelement = this.entries.get(element);
|
||||
if (listelement == null) {
|
||||
return false; //removed by other thread in the meantime
|
||||
} else if (listelement != this.head) {
|
||||
listelement.remove();
|
||||
if (entry != this.head) {
|
||||
entry.remove();
|
||||
} else {
|
||||
// if it was second, now it's first
|
||||
this.head = this.head.next();
|
||||
|
@ -127,6 +132,37 @@ public abstract class AbstractConcurrentSet<T> implements IConcurrentSet<T> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@SuppressWarnings("hiding")
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
|
||||
public abstract static class Entry<T> implements ISetEntry<T> {
|
||||
|
||||
|
|
|
@ -1,19 +1,24 @@
|
|||
package net.engio.mbassy.multi.common;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Todo: Add javadoc
|
||||
*
|
||||
* @author bennidi
|
||||
* Date: 3/29/13
|
||||
*/
|
||||
public interface IConcurrentSet<T> extends Iterable<T> {
|
||||
public interface IConcurrentSet<T> extends Collection<T> {
|
||||
|
||||
void add(T element);
|
||||
@Override
|
||||
boolean add(T element);
|
||||
|
||||
boolean contains(T element);
|
||||
boolean contains(Object element);
|
||||
|
||||
@Override
|
||||
int size();
|
||||
|
||||
@Override
|
||||
boolean isEmpty();
|
||||
|
||||
void addAll(Iterable<T> elements);
|
||||
|
@ -21,5 +26,5 @@ public interface IConcurrentSet<T> extends Iterable<T> {
|
|||
/**
|
||||
* @return TRUE if the element was removed
|
||||
*/
|
||||
boolean remove(T element);
|
||||
boolean remove(Object element);
|
||||
}
|
||||
|
|
|
@ -13,7 +13,11 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
|
|||
|
||||
|
||||
public StrongConcurrentSet() {
|
||||
super(new IdentityHashMap<T, ISetEntry<T>>());
|
||||
this(16);
|
||||
}
|
||||
|
||||
public StrongConcurrentSet(int size) {
|
||||
super(new IdentityHashMap<T, ISetEntry<T>>(size));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -3,8 +3,8 @@ package net.engio.mbassy.multi.subscription;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import net.engio.mbassy.multi.common.IConcurrentSet;
|
||||
import net.engio.mbassy.multi.common.StrongConcurrentSet;
|
||||
import net.engio.mbassy.multi.dispatch.IHandlerInvocation;
|
||||
import net.engio.mbassy.multi.dispatch.ReflectiveHandlerInvocation;
|
||||
|
@ -34,22 +34,15 @@ public class Subscription {
|
|||
private final MessageHandler handlerMetadata;
|
||||
|
||||
private final IHandlerInvocation invocation;
|
||||
// protected final Collection<Object> listeners;
|
||||
// protected final Map<WeakReference<Object>, Boolean> listeners;
|
||||
// protected final Map<Object, Boolean> listeners;
|
||||
protected final IConcurrentSet<Object> listeners;
|
||||
private final Collection<Object> listeners;
|
||||
|
||||
Subscription(MessageHandler handler) {
|
||||
// this.listeners = new WeakConcurrentSet<Object>();
|
||||
this.listeners = new StrongConcurrentSet<Object>();
|
||||
// this.listeners = new ConcurrentHashMap<Object, Boolean>();
|
||||
// this.listeners = new CopyOnWriteArrayList<Object>();
|
||||
// this.listeners = new ConcurrentSkipListSet<Object>();
|
||||
// this.listeners = new ConcurrentWeakHashMap<WeakReference<Object>, Boolean>();
|
||||
this.handlerMetadata = handler;
|
||||
|
||||
IHandlerInvocation invocation = new ReflectiveHandlerInvocation();
|
||||
if (handler.isSynchronized()){
|
||||
if (handler.isSynchronized()) {
|
||||
invocation = new SynchronizedHandlerInvocation(invocation);
|
||||
}
|
||||
|
||||
|
@ -114,11 +107,6 @@ public class Subscription {
|
|||
* @return TRUE if the element was removed
|
||||
*/
|
||||
public boolean unsubscribe(Object existingListener) {
|
||||
// Boolean remove = this.listeners.remove(existingListener);
|
||||
// if (remove != null) {
|
||||
// return true;
|
||||
// }
|
||||
// return false;
|
||||
return this.listeners.remove(existingListener);
|
||||
}
|
||||
|
||||
|
@ -133,9 +121,7 @@ public class Subscription {
|
|||
|
||||
// private AtomicLong counter = new AtomicLong();
|
||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
|
||||
// Collection<Object> listeners = this.listeners.keySet();
|
||||
// Collection<Object> listeners = this.listeners;
|
||||
IConcurrentSet<Object> listeners = this.listeners;
|
||||
Collection<Object> listeners = this.listeners;
|
||||
|
||||
if (listeners.size() > 0) {
|
||||
Method handler = this.handlerMetadata.getHandler();
|
||||
|
@ -185,8 +171,8 @@ public class Subscription {
|
|||
|
||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
|
||||
// Collection<Object> listeners = this.listeners.keySet();
|
||||
// Collection<Object> listeners = this.listeners;
|
||||
IConcurrentSet<Object> listeners = this.listeners;
|
||||
Collection<Object> listeners = this.listeners;
|
||||
// IConcurrentSet<Object> listeners = this.listeners;
|
||||
|
||||
if (listeners.size() > 0) {
|
||||
Method handler = this.handlerMetadata.getHandler();
|
||||
|
@ -238,8 +224,8 @@ public class Subscription {
|
|||
|
||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
|
||||
// Collection<Object> listeners = this.listeners.keySet();
|
||||
// Collection<Object> listeners = this.listeners;
|
||||
IConcurrentSet<Object> listeners = this.listeners;
|
||||
Collection<Object> listeners = this.listeners;
|
||||
// IConcurrentSet<Object> listeners = this.listeners;
|
||||
|
||||
if (listeners.size() > 0) {
|
||||
Method handler = this.handlerMetadata.getHandler();
|
||||
|
@ -293,8 +279,8 @@ public class Subscription {
|
|||
|
||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object... messages) {
|
||||
// Collection<Object> listeners = this.listeners.keySet();
|
||||
// Collection<Object> listeners = this.listeners;
|
||||
IConcurrentSet<Object> listeners = this.listeners;
|
||||
Collection<Object> listeners = this.listeners;
|
||||
// IConcurrentSet<Object> listeners = this.listeners;
|
||||
|
||||
if (listeners.size() > 0) {
|
||||
Method handler = this.handlerMetadata.getHandler();
|
||||
|
@ -327,6 +313,9 @@ public class Subscription {
|
|||
.setMethodName(handler.getName())
|
||||
.setListener(listener)
|
||||
.setPublishedObject(messages));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
errorHandler.handlePublicationError(new PublicationError()
|
||||
.setMessage("Error during invocation of message handler. " +
|
||||
|
|
|
@ -4,6 +4,7 @@ import java.lang.reflect.Array;
|
|||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -13,6 +14,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import net.engio.mbassy.multi.common.IdentityObjectTree;
|
||||
import net.engio.mbassy.multi.common.ReflectionUtils;
|
||||
import net.engio.mbassy.multi.common.StrongConcurrentSet;
|
||||
import net.engio.mbassy.multi.listener.MessageHandler;
|
||||
import net.engio.mbassy.multi.listener.MetadataReader;
|
||||
|
||||
|
@ -31,18 +33,13 @@ import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
|
|||
*/
|
||||
public class SubscriptionManager {
|
||||
|
||||
public static class SubHolder {
|
||||
public int count = 0;
|
||||
public Collection<Subscription> subs = new ArrayDeque<Subscription>(0);
|
||||
}
|
||||
|
||||
// the metadata reader that is used to inspect objects passed to the subscribe method
|
||||
private final MetadataReader metadataReader = new MetadataReader();
|
||||
|
||||
// all subscriptions per message type
|
||||
// this is the primary list for dispatching a specific message
|
||||
// write access is synchronized and happens only when a listener of a specific class is registered the first time
|
||||
private final Map<Class<?>, SubHolder> subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, SubHolder>(50);
|
||||
private final Map<Class<?>, Collection<Subscription>> subscriptionsPerMessageSingle = new IdentityHashMap<Class<?>, Collection<Subscription>>(50);
|
||||
private final IdentityObjectTree<Class<?>, Collection<Subscription>> subscriptionsPerMessageMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
||||
|
||||
// all subscriptions per messageHandler type
|
||||
|
@ -94,10 +91,8 @@ public class SubscriptionManager {
|
|||
// single
|
||||
Class<?> clazz = handledMessageTypes[0];
|
||||
|
||||
// NOTE: Not thread-safe! must be synchronized in outer scope
|
||||
SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz);
|
||||
if (subHolder != null) {
|
||||
Collection<Subscription> subs = subHolder.subs;
|
||||
// NOTE: Order is important for safe publication
|
||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
|
||||
if (subs != null) {
|
||||
subs.remove(subscription);
|
||||
|
||||
|
@ -106,7 +101,6 @@ public class SubscriptionManager {
|
|||
this.subscriptionsPerMessageSingle.remove(clazz);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// NOTE: Not thread-safe! must be synchronized in outer scope
|
||||
IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
|
||||
|
@ -188,7 +182,7 @@ public class SubscriptionManager {
|
|||
}
|
||||
|
||||
// it's SAFE to use non-concurrent collection here (read only). Same thread LOCKS on this with a write lock
|
||||
subscriptions = new ArrayDeque<Subscription>(messageHandlers.size());
|
||||
subscriptions = new StrongConcurrentSet<Subscription>(messageHandlers.size());
|
||||
|
||||
// create subscriptions for all detected message handlers
|
||||
for (MessageHandler messageHandler : messageHandlers) {
|
||||
|
@ -203,15 +197,15 @@ public class SubscriptionManager {
|
|||
// single
|
||||
Class<?> clazz = handledMessageTypes[0];
|
||||
|
||||
// NOTE: Not thread-safe! must be synchronized in outer scope
|
||||
SubHolder subHolder = this.subscriptionsPerMessageSingle.get(clazz);
|
||||
if (subHolder == null) {
|
||||
subHolder = new SubHolder();
|
||||
this.subscriptionsPerMessageSingle.put(clazz, subHolder);
|
||||
}
|
||||
Collection<Subscription> subs = subHolder.subs;
|
||||
// NOTE: Order is important for safe publication
|
||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
|
||||
if (subs == null) {
|
||||
subs = new StrongConcurrentSet<Subscription>(2);
|
||||
subs.add(subscription);
|
||||
subHolder.count++;
|
||||
this.subscriptionsPerMessageSingle.put(clazz, subs);
|
||||
} else {
|
||||
subs.add(subscription);
|
||||
}
|
||||
|
||||
// have to save our the VarArg class types, because creating var-arg arrays for objects is expensive
|
||||
if (subscription.isVarArg()) {
|
||||
|
@ -221,7 +215,7 @@ public class SubscriptionManager {
|
|||
// since it's vararg, this means that it's an ARRAY, so we ALSO
|
||||
// have to add the component classes of the array
|
||||
if (subscription.acceptsSubtypes()) {
|
||||
ArrayList<Class<?>> setupSuperClassCache2 = superClassCache(componentType);
|
||||
ArrayList<Class<?>> setupSuperClassCache2 = setupSuperClassCache(componentType);
|
||||
// have to setup each vararg chain
|
||||
for (int i = 0; i < setupSuperClassCache2.size(); i++) {
|
||||
Class<?> superClass = setupSuperClassCache2.get(i);
|
||||
|
@ -234,7 +228,7 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
} else if (subscription.acceptsSubtypes()) {
|
||||
superClassCache(clazz);
|
||||
setupSuperClassCache(clazz);
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
@ -245,17 +239,17 @@ public class SubscriptionManager {
|
|||
case 2: {
|
||||
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1]);
|
||||
if (subscription.acceptsSubtypes()) {
|
||||
superClassCache(handledMessageTypes[0]);
|
||||
superClassCache(handledMessageTypes[1]);
|
||||
setupSuperClassCache(handledMessageTypes[0]);
|
||||
setupSuperClassCache(handledMessageTypes[1]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 3: {
|
||||
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes[0], handledMessageTypes[1], handledMessageTypes[2]);
|
||||
if (subscription.acceptsSubtypes()) {
|
||||
superClassCache(handledMessageTypes[0]);
|
||||
superClassCache(handledMessageTypes[1]);
|
||||
superClassCache(handledMessageTypes[2]);
|
||||
setupSuperClassCache(handledMessageTypes[0]);
|
||||
setupSuperClassCache(handledMessageTypes[1]);
|
||||
setupSuperClassCache(handledMessageTypes[2]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -263,7 +257,7 @@ public class SubscriptionManager {
|
|||
tree = this.subscriptionsPerMessageMulti.createLeaf(handledMessageTypes);
|
||||
if (subscription.acceptsSubtypes()) {
|
||||
for (Class<?> c : handledMessageTypes) {
|
||||
superClassCache(c);
|
||||
setupSuperClassCache(c);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -291,10 +285,24 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
|
||||
private final Collection<Subscription> EMPTY_LIST = Collections.emptyList();
|
||||
|
||||
|
||||
// cannot return null, not thread safe.
|
||||
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
|
||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType);
|
||||
if (subs != null) {
|
||||
|
||||
return subs;
|
||||
} else {
|
||||
return this.EMPTY_LIST;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// obtain the set of subscriptions for the given message type
|
||||
// Note: never returns null!
|
||||
public Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
|
||||
public Collection<Subscription> DEPRECATED_getSubscriptionsByMessageType(Class<?> messageType) {
|
||||
// thread safe publication
|
||||
Collection<Subscription> subscriptions;
|
||||
|
||||
|
@ -302,32 +310,22 @@ public class SubscriptionManager {
|
|||
this.LOCK.readLock().lock();
|
||||
|
||||
int count = 0;
|
||||
Collection<Subscription> subs;
|
||||
SubHolder primaryHolder = this.subscriptionsPerMessageSingle.get(messageType);
|
||||
if (primaryHolder != null) {
|
||||
subscriptions = new ArrayDeque<Subscription>(count);
|
||||
subs = primaryHolder.subs;
|
||||
count = primaryHolder.count;
|
||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(messageType);
|
||||
if (subs != null) {
|
||||
subscriptions = new ArrayDeque<Subscription>(count);
|
||||
subscriptions.addAll(subs);
|
||||
}
|
||||
} else {
|
||||
subscriptions = new ArrayDeque<Subscription>(16);
|
||||
}
|
||||
|
||||
// also add all subscriptions that match super types
|
||||
SubHolder subHolder;
|
||||
ArrayList<Class<?>> types1 = superClassCache(messageType);
|
||||
ArrayList<Class<?>> types1 = setupSuperClassCache(messageType);
|
||||
if (types1 != null) {
|
||||
Class<?> eventSuperType;
|
||||
int i;
|
||||
for (i = 0; i < types1.size(); i++) {
|
||||
eventSuperType = types1.get(i);
|
||||
subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType);
|
||||
if (subHolder != null) {
|
||||
subs = subHolder.subs;
|
||||
count += subHolder.count;
|
||||
|
||||
subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
|
||||
if (subs != null) {
|
||||
for (Subscription sub : subs) {
|
||||
if (sub.handlesMessageType(messageType)) {
|
||||
|
@ -335,17 +333,12 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
count += addVarArgClass(subscriptions, eventSuperType);
|
||||
addVarArgClass(subscriptions, eventSuperType);
|
||||
}
|
||||
}
|
||||
|
||||
count += addVarArgClass(subscriptions, messageType);
|
||||
addVarArgClass(subscriptions, messageType);
|
||||
|
||||
if (primaryHolder != null) {
|
||||
// save off our count, so our collection creation size is optimal.
|
||||
primaryHolder.count = count;
|
||||
}
|
||||
} finally {
|
||||
this.LOCK.readLock().unlock();
|
||||
}
|
||||
|
@ -364,8 +357,8 @@ public class SubscriptionManager {
|
|||
this.LOCK.readLock().lock();
|
||||
|
||||
// also add all subscriptions that match super types
|
||||
ArrayList<Class<?>> types1 = superClassCache(messageType1);
|
||||
ArrayList<Class<?>> types2 = superClassCache(messageType2);
|
||||
ArrayList<Class<?>> types1 = setupSuperClassCache(messageType1);
|
||||
ArrayList<Class<?>> types2 = setupSuperClassCache(messageType2);
|
||||
|
||||
Collection<Subscription> subs;
|
||||
Class<?> eventSuperType1 = messageType1;
|
||||
|
@ -427,9 +420,9 @@ public class SubscriptionManager {
|
|||
this.LOCK.readLock().lock();
|
||||
|
||||
// also add all subscriptions that match super types
|
||||
ArrayList<Class<?>> types1 = superClassCache(messageType1);
|
||||
ArrayList<Class<?>> types2 = superClassCache(messageType2);
|
||||
ArrayList<Class<?>> types3 = superClassCache(messageType3);
|
||||
ArrayList<Class<?>> types1 = setupSuperClassCache(messageType1);
|
||||
ArrayList<Class<?>> types2 = setupSuperClassCache(messageType2);
|
||||
ArrayList<Class<?>> types3 = setupSuperClassCache(messageType3);
|
||||
|
||||
Class<?> eventSuperType1 = messageType1;
|
||||
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf1;
|
||||
|
@ -513,7 +506,6 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
|
||||
SubHolder subHolder;
|
||||
int size = messageTypes.length;
|
||||
if (size > 0) {
|
||||
boolean allSameType = true;
|
||||
|
@ -537,7 +529,7 @@ public class SubscriptionManager {
|
|||
if (allSameType) {
|
||||
// do we have a var-arg (it shows as an array) subscribed?
|
||||
|
||||
ArrayList<Class<?>> superClasses = superClassCache(firstType);
|
||||
ArrayList<Class<?>> superClasses = setupSuperClassCache(firstType);
|
||||
|
||||
Class<?> eventSuperType = firstType;
|
||||
int j;
|
||||
|
@ -552,10 +544,7 @@ public class SubscriptionManager {
|
|||
eventSuperType = Array.newInstance(eventSuperType, 1).getClass();
|
||||
|
||||
// also add all subscriptions that match super types
|
||||
subHolder = this.subscriptionsPerMessageSingle.get(eventSuperType);
|
||||
if (subHolder != null) {
|
||||
subs = subHolder.subs;
|
||||
count += subHolder.count;
|
||||
subs = this.subscriptionsPerMessageSingle.get(eventSuperType);
|
||||
if (subs != null) {
|
||||
for (Subscription sub : subs) {
|
||||
subscriptions.add(sub);
|
||||
|
@ -564,7 +553,6 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -576,12 +564,30 @@ public class SubscriptionManager {
|
|||
return subscriptions;
|
||||
}
|
||||
|
||||
private ArrayList<Class<?>> superClassCache(Class<?> clazz) {
|
||||
|
||||
private final Collection<Class<?>> EMPTY_LIST_CLASSES = Collections.emptyList();
|
||||
// must be protected by read lock
|
||||
public Collection<Class<?>> getSuperClasses(Class<?> clazz) {
|
||||
// not thread safe. DO NOT MODIFY
|
||||
ArrayList<Class<?>> types = this.superClassesCache.get(clazz);
|
||||
|
||||
if (types != null) {
|
||||
return types;
|
||||
}
|
||||
|
||||
return this.EMPTY_LIST_CLASSES;
|
||||
}
|
||||
|
||||
|
||||
// not a thread safe collection. must be locked by caller
|
||||
private ArrayList<Class<?>> setupSuperClassCache(Class<?> clazz) {
|
||||
ArrayList<Class<?>> types = this.superClassesCache.get(clazz);
|
||||
|
||||
if (types == null) {
|
||||
// it doesn't matter if concurrent access stomps on values, since they are always the same.
|
||||
Set<Class<?>> superTypes = ReflectionUtils.getSuperTypes(clazz);
|
||||
types = new ArrayList<Class<?>>(superTypes);
|
||||
// NOTE: no need to write lock, since race conditions will result in duplicate answers
|
||||
this.superClassesCache.put(clazz, types);
|
||||
}
|
||||
|
||||
|
@ -593,19 +599,14 @@ public class SubscriptionManager {
|
|||
///////////////
|
||||
// a var-arg handler might match
|
||||
///////////////
|
||||
private int addVarArgClass(Collection<Subscription> subscriptions, Class<?> messageType) {
|
||||
private void addVarArgClass(Collection<Subscription> subscriptions, Class<?> messageType) {
|
||||
// tricky part. We have to check the ARRAY version
|
||||
SubHolder subHolder;
|
||||
Collection<Subscription> subs;
|
||||
int count = 0;
|
||||
|
||||
Class<?> varArgClass = this.varArgClasses.get(messageType);
|
||||
if (varArgClass != null) {
|
||||
// also add all subscriptions that match super types
|
||||
subHolder = this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
if (subHolder != null) {
|
||||
subs = subHolder.subs;
|
||||
count += subHolder.count;
|
||||
subs = this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
if (subs != null) {
|
||||
for (Subscription sub : subs) {
|
||||
subscriptions.add(sub);
|
||||
|
@ -613,47 +614,43 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
return count;
|
||||
|
||||
// must be protected by read lock
|
||||
public Collection<Subscription> getVarArgs(Class<?> clazz) {
|
||||
Class<?> varArgClass = this.varArgClasses.get(clazz);
|
||||
if (varArgClass != null) {
|
||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
if (subs != null) {
|
||||
return subs;
|
||||
}
|
||||
}
|
||||
|
||||
public Class<?> getVarArg(Class<?> clazz) {
|
||||
return this.varArgClasses.get(clazz);
|
||||
return this.EMPTY_LIST;
|
||||
}
|
||||
|
||||
///////////////
|
||||
// a var-arg handler might match
|
||||
// tricky part. We have to check the ARRAY version
|
||||
///////////////
|
||||
private int addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayList<Class<?>> types1) {
|
||||
private void addVarArgClasses(Collection<Subscription> subscriptions, Class<?> messageType, ArrayList<Class<?>> types1) {
|
||||
Collection<Subscription> subs;
|
||||
SubHolder subHolder;
|
||||
int count = 0;
|
||||
|
||||
Class<?> varArgClass = this.varArgClasses.get(messageType);
|
||||
if (varArgClass != null) {
|
||||
// also add all subscriptions that match super types
|
||||
subHolder = this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
if (subHolder != null) {
|
||||
subs = subHolder.subs;
|
||||
count += subHolder.count;
|
||||
|
||||
subs = this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
if (subs != null) {
|
||||
for (Subscription sub : subs) {
|
||||
subscriptions.add(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Class<?> eventSuperType : types1) {
|
||||
varArgClass = this.varArgClasses.get(eventSuperType);
|
||||
if (varArgClass != null) {
|
||||
// also add all subscriptions that match super types
|
||||
subHolder = this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
if (subHolder != null) {
|
||||
subs = subHolder.subs;
|
||||
count += subHolder.count;
|
||||
|
||||
subs = this.subscriptionsPerMessageSingle.get(varArgClass);
|
||||
if (subs != null) {
|
||||
for (Subscription sub : subs) {
|
||||
subscriptions.add(sub);
|
||||
|
@ -663,15 +660,12 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
private void getSubsVarArg(Collection<Subscription> subscriptions, int length, int index,
|
||||
IdentityObjectTree<Class<?>, Collection<Subscription>> tree, Class<?>[] messageTypes) {
|
||||
|
||||
Class<?> classType = messageTypes[index];
|
||||
// get all the super types, if there are any.
|
||||
ArrayList<Class<?>> superClasses = superClassCache(classType);
|
||||
ArrayList<Class<?>> superClasses = setupSuperClassCache(classType);
|
||||
|
||||
IdentityObjectTree<Class<?>, Collection<Subscription>> leaf;
|
||||
Collection<Subscription> subs;
|
||||
|
@ -702,4 +696,12 @@ public class SubscriptionManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void readLock() {
|
||||
this.LOCK.readLock().lock();
|
||||
}
|
||||
|
||||
public void readUnLock() {
|
||||
this.LOCK.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user