Direct access to critical error now instead of proxy

This commit is contained in:
Robinson 2023-08-10 20:01:05 -06:00
parent a36947af5b
commit 77d56b8804
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 77 additions and 27 deletions

View File

@ -505,7 +505,6 @@ open class Client<CONNECTION : Connection>(
connection0 = null
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
// this also makes sure that the dispatchers are still active.
// Calling `client.close()` will shutdown the dispatchers (and a new client instance must be created)
@ -840,12 +839,11 @@ open class Client<CONNECTION : Connection>(
}
},
onShutdown = {
val criticalDriverError = aeronDriver.criticalDriverError
val endpoints: Array<EndPoint<*>> = if (criticalDriverError) {
aeronDriver.internal.endPointUsages.toTypedArray()
val criticalDriverError = aeronDriver.internal.criticalDriverError
val endpoints = if (criticalDriverError) {
aeronDriver.internal.endPointUsages
} else {
@Suppress("UNCHECKED_CAST")
arrayOfNulls<EndPoint<*>?>(0) as Array<EndPoint<*>>
null
}
@ -860,11 +858,31 @@ open class Client<CONNECTION : Connection>(
initiatedByShutdown = false)
}
// we can now call connect again
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()
logger.debug { "Closed the Network Event Poller..." }
if (criticalDriverError) {
logger.error { "Critical driver error detected, reconnecting client" }
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
// also wait for everyone else to shutdown!!
endpoints!!.forEach {
it.waitForEndpointShutdown()
}
// if we restart/reconnect too fast, errors from the previous run will still be present!
aeronDriver.delayLingerTimeout()
reconnect()
}
} else {
logger.debug { "Closed the Network Event Poller..." }
}
})
listenerManager.notifyConnect(newConnection)

View File

@ -19,10 +19,7 @@ import dorkbox.bytes.toHexString
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.EventPoller
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.IpInfo
import dorkbox.network.connection.*
import dorkbox.network.connection.IpInfo.Companion.IpListenType
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connectionType.ConnectionRule
@ -209,7 +206,7 @@ open class Server<CONNECTION : Connection>(
* can also be configured independently. This is required, and must be different from port1.
*/
@Suppress("DuplicatedCode")
fun bind(port1: Int, port2: Int = port1+1) = runBlocking {
fun bind(port1: Int, port2: Int = port1+1) = runBlocking {
if (config.enableIPv4 || config.enableIPv6) {
require(port1 != port2) { "port1 cannot be the same as port2" }
require(port1 > 0) { "port1 must be > 0" }
@ -307,6 +304,13 @@ open class Server<CONNECTION : Connection>(
}
},
onShutdown = {
val criticalDriverError = aeronDriver.internal.criticalDriverError
val endpoints = if (criticalDriverError) {
aeronDriver.internal.endPointUsages
} else {
null
}
logger.debug { "Server event dispatch closing..." }
ipcPoller.close()
@ -315,6 +319,43 @@ open class Server<CONNECTION : Connection>(
// clear all the handshake info
handshake.clear()
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
this@Server.close(
closeEverything = false,
notifyDisconnect = !criticalDriverError, // this is because we restart automatically on driver errors
releaseWaitingThreads = !criticalDriverError // this is because we restart automatically on driver errors
)
}
if (criticalDriverError) {
logger.error { "Critical driver error detected, restarting server." }
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
// also wait for everyone else to shutdown!!
endpoints!!.forEach {
it.waitForEndpointShutdown()
}
// if we restart/reconnect too fast, errors from the previous run will still be present!
aeronDriver.delayLingerTimeout()
val p1 = this@Server.port1
val p2 = this@Server.port2
if (p1 == 0 && p2 == 0) {
bindIpc()
} else {
bind(p1, p2)
}
}
}
// we can now call bind again
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()

View File

@ -514,6 +514,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
internal = aeronDriver
}
logger.error { "Aeron Driver [$driverId]: Initialized critical=${internal.criticalDriverError}" }
}
@ -675,17 +676,6 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
*/
fun closed() = internal.closed()
/**
* Checks to see if there are any critical network errors (for example, a VPN connection getting disconnected while running)
*/
var criticalDriverError: Boolean
get() {
return internal.criticalDriverError
}
set(value) {
internal.criticalDriverError = value
}
suspend fun isInUse(endPoint: EndPoint<*>?): Boolean = internal.isInUse(endPoint, logger)
/**
@ -879,7 +869,8 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
return true
}
if (criticalDriverError) {
if (internal.criticalDriverError) {
logger.error { "Critical error, not able to send data." }
// there were critical errors. Don't even try anything! we will reconnect automatically (on the client) when it shuts-down (the connection is closed immediately when an error of this type is encountered
// aeron will likely report this is as "BACK PRESSURE"
@ -996,7 +987,7 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
return true
}
if (criticalDriverError) {
if (internal.criticalDriverError) {
// there were critical errors. Don't even try anything! we will reconnect automatically (on the client) when it shuts-down (the connection is closed immediately when an error of this type is encountered
// aeron will likely report this is as "BACK PRESSURE"

View File

@ -525,7 +525,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
} catch (e: Throwable) {
// if the driver is closed due to a network disconnect or a remote-client termination, we also must close the connection.
if (aeronDriver.criticalDriverError) {
if (aeronDriver.internal.criticalDriverError) {
// we had a HARD network crash/disconnect, we close the driver and then reconnect automatically
//NOTE: notifyDisconnect IS NOT CALLED!
}

View File

@ -79,7 +79,7 @@ internal class Handshaker<CONNECTION : Connection>(
return aeronDriver.send(publication, buffer, logInfo, listenerManager, handshakeSendIdleStrategy)
} catch (e: Exception) {
// if the driver is closed due to a network disconnect or a remote-client termination, we also must close the connection.
if (aeronDriver.criticalDriverError) {
if (aeronDriver.internal.criticalDriverError) {
// we had a HARD network crash/disconnect, we close the driver and then reconnect automatically
//NOTE: notifyDisconnect IS NOT CALLED!
}