Cleaned up duplicate code in the state connection managers

This commit is contained in:
nathan 2018-02-09 23:03:33 +01:00
parent 73de927bd5
commit 763718c1c9
6 changed files with 211 additions and 396 deletions

View File

@ -23,6 +23,7 @@ import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap; import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Listener.OnConnected;
import dorkbox.network.connection.bridge.ConnectionBridgeServer; import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer;
import dorkbox.network.connection.listenerManagement.OnConnectedManager; import dorkbox.network.connection.listenerManagement.OnConnectedManager;
@ -33,6 +34,8 @@ import dorkbox.util.Property;
import dorkbox.util.collections.ConcurrentEntry; import dorkbox.util.collections.ConcurrentEntry;
import dorkbox.util.generics.ClassHelper; import dorkbox.util.generics.ClassHelper;
import dorkbox.util.generics.TypeResolver; import dorkbox.util.generics.TypeResolver;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
// .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other. // .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other.
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -62,10 +65,10 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
private final String loggerName; private final String loggerName;
private final OnConnectedManager onConnectedManager; private final OnConnectedManager<C> onConnectedManager;
private final OnDisconnectedManager onDisconnectedManager; private final OnDisconnectedManager<C> onDisconnectedManager;
private final OnIdleManager onIdleManager; private final OnIdleManager<C> onIdleManager;
private final OnMessageReceivedManager onMessageReceivedManager; private final OnMessageReceivedManager<C> onMessageReceivedManager;
@SuppressWarnings({"FieldCanBeLocal", "unused"}) @SuppressWarnings({"FieldCanBeLocal", "unused"})
private volatile ConcurrentEntry<Connection> connectionsHead = null; // reference to the first element private volatile ConcurrentEntry<Connection> connectionsHead = null; // reference to the first element
@ -90,7 +93,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/ */
private final Class<?> baseClass; private final Class<?> baseClass;
protected final org.slf4j.Logger logger; protected final org.slf4j.Logger logger;
private final AtomicBoolean hasAddedAtLeastOnce = new AtomicBoolean(false); private final AtomicBoolean hasAtLeastOneListener = new AtomicBoolean(false);
final AtomicBoolean shutdown = new AtomicBoolean(false); final AtomicBoolean shutdown = new AtomicBoolean(false);
@ -99,10 +102,10 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
this.logger = org.slf4j.LoggerFactory.getLogger(loggerName); this.logger = org.slf4j.LoggerFactory.getLogger(loggerName);
this.baseClass = baseClass; this.baseClass = baseClass;
onConnectedManager = new OnConnectedManager(logger); onConnectedManager = new OnConnectedManager<C>(logger);
onDisconnectedManager = new OnDisconnectedManager(logger); onDisconnectedManager = new OnDisconnectedManager<C>(logger);
onIdleManager = new OnIdleManager(logger); onIdleManager = new OnIdleManager<C>(logger);
onMessageReceivedManager = new OnMessageReceivedManager(logger); onMessageReceivedManager = new OnMessageReceivedManager<C>(logger);
} }
/** /**
@ -166,19 +169,19 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
found = true; found = true;
} }
final Logger logger2 = this.logger; if (found) {
if (!found) { hasAtLeastOneListener.set(true);
logger2.error("No matching listener types. Unable to add listener: {}",
listener.getClass() if (logger.isTraceEnabled()) {
.getName()); logger.trace("listener added: {}",
listener.getClass()
.getName());
}
} }
else { else {
hasAddedAtLeastOnce.set(true); logger.error("No matching listener types. Unable to add listener: {}",
if (logger2.isTraceEnabled()) { listener.getClass()
logger2.trace("listener added: {}", .getName());
listener.getClass()
.getName());
}
} }
} }
@ -199,30 +202,53 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
} }
boolean found = false; boolean found = false;
int remainingListeners = 0;
if (listener instanceof Listener.OnConnected) { if (listener instanceof Listener.OnConnected) {
found = onConnectedManager.remove((Listener.OnConnected) listener); int size = onConnectedManager.removeWithSize((OnConnected) listener);
if (size >= 0) {
remainingListeners += size;
found = true;
}
} }
if (listener instanceof Listener.OnDisconnected) { if (listener instanceof Listener.OnDisconnected) {
found |= onDisconnectedManager.remove((Listener.OnDisconnected) listener); int size = onDisconnectedManager.removeWithSize((Listener.OnDisconnected) listener);
if (size >= 0) {
remainingListeners += size;
found |= true;
}
} }
if (listener instanceof Listener.OnIdle) { if (listener instanceof Listener.OnIdle) {
found |= onIdleManager.remove((Listener.OnIdle) listener); int size = onIdleManager.removeWithSize((Listener.OnIdle) listener);
if (size >= 0) {
remainingListeners += size;
found |= true;
}
} }
if (listener instanceof Listener.OnMessageReceived) { if (listener instanceof Listener.OnMessageReceived) {
found |= onMessageReceivedManager.remove((Listener.OnMessageReceived) listener); int size = onMessageReceivedManager.removeWithSize((Listener.OnMessageReceived) listener);
if (size >= 0) {
remainingListeners += size;
found |= true;
}
} }
final Logger logger2 = this.logger; if (found) {
if (!found) { if (remainingListeners == 0) {
logger2.error("No matching listener types. Unable to remove listener: {}", hasAtLeastOneListener.set(false);
listener.getClass() }
.getName());
if (logger.isTraceEnabled()) {
logger.trace("listener removed: {}",
listener.getClass()
.getName());
}
} }
else if (logger2.isTraceEnabled()) { else {
logger2.trace("listener removed: {}", logger.error("No matching listener types. Unable to remove listener: {}",
listener.getClass() listener.getClass()
.getName()); .getName());
} }
return this; return this;
@ -300,7 +326,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
message = connection.fixupRmi(message); message = connection.fixupRmi(message);
foundListener |= onMessageReceivedManager.notifyReceived(connection, message, shutdown); foundListener |= onMessageReceivedManager.notifyReceived((C) connection, message, shutdown);
// now have to account for additional connection listener managers (non-global). // now have to account for additional connection listener managers (non-global).
// access a snapshot of the managers (single-writer-principle) // access a snapshot of the managers (single-writer-principle)
@ -315,8 +341,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
// only run a flush once // only run a flush once
if (foundListener) { if (foundListener) {
connection.send() connection.flush();
.flush();
} }
else { else {
this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}", this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}",
@ -331,12 +356,11 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/ */
@Override @Override
public final public final
void onIdle(final Connection connection) { void onIdle(final ConnectionImpl connection) {
boolean foundListener = onIdleManager.notifyIdle(connection, shutdown); boolean foundListener = onIdleManager.notifyIdle((C) connection, shutdown);
if (foundListener) { if (foundListener) {
connection.send() connection.flush();
.flush();
} }
// now have to account for additional (local) listener managers. // now have to account for additional (local) listener managers.
@ -353,14 +377,13 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/ */
@Override @Override
public public
void onConnected(final Connection connection) { void onConnected(final ConnectionImpl connection) {
addConnection(connection); addConnection(connection);
boolean foundListener = onConnectedManager.notifyConnected(connection, shutdown); boolean foundListener = onConnectedManager.notifyConnected((C) connection, shutdown);
if (foundListener) { if (foundListener) {
connection.send() connection.flush();
.flush();
} }
// now have to account for additional (local) listener managers. // now have to account for additional (local) listener managers.
@ -377,12 +400,11 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/ */
@Override @Override
public public
void onDisconnected(final Connection connection) { void onDisconnected(final ConnectionImpl connection) {
boolean foundListener = onDisconnectedManager.notifyDisconnected(connection, shutdown); boolean foundListener = onDisconnectedManager.notifyDisconnected((C) connection, shutdown);
if (foundListener) { if (foundListener) {
connection.send() connection.flush();
.flush();
} }
// now have to account for additional (local) listener managers. // now have to account for additional (local) listener managers.
@ -550,7 +572,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/ */
final final
boolean hasListeners() { boolean hasListeners() {
return hasAddedAtLeastOnce.get(); return hasAtLeastOneListener.get();
} }
/** /**
@ -613,6 +635,12 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
throw new UnsupportedOperationException("Method not implemented"); throw new UnsupportedOperationException("Method not implemented");
} }
@Override
public
void write(final Object object) {
throw new UnsupportedOperationException("Method not implemented");
}
/** /**
* Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a * Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a
* local channel). * local channel).
@ -623,25 +651,10 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
return this; return this;
} }
/**
* This will flush the data from EVERY connection on this server.
* <p/>
* THIS WILL BE SLOW!
*
* @see dorkbox.network.connection.ConnectionPoint#flush()
*/
@Override @Override
public public
void flush() { <V> Promise<V> newPromise() {
ConcurrentEntry<Connection> current = connectionsREF.get(this); return ImmediateEventExecutor.INSTANCE.newPromise();
Connection c;
while (current != null) {
c = current.getValue();
current = current.next();
c.send()
.flush();
}
} }
/** /**

View File

@ -0,0 +1,91 @@
/*
* Copyright 2010 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.util.collections.ConcurrentEntry;
import dorkbox.util.collections.ConcurrentIterator;
public abstract
class ConcurrentManager<C extends Connection, T extends Listener> extends ConcurrentIterator<T> {
private final Logger logger;
ConcurrentManager(final Logger logger) {
this.logger = logger;
}
@Override
public synchronized
void add(final T listener) {
super.add(listener);
}
/**
* The returned value indicates how many listeners are left in this manager
*
* @return >= 0 if the listener was removed, -1 otherwise
*/
public synchronized
int removeWithSize(final T listener) {
boolean removed = super.remove(listener);
if (removed) {
return super.size();
}
else {
return -1;
}
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
boolean doAction(final C connection, final AtomicBoolean shutdown) {
// access a snapshot (single-writer-principle)
ConcurrentEntry<T> head = headREF.get(this);
ConcurrentEntry<T> current = head;
T listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
// Concurrent iteration...
try {
listenerAction(connection, listener);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener '{}', connection '{}'.", listener, connection, e);
}
}
}
return head != null; // true if we have something, otherwise false
}
abstract void listenerAction(final C connection, final T listener) throws Exception;
}

View File

@ -16,129 +16,38 @@
package dorkbox.network.connection.listenerManagement; package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger; import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnConnected; import dorkbox.network.connection.Listener.OnConnected;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.util.collections.ConcurrentEntry;
/** /**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network. * Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed * This method should not block for long periods as other network activity will not be processed
* until it returns. * until it returns.
*/ */
@SuppressWarnings("Duplicates")
public final public final
class OnConnectedManager { class OnConnectedManager<C extends Connection> extends ConcurrentManager<C, OnConnected<C>> {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnConnectedManager, ConcurrentEntry> REF = AtomicReferenceFieldUpdater.newUpdater(
OnConnectedManager.class,
ConcurrentEntry.class,
"head_");
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnConnected, ConcurrentEntry> entries = new IdentityMap<OnConnected, ConcurrentEntry>(32,
ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnConnected> head_ = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time) // use-case 99% of the time)
public public
OnConnectedManager(final Logger logger) { OnConnectedManager(final Logger logger) {
this.logger = logger; super(logger);
}
public synchronized
void add(final OnConnected listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public synchronized
boolean remove(final OnConnected listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head = REF.get(this);
if (concurrentEntry == head) {
// if it was second, now it's first
head = head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
entries.remove(listener);
return true;
}
else {
return false;
}
} }
/** /**
* @return true if a listener was found, false otherwise * @return true if a listener was found, false otherwise
*/ */
public public
<C extends Connection> boolean notifyConnected(final C connection, final AtomicBoolean shutdown) { boolean notifyConnected(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnConnected<C>> head = REF.get(this); return doAction(connection, shutdown);
ConcurrentEntry<OnConnected<C>> current = head;
OnConnected<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.connected(connection);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e);
}
}
}
return head != null; // true if we have something, otherwise false
} }
/** @Override
* called on shutdown void listenerAction(final C connection, final OnConnected<C> listener) throws Exception {
*/ listener.connected(connection);
public synchronized
void clear() {
this.entries.clear();
this.head_ = null;
} }
} }

