diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index e4a93f96..711de4cc 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -9,6 +9,7 @@ import io.aeron.Publication import io.aeron.Subscription import io.aeron.driver.MediaDriver import io.aeron.exceptions.DriverTimeoutException +import kotlinx.atomicfu.atomic import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import mu.KLogger @@ -244,6 +245,9 @@ class AeronDriver(val config: Configuration, } } + private val closeRequested = atomic(false) + + @Volatile private var aeron: Aeron? = null @Volatile @@ -253,6 +257,9 @@ class AeronDriver(val config: Configuration, private val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) private val mediaDriverContext = config.context!! + // did WE start the media driver, or did SOMEONE ELSE start it? + private val mediaDriverAlreadyStarted: Boolean + init { mediaDriverContext .conductorThreadFactory(threadFactory) @@ -260,6 +267,8 @@ class AeronDriver(val config: Configuration, .senderThreadFactory(threadFactory) .sharedNetworkThreadFactory(threadFactory) .sharedThreadFactory(threadFactory) + + mediaDriverAlreadyStarted = isRunning(mediaDriverContext) } private fun setupAeron(): Aeron.Context { @@ -283,8 +292,14 @@ class AeronDriver(val config: Configuration, } - + /** + * @return true if the media driver was started, false if it was not started + */ private fun startDriver(): Boolean { + if (closeRequested.value) { + return false + } + if (mediaDriver == null) { // only start if we didn't already start... There will be several checks. @@ -314,6 +329,10 @@ class AeronDriver(val config: Configuration, } private fun startAeron(didStartDriver: Boolean) { + if (closeRequested.value) { + return + } + // the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to // aeron (which connects to the other media driver process), especially if we haven't already connected to // it (or if there was an error connecting because a different media driver was shutting down) @@ -332,10 +351,24 @@ class AeronDriver(val config: Configuration, * @throws Exception if there is a problem starting the media driver */ fun start() { + // we want to be able to "restart" aeron, as necessary, after a [close] method call + closeRequested.lazySet(false) + startOrRestart() + } + + /** + * if we did NOT close, then will start the media driver + aeron, + */ + private fun startOrRestart() { + if (closeRequested.value) { + return + } + val didStartDriver = startDriver() startAeron(didStartDriver) } + /** * @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!) */ @@ -374,9 +407,9 @@ class AeronDriver(val config: Configuration, // try to start/restart aeron try { - start() + startOrRestart() } catch (e2: Exception) { - e2.printStackTrace() + // ignored } } } @@ -399,7 +432,7 @@ class AeronDriver(val config: Configuration, } catch (e: Exception) { // NOTE: this error will be logged in the `aeronDriverContext` logger exception = e - logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." } + logger.warn { "Unable to add a sublication to Aeron. Retrying $count more times..." } if (e is DriverTimeoutException) { delay(mediaDriverContext.driverTimeoutMs()) @@ -416,9 +449,9 @@ class AeronDriver(val config: Configuration, // try to start/restart aeron try { - start() + startOrRestart() } catch (e2: Exception) { - e2.printStackTrace() + // ignored } } } @@ -443,18 +476,28 @@ class AeronDriver(val config: Configuration, * same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). */ suspend fun close() { + closeRequested.lazySet(true) + try { aeron?.close() } catch (e: Exception) { logger.error("Error stopping aeron.", e) } + aeron = null + val mediaDriver = mediaDriver if (mediaDriver == null) { - logger.debug { "No driver started for this instance. Not Stopping." } + logger.debug("No driver started for this instance. Not Stopping.") return } + if (!mediaDriverAlreadyStarted) { + logger.debug("We did not start the media driver, so we are not stopping it.") + return + } + + logger.debug("Stopping driver at '${mediaDriverContext.aeronDirectory()}'...") if (!isRunning(mediaDriverContext)) { @@ -470,6 +513,8 @@ class AeronDriver(val config: Configuration, // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) delay(AERON_PUBLICATION_LINGER_TIMEOUT) + this@AeronDriver.mediaDriver = null + // wait for the media driver to actually stop var count = 10 while (count-- >= 0 && isRunning(mediaDriverContext)) {