Fixed race condition when we reassemble streaming/chunked data.

This commit is contained in:
Robinson 2022-06-29 19:38:10 +02:00
parent 683abcbe59
commit 09f367963f
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 13 additions and 13 deletions

View File

@ -524,14 +524,12 @@ internal constructor(val type: Class<*>,
message.payload = rawInput.readBytes(dataLength)
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
try {
streamingManager.processDataMessage(message, this@EndPoint)
} catch (e: Exception) {
logger.error("Error processing StreamingMessage", e)
listenerManager.notifyError(connection, e)
}
// NOTE: This MUST NOT be on a new co-routine. It must be on the same thread!
try {
streamingManager.processDataMessage(message, this@EndPoint)
} catch (e: Exception) {
logger.error("Error processing StreamingMessage", e)
listenerManager.notifyError(connection, e)
}
}

View File

@ -110,7 +110,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
val errorMessage = "Error serializing message from received streaming content, stream $streamId"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
val exception = endPoint.newException(errorMessage, e)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
@ -174,10 +174,12 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
}
/**
* Reassemble/figure out the internal message pieces
*
* NOTE sending a huge file can prevent other other network traffic from arriving until it's done!
*/
* NOTE: MUST BE ON THE AERON THREAD!
*
* Reassemble/figure out the internal message pieces
*
* NOTE sending a huge file can prevent other other network traffic from arriving until it's done!
*/
fun processDataMessage(message: StreamingData, endPoint: EndPoint<CONNECTION>) {
// the receiving data will ALWAYS come sequentially, but there might be OTHER streaming data received meanwhile.
val streamId = message.streamId