Small amount of code polish. 186/93 ns/op

This commit is contained in:
nathan 2015-02-22 23:10:15 +01:00
parent 0171a5558c
commit b42eb86531
6 changed files with 12 additions and 3691 deletions

View File

@ -1,5 +1,6 @@
package net.engio.mbassy.multi;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@ -27,7 +28,7 @@ public class MultiMBassador implements IMessageBus {
// error handling is first-class functionality
// this handler will receive all errors that occur during message dispatch or message handling
private final List<IPublicationErrorHandler> errorHandlers = new ArrayList<IPublicationErrorHandler>();
private final Collection<IPublicationErrorHandler> errorHandlers = new ArrayDeque<IPublicationErrorHandler>();
private final SubscriptionManager subscriptionManager;
private final TransferQueue<Runnable> dispatchQueue = new LinkedTransferQueue<Runnable>();
@ -62,9 +63,11 @@ public class MultiMBassador implements IMessageBus {
while (true) {
try {
counter = 200;
counter = 500;
while ((event = IN_QUEUE.poll()) == null) {
if (counter > 0) {
if (counter > 200) {
--counter;
} else if (counter > 0) {
--counter;
LockSupport.parkNanos(1L);
} else {
@ -149,15 +152,12 @@ public class MultiMBassador implements IMessageBus {
}
}
// FastEntrySet<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
Collection<Subscription> superSubscriptions = manager.getSuperSubscriptions(messageClass);
// now get superClasses
if (superSubscriptions != null && !superSubscriptions.isEmpty()) {
// ObjectIterator<Entry<Subscription>> fastIterator = superSubscriptions.fastIterator();
Iterator<Subscription> fastIterator = superSubscriptions.iterator();
while (fastIterator.hasNext()) {
// Subscription sub = fastIterator.next().getKey();
Subscription sub = fastIterator.next();
// this catches all exception types

View File

@ -1,181 +0,0 @@
/*
* Copyright 2006-2008 Makoto YUI
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Hanson Char - implemented and released to the public domain.
* Makoto YUI - imported and fixed bug in take().
*/
package net.engio.mbassy.multi.common;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class BoundedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E> {
private final int _maxCapacity;
private final AtomicInteger _remainingCapacity;
private final TransferQueue<E> _queue;
public BoundedTransferQueue(int capacity) {
if(capacity < 1) {
throw new IllegalArgumentException();
}
this._maxCapacity = capacity;
this._remainingCapacity = new AtomicInteger(capacity);
this._queue = new LinkedTransferQueue<E>();
}
@Override
public boolean offer(E e) {
if(tryDecrementCapacity()) {
return this._queue.offer(e);
}
return false;
}
@Override
public E poll() {
final E e = this._queue.poll();
if(e != null) {
this._remainingCapacity.incrementAndGet();
}
return e;
}
@Override
public void put(E e) throws InterruptedException {
if (tryDecrementCapacity()) {
this._queue.put(e);
} else {
this._queue.transfer(e);
this._remainingCapacity.decrementAndGet();
}
}
@Override
public E take() throws InterruptedException {
E e = this._queue.take();
this._remainingCapacity.incrementAndGet();
return e;
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (tryDecrementCapacity()) {
return this._queue.offer(e, timeout, unit);
} else {
final boolean succeed = this._queue.tryTransfer(e, timeout, unit);
if (succeed) {
this._remainingCapacity.decrementAndGet();
}
return succeed;
}
}
@Override
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final E e = this._queue.poll(timeout, unit);
if (e != null) {
this._remainingCapacity.incrementAndGet();
}
return e;
}
private boolean tryDecrementCapacity() {
int capacity;
do {
capacity = this._remainingCapacity.get();
if (capacity == 0) {
return false;
}
} while(!this._remainingCapacity.compareAndSet(capacity, capacity - 1));
return true;
}
// -------------------------------------------------------
// delegates everything
@Override
public int remainingCapacity() {
return this._remainingCapacity.get();
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return this._queue.drainTo(c, maxElements);
}
@Override
public int drainTo(Collection<? super E> c) {
return this._queue.drainTo(c);
}
@Override
public Iterator<E> iterator() {
return this._queue.iterator();
}
@Override
public E peek() {
return this._queue.peek();
}
@Override
public int size() {
return this._queue.size();
}
@Override
public void clear() {
this._queue.clear();
this._remainingCapacity.set(this._maxCapacity);
}
@Override
public boolean tryTransfer(E e) {
boolean tryTransfer = this._queue.tryTransfer(e);
if (tryTransfer) {
this._remainingCapacity.decrementAndGet();
}
return tryTransfer;
}
@Override
public void transfer(E e) throws InterruptedException {
this._queue.transfer(e);
this._remainingCapacity.decrementAndGet();
}
@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
boolean tryTransfer = this._queue.tryTransfer(e, timeout, unit);
return tryTransfer;
}
@Override
public boolean hasWaitingConsumer() {
return this._queue.hasWaitingConsumer();
}
@Override
public int getWaitingConsumerCount() {
return this._queue.getWaitingConsumerCount();
}
}

View File

@ -78,9 +78,5 @@ public class StrongConcurrentSet<T> extends AbstractConcurrentSet<T> {
public T getValue() {
return this.value;
}
}
}

View File

@ -93,55 +93,8 @@ public class SubscriptionManager {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe(listener);
// boolean isEmpty = subscription.isEmpty();
//
// if (isEmpty) {
// // single or multi?
// Class<?>[] handledMessageTypes = subscription.getHandledMessageTypes();
// int size = handledMessageTypes.length;
// if (size == 1) {
// // single
// Class<?> clazz = handledMessageTypes[0];
//
// // NOTE: Order is important for safe publication
// Collection<Subscription> subs = this.subscriptionsPerMessageSingle.get(clazz);
// if (subs != null) {
// subs.remove(subscription);
//
// if (subs.isEmpty()) {
// // remove element
// this.subscriptionsPerMessageSingle.remove(clazz);
//
// resetSuperClassSubs();
// }
// }
// } else {
// // NOTE: Not thread-safe! must be synchronized in outer scope
// IdentityObjectTree<Class<?>, Collection<Subscription>> tree;
//
// switch (size) {
// case 2: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: tree = this.subscriptionsPerMessageMulti.getLeaf(handledMessageTypes); break;
// }
//
// if (tree != null) {
// Collection<Subscription> subs = tree.getValue();
// if (subs != null) {
// subs.remove(subscription);
//
// if (subs.isEmpty()) {
// // remove tree element
// switch (size) {
// case 2: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[0], handledMessageTypes[1]); break;
// case 3: this.subscriptionsPerMessageMulti.remove(handledMessageTypes[1], handledMessageTypes[1], handledMessageTypes[2]); break;
// default: this.subscriptionsPerMessageMulti.remove(handledMessageTypes); break;
// }
// }
// }
// }
// }
// }
// purposefully DO NOT do anything else. We keep references to Class<?>/subscription, because
// it acts as a "cache" of sorts, so that future add operations are quicker.
}
}
@ -340,27 +293,23 @@ public class SubscriptionManager {
// }
}
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType) {
return this.subscriptionsPerMessageSingle.get(messageType);
}
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2);
}
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?> messageType1, Class<?> messageType2, Class<?> messageType3) {
return this.subscriptionsPerMessageMulti.getValue(messageType1, messageType2, messageType3);
}
// must be protected by read lock
// CAN RETURN NULL - not thread safe.
// CAN RETURN NULL
public final Collection<Subscription> getSubscriptionsByMessageType(Class<?>... messageTypes) {
return this.subscriptionsPerMessageMulti.getValue(messageTypes);
}