Added ability to customize the dispatch for network events

This commit is contained in:
Robinson 2022-03-18 16:34:38 +01:00
parent c836848f91
commit a8dd406b40
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 10 additions and 9 deletions

View File

@ -28,6 +28,8 @@ import dorkbox.storage.Storage
import io.aeron.driver.Configuration
import io.aeron.driver.ThreadingMode
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import mu.KLogger
import org.agrona.SystemUtil
import org.agrona.concurrent.AgentTerminationException
@ -255,6 +257,11 @@ open class Configuration {
field = value
}
/**
* The dispatch responsible for executing events that arrive via the network. Normally, events should be
* dispatched asynchronously across a thread pool, but in certain circumstances you may want to constrain this to a single thread dispatcher, I/O, or a custom dispatcher.
*/
var dispatch = CoroutineScope(Dispatchers.Default)
/**
* Allows the user to change how endpoint settings and public key information are saved.

View File

@ -43,7 +43,6 @@ import io.aeron.logbuffer.Header
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
@ -87,7 +86,7 @@ internal constructor(val type: Class<*>,
val logger: KLogger = KotlinLogging.logger(type.simpleName)
internal val actionDispatch = CoroutineScope(Dispatchers.Default)
internal val actionDispatch = config.dispatch
internal val listenerManager = ListenerManager<CONNECTION>(logger)
internal val connections = ConnectionManager<CONNECTION>()
@ -458,7 +457,7 @@ internal constructor(val type: Class<*>,
// go in "lock step"
is RmiMessage -> {
// if we are an RMI message/registration, we have very specific, defined behavior.
// We do not use the "normal" listener callback pattern because this require special functionality
// We do not use the "normal" listener callback pattern because this requires special functionality
rmiGlobalSupport.manage(serialization, connection, message, rmiConnectionSupport, responseManager, logger)
}
@ -480,12 +479,7 @@ internal constructor(val type: Class<*>,
}
else -> {
// do nothing, there were problems with the message
if (message != null) {
logger.error("No message callbacks found for ${message::class.java.simpleName}")
} else {
logger.error("Unknown message received!!")
}
logger.error("Unknown message received!!")
}
}
}