Better support for polling and sending dc message

This commit is contained in:
Robinson 2023-10-26 21:12:46 +02:00
parent 7f2ad97aa7
commit f531f61a53
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 37 additions and 11 deletions

View File

@ -289,13 +289,27 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
return isClosed.value
}
/**
* Is this a "dirty" disconnect, meaning that it has timed out, but not been explicitly closed
*/
internal fun isDirtyClose(): Boolean {
return !closeRequested && !isClosed() && isClosedWithTimeout()
}
/**
* Is this connection considered still safe for polling (or rather, has it been closed in an unusual way?)
*/
internal fun canPoll(): Boolean {
return !closeRequested && !isClosed() && !isClosedWithTimeout()
}
/**
* We must account for network blips. The blips will be recovered by aeron, but we want to make sure that we are actually
* disconnected for a set period of time before we start the close process for a connection
*
* @return `true` if this connection has been closed via aeron
*/
fun isClosedWithTimeout(): Boolean {
internal fun isClosedWithTimeout(): Boolean {
// we ONLY want to actually, legit check, 1 time every XXX ms.
val now = System.nanoTime()
@ -336,6 +350,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// the compareAndSet is used to make sure that if we call close() MANUALLY, (and later) when the auto-cleanup/disconnect is called -- it doesn't
// try to do it again.
closeRequested = true
// make sure that EVERYTHING before "close()" runs before we do.
// If there are multiple clients/servers sharing the same NetworkPoller -- then they will wait on each other!
@ -378,19 +393,30 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
endPoint.listenerManager.notifyError(e)
}
// notify the remote endPoint that we are closing
// we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back)
if (sendDisconnectMessage && publication.isConnected) {
if (logger.isTraceEnabled) {
logger.trace("Sending disconnect message to ${endPoint.otherTypeName}")
}
if (sendDisconnectMessage) {
if (publication.isConnected) {
if (logger.isDebugEnabled) {
logger.debug("Sending disconnect message to ${endPoint.otherTypeName}")
}
// sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok)
if (closeEverything) {
send(DisconnectMessage.CLOSE_EVERYTHING, true)
// sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok)
if (closeEverything) {
send(DisconnectMessage.CLOSE_EVERYTHING, true)
} else {
send(DisconnectMessage.CLOSE_SIMPLE, true)
}
// wait for .5 seconds to (help) make sure that the messages are sent before shutdown! This is not guaranteed!
if (logger.isDebugEnabled) {
logger.debug("Waiting for disconnect message to send")
}
Thread.sleep(500L)
} else {
send(DisconnectMessage.CLOSE_FOR_SESSION, true)
if (logger.isDebugEnabled) {
logger.debug("Publication is not connected with ${endPoint.otherTypeName}, not sending disconnect message.")
}
}
}

View File

@ -18,7 +18,7 @@ package dorkbox.network.connection
class DisconnectMessage(val closeEverything: Boolean) {
companion object {
val CLOSE_FOR_SESSION = DisconnectMessage(false)
val CLOSE_SIMPLE = DisconnectMessage(false)
val CLOSE_EVERYTHING = DisconnectMessage(true)
}
}