AeronDriverInternal will now restart the network if there is an `unexpected close of heartbeat timestamp counter` error

This commit is contained in:
Robinson 2023-10-03 22:01:54 +02:00
parent c69a33f1a9
commit 1287eb8c6e
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 32 additions and 9 deletions

View File

@ -141,15 +141,18 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
// configure the aeron error handler
val filter = config.aeronErrorFilter
aeronErrorHandler = { error ->
// if the network interface is removed (for example, a VPN connection).
if (error is io.aeron.exceptions.ChannelEndpointException ||
error.cause is BindException ||
error.cause is SocketException ||
error.cause is IOException) {
// this is bad! We must close this connection. THIS WILL BE CALLED AS FAST AS THE CPU CAN RUN (because of how aeron works).
if (!mustRestartDriverOnError) {
// this is bad! We must close this connection. THIS WILL BE CALLED AS FAST AS THE CPU CAN RUN (because of how aeron works).
if (!mustRestartDriverOnError) {
mustRestartDriverOnError = true
var restartNetwork = false
// if the network interface is removed (for example, a VPN connection).
if (error is io.aeron.exceptions.ChannelEndpointException ||
error.cause is BindException ||
error.cause is SocketException ||
error.cause is IOException) {
restartNetwork = true
if (error.message?.startsWith("ERROR - channel error - Network is unreachable") == true) {
val exception = AeronDriverException("Aeron Driver [$driverId]: Network is disconnected or unreachable.")
@ -169,6 +172,23 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
// send this out to the listener-manager so we can be notified of global errors
notifyError(AeronDriverException("Aeron Driver [$driverId]: Unexpected error!", error.cause!!))
}
}
if (error is io.aeron.exceptions.AeronException) {
if (error.message?.startsWith("ERROR - unexpected close of heartbeat timestamp counter:") == true) {
restartNetwork = true
val exception = AeronDriverException("Aeron Driver [$driverId]: HEARTBEAT error, can't continue.")
exception.cleanAllStackTrace()
notifyError(exception)
}
}
if (restartNetwork) {
// this must be set before anything else happens
mustRestartDriverOnError = true
// we must close all the connections on a DIFFERENT thread!
EventDispatcher.CLOSE.launch {
@ -180,7 +200,10 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
}
}
}
else if (filter(error)) {
// if we are restarting the network, ignore all future messages
if (!mustRestartDriverOnError && filter(error)) {
error.cleanStackTrace()
// send this out to the listener-manager so we can be notified of global errors
notifyError(AeronDriverException(error))