From 763718c1c938a975f5b76e19355218e0b2b61fc4 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 9 Feb 2018 23:03:33 +0100 Subject: [PATCH] Cleaned up duplicate code in the state connection managers --- .../network/connection/ConnectionManager.java | 145 ++++++++++-------- .../listenerManagement/ConcurrentManager.java | 91 +++++++++++ .../OnConnectedManager.java | 105 +------------ .../OnDisconnectedManager.java | 113 +------------- .../listenerManagement/OnIdleManager.java | 111 +------------- .../OnMessageReceivedManager.java | 42 +++-- 6 files changed, 211 insertions(+), 396 deletions(-) create mode 100644 src/dorkbox/network/connection/listenerManagement/ConcurrentManager.java diff --git a/src/dorkbox/network/connection/ConnectionManager.java b/src/dorkbox/network/connection/ConnectionManager.java index dfd45d70..e7a19cc5 100644 --- a/src/dorkbox/network/connection/ConnectionManager.java +++ b/src/dorkbox/network/connection/ConnectionManager.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import com.esotericsoftware.kryo.util.IdentityMap; +import dorkbox.network.connection.Listener.OnConnected; import dorkbox.network.connection.bridge.ConnectionBridgeServer; import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; import dorkbox.network.connection.listenerManagement.OnConnectedManager; @@ -33,6 +34,8 @@ import dorkbox.util.Property; import dorkbox.util.collections.ConcurrentEntry; import dorkbox.util.generics.ClassHelper; import dorkbox.util.generics.TypeResolver; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; // .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other. @SuppressWarnings("unchecked") @@ -62,10 +65,10 @@ class ConnectionManager implements Listeners, ISessionMana private final String loggerName; - private final OnConnectedManager onConnectedManager; - private final OnDisconnectedManager onDisconnectedManager; - private final OnIdleManager onIdleManager; - private final OnMessageReceivedManager onMessageReceivedManager; + private final OnConnectedManager onConnectedManager; + private final OnDisconnectedManager onDisconnectedManager; + private final OnIdleManager onIdleManager; + private final OnMessageReceivedManager onMessageReceivedManager; @SuppressWarnings({"FieldCanBeLocal", "unused"}) private volatile ConcurrentEntry connectionsHead = null; // reference to the first element @@ -90,7 +93,7 @@ class ConnectionManager implements Listeners, ISessionMana */ private final Class baseClass; protected final org.slf4j.Logger logger; - private final AtomicBoolean hasAddedAtLeastOnce = new AtomicBoolean(false); + private final AtomicBoolean hasAtLeastOneListener = new AtomicBoolean(false); final AtomicBoolean shutdown = new AtomicBoolean(false); @@ -99,10 +102,10 @@ class ConnectionManager implements Listeners, ISessionMana this.logger = org.slf4j.LoggerFactory.getLogger(loggerName); this.baseClass = baseClass; - onConnectedManager = new OnConnectedManager(logger); - onDisconnectedManager = new OnDisconnectedManager(logger); - onIdleManager = new OnIdleManager(logger); - onMessageReceivedManager = new OnMessageReceivedManager(logger); + onConnectedManager = new OnConnectedManager(logger); + onDisconnectedManager = new OnDisconnectedManager(logger); + onIdleManager = new OnIdleManager(logger); + onMessageReceivedManager = new OnMessageReceivedManager(logger); } /** @@ -166,19 +169,19 @@ class ConnectionManager implements Listeners, ISessionMana found = true; } - final Logger logger2 = this.logger; - if (!found) { - logger2.error("No matching listener types. Unable to add listener: {}", - listener.getClass() - .getName()); + if (found) { + hasAtLeastOneListener.set(true); + + if (logger.isTraceEnabled()) { + logger.trace("listener added: {}", + listener.getClass() + .getName()); + } } else { - hasAddedAtLeastOnce.set(true); - if (logger2.isTraceEnabled()) { - logger2.trace("listener added: {}", - listener.getClass() - .getName()); - } + logger.error("No matching listener types. Unable to add listener: {}", + listener.getClass() + .getName()); } } @@ -199,30 +202,53 @@ class ConnectionManager implements Listeners, ISessionMana } boolean found = false; + int remainingListeners = 0; + if (listener instanceof Listener.OnConnected) { - found = onConnectedManager.remove((Listener.OnConnected) listener); + int size = onConnectedManager.removeWithSize((OnConnected) listener); + if (size >= 0) { + remainingListeners += size; + found = true; + } } if (listener instanceof Listener.OnDisconnected) { - found |= onDisconnectedManager.remove((Listener.OnDisconnected) listener); + int size = onDisconnectedManager.removeWithSize((Listener.OnDisconnected) listener); + if (size >= 0) { + remainingListeners += size; + found |= true; + } } if (listener instanceof Listener.OnIdle) { - found |= onIdleManager.remove((Listener.OnIdle) listener); + int size = onIdleManager.removeWithSize((Listener.OnIdle) listener); + if (size >= 0) { + remainingListeners += size; + found |= true; + } } if (listener instanceof Listener.OnMessageReceived) { - found |= onMessageReceivedManager.remove((Listener.OnMessageReceived) listener); + int size = onMessageReceivedManager.removeWithSize((Listener.OnMessageReceived) listener); + if (size >= 0) { + remainingListeners += size; + found |= true; + } } - final Logger logger2 = this.logger; - if (!found) { - logger2.error("No matching listener types. Unable to remove listener: {}", - listener.getClass() - .getName()); + if (found) { + if (remainingListeners == 0) { + hasAtLeastOneListener.set(false); + } + if (logger.isTraceEnabled()) { + logger.trace("listener removed: {}", + listener.getClass() + .getName()); + } } - else if (logger2.isTraceEnabled()) { - logger2.trace("listener removed: {}", - listener.getClass() - .getName()); + else { + logger.error("No matching listener types. Unable to remove listener: {}", + listener.getClass() + .getName()); + } return this; @@ -300,7 +326,7 @@ class ConnectionManager implements Listeners, ISessionMana message = connection.fixupRmi(message); - foundListener |= onMessageReceivedManager.notifyReceived(connection, message, shutdown); + foundListener |= onMessageReceivedManager.notifyReceived((C) connection, message, shutdown); // now have to account for additional connection listener managers (non-global). // access a snapshot of the managers (single-writer-principle) @@ -315,8 +341,7 @@ class ConnectionManager implements Listeners, ISessionMana // only run a flush once if (foundListener) { - connection.send() - .flush(); + connection.flush(); } else { this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}", @@ -331,12 +356,11 @@ class ConnectionManager implements Listeners, ISessionMana */ @Override public final - void onIdle(final Connection connection) { - boolean foundListener = onIdleManager.notifyIdle(connection, shutdown); + void onIdle(final ConnectionImpl connection) { + boolean foundListener = onIdleManager.notifyIdle((C) connection, shutdown); if (foundListener) { - connection.send() - .flush(); + connection.flush(); } // now have to account for additional (local) listener managers. @@ -353,14 +377,13 @@ class ConnectionManager implements Listeners, ISessionMana */ @Override public - void onConnected(final Connection connection) { + void onConnected(final ConnectionImpl connection) { addConnection(connection); - boolean foundListener = onConnectedManager.notifyConnected(connection, shutdown); + boolean foundListener = onConnectedManager.notifyConnected((C) connection, shutdown); if (foundListener) { - connection.send() - .flush(); + connection.flush(); } // now have to account for additional (local) listener managers. @@ -377,12 +400,11 @@ class ConnectionManager implements Listeners, ISessionMana */ @Override public - void onDisconnected(final Connection connection) { - boolean foundListener = onDisconnectedManager.notifyDisconnected(connection, shutdown); + void onDisconnected(final ConnectionImpl connection) { + boolean foundListener = onDisconnectedManager.notifyDisconnected((C) connection, shutdown); if (foundListener) { - connection.send() - .flush(); + connection.flush(); } // now have to account for additional (local) listener managers. @@ -550,7 +572,7 @@ class ConnectionManager implements Listeners, ISessionMana */ final boolean hasListeners() { - return hasAddedAtLeastOnce.get(); + return hasAtLeastOneListener.get(); } /** @@ -613,6 +635,12 @@ class ConnectionManager implements Listeners, ISessionMana throw new UnsupportedOperationException("Method not implemented"); } + @Override + public + void write(final Object object) { + throw new UnsupportedOperationException("Method not implemented"); + } + /** * Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a * local channel). @@ -623,25 +651,10 @@ class ConnectionManager implements Listeners, ISessionMana return this; } - /** - * This will flush the data from EVERY connection on this server. - *

