MediaDriver + aeron states are now reset on close

This commit is contained in:
Robinson 2021-04-29 01:46:18 +02:00
parent 36318690a2
commit 9c7fa4de8d
1 changed files with 52 additions and 7 deletions

View File

@ -9,6 +9,7 @@ import io.aeron.Publication
import io.aeron.Subscription
import io.aeron.driver.MediaDriver
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KLogger
@ -244,6 +245,9 @@ class AeronDriver(val config: Configuration,
}
}
private val closeRequested = atomic(false)
@Volatile
private var aeron: Aeron? = null
@Volatile
@ -253,6 +257,9 @@ class AeronDriver(val config: Configuration,
private val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
private val mediaDriverContext = config.context!!
// did WE start the media driver, or did SOMEONE ELSE start it?
private val mediaDriverAlreadyStarted: Boolean
init {
mediaDriverContext
.conductorThreadFactory(threadFactory)
@ -260,6 +267,8 @@ class AeronDriver(val config: Configuration,
.senderThreadFactory(threadFactory)
.sharedNetworkThreadFactory(threadFactory)
.sharedThreadFactory(threadFactory)
mediaDriverAlreadyStarted = isRunning(mediaDriverContext)
}
private fun setupAeron(): Aeron.Context {
@ -283,8 +292,14 @@ class AeronDriver(val config: Configuration,
}
/**
* @return true if the media driver was started, false if it was not started
*/
private fun startDriver(): Boolean {
if (closeRequested.value) {
return false
}
if (mediaDriver == null) {
// only start if we didn't already start... There will be several checks.
@ -314,6 +329,10 @@ class AeronDriver(val config: Configuration,
}
private fun startAeron(didStartDriver: Boolean) {
if (closeRequested.value) {
return
}
// the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to
// aeron (which connects to the other media driver process), especially if we haven't already connected to
// it (or if there was an error connecting because a different media driver was shutting down)
@ -332,10 +351,24 @@ class AeronDriver(val config: Configuration,
* @throws Exception if there is a problem starting the media driver
*/
fun start() {
// we want to be able to "restart" aeron, as necessary, after a [close] method call
closeRequested.lazySet(false)
startOrRestart()
}
/**
* if we did NOT close, then will start the media driver + aeron,
*/
private fun startOrRestart() {
if (closeRequested.value) {
return
}
val didStartDriver = startDriver()
startAeron(didStartDriver)
}
/**
* @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!)
*/
@ -374,9 +407,9 @@ class AeronDriver(val config: Configuration,
// try to start/restart aeron
try {
start()
startOrRestart()
} catch (e2: Exception) {
e2.printStackTrace()
// ignored
}
}
}
@ -399,7 +432,7 @@ class AeronDriver(val config: Configuration,
} catch (e: Exception) {
// NOTE: this error will be logged in the `aeronDriverContext` logger
exception = e
logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." }
logger.warn { "Unable to add a sublication to Aeron. Retrying $count more times..." }
if (e is DriverTimeoutException) {
delay(mediaDriverContext.driverTimeoutMs())
@ -416,9 +449,9 @@ class AeronDriver(val config: Configuration,
// try to start/restart aeron
try {
start()
startOrRestart()
} catch (e2: Exception) {
e2.printStackTrace()
// ignored
}
}
}
@ -443,18 +476,28 @@ class AeronDriver(val config: Configuration,
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
*/
suspend fun close() {
closeRequested.lazySet(true)
try {
aeron?.close()
} catch (e: Exception) {
logger.error("Error stopping aeron.", e)
}
aeron = null
val mediaDriver = mediaDriver
if (mediaDriver == null) {
logger.debug { "No driver started for this instance. Not Stopping." }
logger.debug("No driver started for this instance. Not Stopping.")
return
}
if (!mediaDriverAlreadyStarted) {
logger.debug("We did not start the media driver, so we are not stopping it.")
return
}
logger.debug("Stopping driver at '${mediaDriverContext.aeronDirectory()}'...")
if (!isRunning(mediaDriverContext)) {
@ -470,6 +513,8 @@ class AeronDriver(val config: Configuration,
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
delay(AERON_PUBLICATION_LINGER_TIMEOUT)
this@AeronDriver.mediaDriver = null
// wait for the media driver to actually stop
var count = 10
while (count-- >= 0 && isRunning(mediaDriverContext)) {