Fixed edge condition when notifyDisconnect() would get called 2x on a server

This commit is contained in:
Robinson 2022-06-06 17:18:38 +02:00
parent e0570527ae
commit e302b1e429
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 32 additions and 25 deletions

View File

@ -91,8 +91,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private val isClosed = atomic(false) private val isClosed = atomic(false)
internal var preCloseAction: suspend () -> Unit = {} internal var closeAction: suspend () -> Unit = {}
internal var postCloseAction: suspend () -> Unit = {}
// only accessed on a single thread! // only accessed on a single thread!
private var connectionLastCheckTimeNanos = 0L private var connectionLastCheckTimeNanos = 0L
@ -338,6 +337,13 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
* Closes the connection, and removes all connection specific listeners * Closes the connection, and removes all connection specific listeners
*/ */
suspend fun close() { suspend fun close() {
close(true)
}
/**
* Closes the connection, and removes all connection specific listeners
*/
internal suspend fun close(enableRemove: Boolean) {
// there are 2 ways to call close. // there are 2 ways to call close.
// MANUALLY // MANUALLY
// When a connection is disconnected via a timeout/expire. // When a connection is disconnected via a timeout/expire.
@ -377,13 +383,21 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
logger.error("Connection $id: Unable to delete aeron publication log on close: $logFile") logger.error("Connection $id: Unable to delete aeron publication log on close: $logFile")
} }
if (enableRemove) {
endPoint.removeConnection(this) endPoint.removeConnection(this)
}
// NOTE: notifyDisconnect() is called in postCloseAction()!!
// This is set by the client so if there is a "connect()" call in the the disconnect callback, we can have proper // This is set by the client/server so if there is a "connect()" call in the the disconnect callback, we can have proper
// lock-stop ordering for how disconnect and connect work with each-other // lock-stop ordering for how disconnect and connect work with each-other
preCloseAction() closeAction()
logger.debug {"[$id] connection closed"}
}
}
// called in postCloseAction(), so we don't expose our internal listenerManager
internal suspend fun doNotifyDisconnect() {
val connectionSpecificListenerManager = listenerManager.value val connectionSpecificListenerManager = listenerManager.value
if (connectionSpecificListenerManager != null) { if (connectionSpecificListenerManager != null) {
// this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback // this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
@ -392,13 +406,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
connectionSpecificListenerManager.notifyDisconnect(this@Connection) connectionSpecificListenerManager.notifyDisconnect(this@Connection)
} }
} }
}
// This is set by the client/server so if there is a "connect()" call in the the disconnect callback, we can have proper
// lock-stop ordering for how disconnect and connect work with each-other
postCloseAction()
logger.debug {"[$id] connection closed"}
}
}
// //
// //

View File

@ -718,9 +718,12 @@ internal constructor(val type: Class<*>,
aeronDriver.close() aeronDriver.close()
runBlocking { runBlocking {
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
val enableRemove = type == Client::class.java
connections.forEach { connections.forEach {
logger.info { "Closing connection: ${it.id}" } logger.info { "Closing connection: ${it.id}" }
it.close() it.close(enableRemove)
} }
// Connections are closed first, because we want to make sure that no RMI messages can be received // Connections are closed first, because we want to make sure that no RMI messages can be received

View File

@ -109,15 +109,10 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
} else { } else {
logger.trace { "[${message.connectKey}] Connection (${pendingConnection.id}) from $connectionString done with handshake." } logger.trace { "[${message.connectKey}] Connection (${pendingConnection.id}) from $connectionString done with handshake." }
pendingConnection.postCloseAction = { pendingConnection.closeAction = {
// called on connection.close() // called on connection.close()
pendingConnection.doNotifyDisconnect()
// this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
actionDispatch.launch {
listenerManager.notifyDisconnect(pendingConnection)
} }
}
// this enables the connection to start polling for messages // this enables the connection to start polling for messages
server.addConnection(pendingConnection) server.addConnection(pendingConnection)