Streaming data now supports random placement
This commit is contained in:
parent
cbb5038eb6
commit
9e20a20bbb
|
@ -17,5 +17,18 @@
|
|||
package dorkbox.network.connection.streaming
|
||||
|
||||
import dorkbox.network.serialization.AeronOutput
|
||||
import kotlinx.atomicfu.atomic
|
||||
|
||||
class AeronWriter(size: Int): StreamingWriter, AeronOutput(size)
|
||||
class AeronWriter(val size: Int): StreamingWriter, AeronOutput(size) {
|
||||
private val written = atomic(0)
|
||||
|
||||
override fun writeBytes(startPosition: Int, bytes: ByteArray) {
|
||||
position = startPosition
|
||||
writeBytes(bytes)
|
||||
written.getAndAdd(bytes.size)
|
||||
}
|
||||
|
||||
override fun isFinished(): Boolean {
|
||||
return written.value == size
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,11 +16,47 @@
|
|||
|
||||
package dorkbox.network.connection.streaming
|
||||
|
||||
import kotlinx.atomicfu.atomic
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
import java.io.RandomAccessFile
|
||||
|
||||
class FileWriter(val file: File) : StreamingWriter, FileOutputStream(file) {
|
||||
override fun writeBytes(bytes: ByteArray) {
|
||||
class FileWriter(val size: Int, val file: File) : StreamingWriter, RandomAccessFile(file, "rw") {
|
||||
|
||||
private val written = atomic(0)
|
||||
|
||||
init {
|
||||
// reserve space on disk!
|
||||
val saveSize = size.coerceAtMost(4096)
|
||||
var bytes = ByteArray(saveSize)
|
||||
this.write(bytes)
|
||||
|
||||
if (saveSize < size) {
|
||||
var remainingBytes = size - saveSize
|
||||
|
||||
while (remainingBytes > 0) {
|
||||
if (saveSize > remainingBytes) {
|
||||
bytes = ByteArray(remainingBytes)
|
||||
}
|
||||
this.write(bytes)
|
||||
remainingBytes = (remainingBytes - saveSize).coerceAtLeast(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun writeBytes(startPosition: Int, bytes: ByteArray) {
|
||||
// the OS will synchronize writes to disk
|
||||
this.seek(startPosition.toLong())
|
||||
write(bytes)
|
||||
written.addAndGet(bytes.size)
|
||||
}
|
||||
|
||||
override fun isFinished(): Boolean {
|
||||
return written.value == size
|
||||
}
|
||||
|
||||
fun finishAndClose() {
|
||||
fd.sync()
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import dorkbox.bytes.xxHash32
|
|||
class StreamingData(val streamId: Int) : StreamingMessage {
|
||||
|
||||
var payload: ByteArray? = null
|
||||
var startPosition: Int = 0
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
|
@ -34,16 +35,19 @@ class StreamingData(val streamId: Int) : StreamingMessage {
|
|||
if (!payload.contentEquals(other.payload)) return false
|
||||
} else if (other.payload != null) return false
|
||||
|
||||
if (startPosition != other.startPosition) return false
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = streamId.hashCode()
|
||||
result = 31 * result + (payload?.contentHashCode() ?: 0)
|
||||
result = 31 * result + (startPosition)
|
||||
return result
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "StreamingData(streamId=$streamId xxHash=${payload?.xxHash32()})"
|
||||
return "StreamingData(streamId=$streamId position=${startPosition}, xxHash=${payload?.xxHash32()})"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ package dorkbox.network.connection.streaming
|
|||
import com.esotericsoftware.kryo.io.Input
|
||||
import dorkbox.bytes.OptimizeUtilsByteArray
|
||||
import dorkbox.bytes.OptimizeUtilsByteBuf
|
||||
import dorkbox.collections.LockFreeHashMap
|
||||
import dorkbox.collections.LockFreeLongMap
|
||||
import dorkbox.network.Configuration
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.connection.CryptoManagement
|
||||
|
@ -90,8 +90,8 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
}
|
||||
|
||||
|
||||
private val streamingDataTarget = LockFreeHashMap<Long, StreamingControl>()
|
||||
private val streamingDataInMemory = LockFreeHashMap<Long, StreamingWriter>()
|
||||
private val streamingDataTarget = LockFreeLongMap<StreamingControl>()
|
||||
private val streamingDataInMemory = LockFreeLongMap<StreamingWriter>()
|
||||
|
||||
|
||||
/**
|
||||
|
@ -148,7 +148,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
val prettySize = Sys.getSizePretty(message.totalSize)
|
||||
|
||||
endPoint.logger.info { "Saving $prettySize of streaming data [${streamId}] to: $tempFileLocation" }
|
||||
streamingDataInMemory[streamId] = FileWriter(tempFileLocation)
|
||||
streamingDataInMemory[streamId] = FileWriter(message.totalSize.toInt(), tempFileLocation)
|
||||
} else {
|
||||
endPoint.logger.info { "Saving streaming data [${streamId}] in memory" }
|
||||
// .toInt is safe because we know the total size is < than maxStreamSizeInMemoryInBytes
|
||||
|
@ -167,9 +167,8 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
val output = streamingDataInMemory[streamId]
|
||||
|
||||
if (output is FileWriter) {
|
||||
output.flush()
|
||||
output.close()
|
||||
|
||||
output.finishAndClose()
|
||||
// we don't need to do anything else (no de-serialization into an object) because we are already our target object
|
||||
return
|
||||
} else {
|
||||
// something SUPER wrong!
|
||||
|
@ -186,11 +185,13 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
|
||||
val input = when (output) {
|
||||
is AeronWriter -> {
|
||||
// the position can be wrong, especially if there are multiple threads setting the data
|
||||
output.setPosition(output.size)
|
||||
AeronInput(output.internalBuffer)
|
||||
}
|
||||
is FileWriter -> {
|
||||
output.flush()
|
||||
output.close()
|
||||
// if we are too large to fit in memory while streaming, we store it on disk.
|
||||
output.finishAndClose()
|
||||
|
||||
val fileInputStream = FileInputStream(output.file)
|
||||
Input(fileInputStream)
|
||||
|
@ -299,11 +300,11 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
}
|
||||
|
||||
/**
|
||||
* NOTE: MUST BE ON THE AERON THREAD!
|
||||
* NOTE: MUST BE ON THE AERON THREAD BECAUSE THIS MUST BE SINGLE THREADED!!!
|
||||
*
|
||||
* Reassemble/figure out the internal message pieces
|
||||
*
|
||||
* NOTE sending a huge file can prevent other other network traffic from arriving until it's done!
|
||||
* NOTE sending a huge file can cause other network traffic delays!
|
||||
*/
|
||||
fun processDataMessage(message: StreamingData, endPoint: EndPoint<CONNECTION>, connection: CONNECTION) {
|
||||
// the receiving data will ALWAYS come sequentially, but there might be OTHER streaming data received meanwhile.
|
||||
|
@ -312,7 +313,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
|
||||
val dataWriter = streamingDataInMemory[streamId]
|
||||
if (dataWriter != null) {
|
||||
dataWriter.writeBytes(message.payload!!)
|
||||
dataWriter.writeBytes(message.startPosition, message.payload!!)
|
||||
} else {
|
||||
// something SUPER wrong!
|
||||
// more critical error sending the message. we shouldn't retry or anything.
|
||||
|
@ -424,11 +425,12 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
headerSize = objectBuffer.position()
|
||||
header = ByteArray(headerSize)
|
||||
|
||||
// we have to account for the header + the MAX optimized int size
|
||||
sizeOfBlockData -= (headerSize + 5)
|
||||
// we have to account for the header + the MAX optimized int size (position and data-length)
|
||||
val dataSize = headerSize + 5 + 5
|
||||
sizeOfBlockData -= dataSize
|
||||
|
||||
// this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once.
|
||||
val blockBuffer = AeronOutput(headerSize + sizeOfBlockData)
|
||||
val blockBuffer = AeronOutput(dataSize)
|
||||
|
||||
// copy out our header info
|
||||
objectBuffer.internalBuffer.getBytes(0, header, 0, headerSize)
|
||||
|
@ -436,17 +438,20 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
// write out our header
|
||||
blockBuffer.writeBytes(header)
|
||||
|
||||
// write out the payload size using optimized data structures.
|
||||
val varIntSize = blockBuffer.writeVarInt(sizeOfBlockData, true)
|
||||
// write out the start-position (of the payload). First start-position is always 0
|
||||
val positionIntSize = blockBuffer.writeVarInt(0, true)
|
||||
|
||||
// write out the payload size
|
||||
val payloadIntSize = blockBuffer.writeVarInt(sizeOfBlockData, true)
|
||||
|
||||
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
|
||||
originalBuffer.getBytes(0, blockBuffer.internalBuffer, headerSize + varIntSize, sizeOfBlockData)
|
||||
originalBuffer.getBytes(0, blockBuffer.internalBuffer, headerSize + positionIntSize + payloadIntSize, sizeOfBlockData)
|
||||
|
||||
remainingPayload -= sizeOfBlockData
|
||||
payloadSent += sizeOfBlockData
|
||||
|
||||
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
|
||||
val reusedPayloadSize = headerSize + varIntSize + sizeOfBlockData
|
||||
val reusedPayloadSize = headerSize + positionIntSize + payloadIntSize + sizeOfBlockData
|
||||
|
||||
val success = endPoint.aeronDriver.send(
|
||||
publication = publication,
|
||||
|
@ -499,17 +504,21 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
/// TODO: Compression/encryption??
|
||||
|
||||
try {
|
||||
val varIntSize = OptimizeUtilsByteBuf.intLength(amountToSend, true)
|
||||
val writeIndex = payloadSent - headerSize - varIntSize
|
||||
val positionIntSize = OptimizeUtilsByteBuf.intLength(payloadSent, true)
|
||||
val payloadIntSize = OptimizeUtilsByteBuf.intLength(amountToSend, true)
|
||||
val writeIndex = payloadSent - headerSize - positionIntSize - payloadIntSize
|
||||
|
||||
// write out our header data (this will OVERWRITE previous data!)
|
||||
originalBuffer.putBytes(writeIndex, header)
|
||||
|
||||
// write out the payload size using optimized data structures.
|
||||
writeVarInt(originalBuffer, writeIndex + headerSize, amountToSend, true)
|
||||
// write out the payload start position
|
||||
writeVarInt(originalBuffer, writeIndex + headerSize, payloadSent, true)
|
||||
|
||||
// write out the payload size
|
||||
writeVarInt(originalBuffer, writeIndex + headerSize + positionIntSize, amountToSend, true)
|
||||
|
||||
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
|
||||
val reusedPayloadSize = headerSize + varIntSize + amountToSend
|
||||
val reusedPayloadSize = headerSize + payloadIntSize + positionIntSize + amountToSend
|
||||
|
||||
// write out the payload
|
||||
val success = endPoint.aeronDriver.send(
|
||||
|
@ -610,7 +619,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
|
||||
|
||||
|
||||
// we do the FIRST block super-weird, because of the way we copy data around (we inject headers,
|
||||
// we do the FIRST block super-weird, because of the way we copy data around (we inject headers),
|
||||
// so the first message is SUPER tiny and is a COPY, the rest are no-copy.
|
||||
|
||||
// payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time.
|
||||
|
@ -620,8 +629,8 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
|
||||
val headerSize: Int
|
||||
|
||||
val buffer = ByteArray(sizeOfBlockData)
|
||||
val blockBuffer = UnsafeBuffer(buffer)
|
||||
val buffer: ByteArray
|
||||
val blockBuffer: UnsafeBuffer
|
||||
|
||||
try {
|
||||
// This is REUSED to prevent garbage collection issues.
|
||||
|
@ -629,17 +638,25 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
val objectBuffer = kryo.write(connection, blockData)
|
||||
headerSize = objectBuffer.position()
|
||||
|
||||
// we have to account for the header + the MAX optimized int size
|
||||
sizeOfBlockData -= (headerSize + 5)
|
||||
// we have to account for the header + the MAX optimized int size (position and data-length)
|
||||
val dataSize = headerSize + 5 + 5
|
||||
sizeOfBlockData -= dataSize
|
||||
|
||||
// copy out our header info
|
||||
// this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once.
|
||||
buffer = ByteArray(sizeOfBlockData + dataSize)
|
||||
blockBuffer = UnsafeBuffer(buffer)
|
||||
|
||||
// copy out our header info (this skips the header object)
|
||||
objectBuffer.internalBuffer.getBytes(0, buffer, 0, headerSize)
|
||||
|
||||
// write out the payload size using optimized data structures.
|
||||
val varIntSize = OptimizeUtilsByteArray.writeInt(buffer, sizeOfBlockData, true, headerSize)
|
||||
// write out the start-position (of the payload). First start-position is always 0
|
||||
val positionIntSize = OptimizeUtilsByteArray.writeInt(buffer, 0, true, headerSize)
|
||||
|
||||
// write out the payload size
|
||||
val payloadIntSize = OptimizeUtilsByteArray.writeInt(buffer, sizeOfBlockData, true, headerSize + positionIntSize)
|
||||
|
||||
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
|
||||
val readBytes = fileInputStream.read(buffer, headerSize + varIntSize, sizeOfBlockData)
|
||||
val readBytes = fileInputStream.read(buffer, headerSize + positionIntSize + payloadIntSize, sizeOfBlockData)
|
||||
if (readBytes != sizeOfBlockData) {
|
||||
// something SUPER wrong!
|
||||
// more critical error sending the message. we shouldn't retry or anything.
|
||||
|
@ -658,7 +675,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
payloadSent += sizeOfBlockData
|
||||
|
||||
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
|
||||
val reusedPayloadSize = headerSize + varIntSize + sizeOfBlockData
|
||||
val reusedPayloadSize = headerSize + positionIntSize + payloadIntSize + sizeOfBlockData
|
||||
|
||||
val success = endPoint.aeronDriver.send(
|
||||
publication = publication,
|
||||
|
@ -717,11 +734,13 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
/// TODO: Compression/encryption??
|
||||
|
||||
try {
|
||||
// write out the payload size using optimized data structures.
|
||||
val varIntSize = OptimizeUtilsByteArray.writeInt(buffer, amountToSend, true, headerSize)
|
||||
// write out the payload start position
|
||||
val positionIntSize = OptimizeUtilsByteArray.writeInt(buffer, payloadSent, true, headerSize)
|
||||
// write out the payload size
|
||||
val payloadIntSize = OptimizeUtilsByteArray.writeInt(buffer, amountToSend, true, headerSize + positionIntSize)
|
||||
|
||||
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
|
||||
val readBytes = fileInputStream.read(buffer, headerSize + varIntSize, amountToSend)
|
||||
val readBytes = fileInputStream.read(buffer, headerSize + positionIntSize + payloadIntSize, amountToSend)
|
||||
if (readBytes != amountToSend) {
|
||||
// something SUPER wrong!
|
||||
// more critical error sending the message. we shouldn't retry or anything.
|
||||
|
@ -737,14 +756,14 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
|
|||
}
|
||||
|
||||
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
|
||||
val reusedPayloadSize = headerSize + varIntSize + amountToSend
|
||||
val reusedPayloadSize = headerSize + positionIntSize + payloadIntSize + amountToSend
|
||||
|
||||
// write out the payload
|
||||
aeronDriver.send(
|
||||
publication = publication,
|
||||
internalBuffer = blockBuffer,
|
||||
bufferClaim = kryo.bufferClaim,
|
||||
offset = 0,
|
||||
offset = 0, // 0 because we are not reading the entire file at once
|
||||
objectSize = reusedPayloadSize,
|
||||
sendIdleStrategy = sendIdleStrategy,
|
||||
connection = connection,
|
||||
|
|
|
@ -31,7 +31,7 @@ class StreamingControlSerializer: Serializer<StreamingControl>() {
|
|||
override fun read(kryo: Kryo, input: Input, type: Class<out StreamingControl>): StreamingControl {
|
||||
val stateOrdinal = input.readByte().toInt()
|
||||
val isFile = input.readBoolean()
|
||||
val state = StreamingState.values().first { it.ordinal == stateOrdinal }
|
||||
val state = StreamingState.entries.first { it.ordinal == stateOrdinal }
|
||||
val streamId = input.readVarInt(true)
|
||||
val totalSize = input.readVarLong(true)
|
||||
|
||||
|
@ -49,8 +49,10 @@ class StreamingDataSerializer: Serializer<StreamingData>() {
|
|||
val streamId = input.readVarInt(true)
|
||||
val streamingData = StreamingData(streamId)
|
||||
|
||||
// we want to read out the payload. It is not written by the serializer, but by the streaming manager
|
||||
// we want to read out the start-position AND payload. It is not written by the serializer, but by the streaming manager
|
||||
val startPosition = input.readVarInt(true)
|
||||
val payloadSize = input.readVarInt(true)
|
||||
streamingData.startPosition = startPosition
|
||||
streamingData.payload = input.readBytes(payloadSize)
|
||||
return streamingData
|
||||
}
|
||||
|
|
|
@ -17,5 +17,6 @@
|
|||
package dorkbox.network.connection.streaming
|
||||
|
||||
interface StreamingWriter {
|
||||
fun writeBytes(bytes: ByteArray)
|
||||
fun writeBytes(startPosition: Int, bytes: ByteArray)
|
||||
fun isFinished(): Boolean
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue