Changed wording of chunk -> block
This commit is contained in:
parent
290c5bd768
commit
215ed20056
@ -97,7 +97,7 @@ internal class StreamingManager<CONNECTION : Connection>(
|
|||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* What is the max stream size that can exist in memory when deciding if data chunks are in memory or on temo-file on disk
|
* What is the max stream size that can exist in memory when deciding if data blocks are in memory or temp-file on disk
|
||||||
*/
|
*/
|
||||||
private val maxStreamSizeInMemoryInBytes = config.maxStreamSizeInMemoryMB * MEGABYTE
|
private val maxStreamSizeInMemoryInBytes = config.maxStreamSizeInMemoryMB * MEGABYTE
|
||||||
|
|
||||||
@ -329,8 +329,8 @@ internal class StreamingManager<CONNECTION : Connection>(
|
|||||||
* We don't write max possible length per message, we write out MTU (payload) length (so aeron doesn't fragment the message).
|
* We don't write max possible length per message, we write out MTU (payload) length (so aeron doesn't fragment the message).
|
||||||
* The max possible length is WAY, WAY more than the max payload length.
|
* The max possible length is WAY, WAY more than the max payload length.
|
||||||
*
|
*
|
||||||
* @param internalBuffer this is the ORIGINAL object data that is to be "chunked" and sent across the wire
|
* @param internalBuffer this is the ORIGINAL object data that is to be blocks sent across the wire
|
||||||
* @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
|
* @return true if ALL the message blocks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
|
||||||
*/
|
*/
|
||||||
fun send(
|
fun send(
|
||||||
publication: Publication,
|
publication: Publication,
|
||||||
@ -374,11 +374,11 @@ internal class StreamingManager<CONNECTION : Connection>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// we do the FIRST chunk super-weird, because of the way we copy data around (we inject headers,
|
// we do the FIRST block super-weird, because of the way we copy data around (we inject headers,
|
||||||
// so the first message is SUPER tiny and is a COPY, the rest are no-copy.
|
// so the first message is SUPER tiny and is a COPY, the rest are no-copy.
|
||||||
|
|
||||||
// This is REUSED to prevent garbage collection issues.
|
// This is REUSED to prevent garbage collection issues.
|
||||||
val chunkData = StreamingData(streamSessionId)
|
val blockData = StreamingData(streamSessionId)
|
||||||
|
|
||||||
// payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time.
|
// payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time.
|
||||||
// MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates!
|
// MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates!
|
||||||
@ -389,7 +389,7 @@ internal class StreamingManager<CONNECTION : Connection>(
|
|||||||
val headerSize: Int
|
val headerSize: Int
|
||||||
|
|
||||||
try {
|
try {
|
||||||
val objectBuffer = kryo.write(connection, chunkData)
|
val objectBuffer = kryo.write(connection, blockData)
|
||||||
headerSize = objectBuffer.position()
|
headerSize = objectBuffer.position()
|
||||||
header = ByteArray(headerSize)
|
header = ByteArray(headerSize)
|
||||||
|
|
||||||
@ -397,26 +397,26 @@ internal class StreamingManager<CONNECTION : Connection>(
|
|||||||
sizeOfPayload -= (headerSize + 5)
|
sizeOfPayload -= (headerSize + 5)
|
||||||
|
|
||||||
// this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once.
|
// this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once.
|
||||||
val chunkBuffer = AeronOutput(headerSize + sizeOfPayload)
|
val blockBuffer = AeronOutput(headerSize + sizeOfPayload)
|
||||||
|
|
||||||
// copy out our header info
|
// copy out our header info
|
||||||
objectBuffer.internalBuffer.getBytes(0, header, 0, headerSize)
|
objectBuffer.internalBuffer.getBytes(0, header, 0, headerSize)
|
||||||
|
|
||||||
// write out our header
|
// write out our header
|
||||||
chunkBuffer.writeBytes(header)
|
blockBuffer.writeBytes(header)
|
||||||
|
|
||||||
// write out the payload size using optimized data structures.
|
// write out the payload size using optimized data structures.
|
||||||
val varIntSize = chunkBuffer.writeVarInt(sizeOfPayload, true)
|
val varIntSize = blockBuffer.writeVarInt(sizeOfPayload, true)
|
||||||
|
|
||||||
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
|
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
|
||||||
originalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload)
|
originalBuffer.getBytes(0, blockBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload)
|
||||||
|
|
||||||
remainingPayload -= sizeOfPayload
|
remainingPayload -= sizeOfPayload
|
||||||
payloadSent += sizeOfPayload
|
payloadSent += sizeOfPayload
|
||||||
|
|
||||||
val success = endPoint.dataSend(
|
val success = endPoint.dataSend(
|
||||||
publication,
|
publication,
|
||||||
chunkBuffer.internalBuffer,
|
blockBuffer.internalBuffer,
|
||||||
0,
|
0,
|
||||||
headerSize + varIntSize + sizeOfPayload,
|
headerSize + varIntSize + sizeOfPayload,
|
||||||
sendIdleStrategy,
|
sendIdleStrategy,
|
||||||
@ -441,7 +441,7 @@ internal class StreamingManager<CONNECTION : Connection>(
|
|||||||
return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy.
|
return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy.
|
||||||
}
|
}
|
||||||
|
|
||||||
// now send the chunks as fast as possible. Aeron will have us back-off if we send too quickly
|
// now send the block as fast as possible. Aeron will have us back-off if we send too quickly
|
||||||
while (remainingPayload > 0) {
|
while (remainingPayload > 0) {
|
||||||
val amountToSend = if (remainingPayload < sizeOfPayload) {
|
val amountToSend = if (remainingPayload < sizeOfPayload) {
|
||||||
remainingPayload
|
remainingPayload
|
||||||
@ -504,7 +504,7 @@ internal class StreamingManager<CONNECTION : Connection>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// send the last chunk of data
|
// send the last block of data
|
||||||
val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong())
|
val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong())
|
||||||
|
|
||||||
return endPoint.writeUnsafe(finishedMessage, publication, sendIdleStrategy, connection, kryo)
|
return endPoint.writeUnsafe(finishedMessage, publication, sendIdleStrategy, connection, kryo)
|
||||||
|
Loading…
Reference in New Issue
Block a user