diff --git a/src/dorkbox/network/aeron/AeronContext.kt b/src/dorkbox/network/aeron/AeronContext.kt new file mode 100644 index 00000000..178299a8 --- /dev/null +++ b/src/dorkbox/network/aeron/AeronContext.kt @@ -0,0 +1,192 @@ +package dorkbox.network.aeron + +import dorkbox.network.Configuration +import dorkbox.network.exceptions.AeronDriverException +import dorkbox.util.NamedThreadFactory +import io.aeron.driver.MediaDriver +import io.aeron.exceptions.DriverTimeoutException +import mu.KLogger +import java.io.File +import java.util.concurrent.locks.* + +class AeronContext( + val config: Configuration, + val type: Class<*> = AeronDriver::class.java, + val logger: KLogger, + aeronErrorHandler: (error: Throwable) -> Unit +) { + fun close() { + context.close() + + // Destroys this thread group and all of its subgroups. + // This thread group must be empty, indicating that all threads that had been in this thread group have since stopped. + threadFactory.group.destroy() + } + + companion object { + private fun create( + config: Configuration, + threadFactory: NamedThreadFactory, + aeronErrorHandler: (error: Throwable) -> Unit + + ): MediaDriver.Context { + // LOW-LATENCY SETTINGS + // .termBufferSparseFile(false) + // .useWindowsHighResTimer(true) + // .threadingMode(ThreadingMode.DEDICATED) + // .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE) + // .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE) + // .senderIdleStrategy(NoOpIdleStrategy.INSTANCE); + // setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); + // setProperty("aeron.mtu.length", "16384"); + // setProperty("aeron.socket.so_sndbuf", "2097152"); + // setProperty("aeron.socket.so_rcvbuf", "2097152"); + // setProperty("aeron.rcv.initial.window.length", "2097152"); + + // driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind) + val mediaDriverContext = MediaDriver.Context() + .publicationReservedSessionIdLow(AeronDriver.RESERVED_SESSION_ID_LOW) + .publicationReservedSessionIdHigh(AeronDriver.RESERVED_SESSION_ID_HIGH) + .threadingMode(config.threadingMode) + .mtuLength(config.networkMtuSize) + + .initialWindowLength(config.initialWindowLength) + .socketSndbufLength(config.sendBufferSize) + .socketRcvbufLength(config.receiveBufferSize) + + mediaDriverContext + .conductorThreadFactory(threadFactory) + .receiverThreadFactory(threadFactory) + .senderThreadFactory(threadFactory) + .sharedNetworkThreadFactory(threadFactory) + .sharedThreadFactory(threadFactory) + + mediaDriverContext.aeronDirectoryName(config.aeronDirectory!!.absolutePath) + + if (mediaDriverContext.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) { + // default 64 megs each is HUGE + mediaDriverContext.ipcTermBufferLength(8 * 1024 * 1024) + } + + if (mediaDriverContext.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) { + // default 16 megs each is HUGE (we run out of space in production w/ lots of clients) + mediaDriverContext.publicationTermBufferLength(2 * 1024 * 1024) + } + + // we DO NOT want to abort the JVM if there are errors. + // this replaces the default handler with one that doesn't abort the JVM + mediaDriverContext.errorHandler { error -> + aeronErrorHandler(AeronDriverException(error)) + } + + return mediaDriverContext + } + } + + // the context is validated before the AeronDriver object is created + internal val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) + + + val context: MediaDriver.Context + + /** + * @return the configured driver timeout + */ + val driverTimeout: Long + get() { + return context.driverTimeoutMs() + } + + /** + * This is only valid **AFTER** [context.concludeAeronDirectory()] has been called. + * + * @return the aeron context directory + */ + val driverDirectory: File + get() { + return context.aeronDirectory() + } + + /** + * Checks to see if an endpoint (using the specified configuration) is running. + * + * @return true if the media driver is active and running + */ + fun isRunning(): Boolean { + // if the media driver is running, it will be a quick connection. Usually 100ms or so + return context.isDriverActive(context.driverTimeoutMs()) { } + } + + + /** + * Creates the Aeron Media Driver context + * + * @throws IllegalStateException if the configuration has already been used to create a context + * @throws IllegalArgumentException if the aeron media driver directory cannot be setup + */ + init { + var context = create(config, threadFactory, aeronErrorHandler) + + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() + + // will setup the aeron directory or throw IllegalArgumentException if it cannot be configured + val aeronDir = context.aeronDirectory() + + val driverTimeout = context.driverTimeoutMs() + + // sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example) + // we ONLY do this during the initial startup check because it will delete the directory, and we don't + // always want to do this. + var isRunning = try { + context.isDriverActive(driverTimeout) { } + } catch (e: DriverTimeoutException) { + // we have to delete the directory, since it was corrupted, and we try again. + if (aeronDir.deleteRecursively()) { + context.isDriverActive(driverTimeout) { } + } else { + // unable to delete the directory + throw e + } + } + + // this is incompatible with IPC, and will not be set if IPC is enabled + if (config.aeronDirectoryForceUnique && isRunning) { + val savedParent = aeronDir.parentFile + var retry = 0 + val retryMax = 100 + + while (config.aeronDirectoryForceUnique && isRunning) { + if (retry++ > retryMax) { + throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.") + } + + val randomNum = (1..retryMax).shuffled().first() + val newDir = savedParent.resolve("${aeronDir.name}_$randomNum") + + context = create(config, threadFactory, aeronErrorHandler) + context.aeronDirectoryName(newDir.path) + + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() + + isRunning = context.isDriverActive(driverTimeout) { } + } + + if (!isRunning) { + // NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the + // same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). + // since we are forcing a unique directory, we should ALSO delete it when we are done! + context.dirDeleteOnShutdown() + } + } + + logger.info { "Aeron directory: '${context.aeronDirectory()}'" } + + this.context = context + } + + override fun toString(): String { + return context.toString() + } +} diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index f47f519f..e3390cfd 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -3,14 +3,13 @@ package dorkbox.network.aeron import dorkbox.network.Configuration import dorkbox.network.connection.ListenerManager import dorkbox.network.exceptions.AeronDriverException -import dorkbox.util.NamedThreadFactory +import dorkbox.network.exceptions.ClientRetryException import io.aeron.Aeron import io.aeron.ChannelUriStringBuilder import io.aeron.CncFileDescriptor import io.aeron.Publication import io.aeron.Subscription import io.aeron.driver.MediaDriver -import io.aeron.exceptions.DriverTimeoutException import io.aeron.samples.SamplesUtil import mu.KLogger import mu.KotlinLogging @@ -22,8 +21,8 @@ import org.agrona.concurrent.status.CountersReader import org.slf4j.LoggerFactory import java.io.File import java.lang.Thread.sleep -import java.net.BindException import java.util.concurrent.locks.* +import kotlin.concurrent.read import kotlin.concurrent.write /** @@ -135,64 +134,6 @@ class AeronDriver( } } - private fun create( - config: Configuration, - threadFactory: NamedThreadFactory, - aeronErrorHandler: (error: Throwable) -> Unit - - ): MediaDriver.Context { - // LOW-LATENCY SETTINGS - // .termBufferSparseFile(false) - // .useWindowsHighResTimer(true) - // .threadingMode(ThreadingMode.DEDICATED) - // .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE) - // .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE) - // .senderIdleStrategy(NoOpIdleStrategy.INSTANCE); - // setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); - // setProperty("aeron.mtu.length", "16384"); - // setProperty("aeron.socket.so_sndbuf", "2097152"); - // setProperty("aeron.socket.so_rcvbuf", "2097152"); - // setProperty("aeron.rcv.initial.window.length", "2097152"); - - // driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind) - val mediaDriverContext = MediaDriver.Context() - .publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW) - .publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH) - .threadingMode(config.threadingMode) - .mtuLength(config.networkMtuSize) - - .initialWindowLength(config.initialWindowLength) - .socketSndbufLength(config.sendBufferSize) - .socketRcvbufLength(config.receiveBufferSize) - - mediaDriverContext - .conductorThreadFactory(threadFactory) - .receiverThreadFactory(threadFactory) - .senderThreadFactory(threadFactory) - .sharedNetworkThreadFactory(threadFactory) - .sharedThreadFactory(threadFactory) - - mediaDriverContext.aeronDirectoryName(config.aeronDirectory!!.absolutePath) - - if (mediaDriverContext.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) { - // default 64 megs each is HUGE - mediaDriverContext.ipcTermBufferLength(8 * 1024 * 1024) - } - - if (mediaDriverContext.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) { - // default 16 megs each is HUGE (we run out of space in production w/ lots of clients) - mediaDriverContext.publicationTermBufferLength(2 * 1024 * 1024) - } - - // we DO NOT want to abort the JVM if there are errors. - // this replaces the default handler with one that doesn't abort the JVM - mediaDriverContext.errorHandler { error -> - aeronErrorHandler(AeronDriverException(error)) - } - - return mediaDriverContext - } - private fun aeronCounters(aeronLocation: File): CountersReader { val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat")) val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer) @@ -252,36 +193,12 @@ class AeronDriver( - @Volatile - private var closeRequested = false - private var aeron: Aeron? = null - private var mediaDriver: MediaDriver? = null - private var context: MediaDriver.Context? = null - - // the context is validated before the AeronDriver object is created - private var threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) // did WE start the media driver, or did SOMEONE ELSE start it? - private val mediaDriverWasAlreadyRunning: Boolean - - /** - * @return the configured driver timeout - */ - val driverTimeout: Long - get() { - return context!!.driverTimeoutMs() - } - - /** - * This is only valid **AFTER** [context.concludeAeronDirectory()] has been called. - */ - val driverDirectory: File - get() { - return context!!.aeronDirectory() - } + private var mediaDriverWasAlreadyRunning = false /** @@ -289,6 +206,18 @@ class AeronDriver( */ private val aeronErrorHandler: (error: Throwable) -> Unit + @Volatile + private var context_: AeronContext? = null + private val context: AeronContext + get() { + if (context_ == null) { + context_ = AeronContext(config, type, logger, aeronErrorHandler) + } + + return context_!! + } + + init { config.validate() // this happens more than once! (this is ok) setConfigDefaults(config, logger) @@ -303,119 +232,27 @@ class AeronDriver( aeronErrorLogger(AeronDriverException(error)) } } - - - context = createContext() - mediaDriverWasAlreadyRunning = isRunning() } - /** - * Creates the Aeron Media Driver context - * - * @throws IllegalStateException if the configuration has already been used to create a context - * @throws IllegalArgumentException if the aeron media driver directory cannot be setup - */ - private fun createContext(): MediaDriver.Context { - // Note: A mutex doesn't work so well - return lock.write { - if (threadFactory.group.isDestroyed) { - // on close, we destroy the threadfactory -- and on driver restart, we might want it back - threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) - } - - var context = create(config, threadFactory, aeronErrorHandler) - - // this happens EXACTLY once. Must be BEFORE the "isRunning" check! - context.concludeAeronDirectory() - - // will setup the aeron directory or throw IllegalArgumentException if it cannot be configured - val aeronDir = context.aeronDirectory() - - val driverTimeout = context.driverTimeoutMs() - - // sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example) - // we ONLY do this during the initial startup check because it will delete the directory, and we don't - // always want to do this. - var isRunning = try { - context.isDriverActive(driverTimeout) { } - } catch (e: DriverTimeoutException) { - // we have to delete the directory, since it was corrupted, and we try again. - if (aeronDir.deleteRecursively()) { - context.isDriverActive(driverTimeout) { } - } else { - // unable to delete the directory - throw e - } - } - - // this is incompatible with IPC, and will not be set if IPC is enabled - if (config.aeronDirectoryForceUnique && isRunning) { - val savedParent = aeronDir.parentFile - var retry = 0 - val retryMax = 100 - - while (config.aeronDirectoryForceUnique && isRunning) { - if (retry++ > retryMax) { - throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.") - } - - val randomNum = (1..retryMax).shuffled().first() - val newDir = savedParent.resolve("${aeronDir.name}_$randomNum") - - context = create(config, threadFactory, aeronErrorHandler) - context.aeronDirectoryName(newDir.path) - - // this happens EXACTLY once. Must be BEFORE the "isRunning" check! - context.concludeAeronDirectory() - - isRunning = context.isDriverActive(driverTimeout) { } - } - - if (!isRunning) { - // NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the - // same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). - // since we are forcing a unique directory, we should ALSO delete it when we are done! - context.dirDeleteOnShutdown() - } - } - - logger.info { "Aeron directory: '${context.aeronDirectory()}'" } - - context + private fun isLoaded(): Boolean { + return lock.read { + val mediaDriverLoaded = mediaDriverWasAlreadyRunning || mediaDriver != null + mediaDriverLoaded && aeron != null && aeron?.isClosed == false } } /** * If the driver is not already running, this will start the driver. This will ALSO connect to the aeron client * - * @throws Exception if there is a problem starting the media driver + * @return true if we are successfully connected to the aeron client */ - fun start() { - if (closeRequested) { - logger.debug("Resetting media driver context") - - // close was requested previously. we have to reset a few things - context = createContext() - - // we want to be able to "restart" aeron, as necessary, after a [close] method call - closeRequested = false + fun start(): Boolean { + if (isLoaded()) { + return true } - maybeRestart() - } - - /** - * if we did NOT close, then will restart the media driver + aeron. If we manually closed aeron, then we won't try to restart - */ - private fun maybeRestart() { - if (closeRequested) { - return - } - - var loadedDriver = false - lock.write { - if (mediaDriver == null) { + if (!mediaDriverWasAlreadyRunning && mediaDriver == null) { // only start if we didn't already start... There will be several checks. var running = isRunning() @@ -423,59 +260,51 @@ class AeronDriver( // wait for a bit, because we are running, but we ALSO issued a START, and expect it to start. // SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to // that instance - logger.debug("Aeron Media driver already running. Double checking status...") - sleep(driverTimeout/2) + logger.debug { "Aeron Media driver already running. Double checking status..." } + sleep(context.driverTimeout/2) running = isRunning() } if (!running) { - logger.debug("Starting Aeron Media driver.") + logger.debug { "Starting Aeron Media driver." } // try to start. If we start/stop too quickly, it's a problem var count = 10 while (count-- > 0) { try { - mediaDriver = MediaDriver.launch(context) - loadedDriver = true + mediaDriver = MediaDriver.launch(context.context) + logger.debug { "Started the Aeron Media driver." } break } catch (e: Exception) { - logger.warn(e) { "Unable to start the Aeron Media driver at ${driverDirectory}. Retrying $count more times..." } - sleep(driverTimeout) + logger.warn(e) { "Unable to start the Aeron Media driver at ${context.driverDirectory}. Retrying $count more times..." } + sleep(context.driverTimeout) } } } else { - logger.debug("Not starting Aeron Media driver. It was already running.") + mediaDriverWasAlreadyRunning = true + logger.debug { "Not starting Aeron Media driver. It was already running." } + } + + // if we were unable to load the aeron driver, don't continue. + if (!running && mediaDriver == null) { + logger.error { "Not running and unable to start the Aeron Media driver at ${context.driverDirectory}." } + return false } } } - if (loadedDriver || aeron == null) { - if (closeRequested) { - return - } + // the media driver MIGHT already be started in a different process! + // + // We still ALWAYS want to connect to aeron (which connects to the other media driver process), especially if we + // haven't already connected to it (or if there was an error connecting because a different media driver was shutting down) - // the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to - // aeron (which connects to the other media driver process), especially if we haven't already connected to - // it (or if there was an error connecting because a different media driver was shutting down) - - aeron?.close() - - // this might succeed if we can connect to the media driver - aeron = Aeron.connect(setupAeron()) - logger.debug("Connected to Aeron driver.") - } - - return - } - - private fun setupAeron(): Aeron.Context { val aeronDriverContext = Aeron.Context() aeronDriverContext - .aeronDirectoryName(driverDirectory.path) + .aeronDirectoryName(context.driverDirectory.path) .concludeAeronDirectory() aeronDriverContext - .threadFactory(threadFactory) + .threadFactory(context.threadFactory) .idleStrategy(BackoffIdleStrategy()) // we DO NOT want to abort the JVM if there are errors. @@ -487,18 +316,15 @@ class AeronDriver( aeronErrorHandler(error) } - return aeronDriverContext + + + // this might succeed if we can connect to the media driver + aeron = Aeron.connect(aeronDriverContext) + logger.debug { "Connected to Aeron driver." } + + return true } - /** - * @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!) - */ - @Synchronized - fun getMediaDriverPublicationFile(publicationRegId: Long): File { - return driverDirectory.resolve("publications").resolve("${publicationRegId}.logbuffer") - } - - @Synchronized fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication { val uri = publicationUri.build() @@ -534,7 +360,6 @@ class AeronDriver( } } - @Synchronized fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription { val uri = subscriptionUri.build() @@ -577,10 +402,9 @@ class AeronDriver( * * @return true if the media driver is active and running */ - @Synchronized - fun isRunning(timeout: Long = context!!.driverTimeoutMs()): Boolean { + fun isRunning(): Boolean { // if the media driver is running, it will be a quick connection. Usually 100ms or so - return context!!.isDriverActive(timeout) { } + return context.isRunning() } /** @@ -589,106 +413,119 @@ class AeronDriver( * NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the * same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). */ - @Synchronized fun close() { - closeRequested = true + lock.write { + try { + aeron?.close() + } catch (e: Exception) { + logger.error(e) { "Error stopping aeron." } + } - try { - aeron?.close() - } catch (e: Exception) { - logger.error("Error stopping aeron.", e) - } + aeron = null - aeron = null + if (mediaDriver == null) { + logger.debug { "No driver started for this instance. Not Stopping." } + return + } - val mediaDriver = mediaDriver - this@AeronDriver.mediaDriver = null + if (mediaDriverWasAlreadyRunning) { + logger.debug { "We did not start the media driver, so we are not stopping it." } + return + } - if (mediaDriver == null) { - logger.debug("No driver started for this instance. Not Stopping.") - return - } - if (mediaDriverWasAlreadyRunning) { - logger.debug("We did not start the media driver, so we are not stopping it.") - return - } + logger.debug { "Stopping driver at '${context.driverDirectory}'..." } - logger.debug("Stopping driver at '${driverDirectory}'...") + if (!isRunning()) { + // not running + logger.debug { "Driver is not running at '${context.driverDirectory}' for this context. Not Stopping." } + return + } - if (!isRunning()) { - // not running - logger.debug { "Driver is not running at '${driverDirectory}' for this context. Not Stopping." } - return - } + // if we are the ones that started the media driver, then we must be the ones to close it + try { + mediaDriver!!.close() + } catch (e: Exception) { + logger.error(e) { "Error closing the Aeron media driver" } + } - try { - mediaDriver.close() - - // make sure the context is also closed. - context!!.close() + mediaDriver = null // it can actually close faster, if everything is ideal. if (isRunning()) { // on close, we want to wait for the driver to timeout before considering it "closed". Connections can still LINGER (see below) // on close, the publication CAN linger (in case a client goes away, and then comes back) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - sleep(driverTimeout + AERON_PUBLICATION_LINGER_TIMEOUT) + sleep(context.driverTimeout + AERON_PUBLICATION_LINGER_TIMEOUT) } // wait for the media driver to actually stop var count = 10 while (--count >= 0 && isRunning()) { - logger.warn { "Aeron Media driver at '${driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." } - sleep(driverTimeout) + logger.warn { "Aeron Media driver at '${context.driverDirectory}' is still running. Waiting for it to stop. Trying to close $count more times." } + sleep(context.driverTimeout) } - logger.debug { "Closed the media driver at '${driverDirectory}'" } + logger.debug { "Closed the media driver at '${context.driverDirectory}'" } - val deletedAeron = driverDirectory.deleteRecursively() - if (!deletedAeron) { - logger.error { "Error deleting aeron directory $driverDirectory on shutdown "} + try { + } catch (e: Exception) { + logger.error(e) {"Error closing the media driver at '${context.driverDirectory}'" } + } + + // make sure the context is also closed. + context.close() + context_ = null + + try { + val deletedAeron = context.driverDirectory.deleteRecursively() + if (!deletedAeron) { + logger.error { "Error deleting aeron directory ${context.driverDirectory} on shutdown "} + } + } catch (e: Exception) { + logger.error(e) { "Error deleting Aeron directory at: ${context.driverDirectory}"} } - } catch (e: Exception) { - logger.error("Error closing the media driver at '${driverDirectory}'", e) } + } - // Destroys this thread group and all of its subgroups. - // This thread group must be empty, indicating that all threads that had been in this thread group have since stopped. - threadFactory.group.destroy() + /** + * @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!) + */ + fun getMediaDriverPublicationFile(publicationRegId: Long): File { + return context.driverDirectory.resolve("publications").resolve("${publicationRegId}.logbuffer") } /** * @return the internal counters of the Aeron driver in the current aeron directory */ fun driverCounters(counterFunction: (counterId: Int, counterValue: Long, typeId: Int, keyBuffer: DirectBuffer?, label: String?) -> Unit) { - driverCounters(context!!.aeronDirectory(), counterFunction) + driverCounters(context.driverDirectory, counterFunction) } /** * @return the backlog statistics for the Aeron driver */ fun driverBacklog(): BacklogStat { - return driverBacklog(context!!.aeronDirectory()) + return driverBacklog(context.driverDirectory) } /** * @return the internal heartbeat of the Aeron driver in the current aeron directory */ fun driverHeartbeatMs(): Long { - return driverHeartbeatMs(context!!.aeronDirectory()) + return driverHeartbeatMs(context.driverDirectory) } /** * @return the internal version of the Aeron driver in the current aeron directory */ fun driverVersion(): String { - return driverVersion(context!!.aeronDirectory()) + return driverVersion(context.driverDirectory) } /** * @return the current aeron context info, if any */ - fun contextInfo(): String? { - return context?.toString() + fun contextInfo(): String { + return context.toString() } }