Converted the connection manager (listeners + connection dispatch) to

use the singler-writer-principle
This commit is contained in:
nathan 2016-03-22 15:50:41 +01:00
parent 24761bb84b
commit f6522f8b6d
7 changed files with 583 additions and 437 deletions

View File

@ -732,7 +732,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
// safe to ignore, since it's thrown when we try to interact with a closed socket. Race conditions cause this, and // safe to ignore, since it's thrown when we try to interact with a closed socket. Race conditions cause this, and
// it is still safe to ignore. // it is still safe to ignore.
this.logger.error("Unexpected exception while receiving data from {}", channel.remoteAddress(), cause); this.logger.error("Unexpected exception while receiving data from {}", channel.remoteAddress(), cause);
this.sessionManager.connectionError(this, cause);
// the ONLY sockets that can call this are: // the ONLY sockets that can call this are:
// CLIENT TCP or UDP // CLIENT TCP or UDP

View File

@ -15,28 +15,79 @@
*/ */
package dorkbox.network.connection; package dorkbox.network.connection;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer;
import dorkbox.network.rmi.RmiMessages; import dorkbox.network.rmi.RmiMessages;
import dorkbox.network.util.ConcurrentHashMapFactory;
import dorkbox.util.ClassHelper; import dorkbox.util.ClassHelper;
import dorkbox.util.Property;
import dorkbox.util.collections.ConcurrentEntry;
import dorkbox.util.collections.ConcurrentIterator;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.*; import java.util.Iterator;
import java.util.Map.Entry; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
//note that we specifically DO NOT implement equals/hashCode, because we cannot create two separate import static dorkbox.util.collections.ConcurrentIterator.headREF;
// objects that are somehow equal to each other.
// .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other.
@SuppressWarnings("unchecked")
public public
class ConnectionManager<C extends Connection> implements ListenerBridge, ISessionManager<C> { class ConnectionManager<C extends Connection> implements ListenerBridge, ISessionManager<C>, ConnectionPoint, ConnectionBridgeServer<C>,
ConnectionExceptSpecifiedBridgeServer<C> {
public static Listener<?> unRegisteredType_Listener = null; /**
* Specifies the load-factor for the IdentityMap used
*/
@Property
public static final float LOAD_FACTOR = 0.8F;
private static Listener<?> unRegisteredType_Listener = null;
private final String loggerName;
@SuppressWarnings("unused")
private volatile IdentityMap<Connection, ConnectionManager<C>> localManagers = new IdentityMap<Connection, ConnectionManager<C>>(8, ConnectionManager.LOAD_FACTOR);
private volatile IdentityMap<Type, ConcurrentIterator> listeners = new IdentityMap<Type, ConcurrentIterator>(32, LOAD_FACTOR);
@SuppressWarnings({"FieldCanBeLocal", "unused"})
private volatile ConcurrentEntry<C> connectionsHead = null; // reference to the first element
// This is only touched by a single thread, maintains a map of entries for FAST lookup during connection remove.
private final IdentityMap<C, ConcurrentEntry> connectionEntries = new IdentityMap<C, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
// In order to force the "single writer principle" for subscribe & unsubscribe, they are within SYNCHRONIZED.
//
// These methods **COULD** be dispatched via another thread (so it's only one thread ever touching them), however we do NOT want them
// asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is called before. 'Synchronized' is good enough here.
private final Object singleWriterLock1 = new Object();
private final Object singleWriterLock2 = new Object();
private final Object singleWriterLock3 = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, IdentityMap> localManagersREF =
AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class,
IdentityMap.class,
"localManagers");
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, IdentityMap> listenersREF =
AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class,
IdentityMap.class,
"listeners");
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, ConcurrentEntry> connectionsREF =
AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class,
ConcurrentEntry.class,
"connectionsHead");
// these are final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change)
private final ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>> listeners;
private final ConcurrentHashMapFactory<Connection, ConnectionManager<C>> localManagers;
private final CopyOnWriteArrayList<C> connections = new CopyOnWriteArrayList<C>();
/** /**
* Used by the listener subsystem to determine types. * Used by the listener subsystem to determine types.
@ -47,44 +98,20 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
public public
ConnectionManager(final String loggerName, final Class<?> baseClass) { ConnectionManager(final String loggerName, final Class<?> baseClass) {
this.loggerName = loggerName;
this.logger = org.slf4j.LoggerFactory.getLogger(loggerName); this.logger = org.slf4j.LoggerFactory.getLogger(loggerName);
this.baseClass = baseClass; this.baseClass = baseClass;
this.listeners = new ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>>() {
private static final long serialVersionUID = 1L;
@Override
public
CopyOnWriteArrayList<ListenerRaw<C, Object>> createNewObject(Object... args) {
return new CopyOnWriteArrayList<ListenerRaw<C, Object>>();
}
};
this.localManagers = new ConcurrentHashMapFactory<Connection, ConnectionManager<C>>() {
private static final long serialVersionUID = 1L;
@Override
public
ConnectionManager<C> createNewObject(Object... args) {
return new ConnectionManager<C>(loggerName + "-" + args[0] + " Specific", ConnectionManager.this.baseClass);
}
};
} }
/** /**
* Adds a listener to this connection/endpoint to be notified of * Adds a listener to this connection/endpoint to be notified of connect/disconnect/idle/receive(object) events.
* connect/disconnect/idle/receive(object) events.
* <p/> * <p/>
* If the listener already exists, it is not added again. * When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections are
* notified of that listener.
* <p/> * <p/>
* When called by a server, NORMALLY listeners are added at the GLOBAL level * It is POSSIBLE to add a server connection ONLY (ie, not global) listener (via connection.addListener), meaning that ONLY that
* (meaning, I add one listener, and ALL connections are notified of that * listener attached to the connection is notified on that event (ie, admin type listeners)
* listener.
* <p/>
* It is POSSIBLE to add a server connection ONLY (ie, not global) listener
* (via connection.addListener), meaning that ONLY that listener attached to
* the connection is notified on that event (ie, admin type listeners)
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Override @Override
@ -129,8 +156,24 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
void addListener0(final ListenerRaw listener) { void addListener0(final ListenerRaw listener) {
Class<?> type = listener.getObjectType(); Class<?> type = listener.getObjectType();
CopyOnWriteArrayList<ListenerRaw<C, Object>> list = this.listeners.getOrCreate(type); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
list.addIfAbsent(listener); // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
ConcurrentIterator subscribedListeners = listeners.get(type);
if (subscribedListeners == null) {
subscribedListeners = new ConcurrentIterator();
listeners.put(type, subscribedListeners);
}
subscribedListeners.add(listener);
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
}
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
@ -142,16 +185,13 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
} }
/** /**
* Removes a listener from this connection/endpoint to NO LONGER be notified * Removes a listener from this connection/endpoint to NO LONGER be notified of connect/disconnect/idle/receive(object) events.
* of connect/disconnect/idle/receive(object) events.
* <p/> * <p/>
* When called by a server, NORMALLY listeners are added at the GLOBAL level * When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections are
* (meaning, I add one listener, and ALL connections are notified of that * notified of that listener.
* listener.
* <p/> * <p/>
* It is POSSIBLE to remove a server-connection 'non-global' listener (via * It is POSSIBLE to remove a server-connection 'non-global' listener (via connection.removeListener), meaning that ONLY that listener
* connection.removeListener), meaning that ONLY that listener attached to * attached to the connection is removed
* the connection is removed
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Override @Override
@ -163,11 +203,22 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
Class<?> type = listener.getObjectType(); Class<?> type = listener.getObjectType();
CopyOnWriteArrayList<ListenerRaw<C, Object>> list = this.listeners.get(type); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
if (list != null) { // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
list.remove(listener); // use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
final ConcurrentIterator concurrentIterator = listeners.get(type);
if (concurrentIterator != null) {
concurrentIterator.remove(listener);
}
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
} }
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
logger2.trace("listener removed: {} <{}>", logger2.trace("listener removed: {} <{}>",
@ -178,13 +229,24 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
} }
/** /**
* Removes all registered listeners from this connection/endpoint to NO * Removes all registered listeners from this connection/endpoint to NO LONGER be notified of connect/disconnect/idle/receive(object)
* LONGER be notified of connect/disconnect/idle/receive(object) events. * events.
*/ */
@Override @Override
public final public final
void removeAll() { void removeAll() {
this.listeners.clear(); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
listeners.clear();
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
}
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
@ -204,7 +266,18 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
throw new IllegalArgumentException("classType cannot be null."); throw new IllegalArgumentException("classType cannot be null.");
} }
this.listeners.remove(classType); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
listeners.remove(classType);
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
}
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
@ -229,21 +302,40 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
notifyOnMessage0(connection, message, false); notifyOnMessage0(connection, message, false);
} }
@SuppressWarnings("Duplicates")
private private
boolean notifyOnMessage0(final C connection, final Object message, boolean foundListener) { boolean notifyOnMessage0(final C connection, final Object message, boolean foundListener) {
Class<?> objectType = message.getClass(); Class<?> objectType = message.getClass();
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
CopyOnWriteArrayList<ListenerRaw<C, Object>> list = this.listeners.get(objectType); final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
if (list != null) { ConcurrentIterator concurrentIterator = listeners.get(objectType);
for (ListenerRaw<C, Object> listener : list) {
if (concurrentIterator != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) { if (this.shutdown) {
return true; return true;
} }
listener.received(connection, message); listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
listener.error(connection, e);
}
} }
foundListener = true;
foundListener = head != null; // true if we have something to publish to, otherwise false
} }
if (!(message instanceof RmiMessages)) { if (!(message instanceof RmiMessages)) {
@ -255,35 +347,48 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
// we will call Foo (from this code) // we will call Foo (from this code)
// we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object! // we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object!
list = null;
objectType = objectType.getSuperclass(); objectType = objectType.getSuperclass();
while (objectType != null) { while (objectType != null) {
// check to see if we have what we are looking for in our CURRENT class // check to see if we have what we are looking for in our CURRENT class
list = this.listeners.get(objectType); concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) {
return true;
}
if (list != null) { listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
listener.error(connection, e);
}
}
foundListener = head != null; // true if we have something to publish to, otherwise false
break; break;
} }
// NO MATCH, so walk up. // NO MATCH, so walk up.
objectType = objectType.getSuperclass(); objectType = objectType.getSuperclass();
} }
if (list != null) {
for (ListenerRaw<C, Object> listener : list) {
if (this.shutdown) {
return true;
}
listener.received(connection, message);
foundListener = true;
}
}
} }
// now have to account for additional connection listener managers (non-global). // now have to account for additional connection listener managers (non-global).
ConnectionManager<C> localManager = this.localManagers.get(connection); // access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
ConnectionManager<C> localManager = localManagers.get(connection);
if (localManager != null) { if (localManager != null) {
// if we found a listener during THIS method call, we need to let the NEXT method call know, // if we found a listener during THIS method call, we need to let the NEXT method call know,
// so it doesn't spit out error for not handling a message (since that message MIGHT have // so it doesn't spit out error for not handling a message (since that message MIGHT have
@ -302,7 +407,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
else { else {
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isErrorEnabled()) { if (logger2.isErrorEnabled()) {
this.logger.error("----------- LISTENER NOT REGISTERED FOR TYPE: {}", this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}",
message.getClass() message.getClass()
.getSimpleName()); .getSimpleName());
} }
@ -318,29 +423,45 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@Override @Override
public final public final
void notifyOnIdle(final C connection) { void notifyOnIdle(final C connection) {
Set<Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>>> entrySet = this.listeners.entrySet(); // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
CopyOnWriteArrayList<ListenerRaw<C, Object>> list; final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
for (Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>> entry : entrySet) {
list = entry.getValue(); boolean foundListener = false;
if (list != null) { final IdentityMap.Values<ConcurrentIterator> values = listeners.values();
for (ListenerRaw<C, Object> listener : list) { for (ConcurrentIterator concurrentIterator : values) {
if (concurrentIterator != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) { if (this.shutdown) {
return; return;
} }
listener = current.getValue();
current = current.next();
try { try {
listener.idle(connection); listener.idle(connection);
} catch (IOException e) { } catch (Exception e) {
logger.error("Unable to notify listener on idle.", e); logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e);
listener.error(connection, e);
} }
} }
connection.send()
.flush(); foundListener |= head != null; // true if we have something to publish to, otherwise false
} }
} }
if (foundListener) {
connection.send()
.flush();
}
// now have to account for additional (local) listener managers. // now have to account for additional (local) listener managers.
ConnectionManager<C> localManager = this.localManagers.get(connection); // access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
ConnectionManager<C> localManager = localManagers.get(connection);
if (localManager != null) { if (localManager != null) {
localManager.notifyOnIdle(connection); localManager.notifyOnIdle(connection);
} }
@ -351,36 +472,68 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
* <p/> * <p/>
* {@link ISessionManager} * {@link ISessionManager}
*/ */
@SuppressWarnings("Duplicates")
@Override @Override
public public
void connectionConnected(final C connection) { void connectionConnected(final C connection) {
// create a new connection! // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
this.connections.add(connection); // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock2) {
// access a snapshot of the subscriptions (single-writer-principle)
ConcurrentEntry head = connectionsREF.get(this);
try { if (!connectionEntries.containsKey(connection)) {
Set<Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>>> entrySet = this.listeners.entrySet(); head = new ConcurrentEntry<Object>(connection, head);
CopyOnWriteArrayList<ListenerRaw<C, Object>> list;
for (Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>> entry : entrySet) { connectionEntries.put(connection, head);
list = entry.getValue();
if (list != null) { // save this snapshot back to the original (single writer principle)
for (ListenerRaw<C, Object> listener : list) { connectionsREF.lazySet(this, head);
if (this.shutdown) { }
return; }
}
listener.connected(connection);
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
boolean foundListener = false;
final IdentityMap.Values<ConcurrentIterator> values = listeners.values();
for (ConcurrentIterator concurrentIterator : values) {
if (concurrentIterator != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) {
return;
} }
connection.send()
.flush();
}
}
// now have to account for additional (local) listener managers. listener = current.getValue();
ConnectionManager<C> localManager = this.localManagers.get(connection); current = current.next();
if (localManager != null) {
localManager.connectionConnected(connection); try {
listener.connected(connection);
} catch (Exception e) {
logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e);
listener.error(connection, e);
}
}
foundListener |= head != null; // true if we have something to publish to, otherwise false
} }
} catch (Throwable t) { }
connectionError(connection, t);
if (foundListener) {
connection.send()
.flush();
}
// now have to account for additional (local) listener managers.
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
ConnectionManager<C> localManager = localManagers.get(connection);
if (localManager != null) {
localManager.connectionConnected(connection);
} }
} }
@ -389,82 +542,96 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
* <p/> * <p/>
* {@link ISessionManager} * {@link ISessionManager}
*/ */
@SuppressWarnings("Duplicates")
@Override @Override
public public
void connectionDisconnected(final C connection) { void connectionDisconnected(final C connection) {
Set<Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>>> entrySet = this.listeners.entrySet(); final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
CopyOnWriteArrayList<ListenerRaw<C, Object>> list;
for (Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>> entry : entrySet) { boolean foundListener = false;
list = entry.getValue(); final IdentityMap.Values<ConcurrentIterator> values = listeners.values();
if (list != null) { for (ConcurrentIterator concurrentIterator : values) {
for (ListenerRaw<C, Object> listener : list) { if (concurrentIterator != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) { if (this.shutdown) {
return; return;
} }
listener.disconnected(connection); listener = current.getValue();
current = current.next();
try {
listener.disconnected(connection);
} catch (Exception e) {
logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e);
listener.error(connection, e);
}
} }
foundListener |= head != null; // true if we have something to publish to, otherwise false
} }
} }
if (foundListener) {
connection.send()
.flush();
}
// now have to account for additional (local) listener managers. // now have to account for additional (local) listener managers.
ConnectionManager<C> localManager = this.localManagers.get(connection);
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
ConnectionManager<C> localManager = localManagers.get(connection);
if (localManager != null) { if (localManager != null) {
localManager.connectionDisconnected(connection); localManager.connectionDisconnected(connection);
// remove myself from the "global" listeners so we can have our memory cleaned up. // remove myself from the "global" listeners so we can have our memory cleaned up.
this.localManagers.remove(connection); removeListenerManager(connection);
} }
this.connections.remove(connection); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
} // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock2) {
// access a snapshot of the subscriptions (single-writer-principle)
ConcurrentEntry concurrentEntry = connectionEntries.get(connection);
if (concurrentEntry != null) {
ConcurrentEntry head1 = connectionsREF.get(this);
/** if (concurrentEntry == head1) {
* Invoked when there is an error of some kind during the up/down stream process // if it was second, now it's first
* <p/> head1 = head1.next();
* {@link ISessionManager} //oldHead.clear(); // optimize for GC not possible because of potentially running iterators
*/
@Override
public
void connectionError(final C connection, final Throwable throwable) {
Set<Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>>> entrySet = this.listeners.entrySet();
CopyOnWriteArrayList<ListenerRaw<C, Object>> list;
for (Entry<Type, CopyOnWriteArrayList<ListenerRaw<C, Object>>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
for (ListenerRaw<C, Object> listener : list) {
if (this.shutdown) {
return;
}
listener.error(connection, throwable);
} }
connection.send() else {
.flush(); concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
connectionsREF.lazySet(this, head1);
this.connectionEntries.remove(connection);
} }
} }
// now have to account for additional (local) listener managers.
ConnectionManager<C> localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.connectionError(connection, throwable);
}
} }
/** /**
* Returns a non-modifiable list of active connections * Returns a non-modifiable list of active connections. This is extremely slow, and not recommended!
* <p/>
* {@link ISessionManager}
*/ */
@Override @Override
public public
List<C> getConnections() { List<C> getConnections() {
return Collections.unmodifiableList(this.connections); synchronized (singleWriterLock2) {
final IdentityMap.Keys<C> keys = this.connectionEntries.keys();
return keys.toArray();
}
} }
final final
ConnectionManager<C> addListenerManager(final Connection connection) { ConnectionManager<C> addListenerManager(final Connection connection) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections // when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
@ -472,32 +639,67 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
// it is POSSIBLE to add a connection-specific listener (via connection.addListener), meaning that ONLY // it is POSSIBLE to add a connection-specific listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners) // that listener is notified on that event (ie, admin type listeners)
ConnectionManager<C> lm = this.localManagers.getOrCreate(connection, connection.toString()); ConnectionManager<C> manager;
boolean created = false;
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
this.logger.trace("Connection specific Listener Manager added on connection: {}", connection); // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock3) {
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
manager = localManagers.get(connection);
if (manager == null) {
created = true;
manager = new ConnectionManager<C>(loggerName + "-" + connection.toString() + " Specific", ConnectionManager.this.baseClass);
localManagers.put(connection, manager);
// save this snapshot back to the original (single writer principle)
localManagersREF.lazySet(this, localManagers);
}
} }
return lm; if (created) {
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Connection specific Listener Manager added for connection: {}", connection);
}
}
return manager;
} }
final final
void removeListenerManager(final Connection connection) { void removeListenerManager(final Connection connection) {
this.localManagers.remove(connection); boolean wasRemoved = false;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock3) {
// access a snapshot of the subscriptions (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
final ConnectionManager<C> removed = localManagers.remove(connection);
if (removed != null) {
wasRemoved = true;
// save this snapshot back to the original (single writer principle)
localManagersREF.lazySet(this, localManagers);
}
}
if (wasRemoved) {
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Connection specific Listener Manager removed for connection: {}", connection);
}
}
} }
/**
* BE CAREFUL! Only for internal use!
*
* @return Returns a FAST list of active connections.
*/
public final
Collection<C> getConnections0() {
return this.connections;
}
/** /**
* BE CAREFUL! Only for internal use! * BE CAREFUL! Only for internal use!
* *
@ -505,10 +707,9 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*/ */
public final public final
C getConnection0() throws IOException { C getConnection0() throws IOException {
if (this.connections.iterator() ConcurrentEntry<C> head1 = connectionsREF.get(this);
.hasNext()) { if (head1 != null) {
return this.connections.iterator() return head1.getValue();
.next();
} }
else { else {
throw new IOException("Not connected to a remote computer. Unable to continue!"); throw new IOException("Not connected to a remote computer. Unable to continue!");
@ -522,7 +723,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*/ */
final final
boolean hasListeners() { boolean hasListeners() {
return this.listeners.isEmpty(); return listenersREF.get(this).size == 0;
} }
/** /**
@ -535,7 +736,19 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
// disconnect the sessions // disconnect the sessions
closeConnections(); closeConnections();
this.listeners.clear(); synchronized (singleWriterLock1) {
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
final Iterator<ConcurrentIterator> iterator = listeners.values()
.iterator();
while (iterator.hasNext()) {
final ConcurrentIterator next = iterator.next();
next.clear();
iterator.remove();
}
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
}
} }
/** /**
@ -543,23 +756,197 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*/ */
final final
void closeConnections() { void closeConnections() {
// close the sessions
Iterator<C> iterator = this.connections.iterator(); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
//noinspection WhileLoopReplaceableByForEach // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
while (iterator.hasNext()) { // use-case 99% of the time)
Connection connection = iterator.next(); synchronized (singleWriterLock2) {
// Close the connection. Make sure the close operation ends because // don't need anything fast or fancy here, because this method will only be called once
// all I/O operations are asynchronous in Netty. final IdentityMap.Keys<C> keys = connectionEntries.keys();
// Necessary otherwise workers won't close. for (C connection : keys) {
connection.close(); // Close the connection. Make sure the close operation ends because
// all I/O operations are asynchronous in Netty.
// Also necessary otherwise workers won't close.
connection.close();
}
this.connectionEntries.clear();
this.connectionsHead = null;
} }
this.connections.clear(); }
/**
* Not implemented, since this would cause horrendous problems.
*
* @see dorkbox.network.connection.ConnectionPoint#isWritable()
*/
@Override
public
boolean isWritable() {
throw new UnsupportedOperationException("Method not implemented");
}
/**
* Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a
* local channel).
*/
@Override
public
ConnectionExceptSpecifiedBridgeServer<C> except() {
return this;
}
/**
* This will flush the data from EVERY connection on this server.
* <p/>
* THIS WILL BE SLOW!
*
* @see dorkbox.network.connection.ConnectionPoint#flush()
*/
public
void flush() {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
c.send()
.flush();
}
}
/**
* Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local
* channel).
*/
public
ConnectionPoint TCP(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
if (c != connection) {
c.send()
.TCP(message);
}
}
return this;
}
/**
* Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local
* channel).
*/
public
ConnectionPoint UDP(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
if (c != connection) {
c.send()
.UDP(message);
}
}
return this;
}
/**
* Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local
* channel).
*/
public
ConnectionPoint UDT(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
if (c != connection) {
c.send()
.UDT(message);
}
}
return this;
}
/**
* Sends the message to other listeners INSIDE this endpoint for EVERY connection. It does not send it to a remote address.
*/
public
void self(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
notifyOnMessage(c, message);
}
}
/**
* Sends the object all server connections over the network using TCP. (or via LOCAL when it's a local channel).
*/
public
ConnectionPoint TCP(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
c.send()
.TCP(message);
}
return this;
}
/**
* Sends the object all server connections over the network using UDP. (or via LOCAL when it's a local channel).
*/
public
ConnectionPoint UDP(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
c.send()
.UDP(message);
}
return this;
}
/**
* Sends the object all server connections over the network using UDT. (or via LOCAL when it's a local channel).
*/
public
ConnectionPoint UDT(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
while (current != null) {
c = current.getValue();
current = current.next();
c.send()
.UDT(message);
}
return this;
} }
@Override @Override
public public
boolean equals(final Object o) { boolean equals(final Object o) {
return false; return this == o;
} }

