Better lock-step and checks when closing an endpoint

This commit is contained in:
Robinson 2023-07-20 20:39:12 +02:00
parent d787045149
commit 80d77f2f51
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 26 additions and 22 deletions

View File

@ -330,6 +330,13 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
* Closes the connection, and removes all connection specific listeners
*/
suspend fun close() {
close(true)
}
/**
* Closes the connection, and removes all connection specific listeners
*/
internal suspend fun close(sendDisconnectMessage: Boolean) {
// there are 2 ways to call close.
// MANUALLY
// When a connection is disconnected via a timeout/expire.
@ -338,14 +345,14 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// make sure that EVERYTHING before "close()" runs before we do
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
closeImmediately()
closeImmediately(sendDisconnectMessage)
}
}
// connection.close() -> this
// endpoint.close() -> connection.close() -> this
internal suspend fun closeImmediately() {
internal suspend fun closeImmediately(sendDisconnectMessage: Boolean) {
// the server 'handshake' connection info is cleaned up with the disconnect via timeout/expire.
if (!isClosed.compareAndSet(expect = false, update = true)) {
return
@ -358,7 +365,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// notify the remote endPoint that we are closing
// we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back)
if (publication.isConnected) {
if (sendDisconnectMessage && publication.isConnected) {
logger.trace { "Sending disconnect message to remote endpoint" }
// sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok)
send(DisconnectMessage.INSTANCE, true)
}

View File

@ -578,7 +578,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
is DisconnectMessage -> {
// NOTE: This MUST be on a new co-routine (this is...)
runBlocking {
connection.close()
connection.close(false)
}
}
@ -939,10 +939,13 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
initiatedByClientClose: Boolean,
initiatedByShutdown: Boolean)
{
logger.debug { "Requesting close: closeEverything=$closeEverything, initiatedByClientClose=$initiatedByClientClose, initiatedByShutdown=$initiatedByShutdown" }
// 1) endpoints can call close()
// 2) client can close the endpoint if the connection is D/C from aeron (and the endpoint was not closed manually)
val shutdownPreviouslyStarted = shutdownInProgress.getAndSet(true)
if (closeEverything && shutdownPreviouslyStarted) {
logger.debug { "Shutdown previously started, cleaning up..." }
// this is only called when the client network event poller shuts down
// if we have clientConnectionClosed, then run that logic (because it doesn't run on the client when the connection is closed remotely)
@ -973,7 +976,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
logger.trace { "Closing ${connections.size()} via the close event" }
connections.forEach {
it.closeImmediately()
it.closeImmediately(true)
}
// don't do these things if we are "closed" from a client connection disconnect
@ -994,9 +997,11 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
responseManager.close()
// don't do these things if we are "closed" from a client connection disconnect
if (closeEverything) {
// if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
// if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
if (closeEverything) {
// when the client connection is closed, we don't close the driver/etc.
// Clears out all registered events
listenerManager.close()
@ -1004,15 +1009,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
storage.close()
aeronDriver.close()
// the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on
shutdown = true
shutdownLatch.countDown()
shutdownInProgress.lazySet(false)
logger.info { "Done shutting down endpoint." }
}
} else {
// when the client connection is closed, we don't close the driver/etc.
// the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on
shutdown = true
shutdownLatch.countDown()
shutdownInProgress.lazySet(false)

View File

@ -19,11 +19,7 @@ package dorkbox.network.connection
import dorkbox.network.Configuration
import dorkbox.util.NamedThreadFactory
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.util.concurrent.*
@ -165,7 +161,7 @@ enum class EventDispatcher {
launchSequentially(endEvent, function)
}
} else {
function()
endEvent.launch(function)
}
}
}