diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index e6d756cd..d28c563f 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -18,7 +18,7 @@ package dorkbox.network import dorkbox.netUtil.IP import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 -import dorkbox.network.aeron.AeronConfig +import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverClientConnection import dorkbox.network.connection.* @@ -170,8 +170,8 @@ open class Client(config: Configuration = Configuration connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) { // Default IPC ports are flipped because they are in the perspective of the SERVER connect(remoteAddress = remoteAddress, - ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB, - ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB, + ipcPublicationId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB, + ipcSubscriptionId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB, connectionTimeoutMS = connectionTimeoutMS, reliable = reliable) } @@ -188,8 +188,8 @@ open class Client(config: Configuration = Configuration * @throws ClientRejectedException if the client connection is rejected */ @Suppress("DuplicatedCode") - suspend fun connect(ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB, - ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB, + suspend fun connect(ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB, + ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB, connectionTimeoutMS: Long = 30_000L) { // Default IPC ports are flipped because they are in the perspective of the SERVER @@ -229,8 +229,8 @@ open class Client(config: Configuration = Configuration @Suppress("DuplicatedCode") private suspend fun connect(remoteAddress: InetAddress? = null, // Default IPC ports are flipped because they are in the perspective of the SERVER - ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB, - ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB, + ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB, + ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB, connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) { require(connectionTimeoutMS >= 0) { "connectionTimeoutMS '$connectionTimeoutMS' is invalid. It must be >0" } @@ -250,7 +250,7 @@ open class Client(config: Configuration = Configuration connection0 = null // we are done with initial configuration, now initialize aeron and the general state of this endpoint - val aeron = initEndpointState() + initEndpointState() // only try to connect via IPv4 if we have a network interface that supports it! if (remoteAddress is Inet4Address && !IPv4.isAvailable) { @@ -271,7 +271,7 @@ open class Client(config: Configuration = Configuration var isUsingIPC = false val canUseIPC = config.enableIpc && remoteAddress == null val autoChangeToIpc = canUseIPC && config.enableIpcForLoopback && - remoteAddress != null && remoteAddress.isLoopbackAddress && isRunning(mediaDriverContext) + remoteAddress != null && remoteAddress.isLoopbackAddress && aeronDriver.isRunning() if (autoChangeToIpc) { logger.info {"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC" } } @@ -282,11 +282,11 @@ open class Client(config: Configuration = Configuration // MAYBE the server doesn't have IPC enabled? If no, we need to connect via UDP instead val ipcConnection = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId, streamId = ipcPublicationId, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID) + sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID) // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports try { - ipcConnection.buildClient(aeron, logger) + ipcConnection.buildClient(aeronDriver, logger) isUsingIPC = true } catch (e: Exception) { // if we specified that we want to use IPC, then we have to throw the timeout exception, because there is no IPC @@ -305,13 +305,13 @@ open class Client(config: Configuration = Configuration address = this.remoteAddress0!!, publicationPort = config.subscriptionPort, subscriptionPort = config.publicationPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, connectionTimeoutMS = connectionTimeoutMS, isReliable = reliable) // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports - udpConnection.buildClient(aeron, logger) + udpConnection.buildClient(aeronDriver, logger) udpConnection } } @@ -320,13 +320,13 @@ open class Client(config: Configuration = Configuration address = this.remoteAddress0!!, publicationPort = config.subscriptionPort, subscriptionPort = config.publicationPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, connectionTimeoutMS = connectionTimeoutMS, isReliable = reliable) // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports - test.buildClient(aeron, logger) + test.buildClient(aeronDriver, logger) test } @@ -381,7 +381,7 @@ open class Client(config: Configuration = Configuration } // we have to construct how the connection will communicate! - clientConnection.buildClient(aeron, logger) + clientConnection.buildClient(aeronDriver, logger) // only the client connects to the server, so here we have to connect. The server (when creating the new "connection" object) // does not need to do anything @@ -424,6 +424,7 @@ open class Client(config: Configuration = Configuration if (!permitConnection) { handshakeConnection.close() val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress)} was not permitted!") + ListenerManager.cleanStackTrace(exception) listenerManager.notifyError(exception) throw exception } diff --git a/src/dorkbox/network/Configuration.kt b/src/dorkbox/network/Configuration.kt index 061fc613..0d229402 100644 --- a/src/dorkbox/network/Configuration.kt +++ b/src/dorkbox/network/Configuration.kt @@ -17,7 +17,7 @@ package dorkbox.network import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 -import dorkbox.network.aeron.AeronConfig +import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.CoroutineBackoffIdleStrategy import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy @@ -66,7 +66,7 @@ class ServerConfiguration : dorkbox.network.Configuration() { /** * The IPC Publication ID is used to define what ID the server will send data on. The client IPC subscription ID must match this value. */ - var ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB + var ipcPublicationId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB set(value) { require(context == null) { errorMessage } field = value @@ -75,7 +75,7 @@ class ServerConfiguration : dorkbox.network.Configuration() { /** * The IPC Subscription ID is used to define what ID the server will receive data on. The client IPC publication ID must match this value. */ - var ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB + var ipcSubscriptionId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB set(value) { require(context == null) { errorMessage } field = value diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 4e9473be..8925fb71 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -18,7 +18,7 @@ package dorkbox.network import dorkbox.netUtil.IP import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 -import dorkbox.network.aeron.AeronConfig +import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronPoller import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverServerConnection @@ -35,7 +35,6 @@ import dorkbox.network.rmi.RemoteObject import dorkbox.network.rmi.RemoteObjectStorage import dorkbox.network.rmi.RmiManagerConnections import dorkbox.network.rmi.TimeoutException -import io.aeron.Aeron import io.aeron.FragmentAssembler import io.aeron.Image import io.aeron.logbuffer.Header @@ -71,12 +70,12 @@ open class Server(config: ServerConfiguration = ServerC */ fun isRunning(configuration: ServerConfiguration): Boolean { if (configuration.context == null) { - AeronConfig.createContext(configuration) + AeronDriver.createContext(configuration) } require(configuration.context != null) { "Configuration context cannot be properly created. Unable to continue!" } - return AeronConfig.isRunning(configuration.context!!) + return AeronDriver.isRunning(configuration.context!!) } init { @@ -160,12 +159,13 @@ open class Server(config: ServerConfiguration = ServerC return super.getRmiConnectionSupport() } - private suspend fun getIpcPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller { + private suspend fun getIpcPoller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller { val poller = if (config.enableIpc) { val driver = IpcMediaDriverConnection(streamIdSubscription = config.ipcSubscriptionId, streamId = config.ipcPublicationId, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID) - driver.buildServer(aeron, logger) + sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID) + driver.buildServer(aeronDriver, logger) + val publication = driver.publication val subscription = driver.subscription @@ -193,7 +193,7 @@ open class Server(config: ServerConfiguration = ServerC publication, sessionId, message, - aeron) + aeronDriver) } override fun poll(): Int { return subscription.poll(handler, 1) } @@ -213,17 +213,18 @@ open class Server(config: ServerConfiguration = ServerC } @Suppress("DuplicatedCode") - private suspend fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { + private suspend fun getIpv4Poller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller { val poller = if (canUseIPv4) { val driver = UdpMediaDriverServerConnection( listenAddress = listenIPv4Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) - driver.buildServer(aeron, logger) + driver.buildServer(aeronDriver, logger) + val publication = driver.publication val subscription = driver.subscription @@ -274,7 +275,7 @@ open class Server(config: ServerConfiguration = ServerC clientAddressString, clientAddress, message, - aeron, + aeronDriver, false) } @@ -295,17 +296,18 @@ open class Server(config: ServerConfiguration = ServerC } @Suppress("DuplicatedCode") - private suspend fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { + private suspend fun getIpv6Poller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller { val poller = if (canUseIPv6) { val driver = UdpMediaDriverServerConnection( listenAddress = listenIPv6Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) - driver.buildServer(aeron, logger) + driver.buildServer(aeronDriver, logger) + val publication = driver.publication val subscription = driver.subscription @@ -356,7 +358,7 @@ open class Server(config: ServerConfiguration = ServerC clientAddressString, clientAddress, message, - aeron, + aeronDriver, false) } @@ -377,16 +379,17 @@ open class Server(config: ServerConfiguration = ServerC } @Suppress("DuplicatedCode") - private suspend fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller { + private suspend fun getIpv6WildcardPoller(aeronDriver: AeronDriver, config: ServerConfiguration): AeronPoller { val driver = UdpMediaDriverServerConnection( listenAddress = listenIPv6Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, - streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, - sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID, connectionTimeoutMS = TimeUnit.SECONDS.toMillis(config.connectionCloseTimeoutInSeconds.toLong())) - driver.buildServer(aeron, logger) + driver.buildServer(aeronDriver, logger) + val publication = driver.publication val subscription = driver.subscription @@ -438,7 +441,7 @@ open class Server(config: ServerConfiguration = ServerC clientAddressString, clientAddress, message, - aeron, + aeronDriver, true) } @@ -461,17 +464,16 @@ open class Server(config: ServerConfiguration = ServerC return } - val aeron = initEndpointState() + initEndpointState() config as ServerConfiguration - // we are done with initial configuration, now initialize aeron and the general state of this endpoint bindAlreadyCalled = true val waiter = SuspendWaiter() actionDispatch.launch { - val ipcPoller: AeronPoller = getIpcPoller(aeron, config) + val ipcPoller: AeronPoller = getIpcPoller(aeronDriver, config) // if we are binding to WILDCARD, then we have to do something special if BOTH IPv4 and IPv6 are enabled! val isWildcard = listenIPv4Address == IPv4.WILDCARD || listenIPv6Address != IPv6.WILDCARD @@ -486,15 +488,15 @@ open class Server(config: ServerConfiguration = ServerC override fun close() {} override fun serverInfo(): String { return "IPv4 Disabled" } } - ipv6Poller = getIpv6WildcardPoller(aeron, config) + ipv6Poller = getIpv6WildcardPoller(aeronDriver, config) } else { // only 1 will be a real poller - ipv4Poller = getIpv4Poller(aeron, config) - ipv6Poller = getIpv6Poller(aeron, config) + ipv4Poller = getIpv4Poller(aeronDriver, config) + ipv6Poller = getIpv6Poller(aeronDriver, config) } } else { - ipv4Poller = getIpv4Poller(aeron, config) - ipv6Poller = getIpv6Poller(aeron, config) + ipv4Poller = getIpv4Poller(aeronDriver, config) + ipv6Poller = getIpv6Poller(aeronDriver, config) } waiter.doNotify() diff --git a/src/dorkbox/network/aeron/AeronConfig.kt b/src/dorkbox/network/aeron/AeronConfig.kt deleted file mode 100644 index 1557ff33..00000000 --- a/src/dorkbox/network/aeron/AeronConfig.kt +++ /dev/null @@ -1,314 +0,0 @@ -package dorkbox.network.aeron - -import dorkbox.network.Configuration -import dorkbox.util.NamedThreadFactory -import io.aeron.driver.MediaDriver -import kotlinx.coroutines.delay -import kotlinx.coroutines.runBlocking -import mu.KLogger -import mu.KotlinLogging -import java.io.File - -/** - * - */ -object AeronConfig { - /** - * Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW - */ - internal const val RESERVED_SESSION_ID_INVALID = 0 - - /** - * The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0! - */ - internal const val RESERVED_SESSION_ID_LOW = 1 - - /** - * The inclusive upper bound of the reserved sessions range. - */ - internal const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE - - const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe - const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de - const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3 - - - // on close, the publication CAN linger (in case a client goes away, and then comes back) - // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS - - private fun create(config: Configuration, logger: KLogger): MediaDriver.Context { - /* - * Linux - * Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and - * net.core.wmem_max to allow larger SO_SNDBUF values to be set. - * - * Windows - * Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so. - * - * Mac/Darwin - * - * Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB. - */ - if (config.receiveBufferSize == 0) { - config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT -// when { -// OS.isLinux() -> -// OS.isWindows() -> -// OS.isMacOsX() -> -// } - -// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") -// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") - } - - - if (config.sendBufferSize == 0) { - config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT -// when { -// OS.isLinux() -> -// OS.isWindows() -> -// OS.isMacOsX() -> -// } - -// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") -// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") - } - - - /* - * Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir). - * - * You can create a RAM disk with the following command: - * - * $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))` - * - * where: - * - * DISK_NAME should be replaced with a name of your choice. - * SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk). - * - * For example, the following command creates a RAM disk named DevShm which is 2GB in size: - * - * $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))` - * - * After this command is executed the new disk will be mounted under /Volumes/DevShm. - */ - if (config.aeronDirectory == null) { - val baseFileLocation = config.suggestAeronLogLocation(logger) - -// val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName) - val aeronLogDirectory = File(baseFileLocation, "aeron") - config.aeronDirectory = aeronLogDirectory - } - - // LOW-LATENCY SETTINGS - // .termBufferSparseFile(false) - // .useWindowsHighResTimer(true) - // .threadingMode(ThreadingMode.DEDICATED) - // .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE) - // .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE) - // .senderIdleStrategy(NoOpIdleStrategy.INSTANCE); - // setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); - // setProperty("aeron.mtu.length", "16384"); - // setProperty("aeron.socket.so_sndbuf", "2097152"); - // setProperty("aeron.socket.so_rcvbuf", "2097152"); - // setProperty("aeron.rcv.initial.window.length", "2097152"); - - // driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind) - val context = MediaDriver.Context() - .publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW) - .publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH) - .threadingMode(config.threadingMode) - .mtuLength(config.networkMtuSize) - .socketSndbufLength(config.sendBufferSize) - .socketRcvbufLength(config.receiveBufferSize) - - context.aeronDirectoryName(config.aeronDirectory!!.absolutePath) - - if (context.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) { - // default 64 megs each is HUGE - context.ipcTermBufferLength(8 * 1024 * 1024) - } - - if (context.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) { - // default 16 megs each is HUGE (we run out of space in production w/ lots of clients) - context.publicationTermBufferLength(2 * 1024 * 1024) - } - - // we DO NOT want to abort the JVM if there are errors. - context.errorHandler { error -> - logger.error("Error in Aeron", error) - } - - - val aeronDir = File(context.aeronDirectoryName()).absoluteFile - context.aeronDirectoryName(aeronDir.path) - - return context - } - - /** - * Creates the Aeron Media Driver context - * - * @throws IllegalStateException if the configuration has already been used to create a context - * @throws IllegalArgumentException if the aeron media driver directory cannot be setup - */ - fun createContext(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")) { - if (config.context != null) { - logger.warn { "Unable to recreate a context for a configuration that has already been created" } - return - } - - var context = create(config, logger) - - // this happens EXACTLY once. Must be BEFORE the "isRunning" check! - context.concludeAeronDirectory() - - // will setup the aeron directory or throw IllegalArgumentException if it cannot be configured - val aeronDir = context.aeronDirectory() - - var isRunning = isRunning(context) - - // this is incompatible with IPC, and will not be set if IPC is enabled - if (config.aeronDirectoryForceUnique && isRunning) { - val savedParent = aeronDir.parentFile - var retry = 0 - val retryMax = 100 - - while (config.aeronDirectoryForceUnique && isRunning) { - if (retry++ > retryMax) { - throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.") - } - - val randomNum = (1..retryMax).shuffled().first() - val newDir = savedParent.resolve("${aeronDir.name}_$randomNum") - - context = create(config, logger) - context.aeronDirectoryName(newDir.path) - - // this happens EXACTLY once. Must be BEFORE the "isRunning" check! - context.concludeAeronDirectory() - - isRunning = isRunning(context) - } - - if (!isRunning) { - // NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the - // same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). - // since we are forcing a unique directory, we should ALSO delete it when we are done! - context.dirDeleteOnShutdown() - } - } - - logger.info { "Aeron directory: '${context.aeronDirectory()}'" } - - // once we do this, we cannot change any of the config values! - config.context = context - } - - /** - * Checks to see if an endpoint (using the specified configuration) is running. - * - * @return true if the media driver is active and running - */ - fun isRunning(context: MediaDriver.Context): Boolean { - // if the media driver is running, it will be a quick connection. Usually 100ms or so - return context.isDriverActive(context.driverTimeoutMs()) { } - } - - /** - * If the driver is not already running, this will start the driver - * - * @throws Exception if there is a problem starting the media driver - */ - fun startDriver(config: Configuration, - type: Class<*> = AeronConfig::class.java, - logger: KLogger = KotlinLogging.logger("AeronConfig")): MediaDriver? { - config.validate() - - if (config.context == null) { - createContext(config, logger) - } - - require(config.context != null) { "Configuration context cannot be properly created. Unable to continue!" } - - val context = config.context!! - - if (!isRunning(context)) { - logger.debug("Starting Aeron Media driver in '${context.aeronDirectory()}'") - - var threadFactory: NamedThreadFactory? = null - - // try to start. If we start/stop too quickly, it's a problem - var count = 10 - while (count-- > 0) { - try { - if (threadFactory == null) { - threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) - context - .conductorThreadFactory(threadFactory) - .receiverThreadFactory(threadFactory) - .senderThreadFactory(threadFactory) - .sharedNetworkThreadFactory(threadFactory) - .sharedThreadFactory(threadFactory) - } - - return MediaDriver.launch(context) - } catch (e: Exception) { - logger.warn(e) { "Unable to start the Aeron Media driver. Retrying $count more times..." } - runBlocking { - delay(context.driverTimeoutMs()) - } - } - } - } else { - logger.debug("Not starting Aeron Media driver. It was already running in '${context.aeronDirectory()}'") - } - - return null - } - - /** - * A safer way to try to close the media driver - * - * NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the - * same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). - */ - internal suspend fun stopDriver(mediaDriver: MediaDriver?, logger: KLogger = KotlinLogging.logger("AeronConfig")) { - if (mediaDriver == null) { - logger.debug { "No driver started for this instance. Not Stopping." } - return - } - - val context = mediaDriver.context() - logger.debug("Stopping driver at '${context.aeronDirectory()}'...") - - if (!isRunning(context)) { - // not running - logger.debug { "Driver is not running at '${context.aeronDirectory()}' for this context. Not Stopping." } - return - } - - try { - mediaDriver.close() - - // on close, the publication CAN linger (in case a client goes away, and then comes back) - // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - delay(AERON_PUBLICATION_LINGER_TIMEOUT) - - // wait for the media driver to actually stop - var count = 10 - while (count-- >= 0 && isRunning(context)) { - logger.warn { "Aeron Media driver at '${context.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." } - delay(context.driverTimeoutMs()) - } - } catch (e: Exception) { - logger.error("Error closing the media driver at '${context.aeronDirectory()}'", e) - } - - (context.sharedThreadFactory() as NamedThreadFactory).group.destroy() - - logger.debug { "Closed the media driver at '${context.aeronDirectory()}'" } - } -} diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt new file mode 100644 index 00000000..0a7d885f --- /dev/null +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -0,0 +1,489 @@ +package dorkbox.network.aeron + +import dorkbox.network.Configuration +import dorkbox.network.connection.ListenerManager +import dorkbox.util.NamedThreadFactory +import io.aeron.Aeron +import io.aeron.ChannelUriStringBuilder +import io.aeron.Publication +import io.aeron.Subscription +import io.aeron.driver.MediaDriver +import io.aeron.exceptions.DriverTimeoutException +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import mu.KLogger +import mu.KotlinLogging +import org.agrona.concurrent.BackoffIdleStrategy +import java.io.File + +/** + * Class for managing the Aeron+Media drivers + */ +class AeronDriver(val config: Configuration, + val type: Class<*> = AeronDriver::class.java, + val logger: KLogger = KotlinLogging.logger("AeronConfig")) { + + companion object { + /** + * Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW + */ + internal const val RESERVED_SESSION_ID_INVALID = 0 + + /** + * The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0! + */ + internal const val RESERVED_SESSION_ID_LOW = 1 + + /** + * The inclusive upper bound of the reserved sessions range. + */ + internal const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE + + const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe + const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de + const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3 + + + // on close, the publication CAN linger (in case a client goes away, and then comes back) + // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) + private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS + + + private fun create(config: Configuration, logger: KLogger): MediaDriver.Context { + /* + * Linux + * Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and + * net.core.wmem_max to allow larger SO_SNDBUF values to be set. + * + * Windows + * Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so. + * + * Mac/Darwin + * + * Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB. + */ + if (config.receiveBufferSize == 0) { + config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT + // when { + // OS.isLinux() -> + // OS.isWindows() -> + // OS.isMacOsX() -> + // } + + // val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") + // val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") + } + + + if (config.sendBufferSize == 0) { + config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT + // when { + // OS.isLinux() -> + // OS.isWindows() -> + // OS.isMacOsX() -> + // } + + // val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") + // val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") + } + + + /* + * Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir). + * + * You can create a RAM disk with the following command: + * + * $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))` + * + * where: + * + * DISK_NAME should be replaced with a name of your choice. + * SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk). + * + * For example, the following command creates a RAM disk named DevShm which is 2GB in size: + * + * $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))` + * + * After this command is executed the new disk will be mounted under /Volumes/DevShm. + */ + if (config.aeronDirectory == null) { + val baseFileLocation = config.suggestAeronLogLocation(logger) + + // val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName) + val aeronLogDirectory = File(baseFileLocation, "aeron") + config.aeronDirectory = aeronLogDirectory + } + + // LOW-LATENCY SETTINGS + // .termBufferSparseFile(false) + // .useWindowsHighResTimer(true) + // .threadingMode(ThreadingMode.DEDICATED) + // .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE) + // .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE) + // .senderIdleStrategy(NoOpIdleStrategy.INSTANCE); + // setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); + // setProperty("aeron.mtu.length", "16384"); + // setProperty("aeron.socket.so_sndbuf", "2097152"); + // setProperty("aeron.socket.so_rcvbuf", "2097152"); + // setProperty("aeron.rcv.initial.window.length", "2097152"); + + // driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind) + val context = MediaDriver.Context() + .publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW) + .publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH) + .threadingMode(config.threadingMode) + .mtuLength(config.networkMtuSize) + .socketSndbufLength(config.sendBufferSize) + .socketRcvbufLength(config.receiveBufferSize) + + context.aeronDirectoryName(config.aeronDirectory!!.absolutePath) + + if (context.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) { + // default 64 megs each is HUGE + context.ipcTermBufferLength(8 * 1024 * 1024) + } + + if (context.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) { + // default 16 megs each is HUGE (we run out of space in production w/ lots of clients) + context.publicationTermBufferLength(2 * 1024 * 1024) + } + + // we DO NOT want to abort the JVM if there are errors. + context.errorHandler { error -> + logger.error("Error in Aeron", error) + } + + + val aeronDir = File(context.aeronDirectoryName()).absoluteFile + context.aeronDirectoryName(aeronDir.path) + + return context + } + + + /** + * Creates the Aeron Media Driver context + * + * @throws IllegalStateException if the configuration has already been used to create a context + * @throws IllegalArgumentException if the aeron media driver directory cannot be setup + */ + fun createContext(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")) { + if (config.context != null) { + logger.warn { "Unable to recreate a context for a configuration that has already been created" } + return + } + + var context = create(config, logger) + + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() + + // will setup the aeron directory or throw IllegalArgumentException if it cannot be configured + val aeronDir = context.aeronDirectory() + + var isRunning = isRunning(context) + + // this is incompatible with IPC, and will not be set if IPC is enabled + if (config.aeronDirectoryForceUnique && isRunning) { + val savedParent = aeronDir.parentFile + var retry = 0 + val retryMax = 100 + + while (config.aeronDirectoryForceUnique && isRunning) { + if (retry++ > retryMax) { + throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.") + } + + val randomNum = (1..retryMax).shuffled().first() + val newDir = savedParent.resolve("${aeronDir.name}_$randomNum") + + context = create(config, logger) + context.aeronDirectoryName(newDir.path) + + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() + + isRunning = isRunning(context) + } + + if (!isRunning) { + // NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the + // same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). + // since we are forcing a unique directory, we should ALSO delete it when we are done! + context.dirDeleteOnShutdown() + } + } + + logger.info { "Aeron directory: '${context.aeronDirectory()}'" } + + // once we do this, we cannot change any of the config values! + config.context = context + } + + /** + * Checks to see if an endpoint (using the specified configuration) is running. + * + * @return true if the media driver is active and running + */ + fun isRunning(context: MediaDriver.Context, timeout: Long = context.driverTimeoutMs()): Boolean { + // if the media driver is running, it will be a quick connection. Usually 100ms or so + return context.isDriverActive(context.driverTimeoutMs()) { } + } + + /** + * validates and creates the configuration. This can only happen once! + */ + fun validateConfig(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")) { + config.validate() + + if (config.context == null) { + createContext(config, logger) + } + + require(config.context != null) { "Configuration context cannot be properly created. Unable to continue!" } + } + } + + private var aeron: Aeron? = null + + @Volatile + private var mediaDriver: MediaDriver? = null + + // the context is validated before the AeronDriver object is created + private val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronDriver"), true) + private val mediaDriverContext = config.context!! + + init { + mediaDriverContext + .conductorThreadFactory(threadFactory) + .receiverThreadFactory(threadFactory) + .senderThreadFactory(threadFactory) + .sharedNetworkThreadFactory(threadFactory) + .sharedThreadFactory(threadFactory) + } + + private fun setupAeron(): Aeron.Context { + val aeronDriverContext = Aeron.Context() + aeronDriverContext + .aeronDirectoryName(mediaDriverContext.aeronDirectory().path) + .concludeAeronDirectory() + + aeronDriverContext + .threadFactory(threadFactory) + .idleStrategy(BackoffIdleStrategy()) + + // we DO NOT want to abort the JVM if there are errors. + // this replaces the default handler with one that doesn't abort the JVM + aeronDriverContext.errorHandler { error -> + ListenerManager.cleanStackTrace(error) + logger.error("Error in Aeron", error) + } + + return aeronDriverContext + } + + + + private fun startDriver(): Boolean { + if (mediaDriver == null) { + // only start if we didn't already start... There will be several checks. + + if (!isRunning(mediaDriverContext)) { + logger.debug("Starting Aeron Media driver in '${mediaDriverContext.aeronDirectory()}'") + + + // try to start. If we start/stop too quickly, it's a problem + var count = 10 + while (count-- > 0) { + try { + mediaDriver = MediaDriver.launch(mediaDriverContext) + return true + } catch (e: Exception) { + logger.warn(e) { "Unable to start the Aeron Media driver. Retrying $count more times..." } + runBlocking { + delay(mediaDriverContext.driverTimeoutMs()) + } + } + } + } else { + logger.debug("Not starting Aeron Media driver. It was already running in '${mediaDriverContext.aeronDirectory()}'") + } + } + + return false + } + + private fun startAeron(didStartDriver: Boolean) { + // the media driver MIGHT already be started in a different process! We still ALWAYS want to connect to + // aeron (which connects to the other media driver process), especially if we haven't already connected to + // it (or if there was an error connecting because a different media driver was shutting down) + + if (didStartDriver || aeron == null) { + aeron?.close() + + // this might succeed if we can connect to the media driver + aeron = Aeron.connect(setupAeron()) + } + } + + /** + * If the driver is not already running, this will start the driver + * + * @throws Exception if there is a problem starting the media driver + */ + fun start() { + val didStartDriver = startDriver() + startAeron(didStartDriver) + } + + /** + * @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!) + */ + fun getMediaDriverPublicationFile(publicationRegId: Long): File { + return mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publicationRegId}.logbuffer") + } + + + suspend fun addPublicationWithRetry(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication { + val uri = publicationUri.build() + + // If we start/stop too quickly, we might have the address already in use! Retry a few times. + var count = 10 + var exception: Exception? = null + while (count-- > 0) { + try { + // if aeron is null, an exception is thrown + return aeron!!.addPublication(uri, streamId) + } catch (e: Exception) { + // NOTE: this error will be logged in the `aeronDriverContext` logger + exception = e + logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." } + + if (e is DriverTimeoutException) { + delay(mediaDriverContext.driverTimeoutMs()) + } + + // reasons we cannot add a pub/sub to aeron + // 1) the driver was closed + // 2) aeron was unable to connect to the driver + // 3) the address already in use + + // configuring pub/sub to aeron is LINEAR -- and it happens in 2 places. + // 1) starting up the client/server + // 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn) + + // try to start/restart aeron + try { + start() + } catch (e2: Exception) { + e2.printStackTrace() + } + } + } + + throw exception!! + } + + suspend fun addSubscriptionWithRetry(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription { + val uri = subscriptionUri.build() + + // If we start/stop too quickly, we might have the address already in use! Retry a few times. + var count = 10 + var exception: Exception? = null + while (count-- > 0) { + try { + val aeron = aeron + if (aeron != null) { + return aeron.addSubscription(uri, streamId) + } + } catch (e: Exception) { + // NOTE: this error will be logged in the `aeronDriverContext` logger + exception = e + logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." } + + if (e is DriverTimeoutException) { + delay(mediaDriverContext.driverTimeoutMs()) + } + + // reasons we cannot add a pub/sub to aeron + // 1) the driver was closed + // 2) aeron was unable to connect to the driver + // 3) the address already in use + + // configuring pub/sub to aeron is LINEAR -- and it happens in 2 places. + // 1) starting up the client/server + // 2) creating a new client-server connection pair (the media driver won't be "dead" at this poitn) + + // try to start/restart aeron + try { + start() + } catch (e2: Exception) { + e2.printStackTrace() + } + } + } + + throw exception!! + } + + /** + * Checks to see if an endpoint (using the specified configuration) is running. + * + * @return true if the media driver is active and running + */ + fun isRunning(timeout: Long = mediaDriverContext.driverTimeoutMs()): Boolean { + // if the media driver is running, it will be a quick connection. Usually 100ms or so + return mediaDriverContext.isDriverActive(timeout) { } + } + + /** + * A safer way to try to close the media driver + * + * NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the + * same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense). + */ + suspend fun close() { + try { + aeron?.close() + } catch (e: Exception) { + logger.error("Error stopping aeron.", e) + } + + val mediaDriver = mediaDriver + if (mediaDriver == null) { + logger.debug { "No driver started for this instance. Not Stopping." } + return + } + + logger.debug("Stopping driver at '${mediaDriverContext.aeronDirectory()}'...") + + if (!isRunning(mediaDriverContext)) { + // not running + logger.debug { "Driver is not running at '${mediaDriverContext.aeronDirectory()}' for this context. Not Stopping." } + return + } + + try { + mediaDriver.close() + + // on close, the publication CAN linger (in case a client goes away, and then comes back) + // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) + delay(AERON_PUBLICATION_LINGER_TIMEOUT) + + // wait for the media driver to actually stop + var count = 10 + while (count-- >= 0 && isRunning(mediaDriverContext)) { + logger.warn { "Aeron Media driver at '${mediaDriverContext.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." } + delay(mediaDriverContext.driverTimeoutMs()) + } + } catch (e: Exception) { + logger.error("Error closing the media driver at '${mediaDriverContext.aeronDirectory()}'", e) + } + + // Destroys this thread group and all of its subgroups. + // This thread group must be empty, indicating that all threads that had been in this thread group have since stopped. + threadFactory.group.destroy() + + logger.debug { "Closed the media driver at '${mediaDriverContext.aeronDirectory()}'" } + } +} diff --git a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt index fc043744..340a9efc 100644 --- a/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/IpcMediaDriverConnection.kt @@ -17,7 +17,6 @@ package dorkbox.network.aeron import dorkbox.network.exceptions.ClientTimedOutException -import io.aeron.Aeron import io.aeron.ChannelUriStringBuilder import kotlinx.coroutines.delay import mu.KLogger @@ -26,7 +25,7 @@ import mu.KLogger * For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER * NOTE: IPC connection will ALWAYS have a timeout of 1 second to connect. This is IPC, it should connect fast */ -internal class IpcMediaDriverConnection(streamId: Int, +internal open class IpcMediaDriverConnection(streamId: Int, val streamIdSubscription: Int, sessionId: Int, ) : @@ -36,7 +35,7 @@ internal class IpcMediaDriverConnection(streamId: Int, private fun uri(): ChannelUriStringBuilder { val builder = ChannelUriStringBuilder().media("ipc") - if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { builder.sessionId(sessionId) } @@ -48,7 +47,7 @@ internal class IpcMediaDriverConnection(streamId: Int, * * @throws ClientTimedOutException if we cannot connect to the server in the designated time */ - override suspend fun buildClient(aeron: Aeron, logger: KLogger) { + override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) { // Create a publication at the given address and port, using the given stream ID. // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. val publicationUri = uri() @@ -66,8 +65,8 @@ internal class IpcMediaDriverConnection(streamId: Int, // publication of any state to other threads and not be long running or re-entrant with the client. // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. - val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) + val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) + val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription) var success = false @@ -117,7 +116,7 @@ internal class IpcMediaDriverConnection(streamId: Int, * * serverAddress is ignored for IPC */ - override suspend fun buildServer(aeron: Aeron, logger: KLogger) { + override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. // Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs. val publicationUri = uri() @@ -138,12 +137,12 @@ internal class IpcMediaDriverConnection(streamId: Int, // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // If we start/stop too quickly, we might have the aeron connectivity issues! Retry a few times. - publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamIdSubscription, logger) + publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) + subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamIdSubscription) } override fun clientInfo() : String { - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { "[$sessionId] IPC connection established to [$streamIdSubscription|$streamId]" } else { "Connecting handshake to IPC [$streamIdSubscription|$streamId]" @@ -151,7 +150,7 @@ internal class IpcMediaDriverConnection(streamId: Int, } override fun serverInfo() : String { - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { "[$sessionId] IPC listening on [$streamIdSubscription|$streamId] " } else { "Listening handshake on IPC [$streamIdSubscription|$streamId]" diff --git a/src/dorkbox/network/aeron/MediaDriverConnection.kt b/src/dorkbox/network/aeron/MediaDriverConnection.kt index c301715a..e0af5269 100644 --- a/src/dorkbox/network/aeron/MediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/MediaDriverConnection.kt @@ -18,57 +18,22 @@ package dorkbox.network.aeron import dorkbox.network.exceptions.ClientTimedOutException -import io.aeron.Aeron import io.aeron.Publication import io.aeron.Subscription -import kotlinx.coroutines.delay import mu.KLogger -abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionPort: Int, - val streamId: Int, val sessionId: Int, - val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable { +abstract class MediaDriverConnection( + val publicationPort: Int, val subscriptionPort: Int, + val streamId: Int, val sessionId: Int, + val connectionTimeoutMS: Long, val isReliable: Boolean) : AutoCloseable { lateinit var subscription: Subscription lateinit var publication: Publication - suspend fun addSubscriptionWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Subscription { - // If we start/stop too quickly, we might have the address already in use! Retry a few times. - var count = 10 - var exception: Exception? = null - while (count-- > 0) { - try { - return aeron.addSubscription(uri, streamId) - } catch (e: Exception) { - exception = e - logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5000) - } - } - - throw exception!! - } - - suspend fun addPublicationWithRetry(aeron: Aeron, uri: String, streamId: Int, logger: KLogger): Publication { - // If we start/stop too quickly, we might have the address already in use! Retry a few times. - var count = 10 - var exception: Exception? = null - while (count-- > 0) { - try { - return aeron.addPublication(uri, streamId) - } catch (e: Exception) { - exception = e - logger.warn { "Unable to add a publication to Aeron. Retrying $count more times..." } - delay(5_000) - } - } - - throw exception!! - } - @Throws(ClientTimedOutException::class) - abstract suspend fun buildClient(aeron: Aeron, logger: KLogger) - abstract suspend fun buildServer(aeron: Aeron, logger: KLogger) + abstract suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) + abstract suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false) abstract fun clientInfo() : String abstract fun serverInfo() : String diff --git a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt index 8951f73e..704d952a 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverClientConnection.kt @@ -19,7 +19,6 @@ package dorkbox.network.aeron import dorkbox.netUtil.IP import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientTimedOutException -import io.aeron.Aeron import io.aeron.ChannelUriStringBuilder import kotlinx.coroutines.delay import mu.KLogger @@ -62,7 +61,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, private fun uri(): ChannelUriStringBuilder { val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp") - if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { builder.sessionId(sessionId) } @@ -70,7 +69,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, } @Suppress("DuplicatedCode") - override suspend fun buildClient(aeron: Aeron, logger: KLogger) { + override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) { val aeronAddressString = aeronConnectionString(address) // Create a publication at the given address and port, using the given stream ID. @@ -99,8 +98,8 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, // publication of any state to other threads and not be long running or re-entrant with the client. // on close, the publication CAN linger (in case a client goes away, and then comes back) // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) - val publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - val subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) + val publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) + val subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId) var success = false @@ -150,14 +149,14 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress, override fun clientInfo(): String { address - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { "Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" } else { "Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" } } - override suspend fun buildServer(aeron: Aeron, logger: KLogger) { + override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { throw ClientException("Server info not implemented in Client MDC") } override fun serverInfo(): String { diff --git a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt index e59e27fb..da140684 100644 --- a/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt +++ b/src/dorkbox/network/aeron/UdpMediaDriverServerConnection.kt @@ -20,7 +20,6 @@ import dorkbox.netUtil.IP import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 import dorkbox.network.exceptions.ServerException -import io.aeron.Aeron import io.aeron.ChannelUriStringBuilder import mu.KLogger import java.net.Inet4Address @@ -41,7 +40,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres var success: Boolean = false - private fun aeronConnectionString(ipAddress: InetAddress): String { + protected fun aeronConnectionString(ipAddress: InetAddress): String { return if (ipAddress is Inet4Address) { ipAddress.hostAddress } else { @@ -55,9 +54,9 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres } } - private fun uri(): ChannelUriStringBuilder { + protected fun uri(): ChannelUriStringBuilder { val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp") - if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { builder.sessionId(sessionId) } @@ -65,11 +64,11 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres } @Suppress("DuplicatedCode") - override suspend fun buildClient(aeron: Aeron, logger: KLogger) { + override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) { throw ServerException("Client info not implemented in Server MDC") } - override suspend fun buildServer(aeron: Aeron, logger: KLogger) { + override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) { val connectionString = aeronConnectionString(listenAddress) // Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID. @@ -100,8 +99,8 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres // AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param) // If we start/stop too quickly, we might have the address already in use! Retry a few times. - publication = addPublicationWithRetry(aeron, publicationUri.build(), streamId, logger) - subscription = addSubscriptionWithRetry(aeron, subscriptionUri.build(), streamId, logger) + publication = aeronDriver.addPublicationWithRetry(publicationUri, streamId) + subscription = aeronDriver.addSubscriptionWithRetry(subscriptionUri, streamId) } override fun clientInfo(): String { @@ -119,7 +118,7 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres IP.toString(listenAddress) } - return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) { "Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" } else { "Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 684fe3fe..d0db5103 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -416,7 +416,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { } // on close, we want to make sure this file is DELETED! - val logFile = endPoint.getMediaDriverPublicationFile(publication.registrationId()) + val logFile = endPoint.aeronDriver.getMediaDriverPublicationFile(publication.registrationId()) publication.close() closeTimeoutTime = System.currentTimeMillis() + timeOut diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 5de189e9..b483eb4a 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -19,7 +19,7 @@ import dorkbox.network.Client import dorkbox.network.Configuration import dorkbox.network.Server import dorkbox.network.ServerConfiguration -import dorkbox.network.aeron.AeronConfig +import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.coroutines.SuspendWaiter import dorkbox.network.exceptions.MessageNotRegisteredException @@ -33,9 +33,7 @@ import dorkbox.network.rmi.messages.RmiMessage import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.Serialization import dorkbox.network.storage.SettingsStore -import dorkbox.util.NamedThreadFactory import dorkbox.util.exceptions.SecurityException -import io.aeron.Aeron import io.aeron.Publication import io.aeron.driver.MediaDriver import io.aeron.logbuffer.Header @@ -47,8 +45,6 @@ import kotlinx.coroutines.runBlocking import mu.KLogger import mu.KotlinLogging import org.agrona.DirectBuffer -import org.agrona.concurrent.BackoffIdleStrategy -import java.io.File // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! @@ -77,9 +73,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A internal val listenerManager = ListenerManager() internal val connections = ConnectionManager() - internal val aeron: Aeron - private var mediaDriver: MediaDriver? = null - internal val mediaDriverContext: MediaDriver.Context + internal val aeronDriver: AeronDriver /** * Returns the serialization wrapper if there is an object type that needs to be added outside of the basic types. @@ -134,42 +128,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A // Only starts the media driver if we are NOT already running! try { - mediaDriver = AeronConfig.startDriver(config, type, logger) + AeronDriver.validateConfig(config, logger) + aeronDriver = AeronDriver(config, type, logger) + aeronDriver.start() } catch (e: Exception) { listenerManager.notifyError(e) throw e } - - require(config.context != null) { "Configuration context cannot be properly created. Unable to continue!" } - mediaDriverContext = config.context!! - - val aeronContext = Aeron.Context() - aeronContext - .aeronDirectoryName(mediaDriverContext.aeronDirectory().path) - .concludeAeronDirectory() - - val threadFactory = NamedThreadFactory("Thread", ThreadGroup("${type.simpleName}-AeronClient"),true) - aeronContext - .threadFactory(threadFactory) - .idleStrategy(BackoffIdleStrategy()) - - // we DO NOT want to abort the JVM if there are errors. - aeronContext.errorHandler { error -> - logger.error("Error in Aeron", error) - } - - try { - aeron = Aeron.connect(aeronContext) - } catch (e: Exception) { - try { - mediaDriver?.close() - } catch (secondaryException: Exception) { - e.addSuppressed(secondaryException) - } - - listenerManager.notifyError(e) - throw e - } } /** @@ -180,17 +145,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } - internal fun initEndpointState(): Aeron { + internal fun initEndpointState() { shutdown.getAndSet(false) shutdownWaiter = SuspendWaiter() - return aeron - } - - /** - * @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!) - */ - internal fun getMediaDriverPublicationFile(publicationRegId: Long): File { - return mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publicationRegId}.logbuffer") } abstract fun newException(message: String, cause: Throwable? = null): Throwable @@ -637,8 +594,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A * @return true if the media driver is active and running */ fun isRunning(): Boolean { - // if the media driver is running, it will be a quick connection. Usually 100ms or so - return mediaDriverContext.isDriverActive(1_000) { } + return aeronDriver.isRunning() } /** @@ -655,14 +611,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A if (shutdown.compareAndSet(expect = false, update = true)) { logger.info { "Shutting down..." } - try { - aeron.close() - } catch (e: Exception) { - logger.error("Error stopping aeron.", e) - } - runBlocking { - AeronConfig.stopDriver(mediaDriver, logger) + aeronDriver.close() + connections.forEach { it.close() } @@ -677,8 +628,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A close0() - (aeron.context().threadFactory() as NamedThreadFactory).group.destroy() - // if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now) shutdownWaiter.cancel() } diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index 6fc9b8ac..2e40eee7 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -21,7 +21,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause import com.github.benmanes.caffeine.cache.RemovalListener import dorkbox.network.Server import dorkbox.network.ServerConfiguration -import dorkbox.network.aeron.AeronConfig +import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverPairedConnection import dorkbox.network.connection.Connection @@ -29,7 +29,6 @@ import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.exceptions.* -import io.aeron.Aeron import io.aeron.Publication import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch @@ -72,7 +71,7 @@ internal class ServerHandshake(private val logger: KLog private val connectionsPerIpCounts = ConnectionCounts() // guarantee that session/stream ID's will ALWAYS be unique! (there can NEVER be a collision!) - private val sessionIdAllocator = RandomIdAllocator(AeronConfig.RESERVED_SESSION_ID_LOW, AeronConfig.RESERVED_SESSION_ID_HIGH) + private val sessionIdAllocator = RandomIdAllocator(AeronDriver.RESERVED_SESSION_ID_LOW, AeronDriver.RESERVED_SESSION_ID_HIGH) private val streamIdAllocator = RandomIdAllocator(1, Integer.MAX_VALUE) @@ -201,7 +200,7 @@ internal class ServerHandshake(private val logger: KLog handshakePublication: Publication, sessionId: Int, message: HandshakeMessage, - aeron: Aeron) { + aeronDriver: AeronDriver) { val connectionString = "IPC" @@ -278,7 +277,7 @@ internal class ServerHandshake(private val logger: KLog // we have to construct how the connection will communicate! runBlocking { - clientConnection.buildServer(aeron, logger) + clientConnection.buildServer(aeronDriver, logger, true) } logger.info { @@ -345,7 +344,7 @@ internal class ServerHandshake(private val logger: KLog clientAddressString: String, clientAddress: InetAddress, message: HandshakeMessage, - aeron: Aeron, + aeronDriver: AeronDriver, isIpv6Wildcard: Boolean) { if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, clientAddressString)) { @@ -444,7 +443,7 @@ internal class ServerHandshake(private val logger: KLog // we have to construct how the connection will communicate! runBlocking { - clientConnection.buildServer(aeron, logger) + clientConnection.buildServer(aeronDriver, logger, true) } logger.info { diff --git a/test/dorkboxTest/network/ConnectionFilterTest.kt b/test/dorkboxTest/network/ConnectionFilterTest.kt index ffb6b7f5..4d69d717 100644 --- a/test/dorkboxTest/network/ConnectionFilterTest.kt +++ b/test/dorkboxTest/network/ConnectionFilterTest.kt @@ -46,7 +46,12 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } @@ -92,7 +97,12 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } @@ -137,7 +147,12 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } @@ -182,7 +197,12 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } @@ -220,7 +240,12 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } @@ -253,20 +278,18 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } waitForThreads() } - - - - - - - @Test fun acceptAllCustomServer() { val serverConnectSuccess = atomic(false) @@ -303,7 +326,12 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } @@ -350,7 +378,12 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } @@ -390,11 +423,16 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK, Long.MAX_VALUE) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } - waitForThreads(Long.MAX_VALUE) + waitForThreads() } @Test(expected = ClientException::class) @@ -425,19 +463,15 @@ class ConnectionFilterTest : BaseTest() { } runBlocking { - client.connect(LOOPBACK) + try { + client.connect(LOOPBACK) + } catch (e: Exception) { + stopEndPoints() + throw e + } } } waitForThreads() } - - - - - - - - - } diff --git a/test/dorkboxTest/network/DisconnectReconnectTest.kt b/test/dorkboxTest/network/DisconnectReconnectTest.kt index 5d8f6a7d..82ea7dd9 100644 --- a/test/dorkboxTest/network/DisconnectReconnectTest.kt +++ b/test/dorkboxTest/network/DisconnectReconnectTest.kt @@ -2,7 +2,7 @@ package dorkboxTest.network import dorkbox.network.Client import dorkbox.network.Server -import dorkbox.network.aeron.AeronConfig +import dorkbox.network.aeron.AeronDriver import dorkbox.network.connection.Connection import kotlinx.atomicfu.atomic import kotlinx.coroutines.delay @@ -72,8 +72,9 @@ class DisconnectReconnectTest : BaseTest() { @Test fun manualMediaDriverAndReconnectClient() { val serverConfiguration = serverConfig() - val mediaDriver = runBlocking { - AeronConfig.startDriver(serverConfiguration) + val aeronDriver = runBlocking { + AeronDriver.validateConfig(serverConfiguration) + AeronDriver(serverConfiguration) } run { @@ -124,7 +125,7 @@ class DisconnectReconnectTest : BaseTest() { waitForThreads() runBlocking { - AeronConfig.stopDriver(mediaDriver) + aeronDriver.close() } System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)