diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 49c156cd..7baa972d 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -330,6 +330,13 @@ open class Connection(connectionParameters: ConnectionParams<*>) { * Closes the connection, and removes all connection specific listeners */ suspend fun close() { + close(true) + } + + /** + * Closes the connection, and removes all connection specific listeners + */ + internal suspend fun close(sendDisconnectMessage: Boolean) { // there are 2 ways to call close. // MANUALLY // When a connection is disconnected via a timeout/expire. @@ -338,14 +345,14 @@ open class Connection(connectionParameters: ConnectionParams<*>) { // make sure that EVERYTHING before "close()" runs before we do EventDispatcher.launchSequentially(EventDispatcher.CLOSE) { - closeImmediately() + closeImmediately(sendDisconnectMessage) } } // connection.close() -> this // endpoint.close() -> connection.close() -> this - internal suspend fun closeImmediately() { + internal suspend fun closeImmediately(sendDisconnectMessage: Boolean) { // the server 'handshake' connection info is cleaned up with the disconnect via timeout/expire. if (!isClosed.compareAndSet(expect = false, update = true)) { return @@ -358,7 +365,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) { // notify the remote endPoint that we are closing // we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back) - if (publication.isConnected) { + if (sendDisconnectMessage && publication.isConnected) { + logger.trace { "Sending disconnect message to remote endpoint" } + // sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok) send(DisconnectMessage.INSTANCE, true) } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index e95ed1b0..b7c19f44 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -578,7 +578,7 @@ abstract class EndPoint private constructor(val type: C is DisconnectMessage -> { // NOTE: This MUST be on a new co-routine (this is...) runBlocking { - connection.close() + connection.close(false) } } @@ -939,10 +939,13 @@ abstract class EndPoint private constructor(val type: C initiatedByClientClose: Boolean, initiatedByShutdown: Boolean) { + logger.debug { "Requesting close: closeEverything=$closeEverything, initiatedByClientClose=$initiatedByClientClose, initiatedByShutdown=$initiatedByShutdown" } + // 1) endpoints can call close() // 2) client can close the endpoint if the connection is D/C from aeron (and the endpoint was not closed manually) val shutdownPreviouslyStarted = shutdownInProgress.getAndSet(true) if (closeEverything && shutdownPreviouslyStarted) { + logger.debug { "Shutdown previously started, cleaning up..." } // this is only called when the client network event poller shuts down // if we have clientConnectionClosed, then run that logic (because it doesn't run on the client when the connection is closed remotely) @@ -973,7 +976,7 @@ abstract class EndPoint private constructor(val type: C // inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect() logger.trace { "Closing ${connections.size()} via the close event" } connections.forEach { - it.closeImmediately() + it.closeImmediately(true) } // don't do these things if we are "closed" from a client connection disconnect @@ -994,9 +997,11 @@ abstract class EndPoint private constructor(val type: C responseManager.close() // don't do these things if we are "closed" from a client connection disconnect - if (closeEverything) { - // if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done - EventDispatcher.launchSequentially(EventDispatcher.CLOSE) { + // if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done + EventDispatcher.launchSequentially(EventDispatcher.CLOSE) { + if (closeEverything) { + // when the client connection is closed, we don't close the driver/etc. + // Clears out all registered events listenerManager.close() @@ -1004,15 +1009,9 @@ abstract class EndPoint private constructor(val type: C storage.close() aeronDriver.close() - - // the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on - shutdown = true - shutdownLatch.countDown() - shutdownInProgress.lazySet(false) - logger.info { "Done shutting down endpoint." } } - } else { - // when the client connection is closed, we don't close the driver/etc. + + // the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on shutdown = true shutdownLatch.countDown() shutdownInProgress.lazySet(false) diff --git a/src/dorkbox/network/connection/EventDispatcher.kt b/src/dorkbox/network/connection/EventDispatcher.kt index 973a9700..6f7e4978 100644 --- a/src/dorkbox/network/connection/EventDispatcher.kt +++ b/src/dorkbox/network/connection/EventDispatcher.kt @@ -19,11 +19,7 @@ package dorkbox.network.connection import dorkbox.network.Configuration import dorkbox.util.NamedThreadFactory import kotlinx.atomicfu.atomic -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import mu.KotlinLogging import java.util.concurrent.* @@ -165,7 +161,7 @@ enum class EventDispatcher { launchSequentially(endEvent, function) } } else { - function() + endEvent.launch(function) } } }