Fixed "fast" reconnect issues, where sometimes the media driver would be disconnected from the aeron driver

This commit is contained in:
Robinson 2021-04-27 13:57:47 +02:00
parent 9e0bc0da3e
commit f53e705432
14 changed files with 656 additions and 533 deletions

View File

@ -18,7 +18,7 @@ package dorkbox.network
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverClientConnection
import dorkbox.network.connection.*
@ -170,8 +170,8 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) {
// Default IPC ports are flipped because they are in the perspective of the SERVER
connect(remoteAddress = remoteAddress,
ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB,
ipcPublicationId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS = connectionTimeoutMS,
reliable = reliable)
}
@ -188,8 +188,8 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
* @throws ClientRejectedException if the client connection is rejected
*/
@Suppress("DuplicatedCode")
suspend fun connect(ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB,
suspend fun connect(ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS: Long = 30_000L) {
// Default IPC ports are flipped because they are in the perspective of the SERVER
@ -229,8 +229,8 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
@Suppress("DuplicatedCode")
private suspend fun connect(remoteAddress: InetAddress? = null,
// Default IPC ports are flipped because they are in the perspective of the SERVER
ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB,
ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) {
require(connectionTimeoutMS >= 0) { "connectionTimeoutMS '$connectionTimeoutMS' is invalid. It must be >0" }
@ -250,7 +250,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
connection0 = null
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
val aeron = initEndpointState()
initEndpointState()
// only try to connect via IPv4 if we have a network interface that supports it!
if (remoteAddress is Inet4Address && !IPv4.isAvailable) {
@ -271,7 +271,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
var isUsingIPC = false
val canUseIPC = config.enableIpc && remoteAddress == null
val autoChangeToIpc = canUseIPC && config.enableIpcForLoopback &&
remoteAddress != null && remoteAddress.isLoopbackAddress && isRunning(mediaDriverContext)
remoteAddress != null && remoteAddress.isLoopbackAddress && aeronDriver.isRunning()
if (autoChangeToIpc) {
logger.info {"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC" }
}
@ -282,11 +282,11 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// MAYBE the server doesn't have IPC enabled? If no, we need to connect via UDP instead
val ipcConnection = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId,
streamId = ipcPublicationId,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID)
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
try {
ipcConnection.buildClient(aeron, logger)
ipcConnection.buildClient(aeronDriver, logger)
isUsingIPC = true
} catch (e: Exception) {
// if we specified that we want to use IPC, then we have to throw the timeout exception, because there is no IPC
@ -305,13 +305,13 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
address = this.remoteAddress0!!,
publicationPort = config.subscriptionPort,
subscriptionPort = config.publicationPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = connectionTimeoutMS,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
udpConnection.buildClient(aeron, logger)
udpConnection.buildClient(aeronDriver, logger)
udpConnection
}
}
@ -320,13 +320,13 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
address = this.remoteAddress0!!,
publicationPort = config.subscriptionPort,
subscriptionPort = config.publicationPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = connectionTimeoutMS,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
test.buildClient(aeron, logger)
test.buildClient(aeronDriver, logger)
test
}
@ -381,7 +381,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
}
// we have to construct how the connection will communicate!
clientConnection.buildClient(aeron, logger)
clientConnection.buildClient(aeronDriver, logger)
// only the client connects to the server, so here we have to connect. The server (when creating the new "connection" object)
// does not need to do anything
@ -424,6 +424,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
if (!permitConnection) {
handshakeConnection.close()
val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress)} was not permitted!")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
throw exception
}

View File

