code cleanup

This commit is contained in:
Robinson 2023-08-10 20:06:59 -06:00
parent 96f5406ae6
commit 8e7c47abcc
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
6 changed files with 28 additions and 39 deletions

View File

@ -839,23 +839,19 @@ open class Client<CONNECTION : Connection>(
}
},
onShutdown = {
val criticalDriverError = aeronDriver.internal.criticalDriverError
val endpoints = if (criticalDriverError) {
aeronDriver.internal.endPointUsages
} else {
null
}
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
// this can be closed when the connection is remotely closed in ADDITION to manually closing
logger.debug { "Client event dispatch closing..." }
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
this@Client.close(
closeEverything = false,
notifyDisconnect = !criticalDriverError, // this is because we restart automatically on driver errors
releaseWaitingThreads = !criticalDriverError // this is because we restart automatically on driver errors
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
}
@ -865,14 +861,14 @@ open class Client<CONNECTION : Connection>(
pollerClosedLatch.countDown()
if (criticalDriverError) {
if (mustRestartDriverOnError) {
logger.error { "Critical driver error detected, reconnecting client" }
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
// also wait for everyone else to shutdown!!
endpoints!!.forEach {
aeronDriver.internal.endPointUsages.forEach {
it.waitForEndpointShutdown()
}

View File

@ -304,13 +304,7 @@ open class Server<CONNECTION : Connection>(
}
},
onShutdown = {
val criticalDriverError = aeronDriver.internal.criticalDriverError
val endpoints = if (criticalDriverError) {
aeronDriver.internal.endPointUsages
} else {
null
}
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
logger.debug { "Server event dispatch closing..." }
ipcPoller.close()
@ -322,22 +316,24 @@ open class Server<CONNECTION : Connection>(
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
this@Server.close(
closeEverything = false,
notifyDisconnect = !criticalDriverError, // this is because we restart automatically on driver errors
releaseWaitingThreads = !criticalDriverError // this is because we restart automatically on driver errors
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
}
if (criticalDriverError) {
if (mustRestartDriverOnError) {
logger.error { "Critical driver error detected, restarting server." }
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
// also wait for everyone else to shutdown!!
endpoints!!.forEach {
aeronDriver.internal.endPointUsages.forEach {
it.waitForEndpointShutdown()
}

View File

@ -487,7 +487,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
// however - the code that actually does stuff is a "singleton" in regard to an aeron configuration
val driverId = mediaDriverConfig.mediaDriverId()
logger.error { "Aeron Driver [$driverId]: Initializing..." }
logger.info { "Aeron Driver [$driverId]: Initializing..." }
val aeronDriver = driverConfigurations.get(driverId)
if (aeronDriver == null) {
val driver = AeronDriverInternal(endPoint, mediaDriverConfig, logger)
@ -514,7 +514,6 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
internal = aeronDriver
}
logger.error { "Aeron Driver [$driverId]: Initialized critical=${internal.criticalDriverError}" }
}
@ -769,7 +768,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
/**
* Make sure that we DO NOT approach the Aeron linger timeout!
*/
suspend fun delayLingerTimeout(multiplier: Number) = internal.delayLingerTimeout(multiplier.toDouble())
suspend fun delayLingerTimeout(multiplier: Number = 1) = internal.delayLingerTimeout(multiplier.toDouble())
/**
* A safer way to try to close the media driver if in the ENTIRE JVM, our process is the only one using aeron with it's specific configuration
@ -869,7 +868,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
return true
}
if (internal.criticalDriverError) {
if (internal.mustRestartDriverOnError) {
logger.error { "Critical error, not able to send data." }
// there were critical errors. Don't even try anything! we will reconnect automatically (on the client) when it shuts-down (the connection is closed immediately when an error of this type is encountered
@ -905,7 +904,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
}
else {
logger.info { "[${publication.sessionId()}] Connection disconnected while sending data, closing connection." }
internal.criticalDriverError = true
internal.mustRestartDriverOnError = true
// publication was actually closed or the server was closed, so no bother throwing an error
connection.closeImmediately(sendDisconnectMessage = false,
@ -988,7 +987,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
return true
}
if (internal.criticalDriverError) {
if (internal.mustRestartDriverOnError) {
// there were critical errors. Don't even try anything! we will reconnect automatically (on the client) when it shuts-down (the connection is closed immediately when an error of this type is encountered
// aeron will likely report this is as "BACK PRESSURE"

View File

@ -124,7 +124,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
* Checks to see if there are any critical network errors (for example, a VPN connection getting disconnected while running)
*/
@Volatile
internal var criticalDriverError = false
internal var mustRestartDriverOnError = false
@Volatile
private var closed = false
@ -148,8 +148,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
error.cause is SocketException ||
error.cause is IOException) {
// this is bad! We must close this connection. THIS WILL BE CALLED AS FAST AS THE CPU CAN RUN (because of how aeron works).
if (!criticalDriverError) {
criticalDriverError = true
if (!mustRestartDriverOnError) {
mustRestartDriverOnError = true
logger.error { "Aeron Driver [$driverId]: Critical driver error!" }
@ -161,8 +161,6 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
}
}
if (error.message?.startsWith("ERROR - channel error - Network is unreachable") == true) {
val exception = AeronDriverException("Network is disconnected or unreachable.")
exception.cleanAllStackTrace()
@ -218,7 +216,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
}
private suspend fun removeErrors() = onErrorLocalMutex.withLock {
criticalDriverError = false
mustRestartDriverOnError = false
onErrorLocalList.forEach {
removeOnError(it)
}
@ -752,7 +750,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
// ignore the extra driver checks, because in SOME situations, when trying to reconnect upon an error, the
// driver gets into a bad state. When this happens, we cannot rely on the driver stat info!
if (criticalDriverError) {
if (mustRestartDriverOnError) {
return false
}
@ -807,7 +805,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
// ignore the extra driver checks, because in SOME situations, when trying to reconnect upon an error, the
if (isInUse(endPoint, logger)) {
if (criticalDriverError) {
if (mustRestartDriverOnError) {
// driver gets into a bad state. When this happens, we have to ignore "are we already in use" checks, BECAUSE the driver is now corrupted and unusable!
}
else {
@ -890,7 +888,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
}
}
catch (e: Exception) {
if (!criticalDriverError) {
if (!mustRestartDriverOnError) {
logger.error(e) { "Error while checking isRunning() state." }
}
}

View File

@ -519,7 +519,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
} catch (e: Throwable) {
// if the driver is closed due to a network disconnect or a remote-client termination, we also must close the connection.
if (aeronDriver.internal.criticalDriverError) {
if (aeronDriver.internal.mustRestartDriverOnError) {
// we had a HARD network crash/disconnect, we close the driver and then reconnect automatically
//NOTE: notifyDisconnect IS NOT CALLED!
}

View File

@ -79,7 +79,7 @@ internal class Handshaker<CONNECTION : Connection>(
return aeronDriver.send(publication, buffer, logInfo, listenerManager, handshakeSendIdleStrategy)
} catch (e: Exception) {
// if the driver is closed due to a network disconnect or a remote-client termination, we also must close the connection.
if (aeronDriver.internal.criticalDriverError) {
if (aeronDriver.internal.mustRestartDriverOnError) {
// we had a HARD network crash/disconnect, we close the driver and then reconnect automatically
//NOTE: notifyDisconnect IS NOT CALLED!
}