MOved event dispatch to own class (as it is not user configurable)

This commit is contained in:
Robinson 2023-03-02 19:44:54 +01:00
parent 062b8a76ae
commit 8f81243c25
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -30,7 +30,7 @@ import io.aeron.driver.Configuration
import io.aeron.driver.ThreadingMode
import io.aeron.driver.exceptions.InvalidChannelException
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import mu.KLogger
@ -175,14 +175,9 @@ abstract class Configuration {
private val defaultNetworkEventPoll = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Poll Dispatcher", networkThreadGroup, Thread.NORM_PRIORITY, true)
)
private val defaultActionEventDispatcher = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Event Dispatcher", networkThreadGroup, Thread.NORM_PRIORITY, true)
).asCoroutineDispatcher()
private val defaultEventCoroutineScope = CoroutineScope(defaultActionEventDispatcher)
private val defaultMessageCoroutineScope = CoroutineScope(Dispatchers.Default)
private val defaultMessageCoroutineScope = Dispatchers.Default
private val defaultAeronFilter: (error: Throwable) -> Boolean = { error ->
// we suppress these because they are already handled
@ -313,20 +308,7 @@ abstract class Configuration {
/**
* Specifies the Java thread that will poll the underlying network for incoming messages
*/
var networkEventPoll: ExecutorService = defaultNetworkEventPoll
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Responsible for executing connection/misc events
*
* NOTE: This is very specifically NOT 'CoroutineScope(Dispatchers.Default)', because it is very easy (and tricky) to make sure
* that there is no thread starvation going on, which can, and WILL happen.
*/
var eventDispatch = defaultEventCoroutineScope
var networkEventPoll: CoroutineDispatcher = defaultNetworkEventPoll
set(value) {
require(!contextDefined) { errorMessage }
field = value
@ -345,8 +327,6 @@ abstract class Configuration {
}
/**
* Allows the user to change how endpoint settings and public key information are saved.
*
@ -673,8 +653,6 @@ abstract class Configuration {
require(ipcTermBufferLength < 1_073_741_824) { "configuration IPC term buffer must be < 1,073,741,824"}
require(publicationTermBufferLength > 65535) { "configuration publication term buffer must be > 65535"}
require(publicationTermBufferLength < 1_073_741_824) { "configuration publication term buffer must be < 1,073,741,824"}
require(eventDispatch.coroutineContext != Dispatchers.Default) { "configuration of the eventDispatch.context must be it's own ThreadExecutor. It CANNOT be the default dispatch because there will be thread starvation"}
}
internal fun setDefaults(logger: KLogger) {
@ -839,7 +817,6 @@ abstract class Configuration {
if (connectionCheckIntervalNanos != other.connectionCheckIntervalNanos) return false
if (connectionExpirationTimoutNanos != other.connectionExpirationTimoutNanos) return false
if (isReliable != other.isReliable) return false
if (eventDispatch != other.eventDispatch) return false
if (settingsStore != other.settingsStore) return false
if (serialization != other.serialization) return false
if (pollIdleStrategy != other.pollIdleStrategy) return false
@ -865,7 +842,6 @@ abstract class Configuration {
result = 31 * result + connectionCheckIntervalNanos.hashCode()
result = 31 * result + connectionExpirationTimoutNanos.hashCode()
result = 31 * result + isReliable.hashCode()
result = 31 * result + eventDispatch.hashCode()
result = 31 * result + settingsStore.hashCode()
result = 31 * result + serialization.hashCode()
result = 31 * result + pollIdleStrategy.hashCode()