Fixed issues with streaming (it MUST be the aeron thread)

This commit is contained in:
Robinson 2023-09-08 02:49:35 +02:00
parent 50f212b834
commit e3a565f291
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 18 additions and 24 deletions

View File

@ -107,10 +107,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// this is rather silly, BUT if there are more complex errors WITH the coroutine that occur, a regular try/catch WILL NOT catch it.
// ADDITIONALLY, an error handler is ONLY effective at the first, top-level `launch`. IT WILL NOT WORK ANY OTHER WAY.
@OptIn(ExperimentalCoroutinesApi::class)
private val streamingCoroutineContext = Dispatchers.IO.limitedParallelism(1) + handler + SupervisorJob()
private val messageCoroutineScope = CoroutineScope(Dispatchers.IO + handler + SupervisorJob())
private val messageChannel = Channel<Paired<CONNECTION>>(32) // the size is not necessary to be large. Thread handoff should be responsive.
private val messageChannel = Channel<Paired<CONNECTION>>()
private val pairedPool: Pool<Paired<CONNECTION>>
internal val listenerManager = ListenerManager<CONNECTION>(logger)
@ -621,6 +619,23 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
notifyDisconnect = true)
}
// streaming message. This is used when the published data is too large for a single Aeron message.
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped blocks of data.
is StreamingControl -> {
// NOTE: this CANNOT be on a separate threads, because we must guarantee that this happens first!
streamingManager.processControlMessage(message, this@EndPoint, connection)
}
is StreamingData -> {
// NOTE: this CANNOT be on a separate threads, because we must guarantee that this happens in-order!
try {
streamingManager.processDataMessage(message, this@EndPoint, connection)
} catch (e: Exception) {
listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e))
}
}
is Any -> {
// NOTE: This MUST be on a new threads (otherwise RMI has issues where it will be blocking)
val paired = pairedPool.take()
@ -666,27 +681,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
}
// streaming message. This is used when the published data is too large for a single Aeron message.
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped blocks of data.
is StreamingControl -> {
withContext(streamingCoroutineContext) {
// NOTE: this CANNOT be on a separate threads, because we must guarantee that this happens first!
streamingManager.processControlMessage(message, this@EndPoint, connection)
}
}
is StreamingData -> {
withContext(streamingCoroutineContext) {
// NOTE: this CANNOT be on a separate threads, because we must guarantee that this happens in-order!
try {
streamingManager.processDataMessage(message, this@EndPoint, connection)
} catch (e: Exception) {
listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e))
}
}
}
else -> {
try {
var hasListeners = listenerManager.notifyOnMessage(connection, message)