@ -17,7 +17,7 @@ package dorkbox.network
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.CoroutineBackoffIdleStrategy
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy
@ -66,7 +66,7 @@ class ServerConfiguration : dorkbox.network.Configuration() {
/**
* The IPC Publication ID is used to define what ID the server will send data on. The client IPC subscription ID must match this value.
*/
var ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB
var ipcPublicationId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB
set(value) {
require(context == null) { errorMessage }
field = value
@ -75,7 +75,7 @@ class ServerConfiguration : dorkbox.network.Configuration() {
/**
* The IPC Subscription ID is used to define what ID the server will receive data on. The client IPC publication ID must match this value.
*/
var ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB
var ipcSubscriptionId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB
set(value) {
require(context == null) { errorMessage }
field = value

View File

@ -18,7 +18,7 @@ package dorkbox.network
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverServerConnection
@ -35,7 +35,6 @@ import dorkbox.network.rmi.RemoteObject
import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.rmi.RmiManagerConnections
import dorkbox.network.rmi.TimeoutException
import io.aeron.Aeron
import io.aeron.FragmentAssembler
import io.aeron.Image
import io.aeron.logbuffer.Header
@ -71,12 +70,12 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
*/
fun isRunning(configuration: ServerConfiguration): Boolean {
if (configuration.context == null) {
AeronConfig.createContext(configuration)
AeronDriver.createContext(configuration)
}
require(configuration.context != null) { "Configuration context cannot be properly created. Unable to continue!" }
return AeronConfig.isRunning(configuration.context!!)
return AeronDriver.isRunning(configuration.context!!)
}
init {
@ -160,12 +159,13 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
return super.getRmiConnectionSupport()
}
private suspend fun getIpcPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
private suspend fun getIpcPoller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller {
val poller = if (config.enableIpc) {
val driver = IpcMediaDriverConnection(streamIdSubscription = config.ipcSubscriptionId,
streamId = config.ipcPublicationId,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID)
driver.buildServer(aeron, logger)
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID)
driver.buildServer(aeronDriver, logger)
val publication = driver.publication
val subscription = driver.subscription
@ -193,7 +193,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
publication,
sessionId,
message,
aeron)
aeronDriver)
}
override fun poll(): Int { return subscription.poll(handler, 1) }
@ -213,17 +213,18 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
}
@Suppress("DuplicatedCode")
private suspend fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
private suspend fun getIpv4Poller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller {
val poller = if (canUseIPv4) {
val driver = UdpMediaDriverServerConnection(
listenAddress = listenIPv4Address!!,
publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
driver.buildServer(aeron, logger)
driver.buildServer(aeronDriver, logger)
val publication = driver.publication
val subscription = driver.subscription
@ -274,7 +275,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
clientAddressString,
clientAddress,
message,
aeron,
aeronDriver,
false)
}
@ -295,17 +296,18 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
}
@Suppress("DuplicatedCode")
private suspend fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
private suspend fun getIpv6Poller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller {
val poller = if (canUseIPv6) {
val driver = UdpMediaDriverServerConnection(
listenAddress = listenIPv6Address!!,
publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
driver.buildServer(aeron, logger)
driver.buildServer(aeronDriver, logger)
val publication = driver.publication
val subscription = driver.subscription
@ -356,7 +358,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
clientAddressString,
clientAddress,
message,
aeron,
aeronDriver,
false)
}
@ -377,16 +379,17 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
}
@Suppress("DuplicatedCode")
private suspend fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
private suspend fun getIpv6WildcardPoller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller {
val driver = UdpMediaDriverServerConnection(
listenAddress = listenIPv6Address!!,
publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort,
streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong()))
driver.buildServer(aeron, logger)
driver.buildServer(aeronDriver, logger)
val publication = driver.publication
val subscription = driver.subscription
@ -438,7 +441,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
clientAddressString,
clientAddress,
message,
aeron,
aeronDriver,
true)
}
@ -461,17 +464,16 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
return
}
val aeron = initEndpointState()
initEndpointState()
config as ServerConfiguration
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
bindAlreadyCalled = true
val waiter = SuspendWaiter()
actionDispatch.launch {
val ipcPoller: AeronPoller = getIpcPoller(aeron, config)
val ipcPoller: AeronPoller = getIpcPoller(aeronDriver, config)
// if we are binding to WILDCARD, then we have to do something special if BOTH IPv4 and IPv6 are enabled!
val isWildcard = listenIPv4Address == IPv4.WILDCARD || listenIPv6Address != IPv6.WILDCARD
@ -486,15 +488,15 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
override fun close() {}
override fun serverInfo(): String { return "IPv4 Disabled" }
}
ipv6Poller = getIpv6WildcardPoller(aeron, config)
ipv6Poller = getIpv6WildcardPoller(aeronDriver, config)
} else {
// only 1 will be a real poller
ipv4Poller = getIpv4Poller(aeron, config)
ipv6Poller = getIpv6Poller(aeron, config)
ipv4Poller = getIpv4Poller(aeronDriver, config)
ipv6Poller = getIpv6Poller(aeronDriver, config)
}
} else {
ipv4Poller = getIpv4Poller(aeron, config)
ipv6Poller = getIpv6Poller(aeron, config)
ipv4Poller = getIpv4Poller(aeronDriver, config)
ipv6Poller = getIpv6Poller(aeronDriver, config)
}
waiter.doNotify()

View File

