diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 286ef36f..5df62a15 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -433,12 +433,22 @@ open class Client(config: Configuration = Configuration isConnected = false super.close() - // in the client, "notifyDisconnect" will NEVER be called, because it's only called on a connection! + // in the client, "client-notifyDisconnect" will NEVER be called, because it's only called on a connection! + // (meaning, 'connection-notifiyDisconnect' is what is called) + // manually call it. if (con != null) { - runBlocking { + // this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback + val job = actionDispatch.launch { listenerManager.notifyDisconnect(con) } + + // when we close a client or a server, we want to make sure that ALL notifications are finished. + // when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown + // NOTE: this must be the LAST thing happening! + runBlocking { + job.join() + } } } diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 12a0a6bc..8a60bd09 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -30,7 +30,9 @@ import dorkbox.network.rmi.TimeoutException import io.aeron.FragmentAssembler import io.aeron.Image import io.aeron.logbuffer.Header +import kotlinx.coroutines.Job import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import org.agrona.DirectBuffer import java.net.InetSocketAddress import java.util.concurrent.CopyOnWriteArrayList @@ -265,6 +267,8 @@ open class Server(config: ServerConfiguration = ServerC false } }, { connectionToClean -> + logger.info {"[${connectionToClean.sessionId}] cleaned-up connection"} + // have to free up resources! handshake.cleanup(connectionToClean) @@ -274,6 +278,11 @@ open class Server(config: ServerConfiguration = ServerC // the compareAndSet is used to make sure that if we call close() MANUALLY, when the auto-cleanup/disconnect is called -- it doesn't // try to do it again. connectionToClean.close() + + // have to manually notify the server-listenerManager that this connection was closed + // if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is + // instantly notified and on cleanup, the server-listenermanager is called + listenerManager.notifyDisconnect(connectionToClean) }) @@ -296,7 +305,6 @@ open class Server(config: ServerConfiguration = ServerC } } - /** * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have. * - NOTHING : Nothing happens to the in/out bytes @@ -316,68 +324,14 @@ open class Server(config: ServerConfiguration = ServerC } /** - * Safely sends objects to a destination + * Safely sends objects to a destination. */ suspend fun send(message: Any) { - connections.send(message) + connections.forEach { + it.send(message) + } } - /** - * When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, - * and ALL connections are notified of that listener. - *

- * It is POSSIBLE to add a server-connection 'local' listener (via connection.addListener), meaning that ONLY - * that listener attached to the connection is notified on that event (ie, admin type listeners) - * - * @return a newly created listener manager for the connection - */ -// fun addListenerManager(connection: C): ConnectionManager { -// return connectionManager.addListenerManager(connection) -// } - - /** - * When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, - * and ALL connections are notified of that listener. - *

- * It is POSSIBLE to remove a server-connection 'local' listener (via connection.removeListener), meaning that ONLY - * that listener attached to the connection is removed - * - * - * This removes the listener manager for that specific connection - */ -// fun removeListenerManager(connection: C) { -// connectionManager.removeListenerManager(connection) -// } - - - -// -// -// /** -// * Creates a "global" remote object for use by multiple connections. -// * -// * @return the ID assigned to this RMI object -// */ -// fun create(objectId: Short, globalObject: T) { -// return rmiGlobalObjects.register(globalObject) -// } -// -// /** -// * Creates a "global" remote object for use by multiple connections. -// * -// * @return the ID assigned to this RMI object -// */ -// fun create(`object`: T): Short { -// return rmiGlobalObjects.register(`object`) ?: 0 -// } -// -// -// - - - - - /** * TODO: when adding a "custom" connection, it's super important to not have to worry about the sessionID (which is what we key off of) * Adds a custom connection to the server. @@ -405,9 +359,50 @@ open class Server(config: ServerConfiguration = ServerC connections.remove(connection) } + + /** + * Closes the server and all it's connections. After a close, you may call 'bind' again. + */ override fun close() { super.close() bindAlreadyCalled = false + + // when we call close, it will shutdown the polling mechanism, so we have to manually cleanup the connections and call server-notifyDisconnect + // on them + + runBlocking { + val jobs = mutableListOf() + + // we want to clear all the connections FIRST (since we are shutting down) + val cons = mutableListOf() + connections.forEach { cons.add(it) } + connections.clear() + + cons.forEach { connection -> + logger.error("${connection.id} cleanup") + // have to free up resources! + handshake.cleanup(connection) + + // make sure the connection is closed (close can only happen once, so a duplicate call does nothing!) + connection.close() + + // have to manually notify the server-listenerManager that this connection was closed + // if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is + // instantly notified and on cleanup, the server-listenermanager is called + // NOTE: this must be the LAST thing happening! + + // this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback + val job = actionDispatch.launch { + listenerManager.notifyDisconnect(connection) + } + jobs.add(job) + } + + + // when we close a client or a server, we want to make sure that ALL notifications are finished. + // when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown + jobs.forEach { it.join() } + } } diff --git a/src/dorkbox/network/connection/ConnectionManager.kt b/src/dorkbox/network/connection/ConnectionManager.kt index 2c63943d..95c827e3 100644 --- a/src/dorkbox/network/connection/ConnectionManager.kt +++ b/src/dorkbox/network/connection/ConnectionManager.kt @@ -93,12 +93,9 @@ internal open class ConnectionManager() { } /** - * Safely sends objects to a destination (such as a custom object or a standard ping). This will automatically choose which protocol - * is available to use. + * Removes all connections. Does not call close or anything else on them */ - suspend inline fun send(message: Any) { - forEach { - it.send(message) - } + fun clear() { + connections.clear() } } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 284f7360..a24e04ae 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -122,7 +122,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A private var aeron: Aeron? = null /** - * Returns the serialization wrapper if there is an object type that needs to be added outside of the basics. + * Returns the serialization wrapper if there is an object type that needs to be added outside of the basic types. */ val serialization: Serialization @@ -695,7 +695,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A rmiGlobalSupport.close() runBlocking { - // don't need anything fast or fancy here, because this method will only be called once connections.forEach { it.close() }