Code cleanup and fixed issues when sending non-perfect multiples of our data limit.

This commit is contained in:
Robinson 2023-07-15 13:06:53 +02:00
parent 85d716e572
commit 93a7c9008d
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 36 additions and 25 deletions

View File

@ -582,17 +582,14 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
// streaming/chunked message. This is used when the published data is too large for a single Aeron message.
// streaming message. This is used when the published data is too large for a single Aeron message.
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped chunks of data.
// This will recursively call into this method for each of the unwrapped blocks of data.
is StreamingControl -> {
streamingManager.processControlMessage(message, readKryo,this@EndPoint, connection)
}
is StreamingData -> {
// NOTE: this will read extra data from the kryo input as necessary, which is why it's not on action dispatch!
message.payload = readKryo.readBytes()
// NOTE: This MUST NOT be on a new co-routine. It must be on the same thread!
try {
streamingManager.processDataMessage(message, this@EndPoint, connection)

View File

@ -344,10 +344,10 @@ internal class StreamingManager<CONNECTION : Connection>(
connection: CONNECTION
): Boolean {
// this buffer is the exact size as our internal buffer, so it is unnecessary to have multiple kryo instances
val originalBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size!
val originalObjectBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size!
// we have to save out our internal buffer, so we can reuse the kryo instance!
originalBuffer.putBytes(0, internalBuffer, 0, objectSize)
// we have to save out our internal buffer, so we can reuse the kryo instance later!
originalObjectBuffer.putBytes(0, internalBuffer, 0, objectSize)
// NOTE: our max object size for IN-MEMORY messages is an INT. For file transfer it's a LONG (so everything here is cast to a long)
@ -378,9 +378,6 @@ internal class StreamingManager<CONNECTION : Connection>(
// we do the FIRST block super-weird, because of the way we copy data around (we inject headers,
// so the first message is SUPER tiny and is a COPY, the rest are no-copy.
// This is REUSED to prevent garbage collection issues.
val blockData = StreamingData(streamSessionId)
// payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time.
// MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates!
@ -390,6 +387,8 @@ internal class StreamingManager<CONNECTION : Connection>(
val headerSize: Int
try {
// This is REUSED to prevent garbage collection issues.
val blockData = StreamingData(streamSessionId)
val objectBuffer = kryo.write(connection, blockData)
headerSize = objectBuffer.position()
header = ByteArray(headerSize)
@ -410,20 +409,25 @@ internal class StreamingManager<CONNECTION : Connection>(
val varIntSize = blockBuffer.writeVarInt(sizeOfPayload, true)
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
originalBuffer.getBytes(0, blockBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload)
originalObjectBuffer.getBytes(0, blockBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload)
remainingPayload -= sizeOfPayload
payloadSent += sizeOfPayload
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
val reusedPayloadSize = headerSize + varIntSize + sizeOfPayload
val success = endPoint.dataSend(
publication,
blockBuffer.internalBuffer,
0,
headerSize + varIntSize + sizeOfPayload,
sendIdleStrategy,
connection,
false
publication = publication,
internalBuffer = blockBuffer.internalBuffer,
bufferClaim = kryo.bufferClaim,
offset = 0,
objectSize = reusedPayloadSize,
sendIdleStrategy = sendIdleStrategy,
connection = connection,
abortEarly = false
)
if (!success) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
@ -461,21 +465,25 @@ internal class StreamingManager<CONNECTION : Connection>(
// on the receiving end without worry.
try {
val varIntSize = OptimizeUtilsByteBuf.intLength(sizeOfPayload, true)
val varIntSize = OptimizeUtilsByteBuf.intLength(amountToSend, true)
val writeIndex = payloadSent - headerSize - varIntSize
// write out our header data (this will OVERWRITE previous data!)
originalBuffer.putBytes(writeIndex, header)
originalObjectBuffer.putBytes(writeIndex, header)
// write out the payload size using optimized data structures.
writeVarInt(originalBuffer, writeIndex + headerSize, sizeOfPayload, true)
writeVarInt(originalObjectBuffer, writeIndex + headerSize, amountToSend, true)
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
val reusedPayloadSize = headerSize + varIntSize + amountToSend
// write out the payload
endPoint.dataSend(
publication,
originalBuffer,
originalObjectBuffer,
kryo.bufferClaim,
writeIndex,
headerSize + varIntSize + amountToSend,
reusedPayloadSize,
sendIdleStrategy,
connection,
false

View File

@ -40,10 +40,16 @@ class StreamingControlSerializer: Serializer<StreamingControl>() {
class StreamingDataSerializer: Serializer<StreamingData>() {
override fun write(kryo: Kryo, output: Output, data: StreamingData) {
output.writeVarInt(data.streamId, true)
// we re-use this data when streaming data to the remote endpoint, so we don't write out the payload here, we do it in another place
}
override fun read(kryo: Kryo, input: Input, type: Class<out StreamingData>): StreamingData {
val streamId = input.readVarInt(true)
return StreamingData(streamId)
val streamingData = StreamingData(streamId)
// we want to read out the payload. It is not written by the serializer, but by the streaming manager
val payloadSize = input.readVarInt(true)
streamingData.payload = input.readBytes(payloadSize)
return streamingData
}
}