@ -1,314 +0,0 @@
package dorkbox.network.aeron
import dorkbox.network.Configuration
import dorkbox.util.NamedThreadFactory
import io.aeron.driver.MediaDriver
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
import java.io.File
/**
*
*/
object AeronConfig {
/**
* Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW
*/
internal const val RESERVED_SESSION_ID_INVALID = 0
/**
* The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0!
*/
internal const val RESERVED_SESSION_ID_LOW = 1
/**
* The inclusive upper bound of the reserved sessions range.
*/
internal const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE
const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe
const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de
const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS
private fun create(config: Configuration, logger: KLogger): MediaDriver.Context {
/*
* Linux
* Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and
* net.core.wmem_max to allow larger SO_SNDBUF values to be set.
*
* Windows
* Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so.
*
* Mac/Darwin
*
* Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB.
*/
if (config.receiveBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
if (config.sendBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
/*
* Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir).
*
* You can create a RAM disk with the following command:
*
* $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))`
*
* where:
*
* DISK_NAME should be replaced with a name of your choice.
* SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk).
*
* For example, the following command creates a RAM disk named DevShm which is 2GB in size:
*
* $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))`
*
* After this command is executed the new disk will be mounted under /Volumes/DevShm.
*/
if (config.aeronDirectory == null) {
val baseFileLocation = config.suggestAeronLogLocation(logger)
// val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName)
val aeronLogDirectory = File(baseFileLocation, "aeron")
config.aeronDirectory = aeronLogDirectory
}
// LOW-LATENCY SETTINGS
// .termBufferSparseFile(false)
// .useWindowsHighResTimer(true)
// .threadingMode(ThreadingMode.DEDICATED)
// .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE)
// .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE)
// .senderIdleStrategy(NoOpIdleStrategy.INSTANCE);
// setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true");
// setProperty("aeron.mtu.length", "16384");
// setProperty("aeron.socket.so_sndbuf", "2097152");
// setProperty("aeron.socket.so_rcvbuf", "2097152");
// setProperty("aeron.rcv.initial.window.length", "2097152");
// driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind)
val context = MediaDriver.Context()
.publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW)
.publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH)
.threadingMode(config.threadingMode)
.mtuLength(config.networkMtuSize)
.socketSndbufLength(config.sendBufferSize)
.socketRcvbufLength(config.receiveBufferSize)
context.aeronDirectoryName(config.aeronDirectory!!.absolutePath)
if (context.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) {
// default 64 megs each is HUGE
context.ipcTermBufferLength(8 * 1024 * 1024)
}
if (context.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) {
// default 16 megs each is HUGE (we run out of space in production w/ lots of clients)
context.publicationTermBufferLength(2 * 1024 * 1024)
}
// we DO NOT want to abort the JVM if there are errors.
context.errorHandler { error ->
logger.error("Error in Aeron", error)
}
val aeronDir = File(context.aeronDirectoryName()).absoluteFile
context.aeronDirectoryName(aeronDir.path)
return context
}
/**
* Creates the Aeron Media Driver context
*
* @throws IllegalStateException if the configuration has already been used to create a context
* @throws IllegalArgumentException if the aeron media driver directory cannot be setup
*/
fun createContext(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")) {
if (config.context != null) {
logger.warn { "Unable to recreate a context for a configuration that has already been created" }
return
}
var context = create(config, logger)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
// will setup the aeron directory or throw IllegalArgumentException if it cannot be configured
val aeronDir = context.aeronDirectory()
var isRunning = isRunning(context)
// this is incompatible with IPC, and will not be set if IPC is enabled
if (config.aeronDirectoryForceUnique && isRunning) {
val savedParent = aeronDir.parentFile
var retry = 0
val retryMax = 100
while (config.aeronDirectoryForceUnique && isRunning) {
if (retry++ > retryMax) {
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
}
val randomNum = (1..retryMax).shuffled().first()
val newDir = savedParent.resolve("${aeronDir.name}_$randomNum")
context = create(config, logger)
context.aeronDirectoryName(newDir.path)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
isRunning = isRunning(context)
}
if (!isRunning) {
// NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
// same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
// since we are forcing a unique directory, we should ALSO delete it when we are done!
context.dirDeleteOnShutdown()
}
}
logger.info { "Aeron directory: '${context.aeronDirectory()}'" }
// once we do this, we cannot change any of the config values!
config.context = context
}
/**
* Checks to see if an endpoint (using the specified configuration) is running.
*
* @return true if the media driver is active and running
*/
fun isRunning(context: MediaDriver.Context): Boolean {
// if the media driver is running, it will be a quick connection. Usually 100ms or so
return context.isDriverActive(context.driverTimeoutMs()) { }
}
/**
* If the driver is not already running, this will start the driver
*
* @throws Exception if there is a problem starting the media driver
*/
fun startDriver(config: Configuration,
type: Class<*> = AeronConfig::class.java,
logger: KLogger = KotlinLogging.logger("AeronConfig")): MediaDriver? {
config.validate()
if (config.context == null) {
createContext(config, logger)
}
require(config.context != null) { "Configuration context cannot be properly created. Unable to continue!" }
val context = config.context!!
if (!isRunning(context)) {
logger.debug("Starting Aeron Media driver in '${context.aeronDirectory()}'")
var threadFactory: NamedThreadFactory? = null
// try to start. If we start/stop too quickly, it's a problem
var count = 10
while (count-- > 0) {
try {
if (threadFactory == null) {
threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
context
.conductorThreadFactory(threadFactory)
.receiverThreadFactory(threadFactory)
.senderThreadFactory(threadFactory)
.sharedNetworkThreadFactory(threadFactory)
.sharedThreadFactory(threadFactory)
}
return MediaDriver.launch(context)
} catch (e: Exception) {
logger.warn(e) { "Unable to start the Aeron Media driver. Retrying $count more times..." }
runBlocking {
delay(context.driverTimeoutMs())
}
}
}
} else {
logger.debug("Not starting Aeron Media driver. It was already running in '${context.aeronDirectory()}'")
}
return null
}
/**
* A safer way to try to close the media driver
*
* NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
*/
internal suspend fun stopDriver(mediaDriver: MediaDriver?, logger: KLogger = KotlinLogging.logger("AeronConfig")) {
if (mediaDriver == null) {
logger.debug { "No driver started for this instance. Not Stopping." }
return
}
val context = mediaDriver.context()
logger.debug("Stopping driver at '${context.aeronDirectory()}'...")
if (!isRunning(context)) {
// not running
logger.debug { "Driver is not running at '${context.aeronDirectory()}' for this context. Not Stopping." }
return
}
try {
mediaDriver.close()
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
delay(AERON_PUBLICATION_LINGER_TIMEOUT)
// wait for the media driver to actually stop
var count = 10
while (count-- >= 0 && isRunning(context)) {
logger.warn { "Aeron Media driver at '${context.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." }
delay(context.driverTimeoutMs())
}
} catch (e: Exception) {
logger.error("Error closing the media driver at '${context.aeronDirectory()}'", e)
}
(context.sharedThreadFactory() as NamedThreadFactory).group.destroy()
logger.debug { "Closed the media driver at '${context.aeronDirectory()}'" }
}
}