View File

@ -16,134 +16,35 @@
package dorkbox.network.connection.listenerManagement; package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger; import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnDisconnected; import dorkbox.network.connection.Listener.OnDisconnected;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.util.collections.ConcurrentEntry;
/** /**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network. * Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed * This method should not block for long periods as other network activity will not be processed
* until it returns. * until it returns.
*/ */
@SuppressWarnings("Duplicates")
public final public final
class OnDisconnectedManager { class OnDisconnectedManager<C extends Connection> extends ConcurrentManager<C, OnDisconnected<C>> {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnDisconnectedManager, ConcurrentEntry> REF = AtomicReferenceFieldUpdater.newUpdater(
OnDisconnectedManager.class,
ConcurrentEntry.class,
"head_");
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnDisconnected, ConcurrentEntry> entries = new IdentityMap<OnDisconnected, ConcurrentEntry>(32,
ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnDisconnected> head_ = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
public public
OnDisconnectedManager(final Logger logger) { OnDisconnectedManager(final Logger logger) {
this.logger = logger; super(logger);
} }
public synchronized
void add(final OnDisconnected listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public synchronized
boolean remove(final OnDisconnected<Connection> listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head = REF.get(this);
if (concurrentEntry == head) {
// if it was second, now it's first
head = head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
entries.remove(listener);
return true;
}
else {
return false;
}
}
/** /**
* @return true if a listener was found, false otherwise * @return true if a listener was found, false otherwise
*/ */
public public
<C extends Connection> boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) { boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnDisconnected<C>> head = REF.get(this); return doAction(connection, shutdown);
ConcurrentEntry<OnDisconnected<C>> current = head;
OnDisconnected<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.disconnected(connection);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.",
listener,
connection,
e);
}
}
}
return head != null; // true if we have something, otherwise false
} }
/** @Override
* called on shutdown void listenerAction(final C connection, final OnDisconnected<C> listener) throws Exception {
*/ listener.disconnected(connection);
public synchronized
void clear() {
this.entries.clear();
this.head_ = null;
} }
} }

