diff --git a/src/dorkbox/network/connection/streaming/StreamingData.kt b/src/dorkbox/network/connection/streaming/StreamingData.kt index 3bb8890b..14045f8a 100644 --- a/src/dorkbox/network/connection/streaming/StreamingData.kt +++ b/src/dorkbox/network/connection/streaming/StreamingData.kt @@ -1,6 +1,6 @@ package dorkbox.network.connection.streaming -class StreamingData(var streamId: Long) : StreamingMessage { +class StreamingData(val streamId: Long) : StreamingMessage { // These are set just after we receive the message, and before we process it @Transient var payload: ByteArray? = null diff --git a/src/dorkbox/network/connection/streaming/StreamingManager.kt b/src/dorkbox/network/connection/streaming/StreamingManager.kt index 6e08b919..9d1430f4 100644 --- a/src/dorkbox/network/connection/streaming/StreamingManager.kt +++ b/src/dorkbox/network/connection/streaming/StreamingManager.kt @@ -241,6 +241,7 @@ internal class StreamingManager(private val logger: KLo * We don't write max possible length per message, we write out MTU (payload) length (so aeron doesn't fragment the message). * The max possible length is WAY, WAY more than the max payload length. * + * @param internalBuffer this is the ORIGINAL object data that is to be "chunked" and sent across the wire * @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown! */ suspend fun send( @@ -286,7 +287,7 @@ internal class StreamingManager(private val logger: KLo // MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates! // the maxPayloadLength MUST ABSOLUTELY be less that the max size + header! - var maxPayloadLength = publication.maxMessageLength() - 200 + var sizeOfPayload = publication.maxMessageLength() - 200 val header: ByteArray val headerSize: Int @@ -297,10 +298,10 @@ internal class StreamingManager(private val logger: KLo header = ByteArray(headerSize) // we have to account for the header + the MAX optimized int size - maxPayloadLength -= (headerSize + 5) + sizeOfPayload -= (headerSize + 5) // this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once. - val chunkBuffer = AeronOutput(headerSize + maxPayloadLength) + val chunkBuffer = AeronOutput(headerSize + sizeOfPayload) // copy out our header info objectBuffer.internalBuffer.getBytes(0, header, 0, headerSize) @@ -309,15 +310,15 @@ internal class StreamingManager(private val logger: KLo chunkBuffer.writeBytes(header) // write out the payload size using optimized data structures. - val varIntSize = chunkBuffer.writeVarInt(maxPayloadLength, true) + val varIntSize = chunkBuffer.writeVarInt(sizeOfPayload, true) // write out the payload. Our resulting data written out is the ACTUAL MTU of aeron. - internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, maxPayloadLength) + internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload) - remainingPayload -= maxPayloadLength - payloadSent += maxPayloadLength + remainingPayload -= sizeOfPayload + payloadSent += sizeOfPayload - val success = endPoint.sendData(publication, chunkBuffer.internalBuffer, 0, headerSize + varIntSize + maxPayloadLength, connection) + val success = endPoint.sendData(publication, chunkBuffer.internalBuffer, 0, headerSize + varIntSize + sizeOfPayload, connection) if (!success) { // something SUPER wrong! // more critical error sending the message. we shouldn't retry or anything. @@ -341,10 +342,10 @@ internal class StreamingManager(private val logger: KLo // now send the chunks as fast as possible. Aeron will have us back-off if we send too quickly while (remainingPayload > 0) { - val amountToSend = if (remainingPayload < maxPayloadLength) { + val amountToSend = if (remainingPayload < sizeOfPayload) { remainingPayload } else { - maxPayloadLength + sizeOfPayload } remainingPayload -= amountToSend @@ -358,17 +359,17 @@ internal class StreamingManager(private val logger: KLo // on the receiving end without worry. try { - val varIntSize = OptimizeUtilsByteBuf.intLength(maxPayloadLength, true) + val varIntSize = OptimizeUtilsByteBuf.intLength(sizeOfPayload, true) val writeIndex = payloadSent - headerSize - varIntSize // write out our header data (this will OVERWRITE previous data!) internalBuffer.putBytes(writeIndex, header) // write out the payload size using optimized data structures. - writeVarInt(internalBuffer, writeIndex + headerSize, maxPayloadLength, true) + writeVarInt(internalBuffer, writeIndex + headerSize, sizeOfPayload, true) // write out the payload - endPoint.sendData(publication, internalBuffer, writeIndex, headerSize + amountToSend, connection) + endPoint.sendData(publication, internalBuffer, writeIndex, headerSize + varIntSize + amountToSend, connection) payloadSent += amountToSend } catch (e: Exception) { diff --git a/test/dorkboxTest/network/StreamingTest.kt b/test/dorkboxTest/network/StreamingTest.kt index 0771adfe..6efe8747 100644 --- a/test/dorkboxTest/network/StreamingTest.kt +++ b/test/dorkboxTest/network/StreamingTest.kt @@ -11,10 +11,13 @@ import java.security.SecureRandom class StreamingTest : BaseTest() { - val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 8 - @Test fun sendStreamingObject() { + val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 8 + val hugeData = ByteArray(sizeToTest) + SecureRandom().nextBytes(hugeData) + + run { val configuration = serverConfig() @@ -25,6 +28,7 @@ class StreamingTest : BaseTest() { server.onMessage { println("received data, shutting down!") Assert.assertEquals(sizeToTest, it.size) + Assert.assertArrayEquals(hugeData, it) stopEndPoints() } } @@ -42,22 +46,12 @@ class StreamingTest : BaseTest() { client.onConnect { val params = connectionParams ?: throw Exception("We should not have null connectionParams!") val publication = params.mediaDriverConnection.publication - - val hugeData = ByteArray(sizeToTest) - SecureRandom().nextBytes(hugeData) - this.endPoint.send(hugeData, publication, this) } client.connect(LOCALHOST) } - waitForThreads(0) - -// System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value) -// Assert.assertEquals(4, reconnectCount.value) } - - }