diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index edbc5f73..4902d0b1 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -732,7 +732,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn // safe to ignore, since it's thrown when we try to interact with a closed socket. Race conditions cause this, and // it is still safe to ignore. this.logger.error("Unexpected exception while receiving data from {}", channel.remoteAddress(), cause); - this.sessionManager.connectionError(this, cause); // the ONLY sockets that can call this are: // CLIENT TCP or UDP diff --git a/src/dorkbox/network/connection/ConnectionManager.java b/src/dorkbox/network/connection/ConnectionManager.java index a015494e..4dfb4d59 100644 --- a/src/dorkbox/network/connection/ConnectionManager.java +++ b/src/dorkbox/network/connection/ConnectionManager.java @@ -15,28 +15,79 @@ */ package dorkbox.network.connection; +import com.esotericsoftware.kryo.util.IdentityMap; +import dorkbox.network.connection.bridge.ConnectionBridgeServer; +import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; import dorkbox.network.rmi.RmiMessages; -import dorkbox.network.util.ConcurrentHashMapFactory; import dorkbox.util.ClassHelper; +import dorkbox.util.Property; +import dorkbox.util.collections.ConcurrentEntry; +import dorkbox.util.collections.ConcurrentIterator; import org.slf4j.Logger; import java.io.IOException; import java.lang.reflect.Type; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -//note that we specifically DO NOT implement equals/hashCode, because we cannot create two separate -// objects that are somehow equal to each other. +import static dorkbox.util.collections.ConcurrentIterator.headREF; + +// .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other. +@SuppressWarnings("unchecked") public -class ConnectionManager implements ListenerBridge, ISessionManager { +class ConnectionManager implements ListenerBridge, ISessionManager, ConnectionPoint, ConnectionBridgeServer, + ConnectionExceptSpecifiedBridgeServer { - public static Listener unRegisteredType_Listener = null; + /** + * Specifies the load-factor for the IdentityMap used + */ + @Property + public static final float LOAD_FACTOR = 0.8F; + + private static Listener unRegisteredType_Listener = null; + private final String loggerName; + + @SuppressWarnings("unused") + private volatile IdentityMap> localManagers = new IdentityMap>(8, ConnectionManager.LOAD_FACTOR); + private volatile IdentityMap listeners = new IdentityMap(32, LOAD_FACTOR); + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + private volatile ConcurrentEntry connectionsHead = null; // reference to the first element + + // This is only touched by a single thread, maintains a map of entries for FAST lookup during connection remove. + private final IdentityMap connectionEntries = new IdentityMap(32, ConnectionManager.LOAD_FACTOR); + + + + + // In order to force the "single writer principle" for subscribe & unsubscribe, they are within SYNCHRONIZED. + // + // These methods **COULD** be dispatched via another thread (so it's only one thread ever touching them), however we do NOT want them + // asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is called before. 'Synchronized' is good enough here. + private final Object singleWriterLock1 = new Object(); + private final Object singleWriterLock2 = new Object(); + private final Object singleWriterLock3 = new Object(); + + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater localManagersREF = + AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class, + IdentityMap.class, + "localManagers"); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater listenersREF = + AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class, + IdentityMap.class, + "listeners"); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater connectionsREF = + AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class, + ConcurrentEntry.class, + "connectionsHead"); - // these are final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change) - private final ConcurrentHashMapFactory>> listeners; - private final ConcurrentHashMapFactory> localManagers; - private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); /** * Used by the listener subsystem to determine types. @@ -47,44 +98,20 @@ class ConnectionManager implements ListenerBridge, ISessio public ConnectionManager(final String loggerName, final Class baseClass) { + this.loggerName = loggerName; this.logger = org.slf4j.LoggerFactory.getLogger(loggerName); this.baseClass = baseClass; - - this.listeners = new ConcurrentHashMapFactory>>() { - private static final long serialVersionUID = 1L; - - @Override - public - CopyOnWriteArrayList> createNewObject(Object... args) { - return new CopyOnWriteArrayList>(); - } - }; - - this.localManagers = new ConcurrentHashMapFactory>() { - private static final long serialVersionUID = 1L; - - @Override - public - ConnectionManager createNewObject(Object... args) { - return new ConnectionManager(loggerName + "-" + args[0] + " Specific", ConnectionManager.this.baseClass); - } - }; } /** - * Adds a listener to this connection/endpoint to be notified of - * connect/disconnect/idle/receive(object) events. + * Adds a listener to this connection/endpoint to be notified of connect/disconnect/idle/receive(object) events. *