View File

@ -16,132 +16,35 @@
package dorkbox.network.connection.listenerManagement; package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger; import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.network.connection.Listener.OnIdle; import dorkbox.network.connection.Listener.OnIdle;
import dorkbox.util.collections.ConcurrentEntry;
/** /**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network. * Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed * This method should not block for long periods as other network activity will not be processed
* until it returns. * until it returns.
*/ */
@SuppressWarnings("Duplicates")
public final public final
class OnIdleManager { class OnIdleManager<C extends Connection> extends ConcurrentManager<C, OnIdle<C>> {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnIdleManager, ConcurrentEntry> REF = AtomicReferenceFieldUpdater.newUpdater(
OnIdleManager.class,
ConcurrentEntry.class,
"head_");
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnIdle, ConcurrentEntry<OnIdle>> entries = new IdentityMap<OnIdle, ConcurrentEntry<OnIdle>>(32,
ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnIdle> head_ = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
public public
OnIdleManager(final Logger logger) { OnIdleManager(final Logger logger) {
this.logger = logger; super(logger);
}
public synchronized
void add(final OnIdle listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public synchronized
boolean remove(final OnIdle listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head = REF.get(this);
if (concurrentEntry == head) {
// if it was second, now it's first
head = head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
entries.remove(listener);
return true;
}
else {
return false;
}
} }
/** /**
* @return true if a listener was found, false otherwise * @return true if a listener was found, false otherwise
*/ */
public public
<C extends Connection> boolean notifyIdle(final C connection, final AtomicBoolean shutdown) { boolean notifyIdle(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnIdle<C>> head = REF.get(this); return doAction(connection, shutdown);
ConcurrentEntry<OnIdle<C>> current = head;
OnIdle<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.idle(connection);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e);
}
}
}
return head != null; // true if we have something, otherwise false
} }
@Override
/** void listenerAction(final C connection, final OnIdle<C> listener) throws Exception {
* called on shutdown listener.idle(connection);
*/
public synchronized
void clear() {
this.entries.clear();
this.head_ = null;
} }
} }

