Added another type for equality/hash. Fixed issues with event/message dispatch

This commit is contained in:
Robinson 2023-03-01 00:24:44 +01:00
parent 8cdf5781e2
commit f1e6df094d
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -1,5 +1,5 @@
/*
* Copyright 2020 dorkbox, llc
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -32,6 +32,7 @@ import io.aeron.driver.exceptions.InvalidChannelException
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import mu.KLogger
import org.agrona.SystemUtil
import org.agrona.concurrent.AgentTerminationException
@ -140,11 +141,12 @@ class ServerConfiguration : dorkbox.network.Configuration() {
class ClientConfiguration : dorkbox.network.Configuration() {
/**
* Validates the current configuration
* Validates the current configuration. Throws an exception if there are problems.
*/
@Suppress("DuplicatedCode")
override fun validate() {
super.validate()
// have to do some basic validation of our configuration
}
@ -154,6 +156,10 @@ class ClientConfiguration : dorkbox.network.Configuration() {
if (!super.equals(other)) return false
return true
}
override fun hashCode(): Int {
return super.hashCode()
}
}
abstract class Configuration {
@ -162,18 +168,33 @@ abstract class Configuration {
@Volatile
private var alreadyShownTempFsTips = false
}
/**
* Specifies the Java thread that will poll the underlying network for incoming messages
*/
var networkInterfaceEventDispatcher: ExecutorService = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Network Event Dispatcher", Thread.currentThread().threadGroup, Thread.NORM_PRIORITY, true)
)
set(value) {
require(!contextDefined) { errorMessage }
field = value
internal val networkThreadGroup = ThreadGroup("Network")
internal val aeronThreadFactory = NamedThreadFactory( "Aeron", networkThreadGroup, Thread.NORM_PRIORITY, true)
private val defaultNetworkEventPoll = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Poll Dispatcher", networkThreadGroup, Thread.NORM_PRIORITY, true)
)
private val defaultActionEventDispatcher = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Event Dispatcher", networkThreadGroup, Thread.NORM_PRIORITY, true)
).asCoroutineDispatcher()
private val defaultEventCoroutineScope = CoroutineScope(defaultActionEventDispatcher)
private val defaultMessageCoroutineScope = CoroutineScope(Dispatchers.Default)
private val defaultAeronFilter: (error: Throwable) -> Boolean = { error ->
// we suppress these because they are already handled
when {
error is InvalidChannelException || error.cause is InvalidChannelException -> { false }
error is ClosedByInterruptException || error.cause is ClosedByInterruptException -> { false }
error is DriverTimeoutException || error.cause is DriverTimeoutException -> { false }
error is AgentTerminationException || error.cause is AgentTerminationException-> { false }
error is BindException || error.cause is BindException -> { false }
else -> { true }
}
}
}
/**
* Enables the ability to use the IPv4 network stack.
@ -289,20 +310,41 @@ abstract class Configuration {
/**
* The dispatch responsible for executing events that arrive via the network.
* Specifies the Java thread that will poll the underlying network for incoming messages
*/
var networkEventPoll: ExecutorService = defaultNetworkEventPoll
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Responsible for executing connection/misc events
*
* This is very specifically NOT 'CoroutineScope(Dispatchers.Default)', because it is very easy (and tricky) to make sure
* that there is no thread starvation going on, which can, and WILL happen.
* NOTE: This is very specifically NOT 'CoroutineScope(Dispatchers.Default)', because it is very easy (and tricky) to make sure
* that there is no thread starvation going on, which can, and WILL happen.
*/
var eventDispatch = defaultEventCoroutineScope
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Responsible for publishing messages that arrive via the network.
*
* Normally, events should be dispatched asynchronously across a thread pool, but in certain circumstances you may want to constrain this to a single thread dispatcher or other, custom dispatcher.
*/
var dispatch = CoroutineScope(Dispatchers.Default)
var messageDispatch = defaultMessageCoroutineScope
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Allows the user to change how endpoint settings and public key information are saved.
*
* Note: This field is overridden for server configurations, so that the file used is different for client/server
*/
/**
* Allows the user to change how endpoint settings and public key information are saved.
@ -547,39 +589,23 @@ abstract class Configuration {
*
* @return true if the error message should be logged, false to suppress the error
*/
var aeronErrorFilter: (error: Throwable) -> Boolean = { error ->
// we suppress these because they are already handled
when {
error is InvalidChannelException || error.cause is InvalidChannelException -> { false }
error is ClosedByInterruptException || error.cause is ClosedByInterruptException -> { false }
error is DriverTimeoutException || error.cause is DriverTimeoutException -> { false }
error is AgentTerminationException || error.cause is AgentTerminationException-> { false }
error is BindException || error.cause is BindException -> { false }
else -> { true }
}
}
var aeronErrorFilter: (error: Throwable) -> Boolean = defaultAeronFilter
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Internal property that tells us if this configuration has already been configured and used to create and start the Media Driver
*/
@Volatile
internal var contextDefined: Boolean = false
/**
* Internal property that tells us if this configuration has already been used in an endpoint
*/
@Volatile
internal var previouslyUsed = false
/**
* Depending on the OS, different base locations for the Aeron log directory are preferred.
*/
fun suggestAeronLogLocation(logger: KLogger): File {
private fun suggestAeronLogLocation(logger: KLogger): File {
return when {
OS.isMacOsX -> {
// does the recommended location exist??
@ -610,7 +636,7 @@ abstract class Configuration {
}
/**
* Validates the current configuration
* Validates the current configuration. Throws an exception if there are problems.
*/
@Suppress("DuplicatedCode")
open fun validate() {
@ -636,13 +662,169 @@ abstract class Configuration {
require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" }
require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" }
require(sendBufferSize > 0) { "configuration socket send buffer must be > 0"}
require(receiveBufferSize > 0) { "configuration socket receive buffer must be > 0"}
require(ipcTermBufferLength > 65535) { "configuration IPC term buffer must be > 65535"}
require(ipcTermBufferLength < 1_073_741_824) { "configuration IPC term buffer must be < 1,073,741,824"}
require(publicationTermBufferLength > 65535) { "configuration publication term buffer must be > 65535"}
require(publicationTermBufferLength < 1_073_741_824) { "configuration publication term buffer must be < 1,073,741,824"}
require(eventDispatch.coroutineContext != Dispatchers.Default) { "configuration of the eventDispatch.context must be it's own ThreadExecutor. It CANNOT be the default dispatch because there will be thread starvation"}
}
internal fun setDefaults(logger: KLogger) {
// explicitly don't set defaults if we already have the context defined!
if (contextDefined) {
return
}
/*
* Linux
* Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and
* net.core.wmem_max to allow larger SO_SNDBUF values to be set.
*
* Windows
* Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so.
*
* Mac/Darwin
*
* Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB.
*/
if (receiveBufferSize == 0) {
receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
if (sendBufferSize == 0) {
sendBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
/*
* Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir).
*
* You can create a RAM disk with the following command:
*
* $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))`
*
* where:
*
* DISK_NAME should be replaced with a name of your choice.
* SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk).
*
* For example, the following command creates a RAM disk named DevShm which is 2GB in size:
*
* $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))`
*
* After this command is executed the new disk will be mounted under /Volumes/DevShm.
*/
if (aeronDirectory == null) {
val baseFileLocation = suggestAeronLogLocation(logger)
val aeronLogDirectory = File(baseFileLocation, "aeron")
aeronDirectory = aeronLogDirectory
}
aeronDirectory = aeronDirectory!!.absoluteFile
}
// internal class for making sure that the AeronDriver is not duplicated for the same configuration (as that is entirely unnecessary)
internal class MediaDriverConfig : dorkbox.network.Configuration() {
/**
* Validates the current configuration. Throws an exception if there are problems.
*/
@Suppress("DuplicatedCode")
override fun validate() {
// have to do some basic validation of our configuration
require(sendBufferSize > 0) { "configuration socket send buffer must be > 0"}
require(receiveBufferSize > 0) { "configuration socket receive buffer must be > 0"}
require(ipcTermBufferLength > 65535) { "configuration IPC term buffer must be > 65535"}
require(ipcTermBufferLength < 1_073_741_824) { "configuration IPC term buffer must be < 1,073,741,824"}
require(publicationTermBufferLength > 65535) { "configuration publication term buffer must be > 65535"}
require(publicationTermBufferLength < 1_073_741_824) { "configuration publication term buffer must be < 1,073,741,824"}
require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" }
require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" }
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is MediaDriverConfig) return false
return mediaDriverEquals(this)
}
override fun hashCode(): Int {
return mediaDriverHash()
}
}
internal fun asMediaDriverConfig(): MediaDriverConfig {
val newConfig = MediaDriverConfig()
threadingMode = newConfig.threadingMode
networkMtuSize = newConfig.networkMtuSize
initialWindowLength = newConfig.initialWindowLength
sendBufferSize = newConfig.sendBufferSize
receiveBufferSize = newConfig.receiveBufferSize
aeronDirectory = newConfig.aeronDirectory
ipcTermBufferLength = newConfig.ipcTermBufferLength
publicationTermBufferLength = newConfig.publicationTermBufferLength
aeronErrorFilter = newConfig.aeronErrorFilter
return newConfig
}
fun mediaDriverEquals(other: dorkbox.network.Configuration): Boolean {
if (threadingMode != other.threadingMode) return false
if (networkMtuSize != other.networkMtuSize) return false
if (initialWindowLength != other.initialWindowLength) return false
if (sendBufferSize != other.sendBufferSize) return false
if (receiveBufferSize != other.receiveBufferSize) return false
if (aeronDirectory != other.aeronDirectory) return false
if (ipcTermBufferLength != other.ipcTermBufferLength) return false
if (publicationTermBufferLength != other.publicationTermBufferLength) return false
if (aeronErrorFilter != other.aeronErrorFilter) return false
return true
}
fun mediaDriverHash(): Int {
var result = threadingMode.hashCode()
result = 31 * result + networkMtuSize
result = 31 * result + initialWindowLength
result = 31 * result + sendBufferSize
result = 31 * result + receiveBufferSize
result = 31 * result + ipcTermBufferLength
result = 31 * result + publicationTermBufferLength
result = 31 * result + aeronErrorFilter.hashCode() // lambda
result = 31 * result + (aeronDirectory?.hashCode() ?: 0)
return result
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is dorkbox.network.Configuration) return false
if (networkInterfaceEventDispatcher != other.networkInterfaceEventDispatcher) return false
if (!mediaDriverEquals(other)) return false
if (networkEventPoll != other.networkEventPoll) return false
if (enableIPv4 != other.enableIPv4) return false
if (enableIPv6 != other.enableIPv6) return false
if (enableIpc != other.enableIpc) return false
@ -653,29 +835,22 @@ abstract class Configuration {
if (connectionCheckIntervalNanos != other.connectionCheckIntervalNanos) return false
if (connectionExpirationTimoutNanos != other.connectionExpirationTimoutNanos) return false
if (isReliable != other.isReliable) return false
if (dispatch != other.dispatch) return false
if (eventDispatch != other.eventDispatch) return false
if (settingsStore != other.settingsStore) return false
if (serialization != other.serialization) return false
if (pollIdleStrategy != other.pollIdleStrategy) return false
if (sendIdleStrategy != other.sendIdleStrategy) return false
if (threadingMode != other.threadingMode) return false
if (aeronDirectory != other.aeronDirectory) return false
if (uniqueAeronDirectory != other.uniqueAeronDirectory) return false
if (networkMtuSize != other.networkMtuSize) return false
if (initialWindowLength != other.initialWindowLength) return false
if (sendBufferSize != other.sendBufferSize) return false
if (receiveBufferSize != other.receiveBufferSize) return false
if (ipcTermBufferLength != other.ipcTermBufferLength) return false
if (publicationTermBufferLength != other.publicationTermBufferLength) return false
if (aeronErrorFilter != other.aeronErrorFilter) return false
if (contextDefined != other.contextDefined) return false
if (previouslyUsed != other.previouslyUsed) return false
return true
}
override fun hashCode(): Int {
var result = networkInterfaceEventDispatcher.hashCode()
var result = mediaDriverHash()
result = 31 * result + networkEventPoll.hashCode()
result = 31 * result + enableIPv4.hashCode()
result = 31 * result + enableIPv6.hashCode()
result = 31 * result + enableIpc.hashCode()
@ -686,23 +861,13 @@ abstract class Configuration {
result = 31 * result + connectionCheckIntervalNanos.hashCode()
result = 31 * result + connectionExpirationTimoutNanos.hashCode()
result = 31 * result + isReliable.hashCode()
result = 31 * result + dispatch.hashCode()
result = 31 * result + eventDispatch.hashCode()
result = 31 * result + settingsStore.hashCode()
result = 31 * result + serialization.hashCode()
result = 31 * result + pollIdleStrategy.hashCode()
result = 31 * result + sendIdleStrategy.hashCode()
result = 31 * result + threadingMode.hashCode()
result = 31 * result + (aeronDirectory?.hashCode() ?: 0)
result = 31 * result + uniqueAeronDirectory.hashCode()
result = 31 * result + networkMtuSize
result = 31 * result + initialWindowLength
result = 31 * result + sendBufferSize
result = 31 * result + receiveBufferSize
result = 31 * result + ipcTermBufferLength
result = 31 * result + publicationTermBufferLength
result = 31 * result + aeronErrorFilter.hashCode() // lambda
result = 31 * result + contextDefined.hashCode()
result = 31 * result + previouslyUsed.hashCode()
return result
}
}