Added better support for streaming large amounts of data (uses temp files if it's too much data)

This commit is contained in:
Robinson 2023-06-29 12:11:17 +02:00
parent 53939470b9
commit 5ec2abfc9b
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
9 changed files with 225 additions and 124 deletions

View File

@ -451,8 +451,10 @@ abstract class Configuration protected constructor() {
/**
* What is the max stream size that can exist in memory when deciding if data chunks are in memory or on temo-file on disk.
* Data is streamed when it is too large to send in a single aeron message
*
* Must be >= 16 and <= 256
*/
var maxStreamSizeInMemoryMB: Int = 5
var maxStreamSizeInMemoryMB: Int = 16
set(value) {
require(!contextDefined) { errorMessage }
field = value
@ -727,6 +729,9 @@ abstract class Configuration protected constructor() {
}
}
require(maxStreamSizeInMemoryMB >= 16) { "configuration maxStreamSizeInMemoryMB must be >= 16" }
require(maxStreamSizeInMemoryMB <= 256) { "configuration maxStreamSizeInMemoryMB must be <= 256" } // 256 is arbitrary
require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" }
require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" }

View File

@ -0,0 +1,21 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.streaming
import dorkbox.network.serialization.AeronOutput
class AeronWriter: StreamingWriter, AeronOutput()

View File

@ -0,0 +1,26 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.streaming
import java.io.File
import java.io.FileOutputStream
class FileWriter(file: File) : StreamingWriter, FileOutputStream(file) {
override fun writeBytes(bytes: ByteArray) {
write(bytes)
}
}

View File

@ -1,5 +1,22 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.streaming
data class StreamingControl(val state: StreamingState, val streamId: Int,
val totalSize: Long = 0L,
val isFile: Boolean = false, val fileName: String = ""): StreamingMessage
data class StreamingControl(val state: StreamingState,
val streamId: Int,
val totalSize: Long = 0L
): StreamingMessage

View File

@ -18,31 +18,39 @@
package dorkbox.network.connection.streaming
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.io.Input
import dorkbox.bytes.OptimizeUtilsByteBuf
import dorkbox.collections.LockFreeHashMap
import dorkbox.network.Configuration
import dorkbox.network.connection.Connection
import dorkbox.network.connection.CryptoManagement
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.exceptions.StreamingException
import dorkbox.network.serialization.AeronInput
import dorkbox.network.serialization.AeronOutput
import dorkbox.network.serialization.KryoExtra
import dorkbox.os.OS
import dorkbox.util.Sys
import io.aeron.Publication
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KLogger
import org.agrona.ExpandableDirectByteBuffer
import org.agrona.MutableDirectBuffer
import org.agrona.concurrent.IdleStrategy
import java.io.FileInputStream
internal class StreamingManager<CONNECTION : Connection>(
private val logger: KLogger,
private val messageDispatch: CoroutineScope
private val logger: KLogger, private val messageDispatch: CoroutineScope, val config: Configuration
) {
private val streamingDataTarget = LockFreeHashMap<Long, StreamingControl>()
private val streamingDataInMemory = LockFreeHashMap<Long, AeronOutput>()
companion object {
private const val KILOBYTE = 1024
private const val MEGABYTE = 1024 * KILOBYTE
private const val GIGABYTE = 1024 * MEGABYTE
private const val TERABYTE = 1024L * GIGABYTE
@Suppress("UNUSED_CHANGED_VALUE")
private fun writeVarInt(internalBuffer: MutableDirectBuffer, position: Int, value: Int, optimizePositive: Boolean): Int {
var p = position
@ -83,7 +91,19 @@ internal class StreamingManager<CONNECTION : Connection>(
}
private val streamingDataTarget = LockFreeHashMap<Long, StreamingControl>()
private val streamingDataInMemory = LockFreeHashMap<Long, StreamingWriter>()
/**
* What is the max stream size that can exist in memory when deciding if data chunks are in memory or on temo-file on disk
*/
private val maxStreamSizeInMemoryInBytes = config.maxStreamSizeInMemoryMB * MEGABYTE
/**
* NOTE: MUST BE ON THE AERON THREAD!
*
* Reassemble/figure out the internal message pieces. Processed always on the same thread
*/
fun processControlMessage(
@ -98,71 +118,93 @@ internal class StreamingManager<CONNECTION : Connection>(
when (message.state) {
StreamingState.START -> {
streamingDataTarget[streamId] = message
// message.totalSize > maxInMemory, then write to a temp file INSTEAD
if (message.totalSize > maxStreamSizeInMemoryInBytes) {
val fileName = "${config.applicationId}_${streamId}_${connection.id}.tmp"
val tempFileLocation = OS.TEMP_DIR.resolve(fileName)
if (!message.isFile) {
streamingDataInMemory[streamId] = AeronOutput()
val prettySize = Sys.getSizePretty(message.totalSize)
endPoint.logger.info { "Saving $prettySize of streaming data [${streamId}] to: $tempFileLocation" }
streamingDataInMemory[streamId] = FileWriter(tempFileLocation)
} else {
// write the file to disk
endPoint.logger.info { "Saving streaming data [${streamId}] in memory" }
streamingDataInMemory[streamId] = AeronWriter()
}
// this must be last
streamingDataTarget[streamId] = message
}
StreamingState.FINISHED -> {
// NOTE: cannot be on a coroutine before kryo usage!
// get the data out and send messages!
if (!message.isFile) {
val output = streamingDataInMemory.remove(streamId)
if (output != null) {
val streamedMessage: Any?
try {
val input = AeronInput(output.internalBuffer)
streamedMessage = kryo.read(input)
} catch (e: Exception) {
if (e is KryoException) {
// YIKES. this isn't good
// print the list of OUR registered message types, emit them (along with the "error" class index)
// send a message to the remote end, that we had an error for class XYZ, and have it emit ITS message types
// endPoint.serialization.logKryoMessages()
//
//
// val failSent = endPoint.writeUnsafe(tempWriteKryo, failMessage, publication, sendIdleStrategy, connection)
// if (!failSent) {
// // something SUPER wrong!
// // more critical error sending the message. we shouldn't retry or anything.
// val errorMessage = "[${publication.sessionId()}] Abnormal failure while streaming content."
//
// // either client or server. No other choices. We create an exception, because it's more useful!
// val exception = endPoint.newException(errorMessage)
//
// // +2 because we do not want to see the stack for the abstract `newException`
// // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// // where we see who is calling "send()"
// ListenerManager.cleanStackTrace(exception, 5)
// throw exception
// } else {
// // send it up!
// throw e
// }
val input = when (output) {
is AeronWriter -> {
AeronInput(output.internalBuffer)
}
is FileWriter -> {
output.flush()
output.close()
val fileName = "${config.applicationId}_${streamId}_${connection.id}.tmp"
val tempFileLocation = OS.TEMP_DIR.resolve(fileName)
val fileInputStream = FileInputStream(tempFileLocation)
Input(fileInputStream)
}
else -> {
null
}
}
val streamedMessage = if (input != null) {
try {
kryo.read(input)
} catch (e: Exception) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Error serializing message from received streaming content, stream $streamId"
val errorMessage = "Error deserializing message from received streaming content, stream $streamId"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage, e)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
exception.cleanStackTrace(2)
throw exception
} finally {
if (output is FileWriter) {
val fileName = "${config.applicationId}_${streamId}_${connection.id}.tmp"
val tempFileLocation = OS.TEMP_DIR.resolve(fileName)
tempFileLocation.delete()
}
}
} else {
null
}
if (streamedMessage == null) {
if (output is FileWriter) {
val fileName = "${config.applicationId}_${streamId}_${connection.id}.tmp"
val tempFileLocation = OS.TEMP_DIR.resolve(fileName)
tempFileLocation.delete()
}
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Error while processing streaming content, stream $streamId was null."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// where we see who is calling "send()"
exception.cleanStackTrace(2)
throw exception
}
if (streamedMessage != null) {
// NOTE: This MUST be on a new co-routine
messageDispatch.launch {
val listenerManager = endPoint.listenerManager
@ -180,44 +222,16 @@ internal class StreamingManager<CONNECTION : Connection>(
val newException = StreamingException("Error processing message ${streamedMessage::class.java.name}", e)
listenerManager.notifyError(connection, newException)
}
}
} else {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Error while processing streaming content, stream $streamId was null."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
exception.cleanStackTrace(2)
throw exception
}
} else {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Error while receiving streaming content, stream $streamId not available."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
exception.cleanStackTrace(2)
throw exception
}
} else {
// we are a file, so process accordingly
println("processing file")
// we should save it WHERE exactly?
}
}
StreamingState.FAILED -> {
val output = streamingDataInMemory.remove(streamId)
if (output is FileWriter) {
val fileName = "${config.applicationId}_${streamId}_${connection.id}.tmp"
val tempFileLocation = OS.TEMP_DIR.resolve(fileName)
tempFileLocation.delete()
}
// clear all state
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
@ -233,6 +247,13 @@ internal class StreamingManager<CONNECTION : Connection>(
throw exception
}
StreamingState.UNKNOWN -> {
val output = streamingDataInMemory.remove(streamId)
if (output is FileWriter) {
val fileName = "${config.applicationId}_${streamId}_${connection.id}.tmp"
val tempFileLocation = OS.TEMP_DIR.resolve(fileName)
tempFileLocation.delete()
}
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Unknown failure while receiving streaming content for stream $streamId"
@ -263,9 +284,7 @@ internal class StreamingManager<CONNECTION : Connection>(
val controlMessage = streamingDataTarget[streamId]
if (controlMessage != null) {
synchronized(streamingDataInMemory) {
streamingDataInMemory.getOrPut(streamId) { AeronOutput() }.writeBytes(message.payload!!)
}
streamingDataInMemory[streamId]!!.writeBytes(message.payload!!)
} else {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2020 dorkbox, llc
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,10 +25,6 @@ class StreamingControlSerializer: Serializer<StreamingControl>() {
output.writeByte(data.state.ordinal)
output.writeVarInt(data.streamId, true)
output.writeVarLong(data.totalSize, true)
output.writeBoolean(data.isFile)
if (data.isFile) {
output.writeString(data.fileName)
}
}
override fun read(kryo: Kryo, input: Input, type: Class<out StreamingControl>): StreamingControl {
@ -36,14 +32,8 @@ class StreamingControlSerializer: Serializer<StreamingControl>() {
val state = StreamingState.values().first { it.ordinal == stateOrdinal }
val streamId = input.readVarInt(true)
val totalSize = input.readVarLong(true)
val isFile = input.readBoolean()
val fileName = if (isFile) {
input.readString()
} else {
""
}
return StreamingControl(state, streamId, totalSize, isFile, fileName)
return StreamingControl(state, streamId, totalSize)
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.streaming
interface StreamingWriter {
fun writeBytes(bytes: ByteArray)
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2020 dorkbox, llc
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -12,7 +12,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
@ -56,7 +57,7 @@ import java.io.InputStream
*
* Modified from KRYO ByteBufferInput to use ByteBuf instead of ByteBuffer.
*/
class AeronInput
open class AeronInput
/** Creates an uninitialized Input, [.setBuffer] must be called before the Input is used. */
() : Input() {

View File

@ -30,7 +30,8 @@ class StreamingTest : BaseTest() {
fun sendStreamingObject() {
// TODO: streaming data is NOT saved to temp files, it is in memory. every 16 megs should be flushed to disk (this is arbitrary and should be a config setting). if this number is too
// high, we will run out of memory
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 16
// ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 32
val hugeData = ByteArray(sizeToTest)
SecureRandom().nextBytes(hugeData)