- * If the listener already exists, it is not added again. + * When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections are + * notified of that listener. *

- * When called by a server, NORMALLY listeners are added at the GLOBAL level - * (meaning, I add one listener, and ALL connections are notified of that - * listener. - *

- * It is POSSIBLE to add a server connection ONLY (ie, not global) listener - * (via connection.addListener), meaning that ONLY that listener attached to - * the connection is notified on that event (ie, admin type listeners) + * It is POSSIBLE to add a server connection ONLY (ie, not global) listener (via connection.addListener), meaning that ONLY that + * listener attached to the connection is notified on that event (ie, admin type listeners) */ @SuppressWarnings("rawtypes") @Override @@ -129,8 +156,24 @@ class ConnectionManager implements ListenerBridge, ISessio void addListener0(final ListenerRaw listener) { Class type = listener.getObjectType(); - CopyOnWriteArrayList> list = this.listeners.getOrCreate(type); - list.addIfAbsent(listener); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock1) { + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap listeners = listenersREF.get(this); + + ConcurrentIterator subscribedListeners = listeners.get(type); + if (subscribedListeners == null) { + subscribedListeners = new ConcurrentIterator(); + listeners.put(type, subscribedListeners); + } + + subscribedListeners.add(listener); + + // save this snapshot back to the original (single writer principle) + listenersREF.lazySet(this, listeners); + } Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { @@ -142,16 +185,13 @@ class ConnectionManager implements ListenerBridge, ISessio } /** - * Removes a listener from this connection/endpoint to NO LONGER be notified - * of connect/disconnect/idle/receive(object) events. + * Removes a listener from this connection/endpoint to NO LONGER be notified of connect/disconnect/idle/receive(object) events. *

- * When called by a server, NORMALLY listeners are added at the GLOBAL level - * (meaning, I add one listener, and ALL connections are notified of that - * listener. + * When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections are + * notified of that listener. *

- * It is POSSIBLE to remove a server-connection 'non-global' listener (via - * connection.removeListener), meaning that ONLY that listener attached to - * the connection is removed + * It is POSSIBLE to remove a server-connection 'non-global' listener (via connection.removeListener), meaning that ONLY that listener + * attached to the connection is removed */ @SuppressWarnings("rawtypes") @Override @@ -163,11 +203,22 @@ class ConnectionManager implements ListenerBridge, ISessio Class type = listener.getObjectType(); - CopyOnWriteArrayList> list = this.listeners.get(type); - if (list != null) { - list.remove(listener); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock1) { + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap listeners = listenersREF.get(this); + final ConcurrentIterator concurrentIterator = listeners.get(type); + if (concurrentIterator != null) { + concurrentIterator.remove(listener); + } + + // save this snapshot back to the original (single writer principle) + listenersREF.lazySet(this, listeners); } + Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { logger2.trace("listener removed: {} <{}>", @@ -178,13 +229,24 @@ class ConnectionManager implements ListenerBridge, ISessio } /** - * Removes all registered listeners from this connection/endpoint to NO - * LONGER be notified of connect/disconnect/idle/receive(object) events. + * Removes all registered listeners from this connection/endpoint to NO LONGER be notified of connect/disconnect/idle/receive(object) + * events. */ @Override public final void removeAll() { - this.listeners.clear(); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock1) { + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap listeners = listenersREF.get(this); + + listeners.clear(); + + // save this snapshot back to the original (single writer principle) + listenersREF.lazySet(this, listeners); + } Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { @@ -204,7 +266,18 @@ class ConnectionManager implements ListenerBridge, ISessio throw new IllegalArgumentException("classType cannot be null."); } - this.listeners.remove(classType); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock1) { + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap listeners = listenersREF.get(this); + + listeners.remove(classType); + + // save this snapshot back to the original (single writer principle) + listenersREF.lazySet(this, listeners); + } Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { @@ -229,21 +302,40 @@ class ConnectionManager implements ListenerBridge, ISessio notifyOnMessage0(connection, message, false); } + @SuppressWarnings("Duplicates") private boolean notifyOnMessage0(final C connection, final Object message, boolean foundListener) { Class objectType = message.getClass(); // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) - CopyOnWriteArrayList> list = this.listeners.get(objectType); - if (list != null) { - for (ListenerRaw listener : list) { + final IdentityMap listeners = listenersREF.get(this); + ConcurrentIterator concurrentIterator = listeners.get(objectType); + + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + ListenerRaw listener; + while (current != null) { if (this.shutdown) { return true; } - listener.received(connection, message); + listener = current.getValue(); + current = current.next(); + + try { + listener.received(connection, message); + } catch (Exception e) { + logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", + objectType, + listener, + connection, + e); + listener.error(connection, e); + } } - foundListener = true; + + foundListener = head != null; // true if we have something to publish to, otherwise false } if (!(message instanceof RmiMessages)) { @@ -255,35 +347,48 @@ class ConnectionManager implements ListenerBridge, ISessio // we will call Foo (from this code) // we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object! - list = null; objectType = objectType.getSuperclass(); while (objectType != null) { // check to see if we have what we are looking for in our CURRENT class - list = this.listeners.get(objectType); + concurrentIterator = listeners.get(objectType); + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + ListenerRaw listener; + while (current != null) { + if (this.shutdown) { + return true; + } - if (list != null) { + listener = current.getValue(); + current = current.next(); + + try { + listener.received(connection, message); + } catch (Exception e) { + logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", + objectType, + listener, + connection, + e); + listener.error(connection, e); + } + } + + foundListener = head != null; // true if we have something to publish to, otherwise false break; } // NO MATCH, so walk up. objectType = objectType.getSuperclass(); } - - if (list != null) { - for (ListenerRaw listener : list) { - if (this.shutdown) { - return true; - } - - listener.received(connection, message); - foundListener = true; - } - } } // now have to account for additional connection listener managers (non-global). - ConnectionManager localManager = this.localManagers.get(connection); + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap> localManagers = localManagersREF.get(this); + ConnectionManager localManager = localManagers.get(connection); if (localManager != null) { // if we found a listener during THIS method call, we need to let the NEXT method call know, // so it doesn't spit out error for not handling a message (since that message MIGHT have @@ -302,7 +407,7 @@ class ConnectionManager implements ListenerBridge, ISessio else { Logger logger2 = this.logger; if (logger2.isErrorEnabled()) { - this.logger.error("----------- LISTENER NOT REGISTERED FOR TYPE: {}", + this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}", message.getClass() .getSimpleName()); } @@ -318,29 +423,45 @@ class ConnectionManager implements ListenerBridge, ISessio @Override public final void notifyOnIdle(final C connection) { - Set>>> entrySet = this.listeners.entrySet(); - CopyOnWriteArrayList> list; - for (Entry>> entry : entrySet) { - list = entry.getValue(); - if (list != null) { - for (ListenerRaw listener : list) { + // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) + final IdentityMap listeners = listenersREF.get(this); + + boolean foundListener = false; + final IdentityMap.Values values = listeners.values(); + for (ConcurrentIterator concurrentIterator : values) { + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + ListenerRaw listener; + while (current != null) { if (this.shutdown) { return; } + listener = current.getValue(); + current = current.next(); + try { listener.idle(connection); - } catch (IOException e) { - logger.error("Unable to notify listener on idle.", e); + } catch (Exception e) { + logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e); + listener.error(connection, e); } } - connection.send() - .flush(); + + foundListener |= head != null; // true if we have something to publish to, otherwise false } } + if (foundListener) { + connection.send() + .flush(); + } + // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap> localManagers = localManagersREF.get(this); + ConnectionManager localManager = localManagers.get(connection); if (localManager != null) { localManager.notifyOnIdle(connection); } @@ -351,36 +472,68 @@ class ConnectionManager implements ListenerBridge, ISessio *

* {@link ISessionManager} */ + @SuppressWarnings("Duplicates") @Override public void connectionConnected(final C connection) { - // create a new connection! - this.connections.add(connection); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock2) { + // access a snapshot of the subscriptions (single-writer-principle) + ConcurrentEntry head = connectionsREF.get(this); - try { - Set>>> entrySet = this.listeners.entrySet(); - CopyOnWriteArrayList> list; - for (Entry>> entry : entrySet) { - list = entry.getValue(); - if (list != null) { - for (ListenerRaw listener : list) { - if (this.shutdown) { - return; - } - listener.connected(connection); + if (!connectionEntries.containsKey(connection)) { + head = new ConcurrentEntry(connection, head); + + connectionEntries.put(connection, head); + + // save this snapshot back to the original (single writer principle) + connectionsREF.lazySet(this, head); + } + } + + + final IdentityMap listeners = listenersREF.get(this); + + boolean foundListener = false; + final IdentityMap.Values values = listeners.values(); + for (ConcurrentIterator concurrentIterator : values) { + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + ListenerRaw listener; + while (current != null) { + if (this.shutdown) { + return; } - connection.send() - .flush(); - } - } - // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); - if (localManager != null) { - localManager.connectionConnected(connection); + listener = current.getValue(); + current = current.next(); + + try { + listener.connected(connection); + } catch (Exception e) { + logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e); + listener.error(connection, e); + } + } + + foundListener |= head != null; // true if we have something to publish to, otherwise false } - } catch (Throwable t) { - connectionError(connection, t); + } + + if (foundListener) { + connection.send() + .flush(); + } + + // now have to account for additional (local) listener managers. + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap> localManagers = localManagersREF.get(this); + ConnectionManager localManager = localManagers.get(connection); + if (localManager != null) { + localManager.connectionConnected(connection); } } @@ -389,82 +542,96 @@ class ConnectionManager implements ListenerBridge, ISessio *

* {@link ISessionManager} */ + @SuppressWarnings("Duplicates") @Override public void connectionDisconnected(final C connection) { - Set>>> entrySet = this.listeners.entrySet(); - CopyOnWriteArrayList> list; - for (Entry>> entry : entrySet) { - list = entry.getValue(); - if (list != null) { - for (ListenerRaw listener : list) { + final IdentityMap listeners = listenersREF.get(this); + + boolean foundListener = false; + final IdentityMap.Values values = listeners.values(); + for (ConcurrentIterator concurrentIterator : values) { + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + ListenerRaw listener; + while (current != null) { if (this.shutdown) { return; } - listener.disconnected(connection); + listener = current.getValue(); + current = current.next(); + + try { + listener.disconnected(connection); + } catch (Exception e) { + logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e); + listener.error(connection, e); + } } + + foundListener |= head != null; // true if we have something to publish to, otherwise false } } + if (foundListener) { + connection.send() + .flush(); + } + + // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); + + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap> localManagers = localManagersREF.get(this); + ConnectionManager localManager = localManagers.get(connection); if (localManager != null) { localManager.connectionDisconnected(connection); // remove myself from the "global" listeners so we can have our memory cleaned up. - this.localManagers.remove(connection); + removeListenerManager(connection); } - this.connections.remove(connection); - } + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock2) { + // access a snapshot of the subscriptions (single-writer-principle) + ConcurrentEntry concurrentEntry = connectionEntries.get(connection); + if (concurrentEntry != null) { + ConcurrentEntry head1 = connectionsREF.get(this); - /** - * Invoked when there is an error of some kind during the up/down stream process - *

- * {@link ISessionManager} - */ - @Override - public - void connectionError(final C connection, final Throwable throwable) { - Set>>> entrySet = this.listeners.entrySet(); - CopyOnWriteArrayList> list; - for (Entry>> entry : entrySet) { - list = entry.getValue(); - if (list != null) { - for (ListenerRaw listener : list) { - if (this.shutdown) { - return; - } - - listener.error(connection, throwable); + if (concurrentEntry == head1) { + // if it was second, now it's first + head1 = head1.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators } - connection.send() - .flush(); + else { + concurrentEntry.remove(); + } + + // save this snapshot back to the original (single writer principle) + connectionsREF.lazySet(this, head1); + this.connectionEntries.remove(connection); } } - - // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); - if (localManager != null) { - localManager.connectionError(connection, throwable); - } } /** - * Returns a non-modifiable list of active connections - *

- * {@link ISessionManager} + * Returns a non-modifiable list of active connections. This is extremely slow, and not recommended! */ @Override public List getConnections() { - return Collections.unmodifiableList(this.connections); + synchronized (singleWriterLock2) { + final IdentityMap.Keys keys = this.connectionEntries.keys(); + return keys.toArray(); + } } - final ConnectionManager addListenerManager(final Connection connection) { // when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections @@ -472,32 +639,67 @@ class ConnectionManager implements ListenerBridge, ISessio // it is POSSIBLE to add a connection-specific listener (via connection.addListener), meaning that ONLY // that listener is notified on that event (ie, admin type listeners) - ConnectionManager lm = this.localManagers.getOrCreate(connection, connection.toString()); + ConnectionManager manager; + boolean created = false; - Logger logger2 = this.logger; - if (logger2.isTraceEnabled()) { - this.logger.trace("Connection specific Listener Manager added on connection: {}", connection); + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock3) { + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap> localManagers = localManagersREF.get(this); + + manager = localManagers.get(connection); + if (manager == null) { + created = true; + manager = new ConnectionManager(loggerName + "-" + connection.toString() + " Specific", ConnectionManager.this.baseClass); + localManagers.put(connection, manager); + + // save this snapshot back to the original (single writer principle) + localManagersREF.lazySet(this, localManagers); + } } - return lm; + if (created) { + Logger logger2 = this.logger; + if (logger2.isTraceEnabled()) { + logger2.trace("Connection specific Listener Manager added for connection: {}", connection); + } + } + + return manager; } final void removeListenerManager(final Connection connection) { - this.localManagers.remove(connection); + boolean wasRemoved = false; + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock3) { + // access a snapshot of the subscriptions (single-writer-principle) + final IdentityMap> localManagers = localManagersREF.get(this); + + final ConnectionManager removed = localManagers.remove(connection); + if (removed != null) { + wasRemoved = true; + + // save this snapshot back to the original (single writer principle) + localManagersREF.lazySet(this, localManagers); + } + } + + if (wasRemoved) { + Logger logger2 = this.logger; + if (logger2.isTraceEnabled()) { + logger2.trace("Connection specific Listener Manager removed for connection: {}", connection); + } + } } - /** - * BE CAREFUL! Only for internal use! - * - * @return Returns a FAST list of active connections. - */ - public final - Collection getConnections0() { - return this.connections; - } - /** * BE CAREFUL! Only for internal use! * @@ -505,10 +707,9 @@ class ConnectionManager implements ListenerBridge, ISessio */ public final C getConnection0() throws IOException { - if (this.connections.iterator() - .hasNext()) { - return this.connections.iterator() - .next(); + ConcurrentEntry head1 = connectionsREF.get(this); + if (head1 != null) { + return head1.getValue(); } else { throw new IOException("Not connected to a remote computer. Unable to continue!"); @@ -522,7 +723,7 @@ class ConnectionManager implements ListenerBridge, ISessio */ final boolean hasListeners() { - return this.listeners.isEmpty(); + return listenersREF.get(this).size == 0; } /** @@ -535,7 +736,19 @@ class ConnectionManager implements ListenerBridge, ISessio // disconnect the sessions closeConnections(); - this.listeners.clear(); + synchronized (singleWriterLock1) { + final IdentityMap listeners = listenersREF.get(this); + final Iterator iterator = listeners.values() + .iterator(); + while (iterator.hasNext()) { + final ConcurrentIterator next = iterator.next(); + next.clear(); + iterator.remove(); + } + + // save this snapshot back to the original (single writer principle) + listenersREF.lazySet(this, listeners); + } } /** @@ -543,23 +756,197 @@ class ConnectionManager implements ListenerBridge, ISessio */ final void closeConnections() { - // close the sessions - Iterator iterator = this.connections.iterator(); - //noinspection WhileLoopReplaceableByForEach - while (iterator.hasNext()) { - Connection connection = iterator.next(); - // Close the connection. Make sure the close operation ends because - // all I/O operations are asynchronous in Netty. - // Necessary otherwise workers won't close. - connection.close(); + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock2) { + // don't need anything fast or fancy here, because this method will only be called once + final IdentityMap.Keys keys = connectionEntries.keys(); + for (C connection : keys) { + // Close the connection. Make sure the close operation ends because + // all I/O operations are asynchronous in Netty. + // Also necessary otherwise workers won't close. + connection.close(); + } + + this.connectionEntries.clear(); + this.connectionsHead = null; } - this.connections.clear(); + } + + /** + * Not implemented, since this would cause horrendous problems. + * + * @see dorkbox.network.connection.ConnectionPoint#isWritable() + */ + @Override + public + boolean isWritable() { + throw new UnsupportedOperationException("Method not implemented"); + } + + /** + * Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a + * local channel). + */ + @Override + public + ConnectionExceptSpecifiedBridgeServer except() { + return this; + } + + /** + * This will flush the data from EVERY connection on this server. + *

+ * THIS WILL BE SLOW! + * + * @see dorkbox.network.connection.ConnectionPoint#flush() + */ + public + void flush() { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + c.send() + .flush(); + } + } + + /** + * Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local + * channel). + */ + public + ConnectionPoint TCP(final C connection, final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + if (c != connection) { + c.send() + .TCP(message); + } + } + return this; + } + + /** + * Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local + * channel). + */ + public + ConnectionPoint UDP(final C connection, final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + if (c != connection) { + c.send() + .UDP(message); + } + } + return this; + } + + /** + * Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local + * channel). + */ + public + ConnectionPoint UDT(final C connection, final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + if (c != connection) { + c.send() + .UDT(message); + } + } + return this; + } + + /** + * Sends the message to other listeners INSIDE this endpoint for EVERY connection. It does not send it to a remote address. + */ + public + void self(final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + notifyOnMessage(c, message); + } + } + + /** + * Sends the object all server connections over the network using TCP. (or via LOCAL when it's a local channel). + */ + public + ConnectionPoint TCP(final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + c.send() + .TCP(message); + } + return this; + } + + /** + * Sends the object all server connections over the network using UDP. (or via LOCAL when it's a local channel). + */ + public + ConnectionPoint UDP(final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + c.send() + .UDP(message); + } + return this; + } + + + /** + * Sends the object all server connections over the network using UDT. (or via LOCAL when it's a local channel). + */ + public + ConnectionPoint UDT(final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + C c; + while (current != null) { + c = current.getValue(); + current = current.next(); + + c.send() + .UDT(message); + } + return this; } @Override public boolean equals(final Object o) { - return false; + return this == o; } diff --git a/src/dorkbox/network/connection/EndPointServer.java b/src/dorkbox/network/connection/EndPointServer.java index a7b3ddc1..f5e85c54 100644 --- a/src/dorkbox/network/connection/EndPointServer.java +++ b/src/dorkbox/network/connection/EndPointServer.java @@ -29,13 +29,9 @@ import java.io.IOException; public class EndPointServer extends EndPoint { - private final ServerConnectionBridge serverConnections; - public EndPointServer(final Configuration options) throws InitializationException, SecurityException, IOException { super(Server.class, options); - - this.serverConnections = new ServerConnectionBridge(this.connectionManager); } /** @@ -44,7 +40,7 @@ class EndPointServer extends EndPoint { @Override public ConnectionBridgeServer send() { - return this.serverConnections; + return this.connectionManager; } /** diff --git a/src/dorkbox/network/connection/ISessionManager.java b/src/dorkbox/network/connection/ISessionManager.java index bf25fe76..8e0f358e 100644 --- a/src/dorkbox/network/connection/ISessionManager.java +++ b/src/dorkbox/network/connection/ISessionManager.java @@ -35,12 +35,7 @@ interface ISessionManager { void connectionDisconnected(C connection); /** - * Called when there is an error of some kind during the up/down stream process - */ - void connectionError(C connection, Throwable throwable); - - /** - * Returns a non-modifiable list of active connections + * Returns a non-modifiable list of active connections. This is extremely slow, and not recommended! */ Collection getConnections(); } diff --git a/src/dorkbox/network/connection/ServerConnectionBridge.java b/src/dorkbox/network/connection/ServerConnectionBridge.java deleted file mode 100644 index 56df89c7..00000000 --- a/src/dorkbox/network/connection/ServerConnectionBridge.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.connection; - -import dorkbox.network.connection.bridge.ConnectionBridgeServer; -import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; - -import java.util.Collection; - -public -class ServerConnectionBridge - implements ConnectionPoint, ConnectionBridgeServer, ConnectionExceptSpecifiedBridgeServer { - - private final ConnectionManager connectionManager; - - public - ServerConnectionBridge(final ConnectionManager connectionManager) { - this.connectionManager = connectionManager; - } - - /** - * Not implemented, since this would cause horrendous problems. - * - * @see dorkbox.network.connection.ConnectionPoint#isWritable() - */ - @Override - public - boolean isWritable() { - throw new UnsupportedOperationException("Method not implemented"); - } - - /** - * This will flush the data from EVERY connection on this server. - *

- * THIS WILL BE SLOW! - * - * @see dorkbox.network.connection.ConnectionPoint#flush() - */ - @Override - public - void flush() { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - c.send() - .flush(); - } - } - - /** - * Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a - * local channel). - */ - @Override - public - ConnectionExceptSpecifiedBridgeServer except() { - return this; - } - - /** - * Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local - * channel). - */ - @Override - public - void TCP(final C connection, final Object message) { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - if (c != connection) { - c.send() - .TCP(message); - } - } - } - - /** - * Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local - * channel). - */ - @Override - public - void UDP(final C connection, final Object message) { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - if (c != connection) { - c.send() - .UDP(message); - } - } - } - - /** - * Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local - * channel). - */ - @Override - public - void UDT(final C connection, final Object message) { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - if (c != connection) { - c.send() - .UDT(message); - } - } - } - - /** - * Sends the message to other listeners INSIDE this endpoint for EVERY connection. It does not send it to a remote address. - */ - @Override - public - void self(final Object message) { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - this.connectionManager.notifyOnMessage(c, message); - } - } - - /** - * Sends the object all server connections over the network using TCP. (or via LOCAL when it's a local channel). - */ - @Override - public - ConnectionPoint TCP(final Object message) { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - c.send() - .TCP(message); - } - - return this; - } - - /** - * Sends the object all server connections over the network using UDP. (or via LOCAL when it's a local channel). - */ - @Override - public - ConnectionPoint UDP(final Object message) { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - c.send() - .UDP(message); - } - - return this; - } - - /** - * Sends the object all server connections over the network using UDT. (or via LOCAL when it's a local channel). - */ - @Override - public - ConnectionPoint UDT(final Object message) { - Collection connections0 = this.connectionManager.getConnections0(); - for (C c : connections0) { - c.send() - .UDT(message); - } - - return this; - } -} diff --git a/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java b/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java index 547f4cec..74a8c23c 100644 --- a/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java +++ b/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java @@ -16,6 +16,7 @@ package dorkbox.network.connection.bridge; import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionPoint; public interface ConnectionExceptSpecifiedBridgeServer { @@ -24,17 +25,17 @@ interface ConnectionExceptSpecifiedBridgeServer { * Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local * channel). */ - void TCP(C connection, Object message); + ConnectionPoint TCP(C connection, Object message); /** * Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local * channel). */ - void UDP(C connection, Object message); + ConnectionPoint UDP(C connection, Object message); /** * Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local * channel). */ - void UDT(C connection, Object message); + ConnectionPoint UDT(C connection, Object message); } diff --git a/src/dorkbox/network/util/ConcurrentHashMapFactory.java b/src/dorkbox/network/util/ConcurrentHashMapFactory.java deleted file mode 100644 index 9c3ac029..00000000 --- a/src/dorkbox/network/util/ConcurrentHashMapFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.util; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public abstract -class ConcurrentHashMapFactory extends ConcurrentHashMap implements ConcurrentMap { - - private static final long serialVersionUID = -1L; - - public - ConcurrentHashMapFactory() { - } - - public abstract - V createNewObject(Object... args); - - - /** - * Thread safe method to get the value in the map. If the value doesn't exist, it will create a new one (and put the new one in the - * map) - */ - public final - V getOrCreate(K key, Object... args) { - V orig = get(key); - - if (orig == null) { - // It's OK to construct a new object that ends up not being used - orig = createNewObject(args); - V putByOtherThreadJustNow = putIfAbsent(key, orig); - if (putByOtherThreadJustNow != null) { - // Some other thread "won" - orig = putByOtherThreadJustNow; - } - else { - // This thread was the winner - } - } - return orig; - } -}