View File

@ -0,0 +1,489 @@
package dorkbox.network.aeron
import dorkbox.network.Configuration
import dorkbox.network.connection.ListenerManager
import dorkbox.util.NamedThreadFactory
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import io.aeron.Publication
import io.aeron.Subscription
import io.aeron.driver.MediaDriver
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
import org.agrona.concurrent.BackoffIdleStrategy
import java.io.File
/**
* Class for managing the Aeron+Media drivers
*/
class AeronDriver(val config: Configuration,
val type: Class<*> = AeronDriver::class.java,
val logger: KLogger = KotlinLogging.logger("AeronConfig")) {
companion object {
/**
* Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW
*/
internal const val RESERVED_SESSION_ID_INVALID = 0
/**
* The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0!
*/
internal const val RESERVED_SESSION_ID_LOW = 1
/**
* The inclusive upper bound of the reserved sessions range.
*/
internal const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE
const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe
const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de
const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS
private fun create(config: Configuration, logger: KLogger): MediaDriver.Context {
/*
* Linux
* Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and
* net.core.wmem_max to allow larger SO_SNDBUF values to be set.
*
* Windows
* Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so.
*
* Mac/Darwin
*
* Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB.
*/
if (config.receiveBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
if (config.sendBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
/*
* Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir).
*
* You can create a RAM disk with the following command:
*
* $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))`
*
* where:
*
* DISK_NAME should be replaced with a name of your choice.
* SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk).
*
* For example, the following command creates a RAM disk named DevShm which is 2GB in size:
*
* $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))`
*
* After this command is executed the new disk will be mounted under /Volumes/DevShm.
*/
if (config.aeronDirectory == null) {
val baseFileLocation = config.suggestAeronLogLocation(logger)
// val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName)
val aeronLogDirectory = File(baseFileLocation, "aeron")
config.aeronDirectory = aeronLogDirectory
}
// LOW-LATENCY SETTINGS
// .termBufferSparseFile(false)
// .useWindowsHighResTimer(true)
// .threadingMode(ThreadingMode.DEDICATED)
// .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE)
// .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE)
// .senderIdleStrategy(NoOpIdleStrategy.INSTANCE);
// setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true");
// setProperty("aeron.mtu.length", "16384");
// setProperty("aeron.socket.so_sndbuf", "2097152");
// setProperty("aeron.socket.so_rcvbuf", "2097152");
// setProperty("aeron.rcv.initial.window.length", "2097152");
// driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind)
val context = MediaDriver.Context()
.publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW)
.publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH)
.threadingMode(config.threadingMode)
.mtuLength(config.networkMtuSize)
.socketSndbufLength(config.sendBufferSize)
.socketRcvbufLength(config.receiveBufferSize)
context.aeronDirectoryName(config.aeronDirectory!!.absolutePath)
if (context.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) {
// default 64 megs each is HUGE
context.ipcTermBufferLength(8 * 1024 * 1024)
}
if (context.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) {
// default 16 megs each is HUGE (we run out of space in production w/ lots of clients)
context.publicationTermBufferLength(2 * 1024 * 1024)
}
// we DO NOT want to abort the JVM if there are errors.
context.errorHandler { error ->
logger.error("Error in Aeron", error)
}
val aeronDir = File(context.aeronDirectoryName()).absoluteFile
context.aeronDirectoryName(aeronDir.path)
return context
}
/**
* Creates the Aeron Media Driver context
*
* @throws IllegalStateException if the configuration has already been used to create a context
* @throws IllegalArgumentException if the aeron media driver directory cannot be setup
*/
fun createContext(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")) {
if (config.context != null) {
logger.warn { "Unable to recreate a context for a configuration that has already been created" }
return
}
var context = create(config, logger)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
// will setup the aeron directory or throw IllegalArgumentException if it cannot be configured
val aeronDir = context.aeronDirectory()
var isRunning = isRunning(context)
// this is incompatible with IPC, and will not be set if IPC is enabled
if (config.aeronDirectoryForceUnique && isRunning) {
val savedParent = aeronDir.parentFile
var retry = 0
val retryMax = 100
while (config.aeronDirectoryForceUnique && isRunning) {
if (retry++ > retryMax) {
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
}
val randomNum = (1..retryMax).shuffled().first()
val newDir = savedParent.resolve("${aeronDir.name}_$randomNum")
context = create(config, logger)
context.aeronDirectoryName(newDir.path)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
isRunning = isRunning(context)
}
if (!isRunning) {
// NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
// same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
// since we are forcing a unique directory, we should ALSO delete it when we are done!
context.dirDeleteOnShutdown()
}
}
logger.info { "Aeron directory: '${context.aeronDirectory()}'" }
// once we do this, we cannot change any of the config values!
config.context = context
}
/**
* Checks to see if an endpoint (using the specified configuration) is running.
*
* @return true if the media driver is active and running
*/
fun isRunning(context: MediaDriver.Context, timeout: Long = context.driverTimeoutMs()): Boolean {
// if the media driver is running, it will be a quick connection. Usually 100ms or so
return context.isDriverActive(context.driverTimeoutMs()) { }
}
/**
* validates and creates the configuration. This can only happen once!
*/
fun validateConfig(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")) {
config.validate()
if (config.context == null) {
createContext(config, logger)
}
require(config.context != null) { "Configuration context cannot be properly created. Unable to continue!" }
}
}
private var aeron: Aeron? = null
@Volatile
private var mediaDriver: MediaDriver? = null
// the context is validated before the AeronDriver object is created
private val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true)
private val mediaDriverContext = config.context!!
init {
mediaDriverContext
.conductorThreadFactory(threadFactory)
.receiverThreadFactory(threadFactory)
.senderThreadFactory(threadFactory)
.sharedNetworkThreadFactory(threadFactory)
.sharedThreadFactory(threadFactory)
}
private fun setupAeron(): Aeron.Context {
val aeronDriverContext = Aeron.Context()
aeronDriverContext
.aeronDirectoryName(mediaDriverContext.aeronDirectory().path)
.concludeAeronDirectory()
aeronDriverContext
.threadFactory(threadFactory)
.idleStrategy(BackoffIdleStrategy())
// we DO NOT want to abort the JVM if there are errors.
// this replaces the default handler with one that doesn't abort the JVM
aeronDriverContext.errorHandler { error ->
ListenerManager.cleanStackTrace(error)
logger.error("Error in Aeron", error)
}
return aeronDriverContext
}
private fun startDriver(): Boolean {
if (mediaDriver == null) {
// only start if we didn't already start... There will be several checks.
if (!isRunning(mediaDriverContext)) {
logger.debug("Starting Aeron Media driver in '${mediaDriverContext.aeronDirectory()}'")
// try to start. If we start/stop too quickly, it's a problem
var count = 10
while (count-- > 0) {
try {
mediaDriver = MediaDriver.launch(mediaDriverContext)
return true
} catch (e: Exception) {
logger.warn(e) { "Unable to start the Aeron Media driver. Retrying $count more times..." }
runBlocking {
delay(mediaDriverContext.driverTimeoutMs())
}
}
}
} else {
logger.debug("Not starting Aeron Media driver. It was already running in '${mediaDriverContext.aeronDirectory()}'")
}
}
return false
}
private fun startAeron(didStartDriver: Boolean) {
// the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to
// aeron (which connects to the other media driver process), especially if we haven't already connected to
// it (or if there was an error connecting because a different media driver was shutting down)
if (didStartDriver || aeron == null) {
aeron?.close()
// this might succeed if we can connect to the media driver
aeron = Aeron.connect(setupAeron())
}
}
/**
* If the driver is not already running, this will start the driver
*
* @throws Exception if there is a problem starting the media driver
*/
fun start() {
val didStartDriver = startDriver()
startAeron(didStartDriver)
}
/**
* @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!)
*/
fun getMediaDriverPublicationFile(publicationRegId: Long): File {
return mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publicationRegId}.logbuffer")
}
suspend fun addPublicationWithRetry(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication {
val uri = publicationUri.build()
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
var count = 10
var exception: Exception? = null
while (count-- > 0) {
try {
// if aeron is null, an exception is thrown
return aeron!!.addPublication(uri, streamId)
} catch (e: Exception) {
// NOTE: this error will be logged in the `aeronDriverContext` logger
exception = e
logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." }
if (e is DriverTimeoutException) {
delay(mediaDriverContext.driverTimeoutMs())
}
// reasons we cannot add a pub/sub to aeron
// 1) the driver was closed
// 2) aeron was unable to connect to the driver
// 3) the address already in use
// configuring pub/sub to aeron is LINEAR -- and it happens in 2 places.
// 1) starting up the client/server
// 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn)
// try to start/restart aeron
try {
start()
} catch (e2: Exception) {
e2.printStackTrace()
}
}
}
throw exception!!
}
suspend fun addSubscriptionWithRetry(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription {
val uri = subscriptionUri.build()
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
var count = 10
var exception: Exception? = null
while (count-- > 0) {
try {
val aeron = aeron
if (aeron != null) {
return aeron.addSubscription(uri, streamId)
}
} catch (e: Exception) {
// NOTE: this error will be logged in the `aeronDriverContext` logger
exception = e
logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." }
if (e is DriverTimeoutException) {
delay(mediaDriverContext.driverTimeoutMs())
}
// reasons we cannot add a pub/sub to aeron
// 1) the driver was closed
// 2) aeron was unable to connect to the driver
// 3) the address already in use
// configuring pub/sub to aeron is LINEAR -- and it happens in 2 places.
// 1) starting up the client/server
// 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn)
// try to start/restart aeron
try {
start()
} catch (e2: Exception) {
e2.printStackTrace()
}
}
}
throw exception!!
}
/**
* Checks to see if an endpoint (using the specified configuration) is running.
*
* @return true if the media driver is active and running
*/
fun isRunning(timeout: Long = mediaDriverContext.driverTimeoutMs()): Boolean {
// if the media driver is running, it will be a quick connection. Usually 100ms or so
return mediaDriverContext.isDriverActive(timeout) { }
}
/**
* A safer way to try to close the media driver
*
* NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
*/
suspend fun close() {
try {
aeron?.close()
} catch (e: Exception) {
logger.error("Error stopping aeron.", e)
}
val mediaDriver = mediaDriver
if (mediaDriver == null) {
logger.debug { "No driver started for this instance. Not Stopping." }
return
}
logger.debug("Stopping driver at '${mediaDriverContext.aeronDirectory()}'...")
if (!isRunning(mediaDriverContext)) {
// not running
logger.debug { "Driver is not running at '${mediaDriverContext.aeronDirectory()}' for this context. Not Stopping." }
return
}
try {
mediaDriver.close()
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
delay(AERON_PUBLICATION_LINGER_TIMEOUT)
// wait for the media driver to actually stop
var count = 10
while (count-- >= 0 && isRunning(mediaDriverContext)) {
logger.warn { "Aeron Media driver at '${mediaDriverContext.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." }
delay(mediaDriverContext.driverTimeoutMs())
}
} catch (e: Exception) {
logger.error("Error closing the media driver at '${mediaDriverContext.aeronDirectory()}'", e)
}
// Destroys this thread group and all of its subgroups.
// This thread group must be empty, indicating that all threads that had been in this thread group have since stopped.
threadFactory.group.destroy()
logger.debug { "Closed the media driver at '${mediaDriverContext.aeronDirectory()}'" }
}
}

View File

@ -17,7 +17,6 @@
package dorkbox.network.aeron
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
@ -26,7 +25,7 @@ import mu.KLogger
* For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER
* NOTE: IPC connection will ALWAYS have a timeout of 1 second to connect. This is IPC, it should connect fast
*/
internal class IpcMediaDriverConnection(streamId: Int,
internal open class IpcMediaDriverConnection(streamId: Int,
val streamIdSubscription: Int,
sessionId: Int,
) :
@ -36,7 +35,7 @@ internal class IpcMediaDriverConnection(streamId: Int,
private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().media("ipc")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
@ -48,7 +47,7 @@ internal class IpcMediaDriverConnection(streamId: Int,
*
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
@ -66,8 +65,8 @@ internal class IpcMediaDriverConnection(streamId: Int,
// publication of any state to other threads and not be long running or re-entrant with the client.
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription)
var success = false
@ -117,7 +116,7 @@ internal class IpcMediaDriverConnection(streamId: Int,
*
* serverAddress is ignored for IPC
*/
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
@ -138,12 +137,12 @@ internal class IpcMediaDriverConnection(streamId: Int,
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times.
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger)
publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription)
}
override fun clientInfo() : String {
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC connection established to [$streamIdSubscription|$streamId]"
} else {
"Connecting handshake to IPC [$streamIdSubscription|$streamId]"
@ -151,7 +150,7 @@ internal class IpcMediaDriverConnection(streamId: Int,
}
override fun serverInfo() : String {
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC listening on [$streamIdSubscription|$streamId] "
} else {
"Listening handshake on IPC [$streamIdSubscription|$streamId]"

View File

@ -18,57 +18,22 @@
package dorkbox.network.aeron
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Aeron
import io.aeron.Publication
import io.aeron.Subscription
import kotlinx.coroutines.delay
import mu.KLogger
abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionPort: Int,
val streamId: Int, val sessionId: Int,
val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable {
abstract class MediaDriverConnection(
val publicationPort: Int, val subscriptionPort: Int,
val streamId: Int, val sessionId: Int,
val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable {
lateinit var subscription: Subscription
lateinit var publication: Publication
suspend fun addSubscriptionWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Subscription {
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
var count = 10
var exception: Exception? = null
while (count-- > 0) {
try {
return aeron.addSubscription(uri, streamId)
} catch (e: Exception) {
exception = e
logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." }
delay(5000)
}
}
throw exception!!
}
suspend fun addPublicationWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Publication {
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
var count = 10
var exception: Exception? = null
while (count-- > 0) {
try {
return aeron.addPublication(uri, streamId)
} catch (e: Exception) {
exception = e
logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." }
delay(5_000)
}
}
throw exception!!
}
@Throws(ClientTimedOutException::class)
abstract suspend fun buildClient(aeron: Aeron, logger: KLogger)
abstract suspend fun buildServer(aeron: Aeron, logger: KLogger)
abstract suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger)
abstract suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false)
abstract fun clientInfo() : String
abstract fun serverInfo() : String

View File

@ -19,7 +19,6 @@ package dorkbox.network.aeron
import dorkbox.netUtil.IP
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
@ -62,7 +61,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
@ -70,7 +69,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
}
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
val aeronAddressString = aeronConnectionString(address)
// Create a publication at the given address and port, using the given stream ID.
@ -99,8 +98,8 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId)
var success = false
@ -150,14 +149,14 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
override fun clientInfo(): String {
address
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
}
}
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
throw ClientException("Server info not implemented in Client MDC")
}
override fun serverInfo(): String {

View File

@ -20,7 +20,6 @@ import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.exceptions.ServerException
import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder
import mu.KLogger
import java.net.Inet4Address
@ -41,7 +40,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
var success: Boolean = false
private fun aeronConnectionString(ipAddress: InetAddress): String {
protected fun aeronConnectionString(ipAddress: InetAddress): String {
return if (ipAddress is Inet4Address) {
ipAddress.hostAddress
} else {
@ -55,9 +54,9 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
}
}
private fun uri(): ChannelUriStringBuilder {
protected fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp")
if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId)
}
@ -65,11 +64,11 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
}
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeron: Aeron, logger: KLogger) {
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
throw ServerException("Client info not implemented in Server MDC")
}
override suspend fun buildServer(aeron: Aeron, logger: KLogger) {
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
val connectionString = aeronConnectionString(listenAddress)
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
@ -100,8 +99,8 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
// If we start/stop too quickly, we might have the address already in use! Retry a few times.
publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger)
subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger)
publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId)
subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId)
}
override fun clientInfo(): String {
@ -119,7 +118,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
IP.toString(listenAddress)
}
return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"