View File

@ -29,13 +29,9 @@ import java.io.IOException;
public public
class EndPointServer<C extends Connection> extends EndPoint<C> { class EndPointServer<C extends Connection> extends EndPoint<C> {
private final ServerConnectionBridge<C> serverConnections;
public public
EndPointServer(final Configuration options) throws InitializationException, SecurityException, IOException { EndPointServer(final Configuration options) throws InitializationException, SecurityException, IOException {
super(Server.class, options); super(Server.class, options);
this.serverConnections = new ServerConnectionBridge<C>(this.connectionManager);
} }
/** /**
@ -44,7 +40,7 @@ class EndPointServer<C extends Connection> extends EndPoint<C> {
@Override @Override
public public
ConnectionBridgeServer<C> send() { ConnectionBridgeServer<C> send() {
return this.serverConnections; return this.connectionManager;
} }
/** /**

View File

@ -35,12 +35,7 @@ interface ISessionManager<C extends Connection> {
void connectionDisconnected(C connection); void connectionDisconnected(C connection);
/** /**
* Called when there is an error of some kind during the up/down stream process * Returns a non-modifiable list of active connections. This is extremely slow, and not recommended!
*/
void connectionError(C connection, Throwable throwable);
/**
* Returns a non-modifiable list of active connections
*/ */
Collection<C> getConnections(); Collection<C> getConnections();
} }

