Only check if a connection is closed now. We now wait for pub+sub to be "connected" before continuing to build the connection object (so it will always be in the connected state)

This commit is contained in:
Robinson 2023-07-23 13:39:27 +02:00
parent ad3fdfc64d
commit 06b5f30948
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 4 additions and 58 deletions

View File

@ -811,7 +811,7 @@ open class Client<CONNECTION : Connection>(
action = {
// if we initiate a disconnect manually, then there is no need to wait for aeron to verify it's closed
// we only want to wait for aeron to verify it's closed if we are SUPPOSED to be connected, but there's a network blip
if (!(shutdownEventPoller || newConnection.isClosedViaAeron())) {
if (!(shutdownEventPoller || newConnection.isClosed())) {
newConnection.poll()
} else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.

View File

@ -281,7 +281,7 @@ open class Server<CONNECTION : Connection>(
// this manages existing clients (for cleanup + connection polling). This has a concurrent iterator,
// so we can modify this as we go
connections.forEach { connection ->
if (!connection.isClosedViaAeron()) {
if (!connection.isClosed()) {
// Otherwise, poll the connection for messages
pollCount += connection.poll()
} else {

View File

@ -107,14 +107,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private val isClosed = atomic(false)
// only accessed on a single thread!
private var connectionLastCheckTimeNanos = 0L
private var connectionTimeoutTimeNanos = 0L
// always offset by the linger amount, since we cannot act faster than the linger timeout for adding/removing publications
private val connectionCheckIntervalNanos = endPoint.config.connectionCheckIntervalNanos + endPoint.aeronDriver.lingerNs()
private val connectionExpirationTimoutNanos = endPoint.config.connectionExpirationTimoutNanos + endPoint.aeronDriver.lingerNs()
// while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error.
internal val remoteKeyChanged = connectionParameters.publicKeyValidation == PublicKeyValidationState.TAMPERED
@ -266,52 +258,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
return listenerManager.value?.notifyOnMessage(this, message) ?: false
}
/**
* We must account for network blips. The blips will be recovered by aeron, but we want to make sure that we are actually
* disconnected for a set period of time before we start the close process for a connection
*
* @return `true` if this connection has been closed via aeron
*/
fun isClosedViaAeron(): Boolean {
if (isClosed.value) {
// if we are manually closed, then don't check aeron timeouts!
return true
}
// we ONLY want to actually, legit check, 1 time every XXX ms.
val now = System.nanoTime()
if (now - connectionLastCheckTimeNanos < connectionCheckIntervalNanos) {
// we haven't waited long enough for another check. always return false (true means we are closed)
return false
}
connectionLastCheckTimeNanos = now
// as long as we are connected, we reset the state, so that if there is a network blip, we want to make sure that it is
// a network blip for a while, instead of just once or twice. (which can happen)
if (subscription.isConnected && publication.isConnected) {
// reset connection timeout
connectionTimeoutTimeNanos = 0L
// we are still connected (true means we are closed)
return false
}
//
// aeron is not connected
//
if (connectionTimeoutTimeNanos == 0L) {
connectionTimeoutTimeNanos = now
}
// make sure that our "isConnected" state lasts LONGER than the expiry timeout!
// 1) connections take a little bit of time from polling -> connecting (because of how we poll connections before 'connecting' them).
// 2) network blips happen. Aeron will recover, and we want to make sure that WE don't instantly DC
return now - connectionTimeoutTimeNanos >= connectionExpirationTimoutNanos
}
/**
* @return true if this connection has had close() called
*/

View File

@ -795,7 +795,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
if (result == Publication.CLOSED && connection.isClosedViaAeron()) {
if (result == Publication.CLOSED && connection.isClosed()) {
// this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
// send the message.

View File

@ -259,7 +259,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
if (resultOrWaiter is ResponseWaiter) {
logger.trace { "[RM] timeout cancel ($timeoutMillis): $id" }
return if (connection.isClosed() || connection.isClosedViaAeron()) {
return if (connection.isClosed() || connection.isClosed()) {
null
} else {
TIMEOUT_EXCEPTION