From ad06f768f83fe499da71c64884e6f5ea0fcbf487 Mon Sep 17 00:00:00 2001 From: Robinson Date: Thu, 29 Apr 2021 13:34:45 +0200 Subject: [PATCH] Fixed state with driver already running during shutdown --- src/dorkbox/network/aeron/AeronDriver.kt | 50 ++++++++++++++++++------ 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index 711de4cc..75464a0d 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -16,6 +16,8 @@ import mu.KLogger import mu.KotlinLogging import org.agrona.concurrent.BackoffIdleStrategy import java.io.File +import java.lang.Thread.sleep +import java.net.BindException /** * Class for managing the Aeron+Media drivers @@ -151,7 +153,17 @@ class AeronDriver(val config: Configuration, // we DO NOT want to abort the JVM if there are errors. context.errorHandler { error -> - logger.error("Error in Aeron", error) + if (error is DriverTimeoutException) { + // we suppress this because it is already handled + return@errorHandler + } + + if (error.cause is BindException) { + // we suppress this because it is already handled + return@errorHandler + } + + logger.error("Error in Aeron Media Driver", error) } @@ -258,7 +270,7 @@ class AeronDriver(val config: Configuration, private val mediaDriverContext = config.context!! // did WE start the media driver, or did SOMEONE ELSE start it? - private val mediaDriverAlreadyStarted: Boolean + private val mediaDriverWasAlreadyRunning: Boolean init { mediaDriverContext @@ -268,7 +280,7 @@ class AeronDriver(val config: Configuration, .sharedNetworkThreadFactory(threadFactory) .sharedThreadFactory(threadFactory) - mediaDriverAlreadyStarted = isRunning(mediaDriverContext) + mediaDriverWasAlreadyRunning = isRunning(mediaDriverContext) } private fun setupAeron(): Aeron.Context { @@ -353,13 +365,13 @@ class AeronDriver(val config: Configuration, fun start() { // we want to be able to "restart" aeron, as necessary, after a [close] method call closeRequested.lazySet(false) - startOrRestart() + restart() } /** - * if we did NOT close, then will start the media driver + aeron, + * if we did NOT close, then will restart the media driver + aeron. If we manually closed aeron, then we won't try to restart */ - private fun startOrRestart() { + private fun restart() { if (closeRequested.value) { return } @@ -396,6 +408,11 @@ class AeronDriver(val config: Configuration, delay(mediaDriverContext.driverTimeoutMs()) } + if (e.cause is BindException) { + // was starting too fast! + delay(mediaDriverContext.driverTimeoutMs()) + } + // reasons we cannot add a pub/sub to aeron // 1) the driver was closed // 2) aeron was unable to connect to the driver @@ -407,7 +424,7 @@ class AeronDriver(val config: Configuration, // try to start/restart aeron try { - startOrRestart() + restart() } catch (e2: Exception) { // ignored } @@ -427,7 +444,9 @@ class AeronDriver(val config: Configuration, try { val aeron = aeron if (aeron != null) { - return aeron.addSubscription(uri, streamId) + val subscription = aeron.addSubscription(uri, streamId) + + return subscription } } catch (e: Exception) { // NOTE: this error will be logged in the `aeronDriverContext` logger @@ -438,6 +457,11 @@ class AeronDriver(val config: Configuration, delay(mediaDriverContext.driverTimeoutMs()) } + if (e.cause is BindException) { + // was starting too fast! + delay(mediaDriverContext.driverTimeoutMs()) + } + // reasons we cannot add a pub/sub to aeron // 1) the driver was closed // 2) aeron was unable to connect to the driver @@ -449,7 +473,7 @@ class AeronDriver(val config: Configuration, // try to start/restart aeron try { - startOrRestart() + restart() } catch (e2: Exception) { // ignored } @@ -475,7 +499,7 @@ class AeronDriver(val config: Configuration, * NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the * same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). */ - suspend fun close() { + fun close() { closeRequested.lazySet(true) try { @@ -492,7 +516,7 @@ class AeronDriver(val config: Configuration, return } - if (!mediaDriverAlreadyStarted) { + if (mediaDriverWasAlreadyRunning) { logger.debug("We did not start the media driver, so we are not stopping it.") return } @@ -511,7 +535,7 @@ class AeronDriver(val config: Configuration, // on close, the publication CAN linger (in case a client goes away, and then comes back) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - delay(AERON_PUBLICATION_LINGER_TIMEOUT) + sleep(AERON_PUBLICATION_LINGER_TIMEOUT) this@AeronDriver.mediaDriver = null @@ -519,7 +543,7 @@ class AeronDriver(val config: Configuration, var count = 10 while (count-- >= 0 && isRunning(mediaDriverContext)) { logger.warn { "Aeron Media driver at '${mediaDriverContext.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." } - delay(mediaDriverContext.driverTimeoutMs()) + sleep(mediaDriverContext.driverTimeoutMs()) } } catch (e: Exception) { logger.error("Error closing the media driver at '${mediaDriverContext.aeronDirectory()}'", e)