View File

@ -42,7 +42,7 @@ import dorkbox.util.generics.ClassHelper;
*/ */
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
public final public final
class OnMessageReceivedManager { class OnMessageReceivedManager<C extends Connection> {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final // Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnMessageReceivedManager, IdentityMap> REF = AtomicReferenceFieldUpdater.newUpdater( private static final AtomicReferenceFieldUpdater<OnMessageReceivedManager, IdentityMap> REF = AtomicReferenceFieldUpdater.newUpdater(
OnMessageReceivedManager.class, OnMessageReceivedManager.class,
@ -57,6 +57,10 @@ class OnMessageReceivedManager {
*/ */
private static private static
Class<?> identifyType(final OnMessageReceived listener) { Class<?> identifyType(final OnMessageReceived listener) {
if (listener instanceof SelfDefinedType) {
return ((SelfDefinedType) listener).getType();
}
final Class<?> clazz = listener.getClass(); final Class<?> clazz = listener.getClass();
Class<?> objectType = ClassHelper.getGenericParameterAsClassForSuperClass(Listener.OnMessageReceived.class, clazz, 1); Class<?> objectType = ClassHelper.getGenericParameterAsClassForSuperClass(Listener.OnMessageReceived.class, clazz, 1);
@ -87,6 +91,7 @@ class OnMessageReceivedManager {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time) // use-case 99% of the time)
public public
OnMessageReceivedManager(final Logger logger) { OnMessageReceivedManager(final Logger logger) {
this.logger = logger; this.logger = logger;
@ -94,13 +99,7 @@ class OnMessageReceivedManager {
public public
void add(final OnMessageReceived listener) { void add(final OnMessageReceived listener) {
final Class<?> type; final Class<?> type = identifyType(listener);
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
synchronized (this) { synchronized (this) {
// access a snapshot of the listeners (single-writer-principle) // access a snapshot of the listeners (single-writer-principle)
@ -120,41 +119,40 @@ class OnMessageReceivedManager {
} }
/** /**
* @return true if the listener was removed, false otherwise * The returned value indicates how many listeners are left in this manager
*
* @return >= 0 if the listener was removed, -1 otherwise
*/ */
public public
boolean remove(final OnMessageReceived listener) { int removeWithSize(final OnMessageReceived listener) {
final Class<?> type; final Class<?> type = identifyType(listener);
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
boolean found = false; int size = -1; // default is "not found"
synchronized (this) { synchronized (this) {
// access a snapshot of the listeners (single-writer-principle) // access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this); final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
final ConcurrentIterator concurrentIterator = listeners.get(type); final ConcurrentIterator concurrentIterator = listeners.get(type);
if (concurrentIterator != null) { if (concurrentIterator != null) {
concurrentIterator.remove(listener); boolean removed = concurrentIterator.remove(listener);
found = true; if (removed) {
size = concurrentIterator.size();
}
} }
// save this snapshot back to the original (single writer principle) // save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners); REF.lazySet(this, listeners);
} }
return found; return size;
} }
/** /**
* @return true if a listener was found, false otherwise * @return true if a listener was found, false otherwise
*/ */
@SuppressWarnings("unchecked")
public public
<C extends Connection> boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) { boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) {
boolean found = false; boolean found = false;
Class<?> objectType = message.getClass(); Class<?> objectType = message.getClass();