UDP MediaDriver does not recreate driver connections during handshake. Re-organized how aeron driver connections are established

This commit is contained in:
Robinson 2022-06-24 13:11:39 +02:00
parent cb1cd0cec9
commit 3ede82e43f
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 225 additions and 182 deletions

View File

@ -6,14 +6,19 @@ import dorkbox.network.exceptions.AeronDriverException
import dorkbox.util.NamedThreadFactory
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import io.aeron.CncFileDescriptor
import io.aeron.Publication
import io.aeron.Subscription
import io.aeron.driver.MediaDriver
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.atomicfu.atomic
import io.aeron.samples.SamplesUtil
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.SemanticVersion
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor
import org.agrona.concurrent.status.CountersReader
import org.slf4j.LoggerFactory
import java.io.File
import java.lang.Thread.sleep
@ -187,9 +192,56 @@ class AeronDriver(
return mediaDriverContext
}
/**
* @return the internal counters of the Aeron driver in the specified aeron directory
*/
fun driverCounters(aeronLocation: File, counterFunction: (counterId: Int, counterValue: Long, typeId: Int, keyBuffer: DirectBuffer?, label: String?) -> Unit) {
val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat"))
val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer)
val countersReader = CountersReader(
CncFileDescriptor.createCountersMetaDataBuffer(cncByteBuffer, cncMetaDataBuffer),
CncFileDescriptor.createCountersValuesBuffer(cncByteBuffer, cncMetaDataBuffer)
)
countersReader.forEach { counterId: Int, typeId: Int, keyBuffer: DirectBuffer?, label: String? ->
val counterValue = countersReader.getCounterValue(counterId)
counterFunction(counterId, counterValue, typeId, keyBuffer, label)
}
}
/**
* @return the internal heartbeat of the Aeron driver in the specified aeron directory
*/
fun driverHeartbeatMs(aeronLocation: File): Long {
val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat"))
val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer)
val toDriverBuffer = CncFileDescriptor.createToDriverBuffer(cncByteBuffer, cncMetaDataBuffer);
val timestampOffset = toDriverBuffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH + RingBufferDescriptor.CONSUMER_HEARTBEAT_OFFSET
return toDriverBuffer.getLongVolatile(timestampOffset)
}
/**
* @return the internal version of the Aeron driver in the specified aeron directory
*/
fun driverVersion(aeronLocation: File): String {
val cncByteBuffer = SamplesUtil.mapExistingFileReadOnly(aeronLocation.resolve("cnc.dat"))
val cncMetaDataBuffer: DirectBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer)
val cncVersion = cncMetaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0))
val cncSemanticVersion = SemanticVersion.toString(cncVersion);
return cncSemanticVersion
}
}
private val closeRequested = atomic(false)
@Volatile
private var closeRequested = false
private var aeron: Aeron? = null
@ -240,6 +292,7 @@ class AeronDriver(
}
}
context = createContext()
mediaDriverWasAlreadyRunning = isRunning()
}
@ -251,25 +304,27 @@ class AeronDriver(
* @throws IllegalArgumentException if the aeron media driver directory cannot be setup
*/
private fun createContext(): MediaDriver.Context {
if (threadFactory.group.isDestroyed) {
// on close, we destroy the threadfactory -- and on driver restart, we might want it back
threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
}
// Note: A mutex doesn't work so well
return lock.write {
if (threadFactory.group.isDestroyed) {
// on close, we destroy the threadfactory -- and on driver restart, we might want it back
threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
}
var context = create(config, threadFactory, aeronErrorHandler)
var context = create(config, threadFactory, aeronErrorHandler)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
// will setup the aeron directory or throw IllegalArgumentException if it cannot be configured
val aeronDir = context.aeronDirectory()
// will setup the aeron directory or throw IllegalArgumentException if it cannot be configured
val aeronDir = context.aeronDirectory()
val driverTimeout = context.driverTimeoutMs()
val driverTimeout = context.driverTimeoutMs()
// sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example)
// we ONLY do this during the initial startup check because it will delete the directory, and we don't
// always want to do this.
var isRunning = try {
// sometimes when starting up, if a PREVIOUS run was corrupted (during startup, for example)
// we ONLY do this during the initial startup check because it will delete the directory, and we don't
// always want to do this.
var isRunning = try {
context.isDriverActive(driverTimeout) { }
} catch (e: DriverTimeoutException) {
// we have to delete the directory, since it was corrupted, and we try again.
@ -281,134 +336,124 @@ class AeronDriver(
}
}
// this is incompatible with IPC, and will not be set if IPC is enabled
if (config.aeronDirectoryForceUnique && isRunning) {
val savedParent = aeronDir.parentFile
var retry = 0
val retryMax = 100
// this is incompatible with IPC, and will not be set if IPC is enabled
if (config.aeronDirectoryForceUnique && isRunning) {
val savedParent = aeronDir.parentFile
var retry = 0
val retryMax = 100
while (config.aeronDirectoryForceUnique && isRunning) {
if (retry++ > retryMax) {
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
while (config.aeronDirectoryForceUnique && isRunning) {
if (retry++ > retryMax) {
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
}
val randomNum = (1..retryMax).shuffled().first()
val newDir = savedParent.resolve("${aeronDir.name}_$randomNum")
context = create(config, threadFactory, aeronErrorHandler)
context.aeronDirectoryName(newDir.path)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
isRunning = context.isDriverActive(driverTimeout) { }
}
val randomNum = (1..retryMax).shuffled().first()
val newDir = savedParent.resolve("${aeronDir.name}_$randomNum")
context = create(config, threadFactory, aeronErrorHandler)
context.aeronDirectoryName(newDir.path)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
isRunning = context.isDriverActive(driverTimeout) { }
if (!isRunning) {
// NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
// same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
// since we are forcing a unique directory, we should ALSO delete it when we are done!
context.dirDeleteOnShutdown()
}
}
if (!isRunning) {
// NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
// same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
// since we are forcing a unique directory, we should ALSO delete it when we are done!
context.dirDeleteOnShutdown()
}
logger.info { "Aeron directory: '${context.aeronDirectory()}'" }
context
}
logger.info { "Aeron directory: '${context.aeronDirectory()}'" }
return context
}
/**
* If the driver is not already running, this will start the driver
* If the driver is not already running, this will start the driver. This will ALSO connect to the aeron client
*
* @throws Exception if there is a problem starting the media driver
*/
fun start() {
// Note: A mutex doesn't work so well.
lock.write {
if (closeRequested.value) {
logger.debug("Resetting media driver context")
if (closeRequested) {
logger.debug("Resetting media driver context")
// close was requested previously. we have to reset a few things
context = createContext()
// close was requested previously. we have to reset a few things
context = createContext()
// we want to be able to "restart" aeron, as necessary, after a [close] method call
closeRequested.value = false
}
restart()
// we want to be able to "restart" aeron, as necessary, after a [close] method call
closeRequested = false
}
maybeRestart()
}
/**
* if we did NOT close, then will restart the media driver + aeron. If we manually closed aeron, then we won't try to restart
*/
private fun restart() {
if (closeRequested.value) {
private fun maybeRestart() {
if (closeRequested) {
return
}
val didStartDriver = startDriver()
startAeron(didStartDriver)
}
var loadedDriver = false
/**
* @return true if the media driver was started, false if it was not started
*/
private fun startDriver(): Boolean {
if (closeRequested.value) {
return false
}
lock.write {
if (mediaDriver == null) {
// only start if we didn't already start... There will be several checks.
if (mediaDriver == null) {
// only start if we didn't already start... There will be several checks.
var running = isRunning()
if (running) {
// wait for a bit, because we are running, but we ALSO issued a START, and expect it to start.
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
// that instance
logger.debug("Aeron Media driver already running. Double checking status...")
sleep(driverTimeout/2)
running = isRunning()
}
if (!running) {
logger.debug("Starting Aeron Media driver.")
// try to start. If we start/stop too quickly, it's a problem
var count = 10
while (count-- > 0) {
try {
mediaDriver = MediaDriver.launch(context)
return true
} catch (e: Exception) {
logger.warn(e) { "Unable to start the Aeron Media driver at ${driverDirectory}. Retrying $count more times..." }
sleep(driverTimeout)
}
var running = isRunning()
if (running) {
// wait for a bit, because we are running, but we ALSO issued a START, and expect it to start.
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
// that instance
logger.debug("Aeron Media driver already running. Double checking status...")
sleep(driverTimeout/2)
running = isRunning()
}
if (!running) {
logger.debug("Starting Aeron Media driver.")
// try to start. If we start/stop too quickly, it's a problem
var count = 10
while (count-- > 0) {
try {
mediaDriver = MediaDriver.launch(context)
loadedDriver = true
break
} catch (e: Exception) {
logger.warn(e) { "Unable to start the Aeron Media driver at ${driverDirectory}. Retrying $count more times..." }
sleep(driverTimeout)
}
}
} else {
logger.debug("Not starting Aeron Media driver. It was already running.")
}
} else {
logger.debug("Not starting Aeron Media driver. It was already running.")
}
}
return false
}
if (loadedDriver || aeron == null) {
if (closeRequested) {
return
}
private fun startAeron(didStartDriver: Boolean) {
if (closeRequested.value) {
return
}
// the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to
// aeron (which connects to the other media driver process), especially if we haven't already connected to
// it (or if there was an error connecting because a different media driver was shutting down)
// the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to
// aeron (which connects to the other media driver process), especially if we haven't already connected to
// it (or if there was an error connecting because a different media driver was shutting down)
if (didStartDriver || aeron == null) {
aeron?.close()
// this might succeed if we can connect to the media driver
aeron = Aeron.connect(setupAeron())
logger.debug("Connected to Aeron driver.")
}
return
}
private fun setupAeron(): Aeron.Context {
@ -474,14 +519,7 @@ class AeronDriver(
// configuring pub/sub to aeron is LINEAR -- and it happens in 2 places.
// 1) starting up the client/server
// 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn)
// try to start/restart aeron
try {
restart()
} catch (e2: Exception) {
// ignored
}
// 2) creating a new client-server connection pair (the media driver won't be "dead" at this point)
}
}
@ -523,14 +561,7 @@ class AeronDriver(
// configuring pub/sub to aeron is LINEAR -- and it happens in 2 places.
// 1) starting up the client/server
// 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn)
// try to start/restart aeron
try {
restart()
} catch (e2: Exception) {
// ignored
}
// 2) creating a new client-server connection pair (the media driver won't be "dead" at this point)
}
}
@ -556,7 +587,7 @@ class AeronDriver(
*/
@Synchronized
fun close() {
closeRequested.value = true
closeRequested = true
try {
aeron?.close()
@ -621,4 +652,25 @@ class AeronDriver(
// This thread group must be empty, indicating that all threads that had been in this thread group have since stopped.
threadFactory.group.destroy()
}
/**
* @return the internal counters of the Aeron driver in the current aeron directory
*/
fun driverCounters(counterFunction: (counterId: Int, counterValue: Long, typeId: Int, keyBuffer: DirectBuffer?, label: String?) -> Unit) {
driverCounters(context!!.aeronDirectory(), counterFunction)
}
/**
* @return the internal heartbeat of the Aeron driver in the current aeron directory
*/
fun driverHeartbeatMs(): Long {
return driverHeartbeatMs(context!!.aeronDirectory())
}
/**
* @return the internal version of the Aeron driver in the current aeron directory
*/
fun driverVersion(): String {
return driverVersion(context!!.aeronDirectory())
}
}

View File

@ -21,6 +21,8 @@ import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder
import io.aeron.Publication
import io.aeron.Subscription
import kotlinx.coroutines.delay
import mu.KLogger
import java.net.Inet4Address
@ -38,7 +40,13 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
sessionId: Int,
connectionTimeoutSec: Int = 0,
isReliable: Boolean = true) :
UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
@Volatile
private var tempSubscription: Subscription? = null
@Volatile
private var tempPublication: Publication? = null
val addressString: String by lazy {
IP.toString(address)
@ -62,6 +70,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
}
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
val aeronAddressString = if (address is Inet4Address) {
@ -76,32 +85,37 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
}
}
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
.endpoint("$aeronAddressString:$publicationPort")
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.controlEndpoint("$aeronAddressString:$subscriptionPort")
.controlMode("dynamic")
if (logger.isTraceEnabled) {
logger.trace("client pub URI: $ip ${publicationUri.build()}")
logger.trace("client sub URI: $ip ${subscriptionUri.build()}")
}
var success = false
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId)
if (tempPublication == null) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
.endpoint("$aeronAddressString:$publicationPort")
logger.trace("client pub URI: $ip ${publicationUri.build()}")
tempPublication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
}
if (tempSubscription == null) {
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.controlEndpoint("$aeronAddressString:$subscriptionPort")
.controlMode("dynamic")
logger.trace("client sub URI: $ip ${subscriptionUri.build()}")
tempSubscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId)
}
val publication = tempPublication!!
val subscription = tempSubscription!!
// this will wait for the server to acknowledge the connection (all via aeron)
@ -119,8 +133,8 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
if (!success) {
subscription.close()
val ex = ClientTimedOutException("Cannot create subscription: $ip ${subscriptionUri.build()} in ${timoutInNanos}ms")
ListenerManager.cleanStackTraceInternal(ex)
val ex = ClientTimedOutException("Cannot create subscription to $ip in $connectionTimeoutSec seconds")
ListenerManager.cleanAllStackTrace(ex)
throw ex
}
@ -142,14 +156,17 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
subscription.close()
publication.close()
val ex = ClientTimedOutException("Cannot create publication: $ip ${publicationUri.build()} in ${timoutInNanos}ms")
ListenerManager.cleanStackTrace(ex)
val ex = ClientTimedOutException("Cannot create publication to $ip in $connectionTimeoutSec seconds")
ListenerManager.cleanAllStackTrace(ex)
throw ex
}
this.success = true
this.subscription = subscription
this.publication = publication
this.subscription = subscription
this.tempPublication = null
this.tempSubscription = null
}
override val clientInfo: String by lazy {
@ -161,11 +178,11 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
}
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
throw ClientException("Server info not implemented in Client MDC")
throw ClientException("Server info not implemented in Client MediaDriver Connection")
}
override val serverInfo: String
get() {
throw ClientException("Server info not implemented in Client MDC")
throw ClientException("Server info not implemented in Client MediaDriver Connection")
}
override fun toString(): String {

View File

@ -1,25 +0,0 @@
/*
* Copyright 2021 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.aeron
abstract class UdpMediaDriverConnection(publicationPort: Int, subscriptionPort: Int,
streamId: Int, sessionId: Int,
connectionTimeoutSec: Int, isReliable: Boolean) :
MediaDriverConnection(publicationPort, subscriptionPort,
streamId, sessionId,
connectionTimeoutSec, isReliable) {
}

View File

@ -36,7 +36,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
sessionId: Int,
connectionTimeoutSec: Int,
isReliable: Boolean = true) :
UdpMediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
MediaDriverConnection(publicationPort, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
private fun aeronConnectionString(ipAddress: InetAddress): String {
return if (ipAddress is Inet4Address) {
@ -63,7 +63,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
throw ServerException("Client info not implemented in Server MDC")
throw ServerException("Client info not implemented in Server MediaDriver Connection")
}
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
@ -104,7 +104,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
override val clientInfo: String
get() {
throw ServerException("Client info not implemented in Server MDC")
throw ServerException("Client info not implemented in Server MediaDriver Connection")
}
override val serverInfo: String by lazy {

View File

@ -17,7 +17,6 @@ package dorkbox.network.connection
import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverClientConnection
import dorkbox.network.aeron.UdpMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverPairedConnection
import dorkbox.network.handshake.ConnectionCounts
import dorkbox.network.handshake.RandomIdAllocator
@ -78,7 +77,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
/**
* @return true if this connection is a network connection
*/
val isNetwork = connectionParameters.mediaDriverConnection is UdpMediaDriverConnection
val isNetwork = !isIpc
/**
* the endpoint associated with this connection