added sendDisconnectMessage to API when closing

This commit is contained in:
Robinson 2023-10-18 19:51:28 +02:00
parent 2245f0bfc5
commit 6fb5dbb833
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 22 additions and 9 deletions

View File

@ -853,6 +853,15 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
onClose = object : EventCloseOperator {
override fun invoke() {
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
val uncleanDisconnect = !shutdownEventPoller && !newConnection.isClosed() && newConnection.isClosedWithTimeout()
val autoReconnect = mustRestartDriverOnError || uncleanDisconnect
if (mustRestartDriverOnError) {
logger.error("Critical driver error detected, reconnecting client")
} else if (uncleanDisconnect) {
logger.error("Unclean disconnect detected, reconnecting client")
}
// this can be closed when the connection is remotely closed in ADDITION to manually closing
if (logger.isDebugEnabled) {
@ -861,10 +870,11 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
// this is because we restart automatically on driver errors/weird timeouts
val standardClose = !autoReconnect
this@Client.close(
closeEverything = false,
sendDisconnectMessage = standardClose,
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
@ -875,10 +885,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()
if (mustRestartDriverOnError) {
logger.error("Critical driver error detected, reconnecting client")
if (autoReconnect) {
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
@ -1002,6 +1009,7 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
stopConnectOnShutdown = true
close(
closeEverything = closeEverything,
sendDisconnectMessage = true,
notifyDisconnect = true,
releaseWaitingThreads = true
)

View File

@ -272,6 +272,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
val standardClose = !mustRestartDriverOnError
this@Server.close(
closeEverything = false,
sendDisconnectMessage = standardClose,
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
@ -395,6 +396,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
fun close(closeEverything: Boolean = true) {
close(
closeEverything = closeEverything,
sendDisconnectMessage = true,
notifyDisconnect = true,
releaseWaitingThreads = true
)

View File

@ -195,6 +195,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
EventDispatcher.CLOSE.launch {
endPointUsages.forEach {
it.close(closeEverything = false,
sendDisconnectMessage = false,
notifyDisconnect = false,
releaseWaitingThreads = false)
}

View File

@ -351,7 +351,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
if (logger.isDebugEnabled) {
logger.debug("[$toString0] connection closing")
logger.debug("[$toString0] connection closing. sendDisconnectMessage=$sendDisconnectMessage, notifyDisconnect=$notifyDisconnect, closeEverything=$closeEverything")
}
// make sure to save off the RMI objects for session management

View File

@ -267,6 +267,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
hook = Thread {
close(
closeEverything = true,
sendDisconnectMessage = true,
notifyDisconnect = true,
releaseWaitingThreads = true
)
@ -902,11 +903,12 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*/
internal fun close(
closeEverything: Boolean,
sendDisconnectMessage: Boolean,
notifyDisconnect: Boolean,
releaseWaitingThreads: Boolean)
{
if (logger.isDebugEnabled) {
logger.debug("Requesting close: closeEverything=$closeEverything, releaseWaitingThreads=$releaseWaitingThreads")
logger.debug("Requesting close: closeEverything=$closeEverything, sendDisconnectMessage=$sendDisconnectMessage, notifyDisconnect=$notifyDisconnect, releaseWaitingThreads=$releaseWaitingThreads")
}
// 1) endpoints can call close()
@ -953,7 +955,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
connections.forEach {
it.closeImmediately(sendDisconnectMessage = true,
it.closeImmediately(sendDisconnectMessage = sendDisconnectMessage,
notifyDisconnect = notifyDisconnect,
closeEverything = closeEverything)
}