More careful checks when closing endpoints during restart

code polish
This commit is contained in:
Robinson 2023-08-10 20:05:49 -06:00
parent ad9771263c
commit 96f5406ae6
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
4 changed files with 25 additions and 13 deletions

View File

@ -854,8 +854,9 @@ open class Client<CONNECTION : Connection>(
if (!shutdownInProgress.value) {
this@Client.close(
closeEverything = false,
initiatedByClientClose = true,
initiatedByShutdown = false)
notifyDisconnect = !criticalDriverError, // this is because we restart automatically on driver errors
releaseWaitingThreads = !criticalDriverError // this is because we restart automatically on driver errors
)
}
@ -982,7 +983,9 @@ open class Client<CONNECTION : Connection>(
fun close(closeEverything: Boolean = true) {
runBlocking {
close(
closeEverything = closeEverything, releaseWaitingThreads = true
closeEverything = closeEverything,
notifyDisconnect = true,
releaseWaitingThreads = true
)
}
}

View File

@ -450,7 +450,9 @@ open class Server<CONNECTION : Connection>(
fun close(closeEverything: Boolean = true) {
runBlocking {
close(
closeEverything = closeEverything, releaseWaitingThreads = true
closeEverything = closeEverything,
notifyDisconnect = true,
releaseWaitingThreads = true
)
}
}

View File

@ -905,6 +905,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
}
else {
logger.info { "[${publication.sessionId()}] Connection disconnected while sending data, closing connection." }
internal.criticalDriverError = true
// publication was actually closed or the server was closed, so no bother throwing an error
connection.closeImmediately(sendDisconnectMessage = false,
@ -938,7 +939,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
// NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
val exception = endPoint!!.newException(
"[${publication.sessionId()}] Unable to send message. (Connection in closed, aborted attempt! ${
"[${publication.sessionId()}] Unable to send message. (Connection is closed, aborted attempt! ${
AeronDriver.errorCodeName(result)
})"
)
@ -1042,7 +1043,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
// NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
val exception = endPoint!!.newException(
"[${publication.sessionId()}] Unable to send message. (Connection in closed, aborted attempt! ${
"[${publication.sessionId()}] Unable to send message. (Connection is closed, aborted attempt! ${
AeronDriver.errorCodeName(result)
})"
)

View File

@ -256,7 +256,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
hook = Thread {
runBlocking {
close(
closeEverything = true, releaseWaitingThreads = true
closeEverything = true,
notifyDisconnect = true,
releaseWaitingThreads = true
)
}
}
@ -827,6 +829,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*/
internal suspend fun close(
closeEverything: Boolean,
notifyDisconnect: Boolean,
releaseWaitingThreads: Boolean)
{
logger.debug { "Requesting close: closeEverything=$closeEverything, releaseWaitingThreads=$releaseWaitingThreads" }
@ -845,13 +848,16 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// Remove from memory the data from the back-end storage
storage.close()
aeronDriver.close()
// don't do anything more, since we've already shutdown!
return
}
if (!shutdownPreviouslyStarted && Thread.currentThread() != hook) {
if (shutdownPreviouslyStarted) {
logger.debug { "Shutdown previously started, ignoring..." }
return
}
if (Thread.currentThread() != hook) {
try {
Runtime.getRuntime().removeShutdownHook(hook)
} catch (ignored: Exception) {
@ -867,7 +873,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
connections.forEach {
it.closeImmediately(sendDisconnectMessage = true,
notifyDisconnect = true)
notifyDisconnect = notifyDisconnect)
}
@ -905,9 +911,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
storage.close()
}
aeronDriver.close()
aeronDriver.close()
shutdown = true
shutdown = true
// the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on
shutdownLatch.countDown()