Fixed streamed data chunk mis-alignment.

This commit is contained in:
Robinson 2022-06-29 21:56:12 +02:00
parent 1a6ac7048b
commit 55e777860b
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 21 additions and 26 deletions

View File

@ -1,6 +1,6 @@
package dorkbox.network.connection.streaming
class StreamingData(var streamId: Long) : StreamingMessage {
class StreamingData(val streamId: Long) : StreamingMessage {
// These are set just after we receive the message, and before we process it
@Transient var payload: ByteArray? = null

View File

@ -241,6 +241,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
* We don't write max possible length per message, we write out MTU (payload) length (so aeron doesn't fragment the message).
* The max possible length is WAY, WAY more than the max payload length.
*
* @param internalBuffer this is the ORIGINAL object data that is to be "chunked" and sent across the wire
* @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
*/
suspend fun send(
@ -286,7 +287,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
// MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates!
// the maxPayloadLength MUST ABSOLUTELY be less that the max size + header!
var maxPayloadLength = publication.maxMessageLength() - 200
var sizeOfPayload = publication.maxMessageLength() - 200
val header: ByteArray
val headerSize: Int
@ -297,10 +298,10 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
header = ByteArray(headerSize)
// we have to account for the header + the MAX optimized int size
maxPayloadLength -= (headerSize + 5)
sizeOfPayload -= (headerSize + 5)
// this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once.
val chunkBuffer = AeronOutput(headerSize + maxPayloadLength)
val chunkBuffer = AeronOutput(headerSize + sizeOfPayload)
// copy out our header info
objectBuffer.internalBuffer.getBytes(0, header, 0, headerSize)
@ -309,15 +310,15 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
chunkBuffer.writeBytes(header)
// write out the payload size using optimized data structures.
val varIntSize = chunkBuffer.writeVarInt(maxPayloadLength, true)
val varIntSize = chunkBuffer.writeVarInt(sizeOfPayload, true)
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, maxPayloadLength)
internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload)
remainingPayload -= maxPayloadLength
payloadSent += maxPayloadLength
remainingPayload -= sizeOfPayload
payloadSent += sizeOfPayload
val success = endPoint.sendData(publication, chunkBuffer.internalBuffer, 0, headerSize + varIntSize + maxPayloadLength, connection)
val success = endPoint.sendData(publication, chunkBuffer.internalBuffer, 0, headerSize + varIntSize + sizeOfPayload, connection)
if (!success) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
@ -341,10 +342,10 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
// now send the chunks as fast as possible. Aeron will have us back-off if we send too quickly
while (remainingPayload > 0) {
val amountToSend = if (remainingPayload < maxPayloadLength) {
val amountToSend = if (remainingPayload < sizeOfPayload) {
remainingPayload
} else {
maxPayloadLength
sizeOfPayload
}
remainingPayload -= amountToSend
@ -358,17 +359,17 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
// on the receiving end without worry.
try {
val varIntSize = OptimizeUtilsByteBuf.intLength(maxPayloadLength, true)
val varIntSize = OptimizeUtilsByteBuf.intLength(sizeOfPayload, true)
val writeIndex = payloadSent - headerSize - varIntSize
// write out our header data (this will OVERWRITE previous data!)
internalBuffer.putBytes(writeIndex, header)
// write out the payload size using optimized data structures.
writeVarInt(internalBuffer, writeIndex + headerSize, maxPayloadLength, true)
writeVarInt(internalBuffer, writeIndex + headerSize, sizeOfPayload, true)
// write out the payload
endPoint.sendData(publication, internalBuffer, writeIndex, headerSize + amountToSend, connection)
endPoint.sendData(publication, internalBuffer, writeIndex, headerSize + varIntSize + amountToSend, connection)
payloadSent += amountToSend
} catch (e: Exception) {

View File

@ -11,10 +11,13 @@ import java.security.SecureRandom
class StreamingTest : BaseTest() {
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 8
@Test
fun sendStreamingObject() {
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 8
val hugeData = ByteArray(sizeToTest)
SecureRandom().nextBytes(hugeData)
run {
val configuration = serverConfig()
@ -25,6 +28,7 @@ class StreamingTest : BaseTest() {
server.onMessage<ByteArray> {
println("received data, shutting down!")
Assert.assertEquals(sizeToTest, it.size)
Assert.assertArrayEquals(hugeData, it)
stopEndPoints()
}
}
@ -42,22 +46,12 @@ class StreamingTest : BaseTest() {
client.onConnect {
val params = connectionParams ?: throw Exception("We should not have null connectionParams!")
val publication = params.mediaDriverConnection.publication
val hugeData = ByteArray(sizeToTest)
SecureRandom().nextBytes(hugeData)
this.endPoint.send(hugeData, publication, this)
}
client.connect(LOCALHOST)
}
waitForThreads(0)
// System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
// Assert.assertEquals(4, reconnectCount.value)
}
}