Streaming data now goes onto its own context instead of on the aeron polling thread
This commit is contained in:
parent
c2c45b9ffe
commit
d772088eed
|
@ -41,14 +41,12 @@ import dorkbox.network.serialization.Serialization
|
|||
import dorkbox.network.serialization.SettingsStore
|
||||
import dorkbox.objectPool.*
|
||||
import dorkbox.os.OS
|
||||
import dorkbox.util.NamedThreadFactory
|
||||
import io.aeron.Publication
|
||||
import io.aeron.driver.ThreadingMode
|
||||
import io.aeron.logbuffer.Header
|
||||
import kotlinx.atomicfu.atomic
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.onFailure
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import mu.KLogger
|
||||
import mu.KotlinLogging
|
||||
|
@ -58,7 +56,6 @@ import java.util.concurrent.*
|
|||
|
||||
|
||||
|
||||
|
||||
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
|
||||
// it results in severe UDP packet loss and contention.
|
||||
//
|
||||
|
@ -104,9 +101,15 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
|
||||
val logger: KLogger = KotlinLogging.logger(loggerName)
|
||||
|
||||
private val handler = CoroutineExceptionHandler { _, exception ->
|
||||
logger.error { "Uncaught Coroutine Error: ${exception.stackTraceToString()}" }
|
||||
}
|
||||
|
||||
// this is rather silly, BUT if there are more complex errors WITH the coroutine that occur, a regular try/catch WILL NOT catch it.
|
||||
// ADDITIONALLY, an error handler is ONLY effective at the first, top-level `launch`. IT WILL NOT WORK ANY OTHER WAY.
|
||||
private var messageCoroutineScope: CoroutineScope? = null
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
private val streamingCoroutineContext = Dispatchers.IO.limitedParallelism(1) + handler + SupervisorJob()
|
||||
private val messageCoroutineScope = CoroutineScope(Dispatchers.IO + handler + SupervisorJob())
|
||||
private val messageChannel = Channel<Paired<CONNECTION>>(32) // the size is not necessary to be large. Thread handoff should be responsive.
|
||||
private val pairedPool: Pool<Paired<CONNECTION>>
|
||||
|
||||
|
@ -349,9 +352,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
val messageProcessThreads = (OS.optimumNumberOfThreads - aeronThreads).coerceAtLeast(1).coerceAtMost(4)
|
||||
|
||||
// create a new one when the endpoint starts up, because we close it when the endpoint shuts down or when the client retries
|
||||
val messageCoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
|
||||
this.messageCoroutineScope = messageCoroutineScope
|
||||
|
||||
repeat(messageProcessThreads) {
|
||||
messageCoroutineScope.launch {
|
||||
// this is only true while the endpoint is running.
|
||||
|
@ -621,23 +621,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
notifyDisconnect = true)
|
||||
}
|
||||
|
||||
// streaming message. This is used when the published data is too large for a single Aeron message.
|
||||
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
|
||||
// send arbitrarily large pieces of data (gigs in size, potentially).
|
||||
// This will recursively call into this method for each of the unwrapped blocks of data.
|
||||
is StreamingControl -> {
|
||||
// NOTE: this CANNOT be on a separate thread, because we must guarantee that this happens first!
|
||||
streamingManager.processControlMessage(message, this@EndPoint, connection)
|
||||
}
|
||||
is StreamingData -> {
|
||||
// NOTE: this CANNOT be on a separate thread, because we must guarantee that this happens in-order!
|
||||
try {
|
||||
streamingManager.processDataMessage(message, this@EndPoint, connection)
|
||||
} catch (e: Exception) {
|
||||
listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e))
|
||||
}
|
||||
}
|
||||
|
||||
is Any -> {
|
||||
// NOTE: This MUST be on a new threads (otherwise RMI has issues where it will be blocking)
|
||||
val paired = pairedPool.take()
|
||||
|
@ -659,7 +642,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
*
|
||||
* THIS IS PROCESSED ON MULTIPLE THREADS!
|
||||
*/
|
||||
private fun processMessageFromChannel(connection: CONNECTION, message: Any) {
|
||||
private suspend inline fun processMessageFromChannel(connection: CONNECTION, message: Any) {
|
||||
when (message) {
|
||||
is Ping -> {
|
||||
// PING will also measure APP latency, not just NETWORK PIPE latency
|
||||
|
@ -682,6 +665,28 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
listenerManager.notifyError(connection, RMIException("Error while processing RMI message", e))
|
||||
}
|
||||
}
|
||||
|
||||
// streaming message. This is used when the published data is too large for a single Aeron message.
|
||||
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
|
||||
// send arbitrarily large pieces of data (gigs in size, potentially).
|
||||
// This will recursively call into this method for each of the unwrapped blocks of data.
|
||||
is StreamingControl -> {
|
||||
withContext(streamingCoroutineContext) {
|
||||
// NOTE: this CANNOT be on a separate threads, because we must guarantee that this happens first!
|
||||
streamingManager.processControlMessage(message, this@EndPoint, connection)
|
||||
}
|
||||
}
|
||||
is StreamingData -> {
|
||||
withContext(streamingCoroutineContext) {
|
||||
// NOTE: this CANNOT be on a separate threads, because we must guarantee that this happens in-order!
|
||||
try {
|
||||
streamingManager.processDataMessage(message, this@EndPoint, connection)
|
||||
} catch (e: Exception) {
|
||||
listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
else -> {
|
||||
try {
|
||||
var hasListeners = listenerManager.notifyOnMessage(connection, message)
|
||||
|
@ -970,8 +975,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
shutdownLatch.countDown()
|
||||
shutdownInProgress.lazySet(false)
|
||||
|
||||
messageCoroutineScope?.cancel("Message dispatch has been closed")
|
||||
|
||||
if (releaseWaitingThreads) {
|
||||
logger.trace { "Counting down the close latch..." }
|
||||
closeLatch.countDown()
|
||||
|
@ -999,8 +1002,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
endpointIsRunning.lazySet(false)
|
||||
shutdown = false
|
||||
shutdownEventPoller = false
|
||||
|
||||
messageCoroutineScope?.cancel("Reset the message dispatch")
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
|
|
Loading…
Reference in New Issue