Addec copy(), changed the defaultMessageCoroutineScope to be 'Default' instead of IO. The defaultNetworkEventPoll is now in the event poller

This commit is contained in:
Robinson 2023-06-25 17:25:05 +02:00
parent 078cf36e22
commit 3897becef1
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 76 additions and 24 deletions

View File

@ -33,10 +33,8 @@ import io.aeron.driver.Configuration
import io.aeron.driver.ThreadingMode
import io.aeron.driver.exceptions.InvalidChannelException
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.asCoroutineDispatcher
import mu.KLogger
import mu.KotlinLogging
import org.agrona.SystemUtil
@ -120,6 +118,24 @@ class ServerConfiguration : dorkbox.network.Configuration() {
require(listenIpAddress.isNotBlank()) { "Blank listen IP address, cannot continue." }
}
override fun initialize(logger: KLogger): dorkbox.network.ServerConfiguration {
return super.initialize(logger) as dorkbox.network.ServerConfiguration
}
override fun copy(): dorkbox.network.ServerConfiguration {
val config = ServerConfiguration()
config.listenIpAddress = listenIpAddress
config.maxClientCount = maxClientCount
config.maxConnectionsPerIpAddress = maxConnectionsPerIpAddress
config.ipcId = ipcId
config.settingsStore = settingsStore
super.copy(config)
return config
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is ServerConfiguration) return false
@ -157,6 +173,16 @@ class ClientConfiguration : dorkbox.network.Configuration() {
// have to do some basic validation of our configuration
}
override fun initialize(logger: KLogger): dorkbox.network.ClientConfiguration {
return super.initialize(logger) as dorkbox.network.ClientConfiguration
}
override fun copy(): dorkbox.network.ClientConfiguration {
val config = ClientConfiguration()
super.copy(config)
return config
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is ClientConfiguration) return false
@ -182,11 +208,7 @@ abstract class Configuration {
internal val networkThreadGroup = ThreadGroup("Network")
internal val aeronThreadFactory = NamedThreadFactory( "Aeron", networkThreadGroup, true)
private val defaultNetworkEventPoll = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Poll Dispatcher", networkThreadGroup, true)
).asCoroutineDispatcher()
private val defaultMessageCoroutineScope = Dispatchers.IO.limitedParallelism(4)
private val defaultMessageCoroutineScope = Dispatchers.Default
private val defaultAeronFilter: (error: Throwable) -> Boolean = { error ->
// we suppress these because they are already handled
@ -352,16 +374,6 @@ abstract class Configuration {
field = value
}
/**
* Specifies the Java thread that will poll the underlying network for incoming messages
*/
var networkEventPoll: CoroutineDispatcher = defaultNetworkEventPoll
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Responsible for publishing messages that arrive via the network.
*
@ -669,10 +681,10 @@ abstract class Configuration {
require(publicationTermBufferLength < 1_073_741_824) { "configuration publication term buffer must be < 1,073,741,824"}
}
internal fun initialize(logger: KLogger) {
internal open fun initialize(logger: KLogger): dorkbox.network.Configuration {
// explicitly don't set defaults if we already have the context defined!
if (contextDefined) {
return
return this
}
// we are starting a new context, make sure the aeron directory is unique (if specified)
@ -746,7 +758,7 @@ abstract class Configuration {
val baseFileLocation = defaultAeronLogLocation(logger)
val aeronLogDirectory = if (uniqueAeronDirectory) {
// this is incompatible with IPC, and will not be set if IPC is enabled (error will be thrown on validate)
File(baseFileLocation, "aeron_${mediaDriverId()}")
File(baseFileLocation, "aeron_${mediaDriverIdNoDir()}")
} else {
File(baseFileLocation, "aeron")
}
@ -757,6 +769,8 @@ abstract class Configuration {
// cannot make any more changes to the configuration!
contextDefined = true
return this
}
@ -862,7 +876,45 @@ abstract class Configuration {
return true
}
abstract fun copy(): dorkbox.network.Configuration
protected fun copy(config: dorkbox.network.Configuration) {
config.enableIPv4 = enableIPv4
config.enableIPv6 = enableIPv6
config.enableIpc = enableIpc
config.enableRemoteSignatureValidation = enableRemoteSignatureValidation
config.port = port
config.connectionCloseTimeoutInSeconds = connectionCloseTimeoutInSeconds
config.connectionCheckIntervalNanos = connectionCheckIntervalNanos
config.connectionExpirationTimoutNanos = connectionExpirationTimoutNanos
config.isReliable = isReliable
config.pingTimeoutSeconds = pingTimeoutSeconds
config.messageDispatch = messageDispatch
config.settingsStore = settingsStore
config.serialization = serialization
config.pollIdleStrategy = pollIdleStrategy.clone()
config.sendIdleStrategy = sendIdleStrategy.clone()
config.threadingMode = threadingMode
config.aeronDirectory = aeronDirectory
config.uniqueAeronDirectoryID = uniqueAeronDirectoryID
config.uniqueAeronDirectory = uniqueAeronDirectory
config.networkMtuSize = networkMtuSize
config.initialWindowLength = initialWindowLength
config.sendBufferSize = sendBufferSize
config.receiveBufferSize = receiveBufferSize
config.ipcTermBufferLength = ipcTermBufferLength
config.publicationTermBufferLength = publicationTermBufferLength
config.aeronErrorFilter = aeronErrorFilter
// config.contextDefined = contextDefined // we want to be able to reuse this config if it's a copy of an already defined config
}
fun mediaDriverId(): Int {
var result = mediaDriverIdNoDir()
result = 31 * result + aeronDirectory.hashCode()
return result
}
private fun mediaDriverIdNoDir(): Int {
var result = threadingMode.hashCode()
result = 31 * result + networkMtuSize
result = 31 * result + initialWindowLength
@ -870,6 +922,7 @@ abstract class Configuration {
result = 31 * result + receiveBufferSize
result = 31 * result + uniqueAeronDirectoryID
result = 31 * result + uniqueAeronDirectory.hashCode()
result = 31 * result + ipcTermBufferLength
result = 31 * result + publicationTermBufferLength
@ -883,7 +936,6 @@ abstract class Configuration {
if (!mediaDriverEquals(other)) return false
if (networkEventPoll != other.networkEventPoll) return false
if (enableIPv4 != other.enableIPv4) return false
if (enableIPv6 != other.enableIPv6) return false
if (enableIpc != other.enableIpc) return false
@ -909,7 +961,7 @@ abstract class Configuration {
override fun hashCode(): Int {
var result = mediaDriverId()
result = 31 * result + networkEventPoll.hashCode()
result = 31 * result + enableIPv4.hashCode()
result = 31 * result + enableIPv6.hashCode()
result = 31 * result + enableIpc.hashCode()
@ -920,12 +972,12 @@ abstract class Configuration {
result = 31 * result + connectionCheckIntervalNanos.hashCode()
result = 31 * result + connectionExpirationTimoutNanos.hashCode()
result = 31 * result + isReliable.hashCode()
result = 31 * result + messageDispatch.hashCode()
result = 31 * result + settingsStore.hashCode()
result = 31 * result + serialization.hashCode()
result = 31 * result + pollIdleStrategy.hashCode()
result = 31 * result + sendIdleStrategy.hashCode()
result = 31 * result + uniqueAeronDirectory.hashCode()
result = 31 * result + uniqueAeronDirectoryID
// aeronErrorFilter // cannot get the predictable hash code of a lambda
result = 31 * result + contextDefined.hashCode()
return result
}