Pulled out the aeron context into its own class
This commit is contained in:
parent
0b09073a4c
commit
21a6eceebd
192
src/dorkbox/network/aeron/AeronContext.kt
Normal file
192
src/dorkbox/network/aeron/AeronContext.kt
Normal file
@ -0,0 +1,192 @@
|
||||
package dorkbox.network.aeron
|
||||
|
||||
import dorkbox.network.Configuration
|
||||
import dorkbox.network.exceptions.AeronDriverException
|
||||
import dorkbox.util.NamedThreadFactory
|
||||
import io.aeron.driver.MediaDriver
|
||||
import io.aeron.exceptions.DriverTimeoutException
|
||||
import mu.KLogger
|
||||
import java.io.File
|
||||
import java.util.concurrent.locks.*
|
||||
|
||||
class AeronContext(
|
||||
val config: Configuration,
|
||||
val type: Class<*> = AeronDriver::class.java,
|
||||
val logger: KLogger,
|
||||
aeronErrorHandler: (error: Throwable) -> Unit
|
||||
) {
|
||||
fun close() {
|
||||
context.close()
|
||||
|
||||
// Destroys this thread group and all of its subgroups.
|
||||
// This thread group must be empty, indicating that all threads that had been in this thread group have since stopped.
|
||||
threadFactory.group.destroy()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private fun create(
|
||||
config: Configuration,
|
||||
threadFactory: NamedThreadFactory,
|
||||
aeronErrorHandler: (error: Throwable) -> Unit
|
||||
|
||||
): MediaDriver.Context {
|
||||
// LOW-LATENCY SETTINGS
|
||||
// .termBufferSparseFile(false)
|
||||
// .useWindowsHighResTimer(true)
|
||||
// .threadingMode(ThreadingMode.DEDICATED)
|
||||
// .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE)
|
||||
// .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE)
|
||||
// .senderIdleStrategy(NoOpIdleStrategy.INSTANCE);
|
||||
// setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true");
|
||||
// setProperty("aeron.mtu.length", "16384");
|
||||
// setProperty("aeron.socket.so_sndbuf", "2097152");
|
||||
// setProperty("aeron.socket.so_rcvbuf", "2097152");
|
||||
// setProperty("aeron.rcv.initial.window.length", "2097152");
|
||||
|
||||
// driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind)
|
||||
val mediaDriverContext = MediaDriver.Context()
|
||||
.publicationReservedSessionIdLow(AeronDriver.RESERVED_SESSION_ID_LOW)
|
||||
.publicationReservedSessionIdHigh(AeronDriver.RESERVED_SESSION_ID_HIGH)
|
||||
.threadingMode(config.threadingMode)
|
||||
.mtuLength(config.networkMtuSize)
|
||||
|
||||
.initialWindowLength(config.initialWindowLength)
|
||||
.socketSndbufLength(config.sendBufferSize)
|
||||
.socketRcvbufLength(config.receiveBufferSize)
|
||||
|
||||
mediaDriverContext
|
||||
.conductorThreadFactory(threadFactory)
|
||||
.receiverThreadFactory(threadFactory)
|
||||
.senderThreadFactory(threadFactory)
|
||||
.sharedNetworkThreadFactory(threadFactory)
|
||||
.sharedThreadFactory(threadFactory)
|
||||
|
||||
mediaDriverContext.aeronDirectoryName(config.aeronDirectory!!.absolutePath)
|
||||
|
||||
if (mediaDriverContext.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) {
|
||||
// default 64 megs each is HUGE
|
||||
mediaDriverContext.ipcTermBufferLength(8 * 1024 * 1024)
|
||||
}
|
||||
|
||||
if (mediaDriverContext.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) {
|
||||
// default 16 megs each is HUGE (we run out of space in production w/ lots of clients)
|
||||
mediaDriverContext.publicationTermBufferLength(2 * 1024 * 1024)
|
||||
}
|
||||
|
||||
// we DO NOT want to abort the JVM if there are errors.
|
||||
// this replaces the default handler with one that doesn't abort the JVM
|
||||
mediaDriverContext.errorHandler { error ->
|
||||
aeronErrorHandler(AeronDriverException(error))
|
||||
}
|
||||
|
||||
return mediaDriverContext
|
||||
}
|
||||
}
|
||||
|
||||
// the context is validated before the AeronDriver object is created
|
||||
internal val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
|
||||
|
||||
|
||||
val context: MediaDriver.Context
|
||||
|
||||
/**
|
||||
* @return the configured driver timeout
|
||||
*/
|
||||
val driverTimeout: Long
|
||||
get() {
|
||||
return context.driverTimeoutMs()
|
||||
}
|
||||
|
||||
/**
|
||||
* This is only valid **AFTER** [context.concludeAeronDirectory()] has been called.
|
||||
*
|
||||
* @return the aeron context directory
|
||||
*/
|
||||
val driverDirectory: File
|
||||
get() {
|
||||
return context.aeronDirectory()
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see if an endpoint (using the specified configuration) is running.
|
||||
*
|
||||
* @return true if the media driver is active and running
|
||||
*/
|
||||
fun isRunning(): Boolean {
|
||||
// if the media driver is running, it will be a quick connection. Usually 100ms or so
|
||||
return context.isDriverActive(context.driverTimeoutMs()) { }
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates the Aeron Media Driver context
|
||||
*
|
||||
* @throws IllegalStateException if the configuration has already been used to create a context
|
||||
* @throws IllegalArgumentException if the aeron media driver directory cannot be setup
|
||||
*/
|
||||
init {
|
||||
var context = create(config, threadFactory, aeronErrorHandler)
|
||||
|
||||
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
|
||||
context.concludeAeronDirectory()
|
||||
|
||||
// will setup the aeron directory or throw IllegalArgumentException if it cannot be configured
|
||||
val aeronDir = context.aeronDirectory()
|
||||
|
||||
val driverTimeout = context.driverTimeoutMs()
|
||||
|
||||
// sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example)
|
||||
// we ONLY do this during the initial startup check because it will delete the directory, and we don't
|
||||
// always want to do this.
|
||||
var isRunning = try {
|
||||
context.isDriverActive(driverTimeout) { }
|
||||
} catch (e: DriverTimeoutException) {
|
||||
// we have to delete the directory, since it was corrupted, and we try again.
|
||||
if (aeronDir.deleteRecursively()) {
|
||||
context.isDriverActive(driverTimeout) { }
|
||||
} else {
|
||||
// unable to delete the directory
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
// this is incompatible with IPC, and will not be set if IPC is enabled
|
||||
if (config.aeronDirectoryForceUnique && isRunning) {
|
||||
val savedParent = aeronDir.parentFile
|
||||
var retry = 0
|
||||
val retryMax = 100
|
||||
|
||||
while (config.aeronDirectoryForceUnique && isRunning) {
|
||||
if (retry++ > retryMax) {
|
||||
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
|
||||
}
|
||||
|
||||
val randomNum = (1..retryMax).shuffled().first()
|
||||
val newDir = savedParent.resolve("${aeronDir.name}_$randomNum")
|
||||
|
||||
context = create(config, threadFactory, aeronErrorHandler)
|
||||
context.aeronDirectoryName(newDir.path)
|
||||
|
||||
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
|
||||
context.concludeAeronDirectory()
|
||||
|
||||
isRunning = context.isDriverActive(driverTimeout) { }
|
||||
}
|
||||
|
||||
if (!isRunning) {
|
||||
// NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
|
||||
// same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
|
||||
// since we are forcing a unique directory, we should ALSO delete it when we are done!
|
||||
context.dirDeleteOnShutdown()
|
||||
}
|
||||
}
|
||||
|
||||
logger.info { "Aeron directory: '${context.aeronDirectory()}'" }
|
||||
|
||||
this.context = context
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return context.toString()
|
||||
}
|
||||
}
|
@ -3,14 +3,13 @@ package dorkbox.network.aeron
|
||||
import dorkbox.network.Configuration
|
||||
import dorkbox.network.connection.ListenerManager
|
||||
import dorkbox.network.exceptions.AeronDriverException
|
||||
import dorkbox.util.NamedThreadFactory
|
||||
import dorkbox.network.exceptions.ClientRetryException
|
||||
import io.aeron.Aeron
|
||||
import io.aeron.ChannelUriStringBuilder
|
||||
import io.aeron.CncFileDescriptor
|
||||
import io.aeron.Publication
|
||||
import io.aeron.Subscription
|
||||
import io.aeron.driver.MediaDriver
|
||||
import io.aeron.exceptions.DriverTimeoutException
|
||||
import io.aeron.samples.SamplesUtil
|
||||
import mu.KLogger
|
||||
import mu.KotlinLogging
|
||||
@ -22,8 +21,8 @@ import org.agrona.concurrent.status.CountersReader
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.File
|
||||
import java.lang.Thread.sleep
|
||||
import java.net.BindException
|
||||
import java.util.concurrent.locks.*
|
||||
import kotlin.concurrent.read
|
||||
import kotlin.concurrent.write
|
||||
|
||||
/**
|
||||
@ -135,64 +134,6 @@ class AeronDriver(
|
||||
}
|
||||
}
|
||||
|
||||
private fun create(
|
||||
config: Configuration,
|
||||
threadFactory: NamedThreadFactory,
|
||||
aeronErrorHandler: (error: Throwable) -> Unit
|
||||
|
||||
): MediaDriver.Context {
|
||||
// LOW-LATENCY SETTINGS
|
||||
// .termBufferSparseFile(false)
|
||||
// .useWindowsHighResTimer(true)
|
||||
// .threadingMode(ThreadingMode.DEDICATED)
|
||||
// .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE)
|
||||
// .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE)
|
||||
// .senderIdleStrategy(NoOpIdleStrategy.INSTANCE);
|
||||
// setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true");
|
||||
// setProperty("aeron.mtu.length", "16384");
|
||||
// setProperty("aeron.socket.so_sndbuf", "2097152");
|
||||
// setProperty("aeron.socket.so_rcvbuf", "2097152");
|
||||
// setProperty("aeron.rcv.initial.window.length", "2097152");
|
||||
|
||||
// driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind)
|
||||
val mediaDriverContext = MediaDriver.Context()
|
||||
.publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW)
|
||||
.publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH)
|
||||
.threadingMode(config.threadingMode)
|
||||
.mtuLength(config.networkMtuSize)
|
||||
|
||||
.initialWindowLength(config.initialWindowLength)
|
||||
.socketSndbufLength(config.sendBufferSize)
|
||||
.socketRcvbufLength(config.receiveBufferSize)
|
||||
|
||||
mediaDriverContext
|
||||
.conductorThreadFactory(threadFactory)
|
||||
.receiverThreadFactory(threadFactory)
|
||||
.senderThreadFactory(threadFactory)
|
||||
.sharedNetworkThreadFactory(threadFactory)
|
||||
.sharedThreadFactory(threadFactory)
|
||||
|
||||
mediaDriverContext.aeronDirectoryName(config.aeronDirectory!!.absolutePath)
|
||||
|
||||
if (mediaDriverContext.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) {
|
||||
// default 64 megs each is HUGE
|
||||
mediaDriverContext.ipcTermBufferLength(8 * 1024 * 1024)
|
||||
}
|
||||
|
||||
if (mediaDriverContext.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) {
|
||||
// default 16 megs each is HUGE (we run out of space in production w/ lots of clients)
|
||||
mediaDriverContext.publicationTermBufferLength(2 * 1024 * 1024)
|
||||
}
|
||||
|
||||
// we DO NOT want to abort the JVM if there are errors.
|
||||
// this replaces the default handler with one that doesn't abort the JVM
|
||||
mediaDriverContext.errorHandler { error ->
|
||||
aeronErrorHandler(AeronDriverException(error))
|
||||
}
|
||||
|
||||
return mediaDriverContext
|
||||
}
|
||||
|
||||
private fun aeronCounters(aeronLocation: File): CountersReader {
|
||||
val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat"))
|
||||
val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer)
|
||||
@ -252,36 +193,12 @@ class AeronDriver(
|
||||
|
||||
|
||||
|
||||
@Volatile
|
||||
private var closeRequested = false
|
||||
|
||||
private var aeron: Aeron? = null
|
||||
|
||||
private var mediaDriver: MediaDriver? = null
|
||||
|
||||
private var context: MediaDriver.Context? = null
|
||||
|
||||
// the context is validated before the AeronDriver object is created
|
||||
private var threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
|
||||
|
||||
// did WE start the media driver, or did SOMEONE ELSE start it?
|
||||
private val mediaDriverWasAlreadyRunning: Boolean
|
||||
|
||||
/**
|
||||
* @return the configured driver timeout
|
||||
*/
|
||||
val driverTimeout: Long
|
||||
get() {
|
||||
return context!!.driverTimeoutMs()
|
||||
}
|
||||
|
||||
/**
|
||||
* This is only valid **AFTER** [context.concludeAeronDirectory()] has been called.
|
||||
*/
|
||||
val driverDirectory: File
|
||||
get() {
|
||||
return context!!.aeronDirectory()
|
||||
}
|
||||
private var mediaDriverWasAlreadyRunning = false
|
||||
|
||||
|
||||
/**
|
||||
@ -289,6 +206,18 @@ class AeronDriver(
|
||||
*/
|
||||
private val aeronErrorHandler: (error: Throwable) -> Unit
|
||||
|
||||
@Volatile
|
||||
private var context_: AeronContext? = null
|
||||
private val context: AeronContext
|
||||
get() {
|
||||
if (context_ == null) {
|
||||
context_ = AeronContext(config, type, logger, aeronErrorHandler)
|
||||
}
|
||||
|
||||
return context_!!
|
||||
}
|
||||
|
||||
|
||||
init {
|
||||
config.validate() // this happens more than once! (this is ok)
|
||||
setConfigDefaults(config, logger)
|
||||
@ -303,119 +232,27 @@ class AeronDriver(
|
||||
aeronErrorLogger(AeronDriverException(error))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
context = createContext()
|
||||
mediaDriverWasAlreadyRunning = isRunning()
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the Aeron Media Driver context
|
||||
*
|
||||
* @throws IllegalStateException if the configuration has already been used to create a context
|
||||
* @throws IllegalArgumentException if the aeron media driver directory cannot be setup
|
||||
*/
|
||||
private fun createContext(): MediaDriver.Context {
|
||||
// Note: A mutex doesn't work so well
|
||||
return lock.write {
|
||||
if (threadFactory.group.isDestroyed) {
|
||||
// on close, we destroy the threadfactory -- and on driver restart, we might want it back
|
||||
threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
|
||||
}
|
||||
|
||||
var context = create(config, threadFactory, aeronErrorHandler)
|
||||
|
||||
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
|
||||
context.concludeAeronDirectory()
|
||||
|
||||
// will setup the aeron directory or throw IllegalArgumentException if it cannot be configured
|
||||
val aeronDir = context.aeronDirectory()
|
||||
|
||||
val driverTimeout = context.driverTimeoutMs()
|
||||
|
||||
// sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example)
|
||||
// we ONLY do this during the initial startup check because it will delete the directory, and we don't
|
||||
// always want to do this.
|
||||
var isRunning = try {
|
||||
context.isDriverActive(driverTimeout) { }
|
||||
} catch (e: DriverTimeoutException) {
|
||||
// we have to delete the directory, since it was corrupted, and we try again.
|
||||
if (aeronDir.deleteRecursively()) {
|
||||
context.isDriverActive(driverTimeout) { }
|
||||
} else {
|
||||
// unable to delete the directory
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
// this is incompatible with IPC, and will not be set if IPC is enabled
|
||||
if (config.aeronDirectoryForceUnique && isRunning) {
|
||||
val savedParent = aeronDir.parentFile
|
||||
var retry = 0
|
||||
val retryMax = 100
|
||||
|
||||
while (config.aeronDirectoryForceUnique && isRunning) {
|
||||
if (retry++ > retryMax) {
|
||||
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
|
||||
}
|
||||
|
||||
val randomNum = (1..retryMax).shuffled().first()
|
||||
val newDir = savedParent.resolve("${aeronDir.name}_$randomNum")
|
||||
|
||||
context = create(config, threadFactory, aeronErrorHandler)
|
||||
context.aeronDirectoryName(newDir.path)
|
||||
|
||||
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
|
||||
context.concludeAeronDirectory()
|
||||
|
||||
isRunning = context.isDriverActive(driverTimeout) { }
|
||||
}
|
||||
|
||||
if (!isRunning) {
|
||||
// NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
|
||||
// same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
|
||||
// since we are forcing a unique directory, we should ALSO delete it when we are done!
|
||||
context.dirDeleteOnShutdown()
|
||||
}
|
||||
}
|
||||
|
||||
logger.info { "Aeron directory: '${context.aeronDirectory()}'" }
|
||||
|
||||
context
|
||||
private fun isLoaded(): Boolean {
|
||||
return lock.read {
|
||||
val mediaDriverLoaded = mediaDriverWasAlreadyRunning || mediaDriver != null
|
||||
mediaDriverLoaded && aeron != null && aeron?.isClosed == false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the driver is not already running, this will start the driver. This will ALSO connect to the aeron client
|
||||
*
|
||||
* @throws Exception if there is a problem starting the media driver
|
||||
* @return true if we are successfully connected to the aeron client
|
||||
*/
|
||||
fun start() {
|
||||
if (closeRequested) {
|
||||
logger.debug("Resetting media driver context")
|
||||
|
||||
// close was requested previously. we have to reset a few things
|
||||
context = createContext()
|
||||
|
||||
// we want to be able to "restart" aeron, as necessary, after a [close] method call
|
||||
closeRequested = false
|
||||
fun start(): Boolean {
|
||||
if (isLoaded()) {
|
||||
return true
|
||||
}
|
||||
|
||||
maybeRestart()
|
||||
}
|
||||
|
||||
/**
|
||||
* if we did NOT close, then will restart the media driver + aeron. If we manually closed aeron, then we won't try to restart
|
||||
*/
|
||||
private fun maybeRestart() {
|
||||
if (closeRequested) {
|
||||
return
|
||||
}
|
||||
|
||||
var loadedDriver = false
|
||||
|
||||
lock.write {
|
||||
if (mediaDriver == null) {
|
||||
if (!mediaDriverWasAlreadyRunning && mediaDriver == null) {
|
||||
// only start if we didn't already start... There will be several checks.
|
||||
|
||||
var running = isRunning()
|
||||
@ -423,59 +260,51 @@ class AeronDriver(
|
||||
// wait for a bit, because we are running, but we ALSO issued a START, and expect it to start.
|
||||
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
|
||||
// that instance
|
||||
logger.debug("Aeron Media driver already running. Double checking status...")
|
||||
sleep(driverTimeout/2)
|
||||
logger.debug { "Aeron Media driver already running. Double checking status..." }
|
||||
sleep(context.driverTimeout/2)
|
||||
running = isRunning()
|
||||
}
|
||||
|
||||
if (!running) {
|
||||
logger.debug("Starting Aeron Media driver.")
|
||||
logger.debug { "Starting Aeron Media driver." }
|
||||
|
||||
// try to start. If we start/stop too quickly, it's a problem
|
||||
var count = 10
|
||||
while (count-- > 0) {
|
||||
try {
|
||||
mediaDriver = MediaDriver.launch(context)
|
||||
loadedDriver = true
|
||||
mediaDriver = MediaDriver.launch(context.context)
|
||||
logger.debug { "Started the Aeron Media driver." }
|
||||
break
|
||||
} catch (e: Exception) {
|
||||
logger.warn(e) { "Unable to start the Aeron Media driver at ${driverDirectory}. Retrying $count more times..." }
|
||||
sleep(driverTimeout)
|
||||
logger.warn(e) { "Unable to start the Aeron Media driver at ${context.driverDirectory}. Retrying $count more times..." }
|
||||
sleep(context.driverTimeout)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.debug("Not starting Aeron Media driver. It was already running.")
|
||||
mediaDriverWasAlreadyRunning = true
|
||||
logger.debug { "Not starting Aeron Media driver. It was already running." }
|
||||
}
|
||||
|
||||
// if we were unable to load the aeron driver, don't continue.
|
||||
if (!running && mediaDriver == null) {
|
||||
logger.error { "Not running and unable to start the Aeron Media driver at ${context.driverDirectory}." }
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (loadedDriver || aeron == null) {
|
||||
if (closeRequested) {
|
||||
return
|
||||
}
|
||||
// the media driver MIGHT already be started in a different process!
|
||||
//
|
||||
// We still ALWAYS want to connect to aeron (which connects to the other media driver process), especially if we
|
||||
// haven't already connected to it (or if there was an error connecting because a different media driver was shutting down)
|
||||
|
||||
// the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to
|
||||
// aeron (which connects to the other media driver process), especially if we haven't already connected to
|
||||
// it (or if there was an error connecting because a different media driver was shutting down)
|
||||
|
||||
aeron?.close()
|
||||
|
||||
// this might succeed if we can connect to the media driver
|
||||
aeron = Aeron.connect(setupAeron())
|
||||
logger.debug("Connected to Aeron driver.")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
private fun setupAeron(): Aeron.Context {
|
||||
val aeronDriverContext = Aeron.Context()
|
||||
aeronDriverContext
|
||||
.aeronDirectoryName(driverDirectory.path)
|
||||
.aeronDirectoryName(context.driverDirectory.path)
|
||||
.concludeAeronDirectory()
|
||||
|
||||
aeronDriverContext
|
||||
.threadFactory(threadFactory)
|
||||
.threadFactory(context.threadFactory)
|
||||
.idleStrategy(BackoffIdleStrategy())
|
||||
|
||||
// we DO NOT want to abort the JVM if there are errors.
|
||||
@ -487,18 +316,15 @@ class AeronDriver(
|
||||
aeronErrorHandler(error)
|
||||
}
|
||||
|
||||
return aeronDriverContext
|
||||
|
||||
|
||||
// this might succeed if we can connect to the media driver
|
||||
aeron = Aeron.connect(aeronDriverContext)
|
||||
logger.debug { "Connected to Aeron driver." }
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!)
|
||||
*/
|
||||
@Synchronized
|
||||
fun getMediaDriverPublicationFile(publicationRegId: Long): File {
|
||||
return driverDirectory.resolve("publications").resolve("${publicationRegId}.logbuffer")
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication {
|
||||
val uri = publicationUri.build()
|
||||
|
||||
@ -534,7 +360,6 @@ class AeronDriver(
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription {
|
||||
val uri = subscriptionUri.build()
|
||||
|
||||
@ -577,10 +402,9 @@ class AeronDriver(
|
||||
*
|
||||
* @return true if the media driver is active and running
|
||||
*/
|
||||
@Synchronized
|
||||
fun isRunning(timeout: Long = context!!.driverTimeoutMs()): Boolean {
|
||||
fun isRunning(): Boolean {
|
||||
// if the media driver is running, it will be a quick connection. Usually 100ms or so
|
||||
return context!!.isDriverActive(timeout) { }
|
||||
return context.isRunning()
|
||||
}
|
||||
|
||||
/**
|
||||
@ -589,106 +413,119 @@ class AeronDriver(
|
||||
* NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
|
||||
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
|
||||
*/
|
||||
@Synchronized
|
||||
fun close() {
|
||||
closeRequested = true
|
||||
|
||||
lock.write {
|
||||
try {
|
||||
aeron?.close()
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error stopping aeron.", e)
|
||||
logger.error(e) { "Error stopping aeron." }
|
||||
}
|
||||
|
||||
aeron = null
|
||||
|
||||
val mediaDriver = mediaDriver
|
||||
this@AeronDriver.mediaDriver = null
|
||||
|
||||
if (mediaDriver == null) {
|
||||
logger.debug("No driver started for this instance. Not Stopping.")
|
||||
logger.debug { "No driver started for this instance. Not Stopping." }
|
||||
return
|
||||
}
|
||||
|
||||
if (mediaDriverWasAlreadyRunning) {
|
||||
logger.debug("We did not start the media driver, so we are not stopping it.")
|
||||
logger.debug { "We did not start the media driver, so we are not stopping it." }
|
||||
return
|
||||
}
|
||||
|
||||
logger.debug("Stopping driver at '${driverDirectory}'...")
|
||||
|
||||
logger.debug { "Stopping driver at '${context.driverDirectory}'..." }
|
||||
|
||||
if (!isRunning()) {
|
||||
// not running
|
||||
logger.debug { "Driver is not running at '${driverDirectory}' for this context. Not Stopping." }
|
||||
logger.debug { "Driver is not running at '${context.driverDirectory}' for this context. Not Stopping." }
|
||||
return
|
||||
}
|
||||
|
||||
// if we are the ones that started the media driver, then we must be the ones to close it
|
||||
try {
|
||||
mediaDriver.close()
|
||||
mediaDriver!!.close()
|
||||
} catch (e: Exception) {
|
||||
logger.error(e) { "Error closing the Aeron media driver" }
|
||||
}
|
||||
|
||||
// make sure the context is also closed.
|
||||
context!!.close()
|
||||
mediaDriver = null
|
||||
|
||||
// it can actually close faster, if everything is ideal.
|
||||
if (isRunning()) {
|
||||
// on close, we want to wait for the driver to timeout before considering it "closed". Connections can still LINGER (see below)
|
||||
// on close, the publication CAN linger (in case a client goes away, and then comes back)
|
||||
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
|
||||
sleep(driverTimeout + AERON_PUBLICATION_LINGER_TIMEOUT)
|
||||
sleep(context.driverTimeout + AERON_PUBLICATION_LINGER_TIMEOUT)
|
||||
}
|
||||
|
||||
// wait for the media driver to actually stop
|
||||
var count = 10
|
||||
while (--count >= 0 && isRunning()) {
|
||||
logger.warn { "Aeron Media driver at '${driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." }
|
||||
sleep(driverTimeout)
|
||||
logger.warn { "Aeron Media driver at '${context.driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." }
|
||||
sleep(context.driverTimeout)
|
||||
}
|
||||
logger.debug { "Closed the media driver at '${driverDirectory}'" }
|
||||
logger.debug { "Closed the media driver at '${context.driverDirectory}'" }
|
||||
|
||||
val deletedAeron = driverDirectory.deleteRecursively()
|
||||
try {
|
||||
} catch (e: Exception) {
|
||||
logger.error(e) {"Error closing the media driver at '${context.driverDirectory}'" }
|
||||
}
|
||||
|
||||
// make sure the context is also closed.
|
||||
context.close()
|
||||
context_ = null
|
||||
|
||||
try {
|
||||
val deletedAeron = context.driverDirectory.deleteRecursively()
|
||||
if (!deletedAeron) {
|
||||
logger.error { "Error deleting aeron directory $driverDirectory on shutdown "}
|
||||
logger.error { "Error deleting aeron directory ${context.driverDirectory} on shutdown "}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error closing the media driver at '${driverDirectory}'", e)
|
||||
logger.error(e) { "Error deleting Aeron directory at: ${context.driverDirectory}"}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Destroys this thread group and all of its subgroups.
|
||||
// This thread group must be empty, indicating that all threads that had been in this thread group have since stopped.
|
||||
threadFactory.group.destroy()
|
||||
/**
|
||||
* @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!)
|
||||
*/
|
||||
fun getMediaDriverPublicationFile(publicationRegId: Long): File {
|
||||
return context.driverDirectory.resolve("publications").resolve("${publicationRegId}.logbuffer")
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the internal counters of the Aeron driver in the current aeron directory
|
||||
*/
|
||||
fun driverCounters(counterFunction: (counterId: Int, counterValue: Long, typeId: Int, keyBuffer: DirectBuffer?, label: String?) -> Unit) {
|
||||
driverCounters(context!!.aeronDirectory(), counterFunction)
|
||||
driverCounters(context.driverDirectory, counterFunction)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the backlog statistics for the Aeron driver
|
||||
*/
|
||||
fun driverBacklog(): BacklogStat {
|
||||
return driverBacklog(context!!.aeronDirectory())
|
||||
return driverBacklog(context.driverDirectory)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the internal heartbeat of the Aeron driver in the current aeron directory
|
||||
*/
|
||||
fun driverHeartbeatMs(): Long {
|
||||
return driverHeartbeatMs(context!!.aeronDirectory())
|
||||
return driverHeartbeatMs(context.driverDirectory)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the internal version of the Aeron driver in the current aeron directory
|
||||
*/
|
||||
fun driverVersion(): String {
|
||||
return driverVersion(context!!.aeronDirectory())
|
||||
return driverVersion(context.driverDirectory)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current aeron context info, if any
|
||||
*/
|
||||
fun contextInfo(): String? {
|
||||
return context?.toString()
|
||||
fun contextInfo(): String {
|
||||
return context.toString()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user