diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index 7fed99c8..b6202259 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -6,14 +6,19 @@ import dorkbox.network.exceptions.AeronDriverException import dorkbox.util.NamedThreadFactory import io.aeron.Aeron import io.aeron.ChannelUriStringBuilder +import io.aeron.CncFileDescriptor import io.aeron.Publication import io.aeron.Subscription import io.aeron.driver.MediaDriver import io.aeron.exceptions.DriverTimeoutException -import kotlinx.atomicfu.atomic +import io.aeron.samples.SamplesUtil import mu.KLogger import mu.KotlinLogging +import org.agrona.DirectBuffer +import org.agrona.SemanticVersion import org.agrona.concurrent.BackoffIdleStrategy +import org.agrona.concurrent.ringbuffer.RingBufferDescriptor +import org.agrona.concurrent.status.CountersReader import org.slf4j.LoggerFactory import java.io.File import java.lang.Thread.sleep @@ -187,9 +192,56 @@ class AeronDriver( return mediaDriverContext } + + + /** + * @return the internal counters of the Aeron driver in the specified aeron directory + */ + fun driverCounters(aeronLocation: File, counterFunction: (counterId: Int, counterValue: Long, typeId: Int, keyBuffer: DirectBuffer?, label: String?) -> Unit) { + val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat")) + val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer) + + val countersReader = CountersReader( + CncFileDescriptor.createCountersMetaDataBuffer(cncByteBuffer, cncMetaDataBuffer), + CncFileDescriptor.createCountersValuesBuffer(cncByteBuffer, cncMetaDataBuffer) + ) + countersReader.forEach { counterId: Int, typeId: Int, keyBuffer: DirectBuffer?, label: String? -> + val counterValue = countersReader.getCounterValue(counterId) + counterFunction(counterId, counterValue, typeId, keyBuffer, label) + } + } + + /** + * @return the internal heartbeat of the Aeron driver in the specified aeron directory + */ + fun driverHeartbeatMs(aeronLocation: File): Long { + val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat")) + val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer) + + val toDriverBuffer = CncFileDescriptor.createToDriverBuffer(cncByteBuffer, cncMetaDataBuffer); + val timestampOffset = toDriverBuffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH + RingBufferDescriptor.CONSUMER_HEARTBEAT_OFFSET + + return toDriverBuffer.getLongVolatile(timestampOffset) + } + + /** + * @return the internal version of the Aeron driver in the specified aeron directory + */ + fun driverVersion(aeronLocation: File): String { + val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat")) + val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer) + + val cncVersion = cncMetaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0)) + val cncSemanticVersion = SemanticVersion.toString(cncVersion); + + return cncSemanticVersion + } } - private val closeRequested = atomic(false) + + + @Volatile + private var closeRequested = false private var aeron: Aeron? = null @@ -240,6 +292,7 @@ class AeronDriver( } } + context = createContext() mediaDriverWasAlreadyRunning = isRunning() } @@ -251,25 +304,27 @@ class AeronDriver( * @throws IllegalArgumentException if the aeron media driver directory cannot be setup */ private fun createContext(): MediaDriver.Context { - if (threadFactory.group.isDestroyed) { - // on close, we destroy the threadfactory -- and on driver restart, we might want it back - threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) - } + // Note: A mutex doesn't work so well + return lock.write { + if (threadFactory.group.isDestroyed) { + // on close, we destroy the threadfactory -- and on driver restart, we might want it back + threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) + } - var context = create(config, threadFactory, aeronErrorHandler) + var context = create(config, threadFactory, aeronErrorHandler) - // this happens EXACTLY once. Must be BEFORE the "isRunning" check! - context.concludeAeronDirectory() + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() - // will setup the aeron directory or throw IllegalArgumentException if it cannot be configured - val aeronDir = context.aeronDirectory() + // will setup the aeron directory or throw IllegalArgumentException if it cannot be configured + val aeronDir = context.aeronDirectory() - val driverTimeout = context.driverTimeoutMs() + val driverTimeout = context.driverTimeoutMs() - // sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example) - // we ONLY do this during the initial startup check because it will delete the directory, and we don't - // always want to do this. - var isRunning = try { + // sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example) + // we ONLY do this during the initial startup check because it will delete the directory, and we don't + // always want to do this. + var isRunning = try { context.isDriverActive(driverTimeout) { } } catch (e: DriverTimeoutException) { // we have to delete the directory, since it was corrupted, and we try again. @@ -281,134 +336,124 @@ class AeronDriver( } } - // this is incompatible with IPC, and will not be set if IPC is enabled - if (config.aeronDirectoryForceUnique && isRunning) { - val savedParent = aeronDir.parentFile - var retry = 0 - val retryMax = 100 + // this is incompatible with IPC, and will not be set if IPC is enabled + if (config.aeronDirectoryForceUnique && isRunning) { + val savedParent = aeronDir.parentFile + var retry = 0 + val retryMax = 100 - while (config.aeronDirectoryForceUnique && isRunning) { - if (retry++ > retryMax) { - throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.") + while (config.aeronDirectoryForceUnique && isRunning) { + if (retry++ > retryMax) { + throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.") + } + + val randomNum = (1..retryMax).shuffled().first() + val newDir = savedParent.resolve("${aeronDir.name}_$randomNum") + + context = create(config, threadFactory, aeronErrorHandler) + context.aeronDirectoryName(newDir.path) + + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() + + isRunning = context.isDriverActive(driverTimeout) { } } - val randomNum = (1..retryMax).shuffled().first() - val newDir = savedParent.resolve("${aeronDir.name}_$randomNum") - - context = create(config, threadFactory, aeronErrorHandler) - context.aeronDirectoryName(newDir.path) - - // this happens EXACTLY once. Must be BEFORE the "isRunning" check! - context.concludeAeronDirectory() - - isRunning = context.isDriverActive(driverTimeout) { } + if (!isRunning) { + // NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the + // same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). + // since we are forcing a unique directory, we should ALSO delete it when we are done! + context.dirDeleteOnShutdown() + } } - if (!isRunning) { - // NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the - // same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). - // since we are forcing a unique directory, we should ALSO delete it when we are done! - context.dirDeleteOnShutdown() - } + logger.info { "Aeron directory: '${context.aeronDirectory()}'" } + + context } - - logger.info { "Aeron directory: '${context.aeronDirectory()}'" } - - return context } /** - * If the driver is not already running, this will start the driver + * If the driver is not already running, this will start the driver. This will ALSO connect to the aeron client * * @throws Exception if there is a problem starting the media driver */ fun start() { - // Note: A mutex doesn't work so well. - lock.write { - if (closeRequested.value) { - logger.debug("Resetting media driver context") + if (closeRequested) { + logger.debug("Resetting media driver context") - // close was requested previously. we have to reset a few things - context = createContext() + // close was requested previously. we have to reset a few things + context = createContext() - // we want to be able to "restart" aeron, as necessary, after a [close] method call - closeRequested.value = false - } - - restart() + // we want to be able to "restart" aeron, as necessary, after a [close] method call + closeRequested = false } + + maybeRestart() } /** * if we did NOT close, then will restart the media driver + aeron. If we manually closed aeron, then we won't try to restart */ - private fun restart() { - if (closeRequested.value) { + private fun maybeRestart() { + if (closeRequested) { return } - val didStartDriver = startDriver() - startAeron(didStartDriver) - } + var loadedDriver = false - /** - * @return true if the media driver was started, false if it was not started - */ - private fun startDriver(): Boolean { - if (closeRequested.value) { - return false - } + lock.write { + if (mediaDriver == null) { + // only start if we didn't already start... There will be several checks. - if (mediaDriver == null) { - // only start if we didn't already start... There will be several checks. - - var running = isRunning() - if (running) { - // wait for a bit, because we are running, but we ALSO issued a START, and expect it to start. - // SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to - // that instance - logger.debug("Aeron Media driver already running. Double checking status...") - sleep(driverTimeout/2) - running = isRunning() - } - - if (!running) { - logger.debug("Starting Aeron Media driver.") - - // try to start. If we start/stop too quickly, it's a problem - var count = 10 - while (count-- > 0) { - try { - mediaDriver = MediaDriver.launch(context) - return true - } catch (e: Exception) { - logger.warn(e) { "Unable to start the Aeron Media driver at ${driverDirectory}. Retrying $count more times..." } - sleep(driverTimeout) - } + var running = isRunning() + if (running) { + // wait for a bit, because we are running, but we ALSO issued a START, and expect it to start. + // SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to + // that instance + logger.debug("Aeron Media driver already running. Double checking status...") + sleep(driverTimeout/2) + running = isRunning() + } + + if (!running) { + logger.debug("Starting Aeron Media driver.") + + // try to start. If we start/stop too quickly, it's a problem + var count = 10 + while (count-- > 0) { + try { + mediaDriver = MediaDriver.launch(context) + loadedDriver = true + break + } catch (e: Exception) { + logger.warn(e) { "Unable to start the Aeron Media driver at ${driverDirectory}. Retrying $count more times..." } + sleep(driverTimeout) + } + } + } else { + logger.debug("Not starting Aeron Media driver. It was already running.") } - } else { - logger.debug("Not starting Aeron Media driver. It was already running.") } } - return false - } + if (loadedDriver || aeron == null) { + if (closeRequested) { + return + } - private fun startAeron(didStartDriver: Boolean) { - if (closeRequested.value) { - return - } + // the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to + // aeron (which connects to the other media driver process), especially if we haven't already connected to + // it (or if there was an error connecting because a different media driver was shutting down) - // the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to - // aeron (which connects to the other media driver process), especially if we haven't already connected to - // it (or if there was an error connecting because a different media driver was shutting down) - - if (didStartDriver || aeron == null) { aeron?.close() // this might succeed if we can connect to the media driver aeron = Aeron.connect(setupAeron()) + logger.debug("Connected to Aeron driver.") } + + return } private fun setupAeron(): Aeron.Context { @@ -474,14 +519,7 @@ class AeronDriver( // configuring pub/sub to aeron is LINEAR -- and it happens in 2 places. // 1) starting up the client/server - // 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn) - - // try to start/restart aeron - try { - restart() - } catch (e2: Exception) { - // ignored - } + // 2) creating a new client-server connection pair (the media driver won't be "dead" at this point) } } @@ -523,14 +561,7 @@ class AeronDriver( // configuring pub/sub to aeron is LINEAR -- and it happens in 2 places. // 1) starting up the client/server - // 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn) - - // try to start/restart aeron - try { - restart() - } catch (e2: Exception) { - // ignored - } + // 2) creating a new client-server connection pair (the media driver won't be "dead" at this point) } } @@ -556,7 +587,7 @@ class AeronDriver( */ @Synchronized fun close() { - closeRequested.value = true + closeRequested = true try { aeron?.close() @@ -621,4 +652,25 @@ class AeronDriver( // This thread group must be empty, indicating that all threads that had been in this thread group have since stopped. threadFactory.group.destroy() } + + /** + * @return the internal counters of the Aeron driver in the current aeron directory + */ + fun driverCounters(counterFunction: (counterId: Int, counterValue: Long, typeId: Int, keyBuffer: DirectBuffer?, label: String?) -> Unit) { + driverCounters(context!!.aeronDirectory(), counterFunction) + } + + /** + * @return the internal heartbeat of the Aeron driver in the current aeron directory + */ + fun driverHeartbeatMs(): Long { + return driverHeartbeatMs(context!!.aeronDirectory()) + } + + /** + * @return the internal version of the Aeron driver in the current aeron directory + */ + fun driverVersion(): String { + return driverVersion(context!!.aeronDirectory()) + } } diff --git a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt index b80e8481..2f7aebe7 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt @@ -21,6 +21,8 @@ import dorkbox.network.connection.ListenerManager import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientTimedOutException import io.aeron.ChannelUriStringBuilder +import io.aeron.Publication +import io.aeron.Subscription import kotlinx.coroutines.delay import mu.KLogger import java.net.Inet4Address @@ -38,7 +40,13 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, sessionId: Int, connectionTimeoutSec: Int = 0, isReliable: Boolean = true) : - UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { + MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { + + @Volatile + private var tempSubscription: Subscription? = null + + @Volatile + private var tempPublication: Publication? = null val addressString: String by lazy { IP.toString(address) @@ -62,6 +70,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, } + @Suppress("DuplicatedCode") override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) { val aeronAddressString = if (address is Inet4Address) { @@ -76,32 +85,37 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, } } - - // Create a publication at the given address and port, using the given stream ID. - // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. - val publicationUri = uri() - .endpoint("$aeronAddressString:$publicationPort") - - // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. - val subscriptionUri = uri() - .controlEndpoint("$aeronAddressString:$subscriptionPort") - .controlMode("dynamic") - - - - if (logger.isTraceEnabled) { - logger.trace("client pub URI: $ip ${publicationUri.build()}") - logger.trace("client sub URI: $ip ${subscriptionUri.build()}") - } - var success = false // NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe // publication of any state to other threads and not be long running or re-entrant with the client. // on close, the publication CAN linger (in case a client goes away, and then comes back) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) - val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId) + + if (tempPublication == null) { + // Create a publication at the given address and port, using the given stream ID. + // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. + val publicationUri = uri() + .endpoint("$aeronAddressString:$publicationPort") + + logger.trace("client pub URI: $ip ${publicationUri.build()}") + + tempPublication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) + } + + if (tempSubscription == null) { + // Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID. + val subscriptionUri = uri() + .controlEndpoint("$aeronAddressString:$subscriptionPort") + .controlMode("dynamic") + + logger.trace("client sub URI: $ip ${subscriptionUri.build()}") + + tempSubscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId) + } + + val publication = tempPublication!! + val subscription = tempSubscription!! // this will wait for the server to acknowledge the connection (all via aeron) @@ -119,8 +133,8 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, if (!success) { subscription.close() - val ex = ClientTimedOutException("Cannot create subscription: $ip ${subscriptionUri.build()} in ${timoutInNanos}ms") - ListenerManager.cleanStackTraceInternal(ex) + val ex = ClientTimedOutException("Cannot create subscription to $ip in $connectionTimeoutSec seconds") + ListenerManager.cleanAllStackTrace(ex) throw ex } @@ -142,14 +156,17 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, subscription.close() publication.close() - val ex = ClientTimedOutException("Cannot create publication: $ip ${publicationUri.build()} in ${timoutInNanos}ms") - ListenerManager.cleanStackTrace(ex) + val ex = ClientTimedOutException("Cannot create publication to $ip in $connectionTimeoutSec seconds") + ListenerManager.cleanAllStackTrace(ex) throw ex } this.success = true - this.subscription = subscription this.publication = publication + this.subscription = subscription + + this.tempPublication = null + this.tempSubscription = null } override val clientInfo: String by lazy { @@ -161,11 +178,11 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, } override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { - throw ClientException("Server info not implemented in Client MDC") + throw ClientException("Server info not implemented in Client MediaDriver Connection") } override val serverInfo: String get() { - throw ClientException("Server info not implemented in Client MDC") + throw ClientException("Server info not implemented in Client MediaDriver Connection") } override fun toString(): String { diff --git a/src/dorkbox/network/aeron/UdpMediaDriverConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverConnection.kt deleted file mode 100644 index f9ae8df0..00000000 --- a/src/dorkbox/network/aeron/UdpMediaDriverConnection.kt +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2021 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.aeron - -abstract class UdpMediaDriverConnection(publicationPort: Int, subscriptionPort: Int, - streamId: Int, sessionId: Int, - connectionTimeoutSec: Int, isReliable: Boolean) : - MediaDriverConnection(publicationPort, subscriptionPort, - streamId, sessionId, - connectionTimeoutSec, isReliable) { -} diff --git a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt index 7d112dec..3f266b85 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt @@ -36,7 +36,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres sessionId: Int, connectionTimeoutSec: Int, isReliable: Boolean = true) : - UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { + MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) { private fun aeronConnectionString(ipAddress: InetAddress): String { return if (ipAddress is Inet4Address) { @@ -63,7 +63,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres @Suppress("DuplicatedCode") override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) { - throw ServerException("Client info not implemented in Server MDC") + throw ServerException("Client info not implemented in Server MediaDriver Connection") } override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { @@ -104,7 +104,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres override val clientInfo: String get() { - throw ServerException("Client info not implemented in Server MDC") + throw ServerException("Client info not implemented in Server MediaDriver Connection") } override val serverInfo: String by lazy { diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 3af3fc2b..b343f781 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -17,7 +17,6 @@ package dorkbox.network.connection import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverClientConnection -import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverPairedConnection import dorkbox.network.handshake.ConnectionCounts import dorkbox.network.handshake.RandomIdAllocator @@ -78,7 +77,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { /** * @return true if this connection is a network connection */ - val isNetwork = connectionParameters.mediaDriverConnection is UdpMediaDriverConnection + val isNetwork = !isIpc /** * the endpoint associated with this connection