Changed superClassSubs to use fast iterator
This commit is contained in:
parent
13cee46f6c
commit
758a1c837a
@ -1,5 +1,9 @@
|
|||||||
package net.engio.mbassy.multi;
|
package net.engio.mbassy.multi;
|
||||||
|
|
||||||
|
import it.unimi.dsi.fastutil.objects.ObjectIterator;
|
||||||
|
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.Entry;
|
||||||
|
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -36,7 +40,7 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
private List<Thread> threads;
|
private List<Thread> threads;
|
||||||
|
|
||||||
public MultiMBassador() {
|
public MultiMBassador() {
|
||||||
this(Runtime.getRuntime().availableProcessors());
|
this(Runtime.getRuntime().availableProcessors()*2);
|
||||||
// this(2);
|
// this(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,9 +134,8 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
SubscriptionManager manager = this.subscriptionManager;
|
SubscriptionManager manager = this.subscriptionManager;
|
||||||
|
|
||||||
Class<?> messageClass = message.getClass();
|
Class<?> messageClass = message.getClass();
|
||||||
|
|
||||||
|
|
||||||
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
Collection<Subscription> subscriptions = manager.getSubscriptionsByMessageType(messageClass);
|
||||||
|
|
||||||
// Run subscriptions
|
// Run subscriptions
|
||||||
if (subscriptions != null && !subscriptions.isEmpty()) {
|
if (subscriptions != null && !subscriptions.isEmpty()) {
|
||||||
for (Subscription sub : subscriptions) {
|
for (Subscription sub : subscriptions) {
|
||||||
@ -152,10 +155,13 @@ public class MultiMBassador implements IMessageBus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
|
FastEntrySet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
|
||||||
// now get superClasses
|
// now get superClasses
|
||||||
if (superSubscriptions != null) {
|
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
|
||||||
for (Subscription sub : superSubscriptions) {
|
ObjectIterator<Entry<Subscription>> fastIterator = superSubscriptions.fastIterator();
|
||||||
|
|
||||||
|
while (fastIterator.hasNext()) {
|
||||||
|
Subscription sub = fastIterator.next().getKey();
|
||||||
// this catches all exception types
|
// this catches all exception types
|
||||||
sub.publishToSubscription(this, message);
|
sub.publishToSubscription(this, message);
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ public class Subscription {
|
|||||||
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
|
public void publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
|
||||||
Collection<Object> listeners = this.listeners;
|
Collection<Object> listeners = this.listeners;
|
||||||
|
|
||||||
if (listeners.size() > 0) {
|
if (!listeners.isEmpty()) {
|
||||||
Method handler = this.handlerMetadata.getHandler();
|
Method handler = this.handlerMetadata.getHandler();
|
||||||
IHandlerInvocation invocation = this.invocation;
|
IHandlerInvocation invocation = this.invocation;
|
||||||
|
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
package net.engio.mbassy.multi.subscription;
|
package net.engio.mbassy.multi.subscription;
|
||||||
|
|
||||||
|
import it.unimi.dsi.fastutil.objects.Reference2BooleanArrayMap;
|
||||||
|
import it.unimi.dsi.fastutil.objects.Reference2BooleanMap.FastEntrySet;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
@ -7,6 +10,7 @@ import java.util.Map;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import net.engio.mbassy.multi.common.IdentityObjectTree;
|
import net.engio.mbassy.multi.common.IdentityObjectTree;
|
||||||
import net.engio.mbassy.multi.common.ReflectionUtils;
|
import net.engio.mbassy.multi.common.ReflectionUtils;
|
||||||
@ -15,8 +19,6 @@ import net.engio.mbassy.multi.listener.MessageHandler;
|
|||||||
import net.engio.mbassy.multi.listener.MetadataReader;
|
import net.engio.mbassy.multi.listener.MetadataReader;
|
||||||
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
|
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
|
||||||
|
|
||||||
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
|
* The subscription managers responsibility is to consistently handle and synchronize the message listener subscription process.
|
||||||
* It provides fast lookup of existing subscriptions when another instance of an already known
|
* It provides fast lookup of existing subscriptions when another instance of an already known
|
||||||
@ -56,7 +58,7 @@ public class SubscriptionManager {
|
|||||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
|
// it's a hit on SUB/UNSUB, but REALLY improves performance on handlers
|
||||||
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
|
// it's faster to create a new one for SUB/UNSUB than it is to clear() on the original one
|
||||||
private volatile Map<Class<?>, Collection<Subscription>> superClassSubscriptions;
|
private volatile Map<Class<?>, FastEntrySet<Subscription>> superClassSubscriptions;
|
||||||
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
// private final IdentityObjectTree<Class<?>, Collection<Subscription>> superClassSubscriptionsMulti = new IdentityObjectTree<Class<?>, Collection<Subscription>>();
|
||||||
|
|
||||||
|
|
||||||
@ -64,7 +66,7 @@ public class SubscriptionManager {
|
|||||||
private final Map<Class<?>, Object> nonListeners;
|
private final Map<Class<?>, Object> nonListeners;
|
||||||
|
|
||||||
// synchronize read/write acces to the subscription maps
|
// synchronize read/write acces to the subscription maps
|
||||||
private final ReentrantReadWriteUpdateLock LOCK = new ReentrantReadWriteUpdateLock();
|
private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
public SubscriptionManager(int numberOfThreads) {
|
public SubscriptionManager(int numberOfThreads) {
|
||||||
this.MAP_STRIPING = numberOfThreads;
|
this.MAP_STRIPING = numberOfThreads;
|
||||||
@ -84,7 +86,15 @@ public class SubscriptionManager {
|
|||||||
private final void resetSuperClassSubs() {
|
private final void resetSuperClassSubs() {
|
||||||
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
// superClassSubscriptions keeps track of all subscriptions of super classes. SUB/UNSUB dumps it, so it is recreated dynamically.
|
||||||
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
// it's a hit on SUB/UNSUB, but improves performance on handlers
|
||||||
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, Collection<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
this.superClassSubscriptions = new ConcurrentHashMap<Class<?>, FastEntrySet<Subscription>>(8, this.LOAD_FACTOR, this.MAP_STRIPING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readLock() {
|
||||||
|
this.LOCK.readLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readUnLock() {
|
||||||
|
this.LOCK.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unsubscribe(Object listener) {
|
public void unsubscribe(Object listener) {
|
||||||
@ -332,15 +342,16 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// ALSO checks to see if the superClass accepts subtypes.
|
// ALSO checks to see if the superClass accepts subtypes.
|
||||||
public Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
|
public FastEntrySet<Subscription> getSuperSubscriptions(Class<?> superType) {
|
||||||
Map<Class<?>, Collection<Subscription>> superClassSubs = this.superClassSubscriptions;
|
Map<Class<?>, FastEntrySet<Subscription>> superClassSubs = this.superClassSubscriptions;
|
||||||
if (superClassSubs == null) {
|
if (superClassSubs == null) {
|
||||||
// we haven't created it yet (via subscribe)
|
// we haven't created it yet (via subscribe)
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<Subscription> subsPerType = superClassSubs.get(superType);
|
FastEntrySet<Subscription> subsPerType = superClassSubs.get(superType);
|
||||||
|
|
||||||
if (subsPerType == null) {
|
if (subsPerType == null) {
|
||||||
Collection<Class<?>> types = this.superClassesCache.get(superType);
|
Collection<Class<?>> types = this.superClassesCache.get(superType);
|
||||||
@ -348,20 +359,20 @@ public class SubscriptionManager {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// subsPerType = new StrongConcurrentSet<Subscription>(types.size(), this.LOAD_FACTOR);
|
Reference2BooleanArrayMap<Subscription> map = new Reference2BooleanArrayMap<Subscription>(types.size() + 1);
|
||||||
subsPerType = new ArrayDeque<Subscription>(types.size() + 1);
|
|
||||||
|
|
||||||
for (Class<?> superClass : types) {
|
for (Class<?> superClass : types) {
|
||||||
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
|
Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(superClass);
|
||||||
if (subs != null) {
|
if (subs != null && !subs.isEmpty()) {
|
||||||
for (Subscription sub : subs) {
|
for (Subscription sub : subs) {
|
||||||
if (sub.acceptsSubtypes()) {
|
if (sub.acceptsSubtypes()) {
|
||||||
subsPerType.add(sub);
|
map.put(sub, Boolean.TRUE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
subsPerType = map.reference2BooleanEntrySet();
|
||||||
superClassSubs.put(superType, subsPerType);
|
superClassSubs.put(superType, subsPerType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
97
src/test/java/net/engio/mbassy/multi/PerformanceTest.java
Normal file
97
src/test/java/net/engio/mbassy/multi/PerformanceTest.java
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2015 dorkbox, llc
|
||||||
|
*/
|
||||||
|
package net.engio.mbassy.multi;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import net.engio.mbassy.multi.annotations.Handler;
|
||||||
|
import net.engio.mbassy.multi.error.IPublicationErrorHandler;
|
||||||
|
import net.engio.mbassy.multi.error.PublicationError;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author dorkbox, llc Date: 2/2/15
|
||||||
|
*/
|
||||||
|
public class PerformanceTest {
|
||||||
|
|
||||||
|
private static long count = 0;
|
||||||
|
|
||||||
|
protected static final IPublicationErrorHandler TestFailingHandler = new IPublicationErrorHandler() {
|
||||||
|
@Override
|
||||||
|
public void handleError(PublicationError error) {
|
||||||
|
error.getCause().printStackTrace();
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
PerformanceTest multiMessageTest = new PerformanceTest();
|
||||||
|
multiMessageTest.testMultiMessageSending();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public PerformanceTest() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMultiMessageSending() {
|
||||||
|
MultiMBassador bus = new MultiMBassador();
|
||||||
|
bus.addErrorHandler(TestFailingHandler);
|
||||||
|
|
||||||
|
|
||||||
|
Listener listener1 = new Listener();
|
||||||
|
bus.subscribe(listener1);
|
||||||
|
|
||||||
|
long num = Long.MAX_VALUE;
|
||||||
|
while (num-- > 0) {
|
||||||
|
bus.publish("s");
|
||||||
|
}
|
||||||
|
|
||||||
|
// bus.publish("s", "s");
|
||||||
|
// bus.publish("s", "s", "s");
|
||||||
|
// bus.publish("s", "s", "s", "s");
|
||||||
|
// bus.publish(1, 2, "s");
|
||||||
|
// bus.publish(1, 2, 3, 4, 5, 6);
|
||||||
|
// bus.publish(new Integer[] {1, 2, 3, 4, 5, 6});
|
||||||
|
|
||||||
|
bus.shutdown();
|
||||||
|
System.err.println("Count: " + count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public static class Listener {
|
||||||
|
@Handler
|
||||||
|
public void handleSync(String o1) {
|
||||||
|
count++;
|
||||||
|
// System.err.println("match String");
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Handler
|
||||||
|
// public void handleSync(String o1, String o2) {
|
||||||
|
// count.getAndIncrement();
|
||||||
|
// System.err.println("match String, String");
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Handler
|
||||||
|
// public void handleSync(String o1, String o2, String o3) {
|
||||||
|
// count.getAndIncrement();
|
||||||
|
// System.err.println("match String, String, String");
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Handler
|
||||||
|
// public void handleSync(Integer o1, Integer o2, String o3) {
|
||||||
|
// count.getAndIncrement();
|
||||||
|
// System.err.println("match Integer, Integer, String");
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Handler
|
||||||
|
// public void handleSync(String... o) {
|
||||||
|
// count.getAndIncrement();
|
||||||
|
// System.err.println("match String[]");
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Handler
|
||||||
|
// public void handleSync(Integer... o) {
|
||||||
|
// count.getAndIncrement();
|
||||||
|
// System.err.println("match Integer[]");
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user