diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 84475659..72a99a5b 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -15,8 +15,10 @@ */ package dorkbox.network +import dorkbox.netUtil.IP import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 +import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.connection.Connection @@ -85,6 +87,11 @@ open class Client(config: Configuration = Configuration if (config.networkMtuSize <= 0) { throw ClientException("configuration networkMtuSize must be > 0") } if (config.networkMtuSize >= 9 * 1024) { throw ClientException("configuration networkMtuSize must be < ${9 * 1024}") } + + + if (config.enableIpc && config.aeronDirectoryForceUnique) { + require(false) { "IPC and forcing a unique Aeron directory are incompatible!" } + } } override fun newException(message: String, cause: Throwable?): Throwable { @@ -168,8 +175,8 @@ open class Client(config: Configuration = Configuration connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) { // Default IPC ports are flipped because they are in the perspective of the SERVER connect(remoteAddress = remoteAddress, - ipcPublicationId = IPC_HANDSHAKE_STREAM_ID_SUB, - ipcSubscriptionId = IPC_HANDSHAKE_STREAM_ID_PUB, + ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB, + ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB, connectionTimeoutMS = connectionTimeoutMS, reliable = reliable) } @@ -186,8 +193,8 @@ open class Client(config: Configuration = Configuration * @throws ClientRejectedException if the client connection is rejected */ @Suppress("DuplicatedCode") - suspend fun connect(ipcPublicationId: Int = IPC_HANDSHAKE_STREAM_ID_SUB, - ipcSubscriptionId: Int = IPC_HANDSHAKE_STREAM_ID_PUB, + suspend fun connect(ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB, + ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB, connectionTimeoutMS: Long = 30_000L) { // Default IPC ports are flipped because they are in the perspective of the SERVER @@ -227,8 +234,8 @@ open class Client(config: Configuration = Configuration @Suppress("DuplicatedCode") private suspend fun connect(remoteAddress: InetAddress? = null, // Default IPC ports are flipped because they are in the perspective of the SERVER - ipcPublicationId: Int = IPC_HANDSHAKE_STREAM_ID_SUB, - ipcSubscriptionId: Int = IPC_HANDSHAKE_STREAM_ID_PUB, + ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB, + ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB, connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) { // this will exist ONLY if we are reconnecting via a "disconnect" callback lockStepForReconnect.value?.doWait() @@ -239,65 +246,96 @@ open class Client(config: Configuration = Configuration } lockStepForReconnect.lazySet(null) + + // localhost/loopback IP might not always be 127.0.0.1 or ::1 + this.remoteAddress0 = remoteAddress connection0 = null // we are done with initial configuration, now initialize aeron and the general state of this endpoint val aeron = initEndpointState() - // only change LOCALHOST -> IPC if the media driver is ALREADY running LOCALLY! - val canAutoChangeToIpc = config.enableIpcForLoopback && isRunning() - if (canAutoChangeToIpc) { - logger.info("Media driver is already running. Support for auto-switch LOCALHOST -> IPC is enabled") + if (config.enableIpc && config.aeronDirectoryForceUnique) { + require(false) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" } } // only try to connect via IPv4 if we have a network interface that supports it! if (remoteAddress is Inet4Address && !IPv4.isAvailable) { - require(false) { "Unable to connect to the IPv4 address $remoteAddress, there are no IPv4 interfaces available!"} + require(false) { "Unable to connect to the IPv4 address ${IPv4.toString(remoteAddress)}, there are no IPv4 interfaces available!" } } // only try to connect via IPv6 if we have a network interface that supports it! if (remoteAddress is Inet6Address && !IPv6.isAvailable) { - require(false) { "Unable to connect to the IPv6 address $remoteAddress, there are no IPv6 interfaces available!"} + require(false) { "Unable to connect to the IPv6 address ${IPv6.toString(remoteAddress)}, there are no IPv6 interfaces available!" } + } + + if (remoteAddress != null && remoteAddress.isAnyLocalAddress) { + require(false) { "Cannot connect to ${IP.toString(remoteAddress)} It is an invalid address!" } + } + + // only change LOCALHOST -> IPC if the media driver is ALREADY running LOCALLY! + var usingIPC = config.enableIpc && remoteAddress == null + val autoChangeToIpc = !usingIPC && config.enableIpcForLoopback && + remoteAddress != null && remoteAddress.isLoopbackAddress && isRunning(mediaDriverContext) + if (autoChangeToIpc) { + logger.info {"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC" } } - // NETWORK OR IPC ADDRESS - // if we connect to "loopback", then MAYBE we substitute if for IPC (with log message) + val handshake = ClientHandshake(config, crypto, this) + val handshakeConnection = if (autoChangeToIpc || usingIPC) { + // MAYBE the server doesn't have IPC enabled? If no, we need to connect via UDP instead + val test = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId, + streamId = ipcPublicationId, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + // "fast" connection timeout, since this is IPC + connectionTimeoutMS = 1000) - // localhost/loopback IP might not always be 127.0.0.1 or ::1 - if (remoteAddress == null) { - this.remoteAddress0 = null - } else if (remoteAddress.isAnyLocalAddress) { - throw IllegalArgumentException("Cannot connect to $remoteAddress It is an invalid address!") - } else if (canAutoChangeToIpc && remoteAddress.isLoopbackAddress) { - logger.info { "Auto-changing network connection from $remoteAddress -> IPC" } - this.remoteAddress0 = null - } else { - this.remoteAddress0 = remoteAddress - } + var success = false + // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports + try { + test.buildClient(aeron, logger) + success = true + } catch (e: Exception) { + // if we specified that we want to use IPC, then we have to throw the timeout exception, because there is no IPC + if (usingIPC) { + throw e + } + } - val handshake = ClientHandshake(logger, config, crypto, this) + if (success) { + usingIPC = true + test + } else { + logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" } - - val handshakeConnection = if (this.remoteAddress0 == null) { - IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId, - streamId = ipcPublicationId, - sessionId = RESERVED_SESSION_ID_INVALID) + // try a UDP connection instead + val test = UdpMediaDriverConnection(address = this.remoteAddress0!!, + publicationPort = config.subscriptionPort, + subscriptionPort = config.publicationPort, + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + connectionTimeoutMS = connectionTimeoutMS, + isReliable = reliable) + // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports + test.buildClient(aeron, logger) + test + } } else { - UdpMediaDriverConnection(address = this.remoteAddress0!!, - publicationPort = config.subscriptionPort, - subscriptionPort = config.publicationPort, - streamId = UDP_HANDSHAKE_STREAM_ID, - sessionId = RESERVED_SESSION_ID_INVALID, - connectionTimeoutMS = connectionTimeoutMS, - isReliable = reliable) + val test = UdpMediaDriverConnection(address = this.remoteAddress0!!, + publicationPort = config.subscriptionPort, + subscriptionPort = config.publicationPort, + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID, + connectionTimeoutMS = connectionTimeoutMS, + isReliable = reliable) + // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports + test.buildClient(aeron, logger) + test } - // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports - handshakeConnection.buildClient(aeron, logger) logger.info(handshakeConnection.clientInfo()) @@ -308,7 +346,7 @@ open class Client(config: Configuration = Configuration // VALIDATE:: check to see if the remote connection's public key has changed! - val validateRemoteAddress = if (this.remoteAddress0 == null) { + val validateRemoteAddress = if (usingIPC) { PublicKeyValidationState.VALID } else { crypto.validateRemoteAddress(this.remoteAddress0!!, connectionInfo.publicKey) @@ -316,7 +354,7 @@ open class Client(config: Configuration = Configuration if (validateRemoteAddress == PublicKeyValidationState.INVALID) { handshakeConnection.close() - val exception = ClientRejectedException("Connection to $remoteAddress not allowed! Public key mismatch.") + val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} not allowed! Public key mismatch.") listenerManager.notifyError(exception) throw exception } @@ -328,7 +366,7 @@ open class Client(config: Configuration = Configuration // we are now connected, so we can connect to the NEW client-specific ports - val reliableClientConnection = if (this.remoteAddress0 == null) { + val reliableClientConnection = if (usingIPC) { IpcMediaDriverConnection(sessionId = connectionInfo.sessionId, // NOTE: pub/sub must be switched! streamIdSubscription = connectionInfo.publicationPort, @@ -366,13 +404,13 @@ open class Client(config: Configuration = Configuration // because we are getting the class registration details from the SERVER, this should never be the case. // It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers) - val exception = ClientRejectedException("Connection to $remoteAddress has incorrect class registration details!!") + val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!") listenerManager.notifyError(exception) throw exception } - val newConnection = if (this.remoteAddress0 == null) { + val newConnection = if (usingIPC) { newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID)) } else { newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress)) @@ -383,7 +421,7 @@ open class Client(config: Configuration = Configuration val permitConnection = listenerManager.notifyFilter(newConnection) if (!permitConnection) { handshakeConnection.close() - val exception = ClientRejectedException("Connection to $remoteAddress was not permitted!") + val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} was not permitted!") listenerManager.notifyError(exception) throw exception } @@ -414,11 +452,10 @@ open class Client(config: Configuration = Configuration } connection0 = newConnection - connections.add(newConnection) + addConnection(newConnection) // tell the server our connection handshake is done, and the connection can now listen for data. val canFinishConnecting = handshake.handshakeDone(handshakeConnection, connectionTimeoutMS) - if (canFinishConnecting) { isConnected = true diff --git a/src/dorkbox/network/Configuration.kt b/src/dorkbox/network/Configuration.kt index bc9a6e24..176ed906 100644 --- a/src/dorkbox/network/Configuration.kt +++ b/src/dorkbox/network/Configuration.kt @@ -15,10 +15,10 @@ */ package dorkbox.network +import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.CoroutineBackoffIdleStrategy import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy -import dorkbox.network.connection.EndPoint import dorkbox.network.serialization.Serialization import dorkbox.network.storage.PropertyStore import dorkbox.network.storage.SettingsStore @@ -31,21 +31,6 @@ import mu.KLogger import java.io.File class ServerConfiguration : dorkbox.network.Configuration() { - /** - * Enables the ability to use the IPv4 network stack. - */ - var enableIPv4 = true - - /** - * Enables the ability to use the IPv6 network stack. - */ - var enableIPv6 = true - - /** - * Enables the ability use IPC (Inter Process Communication) - */ - var enableIPC = true - /** * The address for the server to listen on. "*" will accept connections from all interfaces, otherwise specify * the hostname (or IP) to bind to. @@ -65,15 +50,39 @@ class ServerConfiguration : dorkbox.network.Configuration() { /** * The IPC Publication ID is used to define what ID the server will send data on. The client IPC subscription ID must match this value. */ - var ipcPublicationId = EndPoint.IPC_HANDSHAKE_STREAM_ID_PUB + var ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB /** * The IPC Subscription ID is used to define what ID the server will receive data on. The client IPC publication ID must match this value. */ - var ipcSubscriptionId = EndPoint.IPC_HANDSHAKE_STREAM_ID_SUB + var ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB } open class Configuration { + /** + * Enables the ability to use the IPv4 network stack. + */ + var enableIPv4 = true + + /** + * Enables the ability to use the IPv6 network stack. + */ + var enableIPv6 = true + + /** + * Enables the ability use IPC (Inter Process Communication). + * + * Aeron must be running in the same location for the client/server in order for this to work + */ + var enableIpc = true + + /** + * Permit loopback connections to use IPC instead of UDP for communicating, if possible. IPC is about 4x faster than UDP in loopback situations. + * + * This configuration only affects the client + */ + var enableIpcForLoopback: Boolean = true + /** * When connecting to a remote client/server, should connections be allowed if the remote machine signature has changed? * @@ -101,13 +110,6 @@ open class Configuration { */ var subscriptionPort: Int = 0 - /** - * Permit loopback connections to use IPC instead of UDP for communicating. IPC is about 4x faster than UDP in loopback situations. - * - * This configuration only affects the client - */ - var enableIpcForLoopback: Boolean = true - /** * How long a connection must be disconnected before we cleanup the memory associated with it */ @@ -181,9 +183,14 @@ open class Configuration { var threadingMode = ThreadingMode.SHARED /** - * Log Buffer Locations for the Media Driver. The default location is a TEMP dir. This must be unique PER application and instance! + * Aeron location for the Media Driver. The default location is a TEMP dir. */ - var aeronLogDirectory: File? = null + var aeronDirectory: File? = null + + /** + * Should we force the Aeron location to be unique for every instance? This is mutually exclusive with IPC. + */ + var aeronDirectoryForceUnique = false /** * The Aeron MTU value impacts a lot of things. diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 63265c4c..990e8874 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -18,6 +18,7 @@ package dorkbox.network import dorkbox.netUtil.IP import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 +import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.AeronPoller import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverConnection @@ -64,12 +65,8 @@ open class Server(config: ServerConfiguration = ServerC * @return true if the configuration matches and can connect (but not verify) to the TCP control socket. */ fun isRunning(configuration: ServerConfiguration): Boolean { - val server = Server(configuration) - - val running = server.isRunning() - server.close() - - return running + val context = AeronConfig.createContext(configuration) + return AeronConfig.isRunning(context) } } @@ -112,20 +109,24 @@ open class Server(config: ServerConfiguration = ServerC require(config.listenIpAddress.isNotBlank()) { "Blank listen IP address, cannot continue"} // can't disable everything! - if (!config.enableIPC && !config.enableIPv4 && !config.enableIPv6) { + if (!config.enableIpc && !config.enableIPv4 && !config.enableIPv6) { require(false) { "At least one of IPC/IPv4/IPv6 must be enabled!" } } // have to verify if it's the only thing specified, is IPv4 available... - if (!config.enableIPC && !config.enableIPv6 && config.enableIPv4 && !IPv4.isAvailable) { + if (!config.enableIpc && !config.enableIPv6 && config.enableIPv4 && !IPv4.isAvailable) { require(false) { "IPC/IPv6 are disabled and IPv4 is enabled, but there is no IPv4 interface available!" } } // have to verify if it's the only thing specified, is IPv4 available... - if (!config.enableIPC && !config.enableIPv4 && config.enableIPv6 && !IPv6.isAvailable) { + if (!config.enableIpc && !config.enableIPv4 && config.enableIPv6 && !IPv6.isAvailable) { require(false) { "IPC/IPv4 are disabled and IPv6 is enabled, but there is no IPv6 interface available!" } } + if (config.enableIpc && config.aeronDirectoryForceUnique) { + require(false) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" } + } + // localhost/loopback IP might not always be 127.0.0.1 or ::1 // We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this) @@ -178,10 +179,10 @@ open class Server(config: ServerConfiguration = ServerC private fun getIpcPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller { - val poller = if (config.enableIPC) { + val poller = if (config.enableIpc) { val driver = IpcMediaDriverConnection(streamIdSubscription = config.ipcSubscriptionId, streamId = config.ipcPublicationId, - sessionId = RESERVED_SESSION_ID_INVALID) + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID) driver.buildServer(aeron, logger) val publication = driver.publication val subscription = driver.subscription @@ -223,8 +224,8 @@ open class Server(config: ServerConfiguration = ServerC val driver = UdpMediaDriverConnection(address = listenIPv4Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, - streamId = UDP_HANDSHAKE_STREAM_ID, - sessionId = RESERVED_SESSION_ID_INVALID) + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID) driver.buildServer(aeron, logger) val publication = driver.publication @@ -291,8 +292,8 @@ open class Server(config: ServerConfiguration = ServerC val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, - streamId = UDP_HANDSHAKE_STREAM_ID, - sessionId = RESERVED_SESSION_ID_INVALID) + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID) driver.buildServer(aeron, logger) val publication = driver.publication @@ -358,8 +359,8 @@ open class Server(config: ServerConfiguration = ServerC val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, - streamId = UDP_HANDSHAKE_STREAM_ID, - sessionId = RESERVED_SESSION_ID_INVALID) + streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID, + sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID) driver.buildServer(aeron, logger) val publication = driver.publication @@ -605,34 +606,6 @@ open class Server(config: ServerConfiguration = ServerC } } - /** - * TODO: when adding a "custom" connection, it's super important to not have to worry about the sessionID (which is what we key off of) - * Adds a custom connection to the server. - * - * This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and - * you want *this* server instance to manage listeners + message dispatch - * - * @param connection the connection to add - */ - fun addConnection(connection: CONNECTION) { - connections.add(connection) - } - - /** - * TODO: when adding a "custom" connection, it's super important to not have to worry about the sessionID (which is what we key off of) - * Removes a custom connection to the server. - * - * - * This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and - * you want *this* server instance to manage listeners + message dispatch - * - * @param connection the connection to remove - */ - fun removeConnection(connection: CONNECTION) { - connections.remove(connection) - } - - /** * Closes the server and all it's connections. After a close, you may call 'bind' again. */ diff --git a/src/dorkbox/network/aeron/AeronConfig.kt b/src/dorkbox/network/aeron/AeronConfig.kt new file mode 100644 index 00000000..36c692f1 --- /dev/null +++ b/src/dorkbox/network/aeron/AeronConfig.kt @@ -0,0 +1,232 @@ +package dorkbox.network.aeron + +import dorkbox.network.Configuration +import dorkbox.util.NamedThreadFactory +import io.aeron.Publication +import io.aeron.driver.MediaDriver +import mu.KLogger +import mu.KotlinLogging +import java.io.File + +/** + * + */ +internal object AeronConfig { + /** + * Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW + */ + const val RESERVED_SESSION_ID_INVALID = 0 + + /** + * The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0! + */ + const val RESERVED_SESSION_ID_LOW = 1 + + /** + * The inclusive upper bound of the reserved sessions range. + */ + const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE + + const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe + const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de + const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3 + + + fun errorCodeName(result: Long): String { + return when (result) { + // The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go. + Publication.NOT_CONNECTED -> "Not connected" + + // The offer failed due to back pressure from the subscribers preventing further transmission. + Publication.BACK_PRESSURED -> "Back pressured" + + // The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt. + Publication.ADMIN_ACTION -> "Administrative action" + + // The Publication has been closed and should no longer be used. + Publication.CLOSED -> "Publication is closed" + + // If this happens then the publication should be closed and a new one added. To make it less likely to happen then increase the term buffer length. + Publication.MAX_POSITION_EXCEEDED -> "Maximum term position exceeded" + + else -> throw IllegalStateException("Unknown error code: $result") + } + } + + private fun create(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")): MediaDriver.Context { + /* + * Linux + * Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and + * net.core.wmem_max to allow larger SO_SNDBUF values to be set. + * + * Windows + * Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so. + * + * Mac/Darwin + * + * Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB. + */ + if (config.receiveBufferSize == 0) { + config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT +// when { +// OS.isLinux() -> +// OS.isWindows() -> +// OS.isMacOsX() -> +// } + +// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") +// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") + } + + + if (config.sendBufferSize == 0) { + config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT +// when { +// OS.isLinux() -> +// OS.isWindows() -> +// OS.isMacOsX() -> +// } + +// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") +// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") + } + + + /* + * Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir). + * + * You can create a RAM disk with the following command: + * + * $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))` + * + * where: + * + * DISK_NAME should be replaced with a name of your choice. + * SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk). + * + * For example, the following command creates a RAM disk named DevShm which is 2GB in size: + * + * $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))` + * + * After this command is executed the new disk will be mounted under /Volumes/DevShm. + */ + if (config.aeronDirectory == null) { + val baseFileLocation = config.suggestAeronLogLocation(logger) + +// val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName) + val aeronLogDirectory = File(baseFileLocation, "aeron") + config.aeronDirectory = aeronLogDirectory + } + + val threadFactory = NamedThreadFactory("Aeron", false) + + // LOW-LATENCY SETTINGS + // .termBufferSparseFile(false) + // .useWindowsHighResTimer(true) + // .threadingMode(ThreadingMode.DEDICATED) + // .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE) + // .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE) + // .senderIdleStrategy(NoOpIdleStrategy.INSTANCE); + // setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); + // setProperty("aeron.mtu.length", "16384"); + // setProperty("aeron.socket.so_sndbuf", "2097152"); + // setProperty("aeron.socket.so_rcvbuf", "2097152"); + // setProperty("aeron.rcv.initial.window.length", "2097152"); + + // driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind) + val context = MediaDriver.Context() + .publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW) + .publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH) + .conductorThreadFactory(threadFactory) + .receiverThreadFactory(threadFactory) + .senderThreadFactory(threadFactory) + .sharedNetworkThreadFactory(threadFactory) + .sharedThreadFactory(threadFactory) + .threadingMode(config.threadingMode) + .mtuLength(config.networkMtuSize) + .socketSndbufLength(config.sendBufferSize) + .socketRcvbufLength(config.receiveBufferSize) + + context + .aeronDirectoryName(config.aeronDirectory!!.absolutePath) + + if (context.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) { + // default 64 megs each is HUGE + context.ipcTermBufferLength(8 * 1024 * 1024) + } + + if (context.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) { + // default 16 megs each is HUGE (we run out of space in production w/ lots of clients) + context.publicationTermBufferLength(2 * 1024 * 1024) + } + + val aeronDir = File(context.aeronDirectoryName()).absoluteFile + context.aeronDirectoryName(aeronDir.path) + + return context + } + + /** + * Creates the Aeron Media Driver context + * + * @throws IllegalArgumentException if the aeron media driver directory cannot be setup + */ + fun createContext(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")): MediaDriver.Context { + var context = create(config, logger) + + // will setup the aeron directory or throw IllegalArgumentException if it cannot be configured + var aeronDir = context.aeronDirectory() + + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() + + var isRunning = isRunning(context) + + // this is incompatible with IPC, and will not be set if IPC is enabled + if (config.aeronDirectoryForceUnique && isRunning) { + val savedParent = aeronDir.parentFile + var retry = 0 + val retryMax = 100 + + while (config.aeronDirectoryForceUnique && isRunning) { + if (retry++ > retryMax) { + throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.") + } + + val randomNum = (1..retryMax).shuffled().first() + val newDir = savedParent.resolve("${aeronDir.name}_$randomNum") + + context = create(config, logger) + context.aeronDirectoryName(newDir.path) + + // this happens EXACTLY once. Must be BEFORE the "isRunning" check! + context.concludeAeronDirectory() + + isRunning = isRunning(context) + } + } + + aeronDir = context.aeronDirectory() + + + // make sure we start over! + if (!isRunning && aeronDir.exists()) { + // try to delete the dir + if (!aeronDir.deleteRecursively()) { + logger.warn { "Unable to delete the aeron directory $aeronDir. Aeron was not running when this was attempted." } + } + } + + return context + } + + /** + * Checks to see if an endpoint (using the specified configuration) is running. + * + * @return true if the media driver is active and running + */ + fun isRunning(context: MediaDriver.Context): Boolean { + // if the media driver is running, it will be a quick connection. Usually 100ms or so + return context.isDriverActive(1_000) { } + } +} diff --git a/src/dorkbox/network/aeron/MediaDriverConnection.kt b/src/dorkbox/network/aeron/MediaDriverConnection.kt index f6b732f6..925da743 100644 --- a/src/dorkbox/network/aeron/MediaDriverConnection.kt +++ b/src/dorkbox/network/aeron/MediaDriverConnection.kt @@ -20,7 +20,6 @@ package dorkbox.network.aeron import dorkbox.netUtil.IP import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv6 -import dorkbox.network.connection.EndPoint import dorkbox.network.exceptions.ClientTimedOutException import io.aeron.Aeron import io.aeron.ChannelUriStringBuilder @@ -84,7 +83,7 @@ class UdpMediaDriverConnection(override val address: InetAddress, private fun uri(): ChannelUriStringBuilder { val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp") - if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { + if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { builder.sessionId(sessionId) } @@ -195,7 +194,7 @@ class UdpMediaDriverConnection(override val address: InetAddress, override fun clientInfo(): String { - return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { "Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" } else { "Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" @@ -213,7 +212,7 @@ class UdpMediaDriverConnection(override val address: InetAddress, IP.toString(address) } - return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { "Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" } else { "Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" @@ -253,7 +252,7 @@ class IpcMediaDriverConnection(override val streamId: Int, private fun uri(): ChannelUriStringBuilder { val builder = ChannelUriStringBuilder().media("ipc") - if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { + if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { builder.sessionId(sessionId) } @@ -345,7 +344,7 @@ class IpcMediaDriverConnection(override val streamId: Int, } override fun clientInfo() : String { - return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { "[$sessionId] aeron connection established to [$streamIdSubscription|$streamId]" } else { "Connecting handshake to IPC [$streamIdSubscription|$streamId]" @@ -353,7 +352,7 @@ class IpcMediaDriverConnection(override val streamId: Int, } override fun serverInfo() : String { - return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { + return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) { "[$sessionId] IPC listening on [$streamIdSubscription|$streamId] " } else { "Listening handshake on IPC [$streamIdSubscription|$streamId]" diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 67297ad4..a5968613 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -120,7 +120,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { // The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) // The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this // counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) - private val aes_gcm_iv = atomic(0) +// private val aes_gcm_iv = atomic(0) // RMI support for this connection internal val rmiConnectionSupport = endPoint.getRmiConnectionSupport() @@ -338,16 +338,15 @@ open class Connection(connectionParameters: ConnectionParams<*>) { } // on close, we want to make sure this file is DELETED! - val logFile = endPoint.mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publication.registrationId()}.logbuffer") - + val logFile = endPoint.getMediaDriverPublicationFile(publication.registrationId()) publication.close() - closeTimeoutTime = System.currentTimeMillis() + timeOut while (logFile.exists() && System.currentTimeMillis() < closeTimeoutTime) { if (logFile.delete()) { break } + delay(100) } if (logFile.exists()) { @@ -356,6 +355,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) { rmiConnectionSupport.clearProxyObjects() + endPoint.removeConnection(this) + + // This is set by the client so if there is a "connect()" call in the the disconnect callback, we can have proper // lock-stop ordering for how disconnect and connect work with each-other preCloseAction() diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 9fb758b8..8b5300cd 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -19,6 +19,7 @@ import dorkbox.network.Client import dorkbox.network.Configuration import dorkbox.network.Server import dorkbox.network.ServerConfiguration +import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.exceptions.MessageNotRegisteredException import dorkbox.network.handshake.HandshakeMessage @@ -31,7 +32,6 @@ import dorkbox.network.rmi.messages.RmiMessage import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.Serialization import dorkbox.network.storage.SettingsStore -import dorkbox.util.NamedThreadFactory import dorkbox.util.exceptions.SecurityException import io.aeron.Aeron import io.aeron.Publication @@ -67,51 +67,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A protected constructor(config: Configuration) : this(Client::class.java, config) protected constructor(config: ServerConfiguration) : this(Server::class.java, config) - - companion object { - /** - * Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW - */ - const val RESERVED_SESSION_ID_INVALID = 0 - - /** - * The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0! - */ - const val RESERVED_SESSION_ID_LOW = 1 - - /** - * The inclusive upper bound of the reserved sessions range. - */ - const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE - - const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe - const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de - const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3 - - - private fun errorCodeName(result: Long): String { - return when (result) { - // The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go. - Publication.NOT_CONNECTED -> "Not connected" - - // The offer failed due to back pressure from the subscribers preventing further transmission. - Publication.BACK_PRESSURED -> "Back pressured" - - // The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt. - Publication.ADMIN_ACTION -> "Administrative action" - - // The Publication has been closed and should no longer be used. - Publication.CLOSED -> "Publication is closed" - - // If this happens then the publication should be closed and a new one added. To make it less likely to happen then increase the term buffer length. - Publication.MAX_POSITION_EXCEEDED -> "Maximum term position exceeded" - - else -> throw IllegalStateException("Unknown error code: $result") - } - } - } - - val logger: KLogger = KotlinLogging.logger(type.simpleName) internal val actionDispatch = CoroutineScope(Dispatchers.Default) @@ -119,9 +74,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A internal val listenerManager = ListenerManager() internal val connections = ConnectionManager() - internal val mediaDriverContext: MediaDriver.Context + internal lateinit var mediaDriverContext: MediaDriver.Context private var mediaDriver: MediaDriver? = null - private var aeron: Aeron? = null + internal var aeron: Aeron? = null /** * Returns the serialization wrapper if there is an object type that needs to be added outside of the basic types. @@ -160,125 +115,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } } - // Aeron configuration - - /* - * Linux - * Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and - * net.core.wmem_max to allow larger SO_SNDBUF values to be set. - * - * Windows - * Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so. - * - * Mac/Darwin - * - * Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB. - */ - if (config.receiveBufferSize == 0) { - config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT -// when { -// OS.isLinux() -> -// OS.isWindows() -> -// OS.isMacOsX() -> -// } - -// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") -// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") - } - - - if (config.sendBufferSize == 0) { - config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT -// when { -// OS.isLinux() -> -// OS.isWindows() -> -// OS.isMacOsX() -> -// } - -// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") -// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") - } - - - /* - * Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir). - * - * You can create a RAM disk with the following command: - * - * $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))` - * - * where: - * - * DISK_NAME should be replaced with a name of your choice. - * SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk). - * - * For example, the following command creates a RAM disk named DevShm which is 2GB in size: - * - * $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))` - * - * After this command is executed the new disk will be mounted under /Volumes/DevShm. - */ - var aeronDirAlreadyExists = false - if (config.aeronLogDirectory == null) { - val baseFileLocation = config.suggestAeronLogLocation(logger) - -// val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName) - val aeronLogDirectory = File(baseFileLocation, "aeron") - aeronDirAlreadyExists = aeronLogDirectory.exists() - config.aeronLogDirectory = aeronLogDirectory - } - - logger.info("Aeron log directory: ${config.aeronLogDirectory}") - if (aeronDirAlreadyExists) { - logger.warn("Aeron log directory already exists! This might not be what you want!") - } - - val threadFactory = NamedThreadFactory("Aeron", false) - - // LOW-LATENCY SETTINGS - // .termBufferSparseFile(false) - // .useWindowsHighResTimer(true) - // .threadingMode(ThreadingMode.DEDICATED) - // .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE) - // .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE) - // .senderIdleStrategy(NoOpIdleStrategy.INSTANCE); - // setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); - // setProperty("aeron.mtu.length", "16384"); - // setProperty("aeron.socket.so_sndbuf", "2097152"); - // setProperty("aeron.socket.so_rcvbuf", "2097152"); - // setProperty("aeron.rcv.initial.window.length", "2097152"); - - // driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind) - val mDrivercontext = MediaDriver.Context() - .publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW) - .publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH) - .conductorThreadFactory(threadFactory) - .receiverThreadFactory(threadFactory) - .senderThreadFactory(threadFactory) - .sharedNetworkThreadFactory(threadFactory) - .sharedThreadFactory(threadFactory) - .threadingMode(config.threadingMode) - .mtuLength(config.networkMtuSize) - .socketSndbufLength(config.sendBufferSize) - .socketRcvbufLength(config.receiveBufferSize) - - mDrivercontext - .aeronDirectoryName(config.aeronLogDirectory!!.absolutePath) - .concludeAeronDirectory() - - if (mDrivercontext.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) { - // default 64 megs each is HUGE - mDrivercontext.ipcTermBufferLength(8 * 1024 * 1024) - } - - if (mDrivercontext.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) { - // default 16 megs each is HUGE (we run out of space in production w/ lots of clients) - mDrivercontext.publicationTermBufferLength(2 * 1024 * 1024) - } - - mediaDriverContext = mDrivercontext - - // serialization stuff serialization = config.serialization sendIdleStrategy = config.sendIdleStrategy @@ -292,27 +128,33 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } internal fun initEndpointState(): Aeron { - val aeronDirectory = config.aeronLogDirectory!!.absolutePath + // Aeron configuration + val context: MediaDriver.Context = AeronConfig.createContext(config, logger) - if (!isRunning()) { - logger.debug("Starting Aeron Media driver...") + logger.info { "Aeron log directory: ${context.aeronDirectory()}" } - mediaDriverContext + + if (!AeronConfig.isRunning(context)) { + logger.debug("Starting Aeron Media driver in ${context.aeronDirectory()}") + + context .dirDeleteOnStart(true) .dirDeleteOnShutdown(true) // the server always creates a the media driver. try { - mediaDriver = MediaDriver.launch(mediaDriverContext) + mediaDriver = MediaDriver.launch(context) } catch (e: Exception) { listenerManager.notifyError(e) throw e } } + + val aeronContext = Aeron.Context() aeronContext - .aeronDirectoryName(aeronDirectory) + .aeronDirectoryName(context.aeronDirectory().path) .concludeAeronDirectory() try { @@ -328,6 +170,8 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A throw e } + mediaDriverContext = context + shutdown.getAndSet(false) shutdownLatch = SuspendWaiter() @@ -335,9 +179,46 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A return aeron!! } + /** + * @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!) + */ + internal fun getMediaDriverPublicationFile(publicationRegId: Long): File { + return mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publicationRegId}.logbuffer") + } abstract fun newException(message: String, cause: Throwable? = null): Throwable + // used internally to remove a connection + internal fun removeConnection(connection: Connection) { + @Suppress("UNCHECKED_CAST") + removeConnection(connection as CONNECTION) + } + + + /** + * Adds a custom connection to the server. + * + * This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and + * you want *this* endpoint to manage listeners + message dispatch + * + * @param connection the connection to add + */ + fun addConnection(connection: CONNECTION) { + connections.add(connection) + } + + /** + * Removes a custom connection to the server. + * + * This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and + * you want *this* endpoint to manage listeners + message dispatch + * + * @param connection the connection to remove + */ + fun removeConnection(connection: CONNECTION) { + connections.remove(connection) + } + /** * Returns the property store used by this endpoint. The property store can store via properties, * a database, etc, or can be a "null" property store, which does nothing @@ -508,7 +389,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } // more critical error sending the message. we shouldn't retry or anything. - val exception = newException("[${publication.sessionId()}] Error sending handshake message. $message (${errorCodeName(result)})") + val exception = newException("[${publication.sessionId()}] Error sending handshake message. $message (${AeronConfig.errorCodeName(result)})") ListenerManager.cleanStackTraceInternal(exception) listenerManager.notifyError(exception) return @@ -681,7 +562,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } // more critical error sending the message. we shouldn't retry or anything. - logger.error("[${publication.sessionId()}] Error sending message. $message (${errorCodeName(result)})") + logger.error("[${publication.sessionId()}] Error sending message. $message (${AeronConfig.errorCodeName(result)})") return } @@ -734,14 +615,24 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A shutdownLatch.doWait() } +// /** +// * Checks to see if an endpoint (using the current configuration) is running. +// * +// * @return true if the media driver is active and running +// */ +// fun isRunning(): Boolean { +// // if the media driver is running, it will be a quick connection. Usually 100ms or so +// return configureMediaDriverContext().isDriverActive(1_000) { } +// } + /** * Checks to see if an endpoint (using the specified configuration) is running. * - * @return true if the client/server is active and running + * @return true if the media driver is active and running */ - fun isRunning(): Boolean { + fun isRunning(context: MediaDriver.Context): Boolean { // if the media driver is running, it will be a quick connection. Usually 100ms or so - return mediaDriverContext.isDriverActive(1_000) { } + return context.isDriverActive(1_000) { } } final override fun close() { diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index 57e57d3f..dc5c3ee2 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -22,11 +22,11 @@ import com.github.benmanes.caffeine.cache.RemovalListener import dorkbox.netUtil.IP import dorkbox.network.Server import dorkbox.network.ServerConfiguration +import dorkbox.network.aeron.AeronConfig import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.connection.Connection import dorkbox.network.connection.ConnectionParams -import dorkbox.network.connection.EndPoint import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.exceptions.AllocationException @@ -71,7 +71,7 @@ internal class ServerHandshake(private val logger: KLog private val connectionsPerIpCounts = ConnectionCounts() // guarantee that session/stream ID's will ALWAYS be unique! (there can NEVER be a collision!) - private val sessionIdAllocator = RandomIdAllocator(EndPoint.RESERVED_SESSION_ID_LOW, EndPoint.RESERVED_SESSION_ID_HIGH) + private val sessionIdAllocator = RandomIdAllocator(AeronConfig.RESERVED_SESSION_ID_LOW, AeronConfig.RESERVED_SESSION_ID_HIGH) private val streamIdAllocator = RandomIdAllocator(1, Integer.MAX_VALUE) @@ -109,7 +109,7 @@ internal class ServerHandshake(private val logger: KLog logger.trace { "[${pendingConnection.id}] Connection from client $connectionString done with handshake." } // this enables the connection to start polling for messages - server.connections.add(pendingConnection) + server.addConnection(pendingConnection) // now tell the client we are done runBlocking { diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index b7202cf3..3c2fa230 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -765,8 +765,9 @@ open class Serialization(private val references: Boolean = true, private val fac * # BLOCKING * * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + * + * @throws IOException */ - @Throws(IOException::class) override fun write(buffer: DirectBuffer, message: Any) { runBlocking { val kryo = takeKryo()