View File

@ -416,7 +416,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
// on close, we want to make sure this file is DELETED!
val logFile = endPoint.getMediaDriverPublicationFile(publication.registrationId())
val logFile = endPoint.aeronDriver.getMediaDriverPublicationFile(publication.registrationId())
publication.close()
closeTimeoutTime = System.currentTimeMillis() + timeOut

View File

@ -19,7 +19,7 @@ import dorkbox.network.Client
import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.exceptions.MessageNotRegisteredException
@ -33,9 +33,7 @@ import dorkbox.network.rmi.messages.RmiMessage
import dorkbox.network.serialization.KryoExtra
import dorkbox.network.serialization.Serialization
import dorkbox.network.storage.SettingsStore
import dorkbox.util.NamedThreadFactory
import dorkbox.util.exceptions.SecurityException
import io.aeron.Aeron
import io.aeron.Publication
import io.aeron.driver.MediaDriver
import io.aeron.logbuffer.Header
@ -47,8 +45,6 @@ import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.concurrent.BackoffIdleStrategy
import java.io.File
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
@ -77,9 +73,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
internal val listenerManager = ListenerManager<CONNECTION>()
internal val connections = ConnectionManager<CONNECTION>()
internal val aeron: Aeron
private var mediaDriver: MediaDriver? = null
internal val mediaDriverContext: MediaDriver.Context
internal val aeronDriver: AeronDriver
/**
* Returns the serialization wrapper if there is an object type that needs to be added outside of the basic types.
@ -134,42 +128,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// Only starts the media driver if we are NOT already running!
try {
mediaDriver = AeronConfig.startDriver(config, type, logger)
AeronDriver.validateConfig(config, logger)
aeronDriver = AeronDriver(config, type, logger)
aeronDriver.start()
} catch (e: Exception) {
listenerManager.notifyError(e)
throw e
}
require(config.context != null) { "Configuration context cannot be properly created. Unable to continue!" }
mediaDriverContext = config.context!!
val aeronContext = Aeron.Context()
aeronContext
.aeronDirectoryName(mediaDriverContext.aeronDirectory().path)
.concludeAeronDirectory()
val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronClient"),true)
aeronContext
.threadFactory(threadFactory)
.idleStrategy(BackoffIdleStrategy())
// we DO NOT want to abort the JVM if there are errors.
aeronContext.errorHandler { error ->
logger.error("Error in Aeron", error)
}
try {
aeron = Aeron.connect(aeronContext)
} catch (e: Exception) {
try {
mediaDriver?.close()
} catch (secondaryException: Exception) {
e.addSuppressed(secondaryException)
}
listenerManager.notifyError(e)
throw e
}
}
/**
@ -180,17 +145,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
}
internal fun initEndpointState(): Aeron {
internal fun initEndpointState() {
shutdown.getAndSet(false)
shutdownWaiter = SuspendWaiter()
return aeron
}
/**
* @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!)
*/
internal fun getMediaDriverPublicationFile(publicationRegId: Long): File {
return mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publicationRegId}.logbuffer")
}
abstract fun newException(message: String, cause: Throwable? = null): Throwable
@ -637,8 +594,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
* @return true if the media driver is active and running
*/
fun isRunning(): Boolean {
// if the media driver is running, it will be a quick connection. Usually 100ms or so
return mediaDriverContext.isDriverActive(1_000) { }
return aeronDriver.isRunning()
}
/**
@ -655,14 +611,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
if (shutdown.compareAndSet(expect = false, update = true)) {
logger.info { "Shutting down..." }
try {
aeron.close()
} catch (e: Exception) {
logger.error("Error stopping aeron.", e)
}
runBlocking {
AeronConfig.stopDriver(mediaDriver, logger)
aeronDriver.close()
connections.forEach {
it.close()
}
@ -677,8 +628,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
close0()
(aeron.context().threadFactory() as NamedThreadFactory).group.destroy()
// if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now)
shutdownWaiter.cancel()
}

View File

@ -21,7 +21,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverPairedConnection
import dorkbox.network.connection.Connection
@ -29,7 +29,6 @@ import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.exceptions.*
import io.aeron.Aeron
import io.aeron.Publication
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
@ -72,7 +71,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
private val connectionsPerIpCounts = ConnectionCounts()
// guarantee that session/stream ID's will ALWAYS be unique! (there can NEVER be a collision!)
private val sessionIdAllocator = RandomIdAllocator(AeronConfig.RESERVED_SESSION_ID_LOW, AeronConfig.RESERVED_SESSION_ID_HIGH)
private val sessionIdAllocator = RandomIdAllocator(AeronDriver.RESERVED_SESSION_ID_LOW, AeronDriver.RESERVED_SESSION_ID_HIGH)
private val streamIdAllocator = RandomIdAllocator(1, Integer.MAX_VALUE)
@ -201,7 +200,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
handshakePublication: Publication,
sessionId: Int,
message: HandshakeMessage,
aeron: Aeron) {
aeronDriver: AeronDriver) {
val connectionString = "IPC"
@ -278,7 +277,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// we have to construct how the connection will communicate!
runBlocking {
clientConnection.buildServer(aeron, logger)
clientConnection.buildServer(aeronDriver, logger, true)
}
logger.info {
@ -345,7 +344,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
clientAddressString: String,
clientAddress: InetAddress,
message: HandshakeMessage,
aeron: Aeron,
aeronDriver: AeronDriver,
isIpv6Wildcard: Boolean) {
if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, clientAddressString)) {
@ -444,7 +443,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// we have to construct how the connection will communicate!
runBlocking {
clientConnection.buildServer(aeron, logger)
clientConnection.buildServer(aeronDriver, logger, true)
}
logger.info {

View File

@ -46,7 +46,12 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
@ -92,7 +97,12 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
@ -137,7 +147,12 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
@ -182,7 +197,12 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
@ -220,7 +240,12 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
@ -253,20 +278,18 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
waitForThreads()
}
@Test
fun acceptAllCustomServer() {
val serverConnectSuccess = atomic(false)
@ -303,7 +326,12 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
@ -350,7 +378,12 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
@ -390,11 +423,16 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK, Long.MAX_VALUE)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
waitForThreads(Long.MAX_VALUE)
waitForThreads()
}
@Test(expected = ClientException::class)
@ -425,19 +463,15 @@ class ConnectionFilterTest : BaseTest() {
}
runBlocking {
client.connect(LOOPBACK)
try {
client.connect(LOOPBACK)
} catch (e: Exception) {
stopEndPoints()
throw e
}
}
}
waitForThreads()
}
}

View File

@ -2,7 +2,7 @@ package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.connection.Connection
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
@ -72,8 +72,9 @@ class DisconnectReconnectTest : BaseTest() {
@Test
fun manualMediaDriverAndReconnectClient() {
val serverConfiguration = serverConfig()
val mediaDriver = runBlocking {
AeronConfig.startDriver(serverConfiguration)
val aeronDriver = runBlocking {
AeronDriver.validateConfig(serverConfiguration)
AeronDriver(serverConfiguration)
}
run {
@ -124,7 +125,7 @@ class DisconnectReconnectTest : BaseTest() {
waitForThreads()
runBlocking {
AeronConfig.stopDriver(mediaDriver)
aeronDriver.close()
}
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)