More optimizations

This commit is contained in:
nathan 2015-06-06 15:10:57 +02:00
parent 3acd8f934f
commit 67c6403355
5 changed files with 166 additions and 541 deletions

View File

@ -1,11 +1,8 @@
package dorkbox.util.messagebus.common;
import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
/**
@ -17,14 +14,13 @@ import java.util.concurrent.locks.Lock;
* Date: 2/2/15
*/
public class HashMapTree<KEY, VALUE> {
private final ReentrantReadWriteUpdateLock lock = new ReentrantReadWriteUpdateLock();
private Map<KEY, HashMapTree<KEY, VALUE>> children;
private volatile VALUE value;
private ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> children; // protected by read/write lock
private volatile VALUE value; // protected by read/write lock
private final int defaultSize;
private final float loadFactor;
public HashMapTree(int defaultSize, float loadFactor) {
public HashMapTree(final int defaultSize, final float loadFactor) {
this.defaultSize = defaultSize;
this.loadFactor = loadFactor;
}
@ -33,30 +29,26 @@ public class HashMapTree<KEY, VALUE> {
/**
* can be overridden to provide a custom backing map
*/
protected ConcurrentMap<KEY, HashMapTree<KEY, VALUE>> createChildren(int defaultSize, float loadFactor) {
protected Map<KEY, HashMapTree<KEY, VALUE>> createChildren(int defaultSize, float loadFactor) {
return new ConcurrentHashMapV8<KEY, HashMapTree<KEY, VALUE>>(defaultSize, loadFactor, 1);
}
public VALUE getValue() {
VALUE returnValue = this.value;
return returnValue;
public final VALUE getValue() {
return this.value;
}
public void putValue(VALUE value) {
public final void putValue(VALUE value) {
this.value = value;
}
public void removeValue() {
public final void removeValue() {
this.value = null;
}
public void clear() {
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
public final void clear() {
if (this.children != null) {
Set<Entry<KEY, HashMapTree<KEY, VALUE>>> entrySet = this.children.entrySet();
for (Entry<KEY, HashMapTree<KEY, VALUE>> entry : entrySet) {
@ -66,8 +58,6 @@ public class HashMapTree<KEY, VALUE> {
this.children.clear();
this.value = null;
}
WRITE.unlock();
}
@ -76,7 +66,7 @@ public class HashMapTree<KEY, VALUE> {
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key) {
public final void remove(KEY key) {
if (key != null) {
removeLeaf(key);
}
@ -88,35 +78,24 @@ public class HashMapTree<KEY, VALUE> {
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2) {
public final void remove(KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
return;
}
Lock UPDATE = this.lock.updateLock();
UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> leaf = null;
HashMapTree<KEY, VALUE> leaf;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
leaf.removeLeaf(key2);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
WRITE.unlock();
}
}
UPDATE.unlock();
}
/**
@ -124,35 +103,24 @@ public class HashMapTree<KEY, VALUE> {
* <p>
* Removes a branch from the tree, and cleans up, if necessary
*/
public void remove(KEY key1, KEY key2, KEY key3) {
public final void remove(KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null) {
return;
}
Lock UPDATE = this.lock.updateLock();
UPDATE.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> leaf = null;
HashMapTree<KEY, VALUE> leaf;
if (this.children != null) {
leaf = this.children.get(key1);
if (leaf != null) {
// promote to writelock and try again - Concurrency in Practice,16.4.2, last sentence on page. Careful for stale state
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
leaf.remove(key2, key3);
this.children.remove(key1);
if (this.children.isEmpty()) {
this.children = null;
}
WRITE.unlock();
}
}
UPDATE.unlock();
}
@ -162,7 +130,7 @@ public class HashMapTree<KEY, VALUE> {
* Removes a branch from the tree, and cleans up, if necessary
*/
@SuppressWarnings("unchecked")
public void remove(KEY... keys) {
public final void remove(KEY... keys) {
if (keys == null) {
return;
}
@ -174,11 +142,8 @@ public class HashMapTree<KEY, VALUE> {
/**
* Removes a branch from the tree, and cleans up, if necessary
*/
private final void removeLeaf(KEY key) {
private void removeLeaf(KEY key) {
if (key != null) {
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
if (this.children != null) {
HashMapTree<KEY, VALUE> leaf = this.children.get(key);
@ -195,16 +160,11 @@ public class HashMapTree<KEY, VALUE> {
}
}
}
WRITE.unlock();
}
}
// keys CANNOT be null here!
private final void removeLeaf(int index, KEY[] keys) {
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
private void removeLeaf(int index, KEY[] keys) {
if (index == keys.length) {
// we have reached the leaf to remove!
this.value = null;
@ -223,249 +183,89 @@ public class HashMapTree<KEY, VALUE> {
}
}
}
WRITE.unlock();
}
public VALUE put(VALUE value, KEY key) {
public final VALUE put(VALUE value, KEY key) {
if (key == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key);
HashMapTree<KEY, VALUE> leaf = createLeaf(key);
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
public VALUE putIfAbsent(VALUE value, KEY key) {
if (key == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key);
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
public VALUE put(VALUE value, KEY key1, KEY key2) {
public final VALUE put(VALUE value, KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
WRITE2.unlock();
HashMapTree<KEY, VALUE> leaf = createLeaf(key1);
leaf = leaf.createLeaf(key2);
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
public VALUE putIfAbsent(VALUE value, KEY key1, KEY key2) {
if (key1 == null || key2 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
WRITE2.unlock();
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
public VALUE put(VALUE value, KEY key1, KEY key2, KEY key3) {
public final VALUE put(VALUE value, KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null || key3 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
Lock WRITE3 = leaf.lock.writeLock();
WRITE3.lock();
leaf = leaf.createLeaf_NL(key3);
WRITE3.unlock();
WRITE2.unlock();
HashMapTree<KEY, VALUE> leaf = createLeaf(key1);
leaf = leaf.createLeaf(key2);
leaf = leaf.createLeaf(key3);
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
public VALUE putIfAbsent(VALUE value, KEY key1, KEY key2, KEY key3) {
if (key1 == null || key2 == null || key3 == null) {
throw new NullPointerException("keys");
}
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(key1);
Lock WRITE2 = leaf.lock.writeLock();
WRITE2.lock();
leaf = leaf.createLeaf_NL(key2);
Lock WRITE3 = leaf.lock.writeLock();
WRITE3.lock();
leaf = leaf.createLeaf_NL(key3);
WRITE3.unlock();
WRITE2.unlock();
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
@SuppressWarnings("unchecked")
public VALUE put(VALUE value, KEY... keys) {
public final VALUE put(VALUE value, KEY... keys) {
if (keys == null) {
throw new NullPointerException("keys");
}
int length = keys.length;
Lock[] locks = new Lock[length];
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(keys[0]);
HashMapTree<KEY, VALUE> leaf = createLeaf(keys[0]);
for (int i=1;i<length;i++) {
locks[i] = leaf.lock.writeLock();
locks[i].lock();
leaf = leaf.createLeaf_NL(keys[i]);
}
for (int i=length-1;i>0;i--) {
locks[i].unlock();
leaf = leaf.createLeaf(keys[i]);
}
VALUE prev = leaf.value;
leaf.value = value;
WRITE.unlock();
return prev;
}
@SuppressWarnings("unchecked")
public VALUE putIfAbsent(VALUE value, KEY... keys) {
if (keys == null) {
throw new NullPointerException("keys");
}
int length = keys.length;
Lock[] locks = new Lock[length];
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(keys[0]);
for (int i=1;i<length;i++) {
locks[i] = leaf.lock.writeLock();
locks[i].lock();
leaf = leaf.createLeaf_NL(keys[i]);
}
for (int i=length-1;i>0;i--) {
locks[i].unlock();
}
VALUE prev = leaf.value;
if (prev == null) {
leaf.value = value;
}
WRITE.unlock();
return prev;
}
@SuppressWarnings("unchecked")
public HashMapTree<KEY, VALUE> createLeaf(KEY... keys) {
public final HashMapTree<KEY, VALUE> createLeaf(KEY... keys) {
if (keys == null) {
return this;
}
int length = keys.length;
Lock[] locks = new Lock[length];
Lock WRITE = this.lock.writeLock();
WRITE.lock(); // upgrade to the write lock, at this point blocks other readers
int length = keys.length;
// have to put value into our children
HashMapTree<KEY, VALUE> leaf = createLeaf_NL(keys[0]);
HashMapTree<KEY, VALUE> leaf = createLeaf(keys[0]);
for (int i=1;i<length;i++) {
locks[i] = leaf.lock.writeLock();
locks[i].lock();
leaf = leaf.createLeaf_NL(keys[i]);
leaf = leaf.createLeaf(keys[i]);
}
for (int i=length-1;i>0;i--) {
locks[i].unlock();
}
WRITE.unlock();
return leaf;
}
private final HashMapTree<KEY, VALUE> createLeaf_NL(KEY key) {
private HashMapTree<KEY, VALUE> createLeaf(KEY key) {
if (key == null) {
return null;
}
@ -497,105 +297,76 @@ public class HashMapTree<KEY, VALUE> {
/////////////////////////////////////////
/////////////////////////////////////////
public VALUE get(KEY key) {
public final VALUE get(KEY key) {
if (key == null) {
return null;
}
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> objectTree = null;
HashMapTree<KEY, VALUE> objectTree;
// publish value from our children
objectTree = getLeaf_NL(key); // protected by lock
objectTree = getLeaf(key); // protected by lock
if (objectTree == null) {
READ.unlock();
return null;
}
VALUE returnValue = objectTree.value;
READ.unlock();
return returnValue;
return objectTree.value;
}
public VALUE get(KEY key1, KEY key2) {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
public final VALUE get(KEY key1, KEY key2) {
HashMapTree<KEY, VALUE> tree;
// publish value from our children
tree = getLeaf_NL(key1); // protected by lock
tree = getLeaf(key1); // protected by lock
if (tree != null) {
tree = tree.getLeaf_NL(key2); // protected by lock
tree = tree.getLeaf(key2); // protected by lock
}
if (tree == null) {
READ.unlock();
return null;
}
VALUE returnValue = tree.value;
READ.unlock();
return returnValue;
return tree.value;
}
public VALUE getValue(KEY key1, KEY key2, KEY key3) {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
public final VALUE getValue(KEY key1, KEY key2, KEY key3) {
HashMapTree<KEY, VALUE> tree;
// publish value from our children
tree = getLeaf_NL(key1);
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
tree = tree.getLeaf(key2);
}
if (tree != null) {
tree = tree.getLeaf_NL(key3);
tree = tree.getLeaf(key3);
}
if (tree == null) {
READ.unlock();
return null;
}
VALUE returnValue = tree.value;
READ.unlock();
return returnValue;
return tree.value;
}
@SuppressWarnings("unchecked")
public VALUE get(KEY... keys) {
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
public final VALUE get(KEY... keys) {
HashMapTree<KEY, VALUE> tree;
HashMapTree<KEY, VALUE> tree = null;
// publish value from our children
tree = getLeaf_NL(keys[0]);
tree = getLeaf(keys[0]);
int size = keys.length;
for (int i=1;i<size;i++) {
if (tree != null) {
tree = tree.getLeaf_NL(keys[i]);
tree = tree.getLeaf(keys[i]);
} else {
READ.unlock();
return null;
}
}
if (tree == null) {
READ.unlock();
return null;
}
VALUE returnValue = tree.value;
READ.unlock();
return returnValue;
return tree.value;
}
public final HashMapTree<KEY, VALUE> getLeaf(KEY key) {
@ -605,54 +376,39 @@ public class HashMapTree<KEY, VALUE> {
HashMapTree<KEY, VALUE> tree;
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
if (this.children == null) {
tree = null;
} else {
tree = this.children.get(key);
}
READ.unlock();
return tree;
}
public final HashMapTree<KEY, VALUE> getLeaf(KEY key1, KEY key2) {
HashMapTree<KEY, VALUE> tree = null;
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree;
// publish value from our children
tree = getLeaf_NL(key1);
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
tree = tree.getLeaf(key2);
}
READ.unlock();
return tree;
}
public final HashMapTree<KEY, VALUE> getLeaf(KEY key1, KEY key2, KEY key3) {
HashMapTree<KEY, VALUE> tree = null;
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree;
// publish value from our children
tree = getLeaf_NL(key1);
tree = getLeaf(key1);
if (tree != null) {
tree = tree.getLeaf_NL(key2);
tree = tree.getLeaf(key2);
}
if (tree != null) {
tree = tree.getLeaf_NL(key3);
tree = tree.getLeaf(key3);
}
READ.unlock();
return tree;
}
@ -664,36 +420,18 @@ public class HashMapTree<KEY, VALUE> {
return null;
}
Lock READ = this.lock.readLock();
READ.lock(); // allows other readers, blocks others from acquiring update or write locks
HashMapTree<KEY, VALUE> tree = null;
HashMapTree<KEY, VALUE> tree;
// publish value from our children
tree = getLeaf_NL(keys[0]);
tree = getLeaf(keys[0]);
for (int i=1;i<size;i++) {
if (tree != null) {
tree = tree.getLeaf_NL(keys[i]);
tree = tree.getLeaf(keys[i]);
} else {
READ.unlock();
return null;
}
}
READ.unlock();
return tree;
}
private final HashMapTree<KEY, VALUE> getLeaf_NL(KEY key) {
HashMapTree<KEY, VALUE> tree;
if (this.children == null) {
tree = null;
} else {
tree = this.children.get(key);
}
return tree;
}
}

View File

@ -61,5 +61,5 @@ public interface IHandlerInvocation {
* type that the handler consumes
* @param handler The handler (method) that will be called via reflection
*/
void invoke(Object listener, MethodAccess handler, int methodIndex, Object... message) throws Throwable;
void invoke(Object listener, MethodAccess handler, int methodIndex, Object... messages) throws Throwable;
}

View File

@ -7,8 +7,6 @@ import dorkbox.util.messagebus.common.StrongConcurrentSetV8;
import dorkbox.util.messagebus.dispatch.IHandlerInvocation;
import dorkbox.util.messagebus.dispatch.ReflectiveHandlerInvocation;
import dorkbox.util.messagebus.dispatch.SynchronizedHandlerInvocation;
import dorkbox.util.messagebus.error.ErrorHandlingSupport;
import org.omg.CORBA.BooleanHolder;
import java.util.ArrayList;
import java.util.Collection;
@ -43,9 +41,9 @@ public class Subscription {
private final IHandlerInvocation invocation;
private final Collection<Object> listeners;
public Subscription(MessageHandler handler) {
public Subscription(final MessageHandler handler, final float loadFactor, final int stripeSize) {
this.handlerMetadata = handler;
this.listeners = new StrongConcurrentSetV8<Object>(16, 0.85F, 15);
this.listeners = new StrongConcurrentSetV8<Object>(16, loadFactor, stripeSize);
// this.listeners = new StrongConcurrentSet<Object>(16, 0.85F);
// this.listeners = new ConcurrentLinkedQueue2<Object>();
// this.listeners = new CopyOnWriteArrayList<Object>();
@ -62,7 +60,7 @@ public class Subscription {
return handlerMetadata;
}
public Class<?>[] getHandledMessageTypes() {
public final Class<?>[] getHandledMessageTypes() {
return this.handlerMetadata.getHandledMessages();
}
@ -90,13 +88,10 @@ public class Subscription {
}
// only used in unit-test
public int size() {
public final int size() {
return this.listeners.size();
}
/**
* @return true if there were listeners for this publication, false if there was nothing
*/
public final void publish(final Object message) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
@ -112,126 +107,59 @@ public class Subscription {
}
}
/**
* @return true if there were listeners for this publication, false if there was nothing
*/
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2) {
// StrongConcurrentSet<Object> listeners = this.listeners;
//
// if (!listeners.isEmpty()) {
// MethodAccess handler = this.handlerMetadata.getHandler();
// int handleIndex = this.handlerMetadata.getMethodIndex();
// IHandlerInvocation invocation = this.invocation;
//
//
// ISetEntry<Object> current = listeners.head;
// Object listener;
// while (current != null) {
// listener = current.getValue();
// current = current.next();
//
// try {
// invocation.invoke(listener, handler, handleIndex, message1, message2);
// } catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " +
// message1.getClass() + ", " +
// message2.getClass()
// + ". Expected: " + handler.getParameterTypes()[0] + ", " +
// handler.getParameterTypes()[1]
// )
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The Message handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2));
// }
// }
// booleanHolder.bool = true;
// }
public final void publishToSubscription(final Object message1, final Object message2) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
Object listener;
for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
listener = iterator.next();
invocation.invoke(listener, handler, handleIndex, message1, message2);
}
}
/**
* @return true if there were listeners for this publication, false if there was nothing
*/
public void publishToSubscription(ErrorHandlingSupport errorHandler, BooleanHolder booleanHolder, Object message1, Object message2, Object message3) {
// StrongConcurrentSet<Object> listeners = this.listeners;
//
// if (!listeners.isEmpty()) {
// MethodAccess handler = this.handlerMetadata.getHandler();
// int handleIndex = this.handlerMetadata.getMethodIndex();
// IHandlerInvocation invocation = this.invocation;
//
//
// ISetEntry<Object> current = listeners.head;
// Object listener;
// while (current != null) {
// listener = current.getValue();
// current = current.next();
//
// try {
// invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
// } catch (IllegalAccessException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The class or method is not accessible")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (IllegalArgumentException e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "Wrong arguments passed to method. Was: " +
// message1.getClass() + ", " +
// message2.getClass() + ", " +
// message3.getClass()
// + ". Expected: " + handler.getParameterTypes()[0] + ", " +
// handler.getParameterTypes()[1] + ", " +
// handler.getParameterTypes()[2]
// )
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// } catch (Throwable e) {
// errorHandler.handlePublicationError(new PublicationError()
// .setMessage("Error during invocation of message handler. " +
// "The Message handler code threw an exception")
// .setCause(e)
// .setMethodName(handler.getMethodNames()[handleIndex])
// .setListener(listener)
// .setPublishedObject(message1, message2, message3));
// }
// }
// booleanHolder.bool = true;
// }
public final void publishToSubscription(final Object message1, final Object message2, final Object message3) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
Object listener;
for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
listener = iterator.next();
invocation.invoke(listener, handler, handleIndex, message1, message2, message3);
}
}
public final void publishToSubscription(final Object... messages) throws Throwable {
final MethodAccess handler = this.handlerMetadata.getHandler();
final int handleIndex = this.handlerMetadata.getMethodIndex();
final IHandlerInvocation invocation = this.invocation;
Iterator<Object> iterator;
Object listener;
for (iterator = this.listeners.iterator(); iterator.hasNext(); ) {
listener = iterator.next();
invocation.invoke(listener, handler, handleIndex, messages);
}
}
@Override
public int hashCode() {
public final int hashCode() {
return this.ID;
}
@Override
public boolean equals(Object obj) {
public final boolean equals(Object obj) {
if (this == obj) {
return true;
}
@ -246,17 +174,15 @@ public class Subscription {
}
// inside a write lock
// also puts it into the correct map if it's not already there
public Collection<Subscription> createPublicationSubscriptions(final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
AtomicBoolean varArgPossibility, SubscriptionUtils utils) {
// add this subscription to each of the handled types
// to activate this sub for publication
public void registerForPublication(final Map<Class<?>, ArrayList<Subscription>> subsPerMessageSingle,
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti,
final AtomicBoolean varArgPossibility, final SubscriptionUtils utils) {
final Class<?>[] messageHandlerTypes = handlerMetadata.getHandledMessages();
final int size = messageHandlerTypes.length;
// ConcurrentSet<Subscription> subsPerType;
// SubscriptionUtils utils = this.utils;
Class<?> type0 = messageHandlerTypes[0];
switch (size) {
@ -269,73 +195,45 @@ public class Subscription {
if (isArray) {
varArgPossibility.lazySet(true);
}
utils.cacheSuperClasses(type0);
subsPerMessageSingle.put(type0, subs);
}
return subs;
subs.add(this);
return;
}
case 2: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.publish();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1]);
// if (putIfAbsent != null) {
// return putIfAbsent;
// } else {
// subHolderSingle.set(subHolderSingle.initialValue());
//
// // cache the super classes
// utils.cacheSuperClasses(type0);
// utils.cacheSuperClasses(types[1]);
//
// return subsPerType;
// }
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1]);
}
subs.add(this);
return;
}
case 3: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.publish();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, type0, types[1], types[2]);
// if (putIfAbsent != null) {
// return putIfAbsent;
// } else {
// subHolderSingle.set(subHolderSingle.initialValue());
//
// // cache the super classes
// utils.cacheSuperClasses(type0);
// utils.cacheSuperClasses(types[1]);
// utils.cacheSuperClasses(types[2]);
//
// return subsPerType;
// }
ArrayList<Subscription> subs = subsPerMessageMulti.get(type0, messageHandlerTypes[1], messageHandlerTypes[2]);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, type0, messageHandlerTypes[1], messageHandlerTypes[2]);
}
subs.add(this);
return;
}
default: {
// the HashMapTree uses read/write locks, so it is only accessible one thread at a time
// SubscriptionHolder subHolderSingle = this.subHolderSingle;
// subsPerType = subHolderSingle.publish();
//
// Collection<Subscription> putIfAbsent = subsPerMessageMulti.putIfAbsent(subsPerType, types);
// if (putIfAbsent != null) {
// return putIfAbsent;
// } else {
// subHolderSingle.set(subHolderSingle.initialValue());
//
// Class<?> c;
// int length = types.length;
// for (int i = 0; i < length; i++) {
// c = types[i];
//
// // cache the super classes
// utils.cacheSuperClasses(c);
// }
//
// return subsPerType;
// }
return null;
ArrayList<Subscription> subs = subsPerMessageMulti.get(messageHandlerTypes);
if (subs == null) {
subs = new ArrayList<Subscription>();
subsPerMessageMulti.put(subs, messageHandlerTypes);
}
subs.add(this);
return;
}
}
}

View File

@ -54,28 +54,29 @@ public final class SubscriptionManager {
private final VarArgUtils varArgUtils;
private final StampedLock lock = new StampedLock();
private final int numberOfThreads;
public SubscriptionManager(int numberOfThreads) {
float loadFactor = SubscriptionManager.LOAD_FACTOR;
this.numberOfThreads = numberOfThreads;
// modified ONLY during SUB/UNSUB
{
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, loadFactor, numberOfThreads);
this.nonListeners = new ConcurrentHashMapV8<Class<?>, Boolean>(4, LOAD_FACTOR, numberOfThreads);
this.subscriptionsPerMessageSingle = new ConcurrentHashMapV8<Class<?>, ArrayList<Subscription>>(32, LOAD_FACTOR, 1);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, loadFactor);
this.subscriptionsPerMessageMulti = new HashMapTree<Class<?>, ArrayList<Subscription>>(4, LOAD_FACTOR);
// only used during SUB/UNSUB
this.subscriptionsPerListener = new ConcurrentHashMapV8<Class<?>, Subscription[]>(32, LOAD_FACTOR, 1);
}
final SuperClassUtils superClass = new SuperClassUtils(loadFactor, 1);
this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, loadFactor,
final SuperClassUtils superClass = new SuperClassUtils(LOAD_FACTOR, 1);
this.utils = new SubscriptionUtils(superClass, this.subscriptionsPerMessageSingle, this.subscriptionsPerMessageMulti, LOAD_FACTOR,
numberOfThreads);
// var arg subscriptions keep track of which subscriptions can handle varArgs. SUB/UNSUB dumps it, so it is recreated dynamically.
// it's a hit on SUB/UNSUB, but improves performance of handlers
this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, loadFactor, numberOfThreads);
this.varArgUtils = new VarArgUtils(this.utils, superClass, this.subscriptionsPerMessageSingle, LOAD_FACTOR, numberOfThreads);
}
public final void shutdown() {
@ -126,7 +127,6 @@ public final class SubscriptionManager {
final HashMapTree<Class<?>, ArrayList<Subscription>> subsPerMessageMulti = this.subscriptionsPerMessageMulti;
final Subscription[] subsPerListener = new Subscription[handlersSize];
Collection<Subscription> subsForPublication;
// create the subscription
MessageHandler messageHandler;
@ -136,12 +136,15 @@ public final class SubscriptionManager {
messageHandler = messageHandlers[i];
// create the subscription
subscription = new Subscription(messageHandler);
subscription = new Subscription(messageHandler, LOAD_FACTOR, numberOfThreads);
subscription.subscribe(listener);
subsPerListener[i] = subscription; // activates this sub for sub/unsub
}
final ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
final AtomicBoolean varArgPossibility = this.varArgPossibility;
final SubscriptionUtils utils = this.utils;
// now write lock for the least expensive part. This is a deferred "double checked lock", but is necessary because
// of the huge number of reads compared to writes.
@ -149,23 +152,16 @@ public final class SubscriptionManager {
final StampedLock lock = this.lock;
final long stamp = lock.writeLock();
final ConcurrentMap<Class<?>, Subscription[]> subsPerListenerMap = this.subscriptionsPerListener;
subscriptions = subsPerListenerMap.get(listenerClass);
// it was still null, so we actually have to create the rest of the subs
if (subscriptions == null) {
final AtomicBoolean varArgPossibility = this.varArgPossibility;
final SubscriptionUtils utils = this.utils;
for (int i = 0; i < handlersSize; i++) {
subscription = subsPerListener[i];
// now add this subscription to each of the handled types
subsForPublication = subscription
.createPublicationSubscriptions(subsPerMessageSingle, subsPerMessageMulti, varArgPossibility, utils);
//noinspection ConstantConditions
subsForPublication.add(subscription); // activates this sub for publication
// to activate this sub for publication
subscription.registerForPublication(subsPerMessageSingle, subsPerMessageMulti, varArgPossibility, utils);
}
subsPerListenerMap.put(listenerClass, subsPerListener);
@ -182,7 +178,6 @@ public final class SubscriptionManager {
// subscriptions already exist and must only be updated
// only publish here if our single-check was OK, or our double-check was OK
Subscription subscription;
for (int i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
subscription.subscribe(listener);

View File

@ -1,16 +1,5 @@
package dorkbox.util.messagebus;
import java.lang.ref.WeakReference;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedTransferQueue;
import dorkbox.util.messagebus.annotations.Handler;
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
import dorkbox.util.messagebus.common.MessageHandler;
@ -20,6 +9,11 @@ import dorkbox.util.messagebus.common.thread.ConcurrentLinkedQueue2;
import dorkbox.util.messagebus.common.thread.ConcurrentSet;
import dorkbox.util.messagebus.subscription.Subscription;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedTransferQueue;
public class PerfTest_Collections {
public static final int REPETITIONS = 10 * 1000 * 100;
@ -75,7 +69,7 @@ public class PerfTest_Collections {
for (int i=0;i<size;i++) {
for (MessageHandler x : allHandlers) {
set.add(new Subscription(x));
set.add(new Subscription(x, .85F, 1));
}
}