- * THIS WILL BE SLOW! - * - * @see dorkbox.network.connection.ConnectionPoint#flush() - */ @Override public - void flush() { - ConcurrentEntry current = connectionsREF.get(this); - Connection c; - while (current != null) { - c = current.getValue(); - current = current.next(); - - c.send() - .flush(); - } + Promise newPromise() { + return ImmediateEventExecutor.INSTANCE.newPromise(); } /** diff --git a/src/dorkbox/network/connection/listenerManagement/ConcurrentManager.java b/src/dorkbox/network/connection/listenerManagement/ConcurrentManager.java new file mode 100644 index 00000000..8064c644 --- /dev/null +++ b/src/dorkbox/network/connection/listenerManagement/ConcurrentManager.java @@ -0,0 +1,91 @@ +/* + * Copyright 2010 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection.listenerManagement; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.Listener; +import dorkbox.network.connection.Listener.OnError; +import dorkbox.util.collections.ConcurrentEntry; +import dorkbox.util.collections.ConcurrentIterator; + +public abstract +class ConcurrentManager extends ConcurrentIterator { + + private final Logger logger; + + ConcurrentManager(final Logger logger) { + this.logger = logger; + } + + @Override + public synchronized + void add(final T listener) { + super.add(listener); + } + + /** + * The returned value indicates how many listeners are left in this manager + * + * @return >= 0 if the listener was removed, -1 otherwise + */ + public synchronized + int removeWithSize(final T listener) { + boolean removed = super.remove(listener); + + if (removed) { + return super.size(); + } + else { + return -1; + } + } + + /** + * @return true if a listener was found, false otherwise + */ + @SuppressWarnings("unchecked") + boolean doAction(final C connection, final AtomicBoolean shutdown) { + // access a snapshot (single-writer-principle) + ConcurrentEntry head = headREF.get(this); + ConcurrentEntry current = head; + + T listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + // Concurrent iteration... + try { + listenerAction(connection, listener); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify listener '{}', connection '{}'.", listener, connection, e); + } + } + } + + return head != null; // true if we have something, otherwise false + } + + abstract void listenerAction(final C connection, final T listener) throws Exception; +} diff --git a/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java index 58173098..89156af9 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java @@ -16,129 +16,38 @@ package dorkbox.network.connection.listenerManagement; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.slf4j.Logger; -import com.esotericsoftware.kryo.util.IdentityMap; - import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ConnectionManager; import dorkbox.network.connection.Listener.OnConnected; -import dorkbox.network.connection.Listener.OnError; -import dorkbox.util.collections.ConcurrentEntry; /** * Called when the remote end has been connected. This will be invoked before any objects are received by the network. * This method should not block for long periods as other network activity will not be processed * until it returns. */ -@SuppressWarnings("Duplicates") public final -class OnConnectedManager { - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( - OnConnectedManager.class, - ConcurrentEntry.class, - "head_"); - - private final Logger logger; - - // - // The iterators for IdentityMap are NOT THREAD SAFE! - // - // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. - private final IdentityMap entries = new IdentityMap(32, - ConnectionManager.LOAD_FACTOR); - private volatile ConcurrentEntry head_ = null; // reference to the first element +class OnConnectedManager extends ConcurrentManager> { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) public OnConnectedManager(final Logger logger) { - this.logger = logger; - } - - public synchronized - void add(final OnConnected listener) { - // access a snapshot (single-writer-principle) - ConcurrentEntry head = REF.get(this); - - if (!entries.containsKey(listener)) { - head = new ConcurrentEntry(listener, head); - - entries.put(listener, head); - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - } - } - - /** - * @return true if the listener was removed, false otherwise - */ - public synchronized - boolean remove(final OnConnected listener) { - // access a snapshot (single-writer-principle) - ConcurrentEntry concurrentEntry = entries.get(listener); - - if (concurrentEntry != null) { - ConcurrentEntry head = REF.get(this); - - if (concurrentEntry == head) { - // if it was second, now it's first - head = head.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - else { - concurrentEntry.remove(); - } - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - entries.remove(listener); - return true; - } - else { - return false; - } + super(logger); } /** * @return true if a listener was found, false otherwise */ public - boolean notifyConnected(final C connection, final AtomicBoolean shutdown) { - ConcurrentEntry> head = REF.get(this); - ConcurrentEntry> current = head; - - OnConnected listener; - while (current != null && !shutdown.get()) { - listener = current.getValue(); - current = current.next(); - - try { - listener.connected(connection); - } catch (Exception e) { - if (listener instanceof OnError) { - ((OnError) listener).error(connection, e); - } - else { - logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e); - } - } - } - - return head != null; // true if we have something, otherwise false + boolean notifyConnected(final C connection, final AtomicBoolean shutdown) { + return doAction(connection, shutdown); } - /** - * called on shutdown - */ - public synchronized - void clear() { - this.entries.clear(); - this.head_ = null; + @Override + void listenerAction(final C connection, final OnConnected listener) throws Exception { + listener.connected(connection); } } diff --git a/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java b/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java index f359491e..9d43c1e3 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java @@ -16,134 +16,35 @@ package dorkbox.network.connection.listenerManagement; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.slf4j.Logger; -import com.esotericsoftware.kryo.util.IdentityMap; - import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ConnectionManager; import dorkbox.network.connection.Listener.OnDisconnected; -import dorkbox.network.connection.Listener.OnError; -import dorkbox.util.collections.ConcurrentEntry; /** * Called when the remote end has been connected. This will be invoked before any objects are received by the network. * This method should not block for long periods as other network activity will not be processed * until it returns. */ -@SuppressWarnings("Duplicates") public final -class OnDisconnectedManager { - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( - OnDisconnectedManager.class, - ConcurrentEntry.class, - "head_"); +class OnDisconnectedManager extends ConcurrentManager> { - private final Logger logger; - - // - // The iterators for IdentityMap are NOT THREAD SAFE! - // - // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. - private final IdentityMap entries = new IdentityMap(32, - ConnectionManager.LOAD_FACTOR); - - private volatile ConcurrentEntry head_ = null; // reference to the first element - - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) public OnDisconnectedManager(final Logger logger) { - this.logger = logger; + super(logger); } - public synchronized - void add(final OnDisconnected listener) { - // access a snapshot (single-writer-principle) - ConcurrentEntry head = REF.get(this); - - if (!entries.containsKey(listener)) { - head = new ConcurrentEntry(listener, head); - - entries.put(listener, head); - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - } - } - - /** - * @return true if the listener was removed, false otherwise - */ - public synchronized - boolean remove(final OnDisconnected listener) { - // access a snapshot (single-writer-principle) - ConcurrentEntry concurrentEntry = entries.get(listener); - - if (concurrentEntry != null) { - ConcurrentEntry head = REF.get(this); - - if (concurrentEntry == head) { - // if it was second, now it's first - head = head.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - else { - concurrentEntry.remove(); - } - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - entries.remove(listener); - return true; - } - else { - return false; - } - } - - /** * @return true if a listener was found, false otherwise */ public - boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) { - ConcurrentEntry> head = REF.get(this); - ConcurrentEntry> current = head; - - OnDisconnected listener; - while (current != null && !shutdown.get()) { - listener = current.getValue(); - current = current.next(); - - try { - listener.disconnected(connection); - } catch (Exception e) { - if (listener instanceof OnError) { - ((OnError) listener).error(connection, e); - } - else { - logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", - listener, - connection, - e); - } - } - } - - return head != null; // true if we have something, otherwise false + boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) { + return doAction(connection, shutdown); } - /** - * called on shutdown - */ - public synchronized - void clear() { - this.entries.clear(); - this.head_ = null; + @Override + void listenerAction(final C connection, final OnDisconnected listener) throws Exception { + listener.disconnected(connection); } } diff --git a/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java b/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java index fd97c858..3562bd40 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java @@ -16,132 +16,35 @@ package dorkbox.network.connection.listenerManagement; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.slf4j.Logger; -import com.esotericsoftware.kryo.util.IdentityMap; - import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ConnectionManager; -import dorkbox.network.connection.Listener.OnError; import dorkbox.network.connection.Listener.OnIdle; -import dorkbox.util.collections.ConcurrentEntry; /** * Called when the remote end has been connected. This will be invoked before any objects are received by the network. * This method should not block for long periods as other network activity will not be processed * until it returns. */ -@SuppressWarnings("Duplicates") public final -class OnIdleManager { - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( - OnIdleManager.class, - ConcurrentEntry.class, - "head_"); +class OnIdleManager extends ConcurrentManager> { - private final Logger logger; - - // - // The iterators for IdentityMap are NOT THREAD SAFE! - // - // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. - private final IdentityMap> entries = new IdentityMap>(32, - ConnectionManager.LOAD_FACTOR); - - private volatile ConcurrentEntry head_ = null; // reference to the first element - - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) public OnIdleManager(final Logger logger) { - this.logger = logger; - } - - public synchronized - void add(final OnIdle listener) { - // access a snapshot (single-writer-principle) - ConcurrentEntry head = REF.get(this); - - if (!entries.containsKey(listener)) { - head = new ConcurrentEntry(listener, head); - - entries.put(listener, head); - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - } - } - - /** - * @return true if the listener was removed, false otherwise - */ - public synchronized - boolean remove(final OnIdle listener) { - // access a snapshot (single-writer-principle) - ConcurrentEntry concurrentEntry = entries.get(listener); - - if (concurrentEntry != null) { - ConcurrentEntry head = REF.get(this); - - if (concurrentEntry == head) { - // if it was second, now it's first - head = head.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - else { - concurrentEntry.remove(); - } - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - entries.remove(listener); - return true; - } - else { - return false; - } + super(logger); } /** * @return true if a listener was found, false otherwise */ public - boolean notifyIdle(final C connection, final AtomicBoolean shutdown) { - ConcurrentEntry> head = REF.get(this); - ConcurrentEntry> current = head; - - OnIdle listener; - while (current != null && !shutdown.get()) { - listener = current.getValue(); - current = current.next(); - - try { - listener.idle(connection); - } catch (Exception e) { - if (listener instanceof OnError) { - ((OnError) listener).error(connection, e); - } - else { - logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e); - } - - } - } - - return head != null; // true if we have something, otherwise false + boolean notifyIdle(final C connection, final AtomicBoolean shutdown) { + return doAction(connection, shutdown); } - - /** - * called on shutdown - */ - public synchronized - void clear() { - this.entries.clear(); - this.head_ = null; + @Override + void listenerAction(final C connection, final OnIdle listener) throws Exception { + listener.idle(connection); } } diff --git a/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java b/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java index 4c578995..645e107a 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java @@ -42,7 +42,7 @@ import dorkbox.util.generics.ClassHelper; */ @SuppressWarnings("Duplicates") public final -class OnMessageReceivedManager { +class OnMessageReceivedManager { // Recommended for best performance while adhering to the "single writer principle". Must be static-final private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( OnMessageReceivedManager.class, @@ -57,6 +57,10 @@ class OnMessageReceivedManager { */ private static Class identifyType(final OnMessageReceived listener) { + if (listener instanceof SelfDefinedType) { + return ((SelfDefinedType) listener).getType(); + } + final Class clazz = listener.getClass(); Class objectType = ClassHelper.getGenericParameterAsClassForSuperClass(Listener.OnMessageReceived.class, clazz, 1); @@ -87,6 +91,7 @@ class OnMessageReceivedManager { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) + public OnMessageReceivedManager(final Logger logger) { this.logger = logger; @@ -94,13 +99,7 @@ class OnMessageReceivedManager { public void add(final OnMessageReceived listener) { - final Class type; - if (listener instanceof SelfDefinedType) { - type = ((SelfDefinedType) listener).getType(); - } - else { - type = identifyType(listener); - } + final Class type = identifyType(listener); synchronized (this) { // access a snapshot of the listeners (single-writer-principle) @@ -120,41 +119,40 @@ class OnMessageReceivedManager { } /** - * @return true if the listener was removed, false otherwise + * The returned value indicates how many listeners are left in this manager + * + * @return >= 0 if the listener was removed, -1 otherwise */ public - boolean remove(final OnMessageReceived listener) { - final Class type; - if (listener instanceof SelfDefinedType) { - type = ((SelfDefinedType) listener).getType(); - } - else { - type = identifyType(listener); - } + int removeWithSize(final OnMessageReceived listener) { + final Class type = identifyType(listener); - boolean found = false; + int size = -1; // default is "not found" synchronized (this) { // access a snapshot of the listeners (single-writer-principle) final IdentityMap listeners = REF.get(this); final ConcurrentIterator concurrentIterator = listeners.get(type); if (concurrentIterator != null) { - concurrentIterator.remove(listener); - found = true; + boolean removed = concurrentIterator.remove(listener); + if (removed) { + size = concurrentIterator.size(); + } } // save this snapshot back to the original (single writer principle) REF.lazySet(this, listeners); } - return found; + return size; } /** * @return true if a listener was found, false otherwise */ + @SuppressWarnings("unchecked") public - boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) { + boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) { boolean found = false; Class objectType = message.getClass();