View File

@ -1,176 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection;
import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer;
import java.util.Collection;
public
class ServerConnectionBridge<C extends Connection>
implements ConnectionPoint, ConnectionBridgeServer<C>, ConnectionExceptSpecifiedBridgeServer<C> {
private final ConnectionManager<C> connectionManager;
public
ServerConnectionBridge(final ConnectionManager<C> connectionManager) {
this.connectionManager = connectionManager;
}
/**
* Not implemented, since this would cause horrendous problems.
*
* @see dorkbox.network.connection.ConnectionPoint#isWritable()
*/
@Override
public
boolean isWritable() {
throw new UnsupportedOperationException("Method not implemented");
}
/**
* This will flush the data from EVERY connection on this server.
* <p/>
* THIS WILL BE SLOW!
*
* @see dorkbox.network.connection.ConnectionPoint#flush()
*/
@Override
public
void flush() {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
c.send()
.flush();
}
}
/**
* Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a
* local channel).
*/
@Override
public
ConnectionExceptSpecifiedBridgeServer<C> except() {
return this;
}
/**
* Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local
* channel).
*/
@Override
public
void TCP(final C connection, final Object message) {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
if (c != connection) {
c.send()
.TCP(message);
}
}
}
/**
* Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local
* channel).
*/
@Override
public
void UDP(final C connection, final Object message) {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
if (c != connection) {
c.send()
.UDP(message);
}
}
}
/**
* Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local
* channel).
*/
@Override
public
void UDT(final C connection, final Object message) {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
if (c != connection) {
c.send()
.UDT(message);
}
}
}
/**
* Sends the message to other listeners INSIDE this endpoint for EVERY connection. It does not send it to a remote address.
*/
@Override
public
void self(final Object message) {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
this.connectionManager.notifyOnMessage(c, message);
}
}
/**
* Sends the object all server connections over the network using TCP. (or via LOCAL when it's a local channel).
*/
@Override
public
ConnectionPoint TCP(final Object message) {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
c.send()
.TCP(message);
}
return this;
}
/**
* Sends the object all server connections over the network using UDP. (or via LOCAL when it's a local channel).
*/
@Override
public
ConnectionPoint UDP(final Object message) {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
c.send()
.UDP(message);
}
return this;
}
/**
* Sends the object all server connections over the network using UDT. (or via LOCAL when it's a local channel).
*/
@Override
public
ConnectionPoint UDT(final Object message) {
Collection<C> connections0 = this.connectionManager.getConnections0();
for (C c : connections0) {
c.send()
.UDT(message);
}
return this;
}
}

