|
|
|
@ -88,6 +88,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val driverId = config.id
|
|
|
|
|
|
|
|
|
|
private val endPointUsages = mutableListOf<EndPoint<*>>()
|
|
|
|
|
|
|
|
|
|
@Volatile
|
|
|
|
@ -166,13 +168,11 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
* @return true if we are successfully connected to the aeron client
|
|
|
|
|
*/
|
|
|
|
|
suspend fun start(logger: KLogger): Boolean = stateMutex.withLock {
|
|
|
|
|
require(!closed) { "Cannot start a driver that was closed. A new driver + context must be created" }
|
|
|
|
|
|
|
|
|
|
val driverId = config.id
|
|
|
|
|
require(!closed) { "Aeron Driver [$driverId]: Cannot start a driver that was closed. A new driver + context must be created" }
|
|
|
|
|
|
|
|
|
|
val isLoaded = mediaDriver != null && aeron != null && aeron?.isClosed == false
|
|
|
|
|
if (isLoaded) {
|
|
|
|
|
logger.debug { "Aeron Media driver [$driverId] already running... Not starting again." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Already running... Not starting again." }
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -184,7 +184,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
// wait for a bit, because we are running, but we ALSO issued a START, and expect it to start.
|
|
|
|
|
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
|
|
|
|
|
// that instance
|
|
|
|
|
logger.debug { "Aeron Media driver [$driverId] already running. Double checking status..." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Already running. Double checking status..." }
|
|
|
|
|
delay(context.driverTimeout / 2)
|
|
|
|
|
running = isRunning()
|
|
|
|
|
}
|
|
|
|
@ -195,20 +195,20 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
while (count-- > 0) {
|
|
|
|
|
try {
|
|
|
|
|
mediaDriver = MediaDriver.launch(context.context)
|
|
|
|
|
logger.debug { "Successfully started the Aeron Media driver [$driverId]" }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Successfully started" }
|
|
|
|
|
break
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
logger.warn(e) { "Unable to start the Aeron Media driver [$driverId] at ${context.directory}. Retrying $count more times..." }
|
|
|
|
|
logger.warn(e) { "Aeron Driver [$driverId]: Unable to start at ${context.directory}. Retrying $count more times..." }
|
|
|
|
|
delay(context.driverTimeout)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
logger.debug { "Not starting Aeron Media driver [$driverId]. It was already running." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Not starting. It was already running." }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if we were unable to load the aeron driver, don't continue.
|
|
|
|
|
if (!running && mediaDriver == null) {
|
|
|
|
|
logger.error { "Not running and unable to start the Aeron Media driver [$driverId] at ${context.directory}." }
|
|
|
|
|
logger.error { "Aeron Driver [$driverId]: Not running and unable to start at ${context.directory}." }
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -234,7 +234,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
|
|
|
|
|
// this might succeed if we can connect to the media driver
|
|
|
|
|
aeron = Aeron.connect(aeronDriverContext)
|
|
|
|
|
logger.debug { "Connected with the Aeron driver [$driverId] at '${context.directory}'" }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Connected to '${context.directory}'" }
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
@ -242,7 +242,6 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
|
|
|
|
|
fun addPublication(logger: KLogger, publicationUri: ChannelUriStringBuilder, aeronLogInfo: String, streamId: Int): Publication {
|
|
|
|
|
val uri = publicationUri.build()
|
|
|
|
|
logger.trace { "$aeronLogInfo pub URI: $uri,stream-id=$streamId" }
|
|
|
|
|
|
|
|
|
|
// reasons we cannot add a pub/sub to aeron
|
|
|
|
|
// 1) the driver was closed
|
|
|
|
@ -258,7 +257,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
val aeron1 = aeron
|
|
|
|
|
if (aeron1 == null || aeron1.isClosed) {
|
|
|
|
|
// there was an error connecting to the aeron client or media driver.
|
|
|
|
|
val ex = ClientRetryException("Error adding a publication to aeron")
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a publication to aeron")
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
@ -268,14 +267,14 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
// this happens if the aeron media driver cannot actually establish connection... OR IF IT IS TOO FAST BETWEEN ADD AND REMOVE FOR THE SAME SESSION/STREAM ID!
|
|
|
|
|
e.cleanAllStackTrace()
|
|
|
|
|
val ex = ClientRetryException("Error adding a publication", e)
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a publication", e)
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (publication == null) {
|
|
|
|
|
// there was an error connecting to the aeron client or media driver.
|
|
|
|
|
val ex = ClientRetryException("Error adding a publication")
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a publication")
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
@ -306,7 +305,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
val aeron1 = aeron
|
|
|
|
|
if (aeron1 == null || aeron1.isClosed) {
|
|
|
|
|
// there was an error connecting to the aeron client or media driver.
|
|
|
|
|
val ex = ClientRetryException("Error adding a publication to aeron")
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an ex-publication to aeron")
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
@ -334,7 +333,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
|
|
|
|
|
if (publication == null) {
|
|
|
|
|
// there was an error connecting to the aeron client or media driver.
|
|
|
|
|
val ex = ClientRetryException("Error adding a publication")
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding an ex-publication")
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
@ -366,7 +365,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
val aeron1 = aeron
|
|
|
|
|
if (aeron1 == null || aeron1.isClosed) {
|
|
|
|
|
// there was an error connecting to the aeron client or media driver.
|
|
|
|
|
val ex = ClientRetryException("Error adding a subscription to aeron")
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a subscription to aeron")
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
@ -375,14 +374,14 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
aeron1.addSubscription(uri, streamId)
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
e.cleanAllStackTrace()
|
|
|
|
|
val ex = ClientRetryException("Error adding a subscription", e)
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a subscription", e)
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (subscription == null) {
|
|
|
|
|
// there was an error connecting to the aeron client or media driver.
|
|
|
|
|
val ex = ClientRetryException("Error adding a subscription")
|
|
|
|
|
val ex = ClientRetryException("Aeron Driver [$driverId]: Error adding a subscription")
|
|
|
|
|
ex.cleanAllStackTrace()
|
|
|
|
|
throw ex
|
|
|
|
|
}
|
|
|
|
@ -403,7 +402,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
// This can throw exceptions!
|
|
|
|
|
publication.close()
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
logger.error(e) { "Unable to close [$aeronLogInfo] publication $publication" }
|
|
|
|
|
logger.error(e) { "Aeron Driver [$driverId]: Unable to close [$aeronLogInfo] publication $publication" }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ensureLogfileDeleted("publication", getMediaDriverFile(publication), aeronLogInfo, logger)
|
|
|
|
@ -413,7 +412,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
* Guarantee that the publication is closed AND the backing file is removed
|
|
|
|
|
*/
|
|
|
|
|
suspend fun closeAndDeleteSubscription(subscription: Subscription, aeronLogInfo: String, logger: KLogger) {
|
|
|
|
|
logger.trace { "Closing subscription [$aeronLogInfo] ::regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" }
|
|
|
|
|
logger.trace { "Aeron Driver [$driverId]: Closing subscription [$aeronLogInfo] ::regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" }
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// This can throw exceptions!
|
|
|
|
@ -443,7 +442,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (logFile.exists()) {
|
|
|
|
|
logger.error("[$aeronLogInfo] Unable to delete aeron $type log on close: $logFile")
|
|
|
|
|
logger.error("Aeron Driver [$driverId]: [$aeronLogInfo] Unable to delete aeron $type log on close: $logFile")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pubSubs.decrementAndGet()
|
|
|
|
@ -467,7 +466,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
// only emit the log info once. It's rather spammy otherwise!
|
|
|
|
|
if (!didLog) {
|
|
|
|
|
didLog = true
|
|
|
|
|
logger.debug("Aeron was still running. Waiting for it to stop...")
|
|
|
|
|
logger.debug("Aeron Driver [$driverId]: Still running. Waiting for it to stop...")
|
|
|
|
|
}
|
|
|
|
|
delay(intervalTimeoutMS)
|
|
|
|
|
}
|
|
|
|
@ -488,7 +487,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
suspend fun isInUse(logger: KLogger): Boolean {
|
|
|
|
|
// as many "sort-cuts" as we can for checking if the current Aeron Driver/client is still in use
|
|
|
|
|
if (!isRunning()) {
|
|
|
|
|
logger.trace { "is not running" }
|
|
|
|
|
logger.trace { "Aeron Driver [$driverId]: not running" }
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -499,7 +498,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (endPointUsages.isNotEmpty()) {
|
|
|
|
|
logger.debug { "Aeron driver [$driverId] still referenced by ${endPointUsages.size} endpoints" }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Still referenced by ${endPointUsages.size} endpoints" }
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -511,7 +510,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
var count = 3
|
|
|
|
|
|
|
|
|
|
while (count > 0 && currentUsage > 0) {
|
|
|
|
|
logger.debug { "Aeron driver is still in use, double checking status" }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: in use, double checking status" }
|
|
|
|
|
delayTimeout(.5)
|
|
|
|
|
currentUsage = driverBacklog()?.snapshot()?.size ?: 0
|
|
|
|
|
count--
|
|
|
|
@ -519,7 +518,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
|
|
|
|
|
count = 3
|
|
|
|
|
while (count > 0 && currentUsage > 0) {
|
|
|
|
|
logger.debug { "Aeron driver is still in use, double checking status (long)" }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: in use, double checking status (long)" }
|
|
|
|
|
delayLingerTimeout()
|
|
|
|
|
currentUsage = driverBacklog()?.snapshot()?.size ?: 0
|
|
|
|
|
count--
|
|
|
|
@ -527,7 +526,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
|
|
|
|
|
val isInUse = currentUsage > 0
|
|
|
|
|
if (isInUse) {
|
|
|
|
|
logger.debug { "Aeron driver location usage is: $currentUsage" }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: usage is: $currentUsage" }
|
|
|
|
|
}
|
|
|
|
|
return isInUse
|
|
|
|
|
}
|
|
|
|
@ -541,24 +540,24 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
suspend fun close(endPoint: EndPoint<*>?, logger: KLogger) = stateMutex.withLock {
|
|
|
|
|
val driverId = config.id
|
|
|
|
|
|
|
|
|
|
logger.trace { "Aeron driver [$driverId] requested close... (${endPointUsages.size} endpoints in use)" }
|
|
|
|
|
logger.trace { "Aeron Driver [$driverId]: Requested close... (${endPointUsages.size} endpoints in use)" }
|
|
|
|
|
|
|
|
|
|
if (endPoint != null) {
|
|
|
|
|
endPointUsages.remove(endPoint)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (isInUse(logger)) {
|
|
|
|
|
logger.debug { "Aeron driver [$driverId] still in use, not shutting down this instance." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: in use, not shutting down this instance." }
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val removed = AeronDriver.driverConfigurations.remove(driverId)
|
|
|
|
|
if (removed == null) {
|
|
|
|
|
logger.debug { "Aeron driver [$driverId] was already closed. Ignoring close request." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: already closed. Ignoring close request." }
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.debug { "Aeron driver [$driverId] closing..." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Closing..." }
|
|
|
|
|
|
|
|
|
|
// we have to assign context BEFORE we close, because the `getter` for context will create it if necessary
|
|
|
|
|
val aeronContext = context
|
|
|
|
@ -567,22 +566,22 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
try {
|
|
|
|
|
aeron?.close()
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
logger.error(e) { "Error stopping Aeron driver [$driverId]." }
|
|
|
|
|
logger.error(e) { "Aeron Driver [$driverId]: Error stopping!" }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
aeron = null
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (mediaDriver == null) {
|
|
|
|
|
logger.debug { "No driver started for Aeron driver [$driverId]. Not Stopping." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: No driver started, not Stopping." }
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.debug { "Stopping driver [$driverId] at '${driverDirectory}'..." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: Stopping driver at '${driverDirectory}'..." }
|
|
|
|
|
|
|
|
|
|
if (!isRunning()) {
|
|
|
|
|
// not running
|
|
|
|
|
logger.debug { "Driver [$driverId] is not running at '${driverDirectory}' for this context. Not Stopping." }
|
|
|
|
|
logger.debug { "Aeron Driver [$driverId]: is not running at '${driverDirectory}' for this context. Not Stopping." }
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -590,7 +589,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
try {
|
|
|
|
|
mediaDriver!!.close()
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
logger.error(e) { "Error closing the Aeron media driver [$driverId]" }
|
|
|
|
|
logger.error(e) { "Aeron Driver [$driverId]: Error closing" }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mediaDriver = null
|
|
|
|
@ -610,7 +609,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
// wait for the media driver to actually stop
|
|
|
|
|
var count = 10
|
|
|
|
|
while (--count >= 0 && isRunning()) {
|
|
|
|
|
logger.warn { "Aeron Media driver [$driverId] at '${driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." }
|
|
|
|
|
logger.warn { "Aeron Driver [$driverId]: still running at '${driverDirectory}'. Waiting for it to stop. Trying to close $count more times." }
|
|
|
|
|
delay(timeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -623,15 +622,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, private val config: C
|
|
|
|
|
try {
|
|
|
|
|
val deletedAeron = driverDirectory.deleteRecursively()
|
|
|
|
|
if (!deletedAeron) {
|
|
|
|
|
logger.error { "Error deleting aeron directory $driverDirectory on shutdown "}
|
|
|
|
|
logger.error { "Aeron Driver [$driverId]: Error deleting aeron directory $driverDirectory on shutdown "}
|
|
|
|
|
}
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
logger.error(e) { "Error deleting Aeron directory at: $driverDirectory"}
|
|
|
|
|
logger.error(e) { "Aeron Driver [$driverId]: Error deleting Aeron directory at: $driverDirectory"}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (driverDirectory.isDirectory) {
|
|
|
|
|
logger.error { "Error deleting Aeron directory at: $driverDirectory"}
|
|
|
|
|
logger.error { "Aeron Driver [$driverId]: Error deleting Aeron directory at: $driverDirectory"}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.debug { "Closed the media driver [$driverId] at '${driverDirectory}'" }
|
|
|
|
|