Fixed state with driver already running during shutdown

This commit is contained in:
Robinson 2021-04-29 13:34:45 +02:00
parent 0da8986258
commit ad06f768f8

View File

@ -16,6 +16,8 @@ import mu.KLogger
import mu.KotlinLogging import mu.KotlinLogging
import org.agrona.concurrent.BackoffIdleStrategy import org.agrona.concurrent.BackoffIdleStrategy
import java.io.File import java.io.File
import java.lang.Thread.sleep
import java.net.BindException
/** /**
* Class for managing the Aeron+Media drivers * Class for managing the Aeron+Media drivers
@ -151,7 +153,17 @@ class AeronDriver(val config: Configuration,
// we DO NOT want to abort the JVM if there are errors. // we DO NOT want to abort the JVM if there are errors.
context.errorHandler { error -> context.errorHandler { error ->
logger.error("Error in Aeron", error) if (error is DriverTimeoutException) {
// we suppress this because it is already handled
return@errorHandler
}
if (error.cause is BindException) {
// we suppress this because it is already handled
return@errorHandler
}
logger.error("Error in Aeron Media Driver", error)
} }
@ -258,7 +270,7 @@ class AeronDriver(val config: Configuration,
private val mediaDriverContext = config.context!! private val mediaDriverContext = config.context!!
// did WE start the media driver, or did SOMEONE ELSE start it? // did WE start the media driver, or did SOMEONE ELSE start it?
private val mediaDriverAlreadyStarted: Boolean private val mediaDriverWasAlreadyRunning: Boolean
init { init {
mediaDriverContext mediaDriverContext
@ -268,7 +280,7 @@ class AeronDriver(val config: Configuration,
.sharedNetworkThreadFactory(threadFactory) .sharedNetworkThreadFactory(threadFactory)
.sharedThreadFactory(threadFactory) .sharedThreadFactory(threadFactory)
mediaDriverAlreadyStarted = isRunning(mediaDriverContext) mediaDriverWasAlreadyRunning = isRunning(mediaDriverContext)
} }
private fun setupAeron(): Aeron.Context { private fun setupAeron(): Aeron.Context {
@ -353,13 +365,13 @@ class AeronDriver(val config: Configuration,
fun start() { fun start() {
// we want to be able to "restart" aeron, as necessary, after a [close] method call // we want to be able to "restart" aeron, as necessary, after a [close] method call
closeRequested.lazySet(false) closeRequested.lazySet(false)
startOrRestart() restart()
} }
/** /**
* if we did NOT close, then will start the media driver + aeron, * if we did NOT close, then will restart the media driver + aeron. If we manually closed aeron, then we won't try to restart
*/ */
private fun startOrRestart() { private fun restart() {
if (closeRequested.value) { if (closeRequested.value) {
return return
} }
@ -396,6 +408,11 @@ class AeronDriver(val config: Configuration,
delay(mediaDriverContext.driverTimeoutMs()) delay(mediaDriverContext.driverTimeoutMs())
} }
if (e.cause is BindException) {
// was starting too fast!
delay(mediaDriverContext.driverTimeoutMs())
}
// reasons we cannot add a pub/sub to aeron // reasons we cannot add a pub/sub to aeron
// 1) the driver was closed // 1) the driver was closed
// 2) aeron was unable to connect to the driver // 2) aeron was unable to connect to the driver
@ -407,7 +424,7 @@ class AeronDriver(val config: Configuration,
// try to start/restart aeron // try to start/restart aeron
try { try {
startOrRestart() restart()
} catch (e2: Exception) { } catch (e2: Exception) {
// ignored // ignored
} }
@ -427,7 +444,9 @@ class AeronDriver(val config: Configuration,
try { try {
val aeron = aeron val aeron = aeron
if (aeron != null) { if (aeron != null) {
return aeron.addSubscription(uri, streamId) val subscription = aeron.addSubscription(uri, streamId)
return subscription
} }
} catch (e: Exception) { } catch (e: Exception) {
// NOTE: this error will be logged in the `aeronDriverContext` logger // NOTE: this error will be logged in the `aeronDriverContext` logger
@ -438,6 +457,11 @@ class AeronDriver(val config: Configuration,
delay(mediaDriverContext.driverTimeoutMs()) delay(mediaDriverContext.driverTimeoutMs())
} }
if (e.cause is BindException) {
// was starting too fast!
delay(mediaDriverContext.driverTimeoutMs())
}
// reasons we cannot add a pub/sub to aeron // reasons we cannot add a pub/sub to aeron
// 1) the driver was closed // 1) the driver was closed
// 2) aeron was unable to connect to the driver // 2) aeron was unable to connect to the driver
@ -449,7 +473,7 @@ class AeronDriver(val config: Configuration,
// try to start/restart aeron // try to start/restart aeron
try { try {
startOrRestart() restart()
} catch (e2: Exception) { } catch (e2: Exception) {
// ignored // ignored
} }
@ -475,7 +499,7 @@ class AeronDriver(val config: Configuration,
* NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the * NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). * same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
*/ */
suspend fun close() { fun close() {
closeRequested.lazySet(true) closeRequested.lazySet(true)
try { try {
@ -492,7 +516,7 @@ class AeronDriver(val config: Configuration,
return return
} }
if (!mediaDriverAlreadyStarted) { if (mediaDriverWasAlreadyRunning) {
logger.debug("We did not start the media driver, so we are not stopping it.") logger.debug("We did not start the media driver, so we are not stopping it.")
return return
} }
@ -511,7 +535,7 @@ class AeronDriver(val config: Configuration,
// on close, the publication CAN linger (in case a client goes away, and then comes back) // on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
delay(AERON_PUBLICATION_LINGER_TIMEOUT) sleep(AERON_PUBLICATION_LINGER_TIMEOUT)
this@AeronDriver.mediaDriver = null this@AeronDriver.mediaDriver = null
@ -519,7 +543,7 @@ class AeronDriver(val config: Configuration,
var count = 10 var count = 10
while (count-- >= 0 && isRunning(mediaDriverContext)) { while (count-- >= 0 && isRunning(mediaDriverContext)) {
logger.warn { "Aeron Media driver at '${mediaDriverContext.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." } logger.warn { "Aeron Media driver at '${mediaDriverContext.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." }
delay(mediaDriverContext.driverTimeoutMs()) sleep(mediaDriverContext.driverTimeoutMs())
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Error closing the media driver at '${mediaDriverContext.aeronDirectory()}'", e) logger.error("Error closing the media driver at '${mediaDriverContext.aeronDirectory()}'", e)