View File

@ -16,6 +16,7 @@
package dorkbox.network.connection.bridge; package dorkbox.network.connection.bridge;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionPoint;
public public
interface ConnectionExceptSpecifiedBridgeServer<C extends Connection> { interface ConnectionExceptSpecifiedBridgeServer<C extends Connection> {
@ -24,17 +25,17 @@ interface ConnectionExceptSpecifiedBridgeServer<C extends Connection> {
* Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local * Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local
* channel). * channel).
*/ */
void TCP(C connection, Object message); ConnectionPoint TCP(C connection, Object message);
/** /**
* Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local * Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local
* channel). * channel).
*/ */
void UDP(C connection, Object message); ConnectionPoint UDP(C connection, Object message);
/** /**
* Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local * Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local
* channel). * channel).
*/ */
void UDT(C connection, Object message); ConnectionPoint UDT(C connection, Object message);
} }

View File

@ -1,56 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.util;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public abstract
class ConcurrentHashMapFactory<K, V> extends ConcurrentHashMap<K, V> implements ConcurrentMap<K, V> {
private static final long serialVersionUID = -1L;
public
ConcurrentHashMapFactory() {
}
public abstract
V createNewObject(Object... args);
/**
* Thread safe method to get the value in the map. If the value doesn't exist, it will create a new one (and put the new one in the
* map)
*/
public final
V getOrCreate(K key, Object... args) {
V orig = get(key);
if (orig == null) {
// It's OK to construct a new object that ends up not being used
orig = createNewObject(args);
V putByOtherThreadJustNow = putIfAbsent(key, orig);
if (putByOtherThreadJustNow != null) {
// Some other thread "won"
orig = putByOtherThreadJustNow;
}
else {
// This thread was the winner
}
}
return orig;
}
}