Cleaned up stopping/starting the aeron driver

This commit is contained in:
Robinson 2023-02-15 01:15:51 +01:00
parent 73efd8d370
commit 76a96a89ef
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -268,15 +268,17 @@ class AeronDriver(
*/
private val aeronErrorHandler: (error: Throwable) -> Unit
@Volatile
private var context_: AeronContext? = null
private var contextHolder: AeronContext? = null
private val context: AeronContext
get() {
if (context_ == null) {
context_ = AeronContext(config, type, logger, aeronErrorHandler)
}
synchronized(this) {
if (contextHolder == null) {
contextHolder = AeronContext(config, type, logger, aeronErrorHandler)
}
return context_!!
return contextHolder!!
}
}
@ -549,6 +551,10 @@ class AeronDriver(
*/
fun close() {
synchronized(lock) {
// we have to assign context BEFORE we close, because the `getter` for context will create it if necessary
val aeronContext = context
val driverDirectory = aeronContext.driverDirectory
try {
aeron?.close()
aeronClientUsageCount.getAndDecrement()
@ -569,11 +575,11 @@ class AeronDriver(
}
logger.debug { "Stopping driver at '${context.driverDirectory}'..." }
logger.debug { "Stopping driver at '${driverDirectory}'..." }
if (!isRunning()) {
// not running
logger.debug { "Driver is not running at '${context.driverDirectory}' for this context. Not Stopping." }
logger.debug { "Driver is not running at '${driverDirectory}' for this context. Not Stopping." }
return
}
@ -587,39 +593,44 @@ class AeronDriver(
mediaDriver = null
// it can actually close faster, if everything is ideal.
val timeout = aeronContext.driverTimeout + AERON_PUBLICATION_LINGER_TIMEOUT
// it can actually close faster, if everything is ideal.
if (isRunning()) {
// on close, we want to wait for the driver to timeout before considering it "closed". Connections can still LINGER (see below)
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
sleep(context.driverTimeout + AERON_PUBLICATION_LINGER_TIMEOUT)
sleep(timeout)
}
// wait for the media driver to actually stop
var count = 10
while (--count >= 0 && isRunning()) {
logger.warn { "Aeron Media driver at '${context.driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." }
sleep(context.driverTimeout)
logger.warn { "Aeron Media driver at '${driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." }
sleep(timeout)
}
logger.debug { "Closed the media driver at '${context.driverDirectory}'" }
logger.debug { "Closed the media driver at '${driverDirectory}'" }
try {
} catch (e: Exception) {
logger.error(e) {"Error closing the media driver at '${context.driverDirectory}'" }
logger.error(e) {"Error closing the media driver at '${driverDirectory}'" }
}
// make sure the context is also closed.
context.close()
aeronContext.close()
try {
val deletedAeron = context.driverDirectory.deleteRecursively()
val deletedAeron = driverDirectory.deleteRecursively()
if (!deletedAeron) {
logger.error { "Error deleting aeron directory ${context.driverDirectory} on shutdown "}
logger.error { "Error deleting aeron directory $driverDirectory on shutdown "}
}
} catch (e: Exception) {
logger.error(e) { "Error deleting Aeron directory at: ${context.driverDirectory}"}
logger.error(e) { "Error deleting Aeron directory at: $driverDirectory"}
}
context_ = null
contextHolder = null
}
}