diff --git a/src/dorkbox/network/Configuration.kt b/src/dorkbox/network/Configuration.kt index 291c71e4..7d6e3e6d 100644 --- a/src/dorkbox/network/Configuration.kt +++ b/src/dorkbox/network/Configuration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import io.aeron.driver.exceptions.InvalidChannelException import io.aeron.exceptions.DriverTimeoutException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.asCoroutineDispatcher import mu.KLogger import org.agrona.SystemUtil import org.agrona.concurrent.AgentTerminationException @@ -140,11 +141,12 @@ class ServerConfiguration : dorkbox.network.Configuration() { class ClientConfiguration : dorkbox.network.Configuration() { /** - * Validates the current configuration + * Validates the current configuration. Throws an exception if there are problems. */ @Suppress("DuplicatedCode") override fun validate() { super.validate() + // have to do some basic validation of our configuration } @@ -154,6 +156,10 @@ class ClientConfiguration : dorkbox.network.Configuration() { if (!super.equals(other)) return false return true } + + override fun hashCode(): Int { + return super.hashCode() + } } abstract class Configuration { @@ -162,18 +168,33 @@ abstract class Configuration { @Volatile private var alreadyShownTempFsTips = false - } - /** - * Specifies the Java thread that will poll the underlying network for incoming messages - */ - var networkInterfaceEventDispatcher: ExecutorService = Executors.newSingleThreadExecutor( - NamedThreadFactory( "Network Event Dispatcher", Thread.currentThread().threadGroup, Thread.NORM_PRIORITY, true) - ) - set(value) { - require(!contextDefined) { errorMessage } - field = value + internal val networkThreadGroup = ThreadGroup("Network") + internal val aeronThreadFactory = NamedThreadFactory( "Aeron", networkThreadGroup, Thread.NORM_PRIORITY, true) + + private val defaultNetworkEventPoll = Executors.newSingleThreadExecutor( + NamedThreadFactory( "Poll Dispatcher", networkThreadGroup, Thread.NORM_PRIORITY, true) + ) + + private val defaultActionEventDispatcher = Executors.newSingleThreadExecutor( + NamedThreadFactory( "Event Dispatcher", networkThreadGroup, Thread.NORM_PRIORITY, true) + ).asCoroutineDispatcher() + + private val defaultEventCoroutineScope = CoroutineScope(defaultActionEventDispatcher) + private val defaultMessageCoroutineScope = CoroutineScope(Dispatchers.Default) + + private val defaultAeronFilter: (error: Throwable) -> Boolean = { error -> + // we suppress these because they are already handled + when { + error is InvalidChannelException || error.cause is InvalidChannelException -> { false } + error is ClosedByInterruptException || error.cause is ClosedByInterruptException -> { false } + error is DriverTimeoutException || error.cause is DriverTimeoutException -> { false } + error is AgentTerminationException || error.cause is AgentTerminationException-> { false } + error is BindException || error.cause is BindException -> { false } + else -> { true } + } } + } /** * Enables the ability to use the IPv4 network stack. @@ -289,20 +310,41 @@ abstract class Configuration { /** - * The dispatch responsible for executing events that arrive via the network. + * Specifies the Java thread that will poll the underlying network for incoming messages + */ + var networkEventPoll: ExecutorService = defaultNetworkEventPoll + set(value) { + require(!contextDefined) { errorMessage } + field = value + } + + + /** + * Responsible for executing connection/misc events * - * This is very specifically NOT 'CoroutineScope(Dispatchers.Default)', because it is very easy (and tricky) to make sure - * that there is no thread starvation going on, which can, and WILL happen. + * NOTE: This is very specifically NOT 'CoroutineScope(Dispatchers.Default)', because it is very easy (and tricky) to make sure + * that there is no thread starvation going on, which can, and WILL happen. + */ + var eventDispatch = defaultEventCoroutineScope + set(value) { + require(!contextDefined) { errorMessage } + field = value + } + + + /** + * Responsible for publishing messages that arrive via the network. * * Normally, events should be dispatched asynchronously across a thread pool, but in certain circumstances you may want to constrain this to a single thread dispatcher or other, custom dispatcher. */ - var dispatch = CoroutineScope(Dispatchers.Default) + var messageDispatch = defaultMessageCoroutineScope + set(value) { + require(!contextDefined) { errorMessage } + field = value + } + + - /** - * Allows the user to change how endpoint settings and public key information are saved. - * - * Note: This field is overridden for server configurations, so that the file used is different for client/server - */ /** * Allows the user to change how endpoint settings and public key information are saved. @@ -547,39 +589,23 @@ abstract class Configuration { * * @return true if the error message should be logged, false to suppress the error */ - var aeronErrorFilter: (error: Throwable) -> Boolean = { error -> - // we suppress these because they are already handled - when { - error is InvalidChannelException || error.cause is InvalidChannelException -> { false } - error is ClosedByInterruptException || error.cause is ClosedByInterruptException -> { false } - error is DriverTimeoutException || error.cause is DriverTimeoutException -> { false } - error is AgentTerminationException || error.cause is AgentTerminationException-> { false } - error is BindException || error.cause is BindException -> { false } - else -> { true } - } - } + var aeronErrorFilter: (error: Throwable) -> Boolean = defaultAeronFilter set(value) { require(!contextDefined) { errorMessage } field = value } - /** * Internal property that tells us if this configuration has already been configured and used to create and start the Media Driver */ @Volatile internal var contextDefined: Boolean = false - /** - * Internal property that tells us if this configuration has already been used in an endpoint - */ - @Volatile - internal var previouslyUsed = false /** * Depending on the OS, different base locations for the Aeron log directory are preferred. */ - fun suggestAeronLogLocation(logger: KLogger): File { + private fun suggestAeronLogLocation(logger: KLogger): File { return when { OS.isMacOsX -> { // does the recommended location exist?? @@ -610,7 +636,7 @@ abstract class Configuration { } /** - * Validates the current configuration + * Validates the current configuration. Throws an exception if there are problems. */ @Suppress("DuplicatedCode") open fun validate() { @@ -636,13 +662,169 @@ abstract class Configuration { require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" } require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" } + + require(sendBufferSize > 0) { "configuration socket send buffer must be > 0"} + require(receiveBufferSize > 0) { "configuration socket receive buffer must be > 0"} + require(ipcTermBufferLength > 65535) { "configuration IPC term buffer must be > 65535"} + require(ipcTermBufferLength < 1_073_741_824) { "configuration IPC term buffer must be < 1,073,741,824"} + require(publicationTermBufferLength > 65535) { "configuration publication term buffer must be > 65535"} + require(publicationTermBufferLength < 1_073_741_824) { "configuration publication term buffer must be < 1,073,741,824"} + + require(eventDispatch.coroutineContext != Dispatchers.Default) { "configuration of the eventDispatch.context must be it's own ThreadExecutor. It CANNOT be the default dispatch because there will be thread starvation"} + } + + internal fun setDefaults(logger: KLogger) { + // explicitly don't set defaults if we already have the context defined! + if (contextDefined) { + return + } + + /* + * Linux + * Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and + * net.core.wmem_max to allow larger SO_SNDBUF values to be set. + * + * Windows + * Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so. + * + * Mac/Darwin + * + * Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB. + */ + if (receiveBufferSize == 0) { + receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT + // when { + // OS.isLinux() -> + // OS.isWindows() -> + // OS.isMacOsX() -> + // } + + // val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") + // val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") + } + + + if (sendBufferSize == 0) { + sendBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT + // when { + // OS.isLinux() -> + // OS.isWindows() -> + // OS.isMacOsX() -> + // } + + // val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max") + // val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max") + } + + + /* + * Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir). + * + * You can create a RAM disk with the following command: + * + * $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))` + * + * where: + * + * DISK_NAME should be replaced with a name of your choice. + * SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk). + * + * For example, the following command creates a RAM disk named DevShm which is 2GB in size: + * + * $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))` + * + * After this command is executed the new disk will be mounted under /Volumes/DevShm. + */ + if (aeronDirectory == null) { + val baseFileLocation = suggestAeronLogLocation(logger) + val aeronLogDirectory = File(baseFileLocation, "aeron") + aeronDirectory = aeronLogDirectory + } + + aeronDirectory = aeronDirectory!!.absoluteFile + } + + + // internal class for making sure that the AeronDriver is not duplicated for the same configuration (as that is entirely unnecessary) + internal class MediaDriverConfig : dorkbox.network.Configuration() { + /** + * Validates the current configuration. Throws an exception if there are problems. + */ + @Suppress("DuplicatedCode") + override fun validate() { + // have to do some basic validation of our configuration + require(sendBufferSize > 0) { "configuration socket send buffer must be > 0"} + require(receiveBufferSize > 0) { "configuration socket receive buffer must be > 0"} + require(ipcTermBufferLength > 65535) { "configuration IPC term buffer must be > 65535"} + require(ipcTermBufferLength < 1_073_741_824) { "configuration IPC term buffer must be < 1,073,741,824"} + require(publicationTermBufferLength > 65535) { "configuration publication term buffer must be > 65535"} + require(publicationTermBufferLength < 1_073_741_824) { "configuration publication term buffer must be < 1,073,741,824"} + + require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" } + require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" } + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is MediaDriverConfig) return false + + return mediaDriverEquals(this) + } + + override fun hashCode(): Int { + return mediaDriverHash() + } + } + internal fun asMediaDriverConfig(): MediaDriverConfig { + val newConfig = MediaDriverConfig() + + threadingMode = newConfig.threadingMode + networkMtuSize = newConfig.networkMtuSize + initialWindowLength = newConfig.initialWindowLength + sendBufferSize = newConfig.sendBufferSize + receiveBufferSize = newConfig.receiveBufferSize + aeronDirectory = newConfig.aeronDirectory + ipcTermBufferLength = newConfig.ipcTermBufferLength + publicationTermBufferLength = newConfig.publicationTermBufferLength + aeronErrorFilter = newConfig.aeronErrorFilter + + return newConfig + } + fun mediaDriverEquals(other: dorkbox.network.Configuration): Boolean { + if (threadingMode != other.threadingMode) return false + if (networkMtuSize != other.networkMtuSize) return false + if (initialWindowLength != other.initialWindowLength) return false + if (sendBufferSize != other.sendBufferSize) return false + if (receiveBufferSize != other.receiveBufferSize) return false + if (aeronDirectory != other.aeronDirectory) return false + if (ipcTermBufferLength != other.ipcTermBufferLength) return false + if (publicationTermBufferLength != other.publicationTermBufferLength) return false + if (aeronErrorFilter != other.aeronErrorFilter) return false + + return true + } + + fun mediaDriverHash(): Int { + var result = threadingMode.hashCode() + result = 31 * result + networkMtuSize + result = 31 * result + initialWindowLength + result = 31 * result + sendBufferSize + result = 31 * result + receiveBufferSize + result = 31 * result + ipcTermBufferLength + result = 31 * result + publicationTermBufferLength + result = 31 * result + aeronErrorFilter.hashCode() // lambda + result = 31 * result + (aeronDirectory?.hashCode() ?: 0) + + return result } override fun equals(other: Any?): Boolean { if (this === other) return true if (other !is dorkbox.network.Configuration) return false - if (networkInterfaceEventDispatcher != other.networkInterfaceEventDispatcher) return false + if (!mediaDriverEquals(other)) return false + + if (networkEventPoll != other.networkEventPoll) return false if (enableIPv4 != other.enableIPv4) return false if (enableIPv6 != other.enableIPv6) return false if (enableIpc != other.enableIpc) return false @@ -653,29 +835,22 @@ abstract class Configuration { if (connectionCheckIntervalNanos != other.connectionCheckIntervalNanos) return false if (connectionExpirationTimoutNanos != other.connectionExpirationTimoutNanos) return false if (isReliable != other.isReliable) return false - if (dispatch != other.dispatch) return false + if (eventDispatch != other.eventDispatch) return false if (settingsStore != other.settingsStore) return false if (serialization != other.serialization) return false if (pollIdleStrategy != other.pollIdleStrategy) return false if (sendIdleStrategy != other.sendIdleStrategy) return false - if (threadingMode != other.threadingMode) return false - if (aeronDirectory != other.aeronDirectory) return false + if (uniqueAeronDirectory != other.uniqueAeronDirectory) return false - if (networkMtuSize != other.networkMtuSize) return false - if (initialWindowLength != other.initialWindowLength) return false - if (sendBufferSize != other.sendBufferSize) return false - if (receiveBufferSize != other.receiveBufferSize) return false if (ipcTermBufferLength != other.ipcTermBufferLength) return false - if (publicationTermBufferLength != other.publicationTermBufferLength) return false - if (aeronErrorFilter != other.aeronErrorFilter) return false if (contextDefined != other.contextDefined) return false - if (previouslyUsed != other.previouslyUsed) return false return true } override fun hashCode(): Int { - var result = networkInterfaceEventDispatcher.hashCode() + var result = mediaDriverHash() + result = 31 * result + networkEventPoll.hashCode() result = 31 * result + enableIPv4.hashCode() result = 31 * result + enableIPv6.hashCode() result = 31 * result + enableIpc.hashCode() @@ -686,23 +861,13 @@ abstract class Configuration { result = 31 * result + connectionCheckIntervalNanos.hashCode() result = 31 * result + connectionExpirationTimoutNanos.hashCode() result = 31 * result + isReliable.hashCode() - result = 31 * result + dispatch.hashCode() + result = 31 * result + eventDispatch.hashCode() result = 31 * result + settingsStore.hashCode() result = 31 * result + serialization.hashCode() result = 31 * result + pollIdleStrategy.hashCode() result = 31 * result + sendIdleStrategy.hashCode() - result = 31 * result + threadingMode.hashCode() - result = 31 * result + (aeronDirectory?.hashCode() ?: 0) result = 31 * result + uniqueAeronDirectory.hashCode() - result = 31 * result + networkMtuSize - result = 31 * result + initialWindowLength - result = 31 * result + sendBufferSize - result = 31 * result + receiveBufferSize - result = 31 * result + ipcTermBufferLength - result = 31 * result + publicationTermBufferLength - result = 31 * result + aeronErrorFilter.hashCode() // lambda result = 31 * result + contextDefined.hashCode() - result = 31 * result + previouslyUsed.hashCode() return result } }