diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 39ca724a..1afbead4 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -21,6 +21,9 @@ import dorkbox.network.Server import dorkbox.network.ServerConfiguration import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.CoroutineIdleStrategy +import dorkbox.network.connection.streaming.StreamingControl +import dorkbox.network.connection.streaming.StreamingData +import dorkbox.network.connection.streaming.StreamingManager import dorkbox.network.coroutines.SuspendWaiter import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ServerException @@ -48,6 +51,7 @@ import kotlinx.coroutines.runBlocking import mu.KLogger import mu.KotlinLogging import org.agrona.DirectBuffer +import org.agrona.MutableDirectBuffer import org.agrona.concurrent.IdleStrategy fun CoroutineScope.eventLoop(block: suspend CoroutineScope.() -> Unit): Job { @@ -152,6 +156,8 @@ internal constructor(val type: Class<*>, internal val rmiGlobalSupport = RmiManagerGlobal(logger) internal val rmiConnectionSupport: RmiManagerConnections + internal val streamingManager = StreamingManager(logger, actionDispatch) + internal val pingManager = PingManager() init { @@ -451,14 +457,86 @@ internal constructor(val type: Class<*>, val message = serialization.readMessage(buffer, offset, length, connection) logger.trace { "[${header.sessionId()}] received: $message" } + // the REPEATED usage of wrapping methods below is because Streaming messages have to intercept date BEFORE it goes to a coroutine - // NOTE: This MUST be on a new co-routine - actionDispatch.launch { - try { - processMessage(message, connection) - } catch (e: Exception) { - logger.error("Error processing message", e) - listenerManager.notifyError(connection, e) + when (message) { + is Ping -> { + // NOTE: This MUST be on a new co-routine + actionDispatch.launch { + try { + pingManager.manage(connection, responseManager, message, logger) + } catch (e: Exception) { + logger.error("Error processing message", e) + listenerManager.notifyError(connection, e) + } + } + } + + // small problem... If we expect IN ORDER messages (ie: setting a value, then later reading the value), multiple threads don't work. + // this is worked around by having RMI always return (unless async), even with a null value, so the CALLING side of RMI will always + // go in "lock step" + is RmiMessage -> { + // if we are an RMI message/registration, we have very specific, defined behavior. + // We do not use the "normal" listener callback pattern because this requires special functionality + // NOTE: This MUST be on a new co-routine + actionDispatch.launch { + try { + rmiGlobalSupport.processMessage(serialization, connection, message, rmiConnectionSupport, responseManager, logger) + } catch (e: Exception) { + logger.error("Error processing message", e) + listenerManager.notifyError(connection, e) + } + } + } + + // streaming/chunked message. This is used when the published data is too large for a single Aeron message. + // TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us + // send arbitrarily large pieces of data (gigs in size, potentially). + // This will recursively call into this method for each of the unwrapped chunks of data. + is StreamingControl -> { + streamingManager.processControlMessage(message, this@EndPoint, connection) + } + is StreamingData -> { + // NOTE: this will read extra data from the kryo input as necessary (which is why it's not on action dispatch)! + val rawInput = serialization.readRaw() + val dataLength = rawInput.readVarInt(true) + message.payload = rawInput.readBytes(dataLength) + + + // NOTE: This MUST be on a new co-routine + actionDispatch.launch { + try { + streamingManager.processDataMessage(message, this@EndPoint, connection) + } catch (e: Exception) { + logger.error("Error processing StreamingMessage", e) + listenerManager.notifyError(connection, e) + } + } + } + + + is Any -> { + // NOTE: This MUST be on a new co-routine + actionDispatch.launch { + try { + @Suppress("UNCHECKED_CAST") + var hasListeners = listenerManager.notifyOnMessage(connection, message) + + // each connection registers, and is polled INDEPENDENTLY for messages. + hasListeners = hasListeners or connection.notifyOnMessage(message) + + if (!hasListeners) { + logger.error("No message callbacks found for ${message::class.java.name}") + } + } catch (e: Exception) { + logger.error("Error processing message ${message::class.java.name}", e) + listenerManager.notifyError(connection, e) + } + } + } + + else -> { + logger.error("Unknown message received!!") } } } catch (e: Exception) { @@ -468,48 +546,6 @@ internal constructor(val type: Class<*>, } } - /** - * Actually process the message. - */ - private suspend fun processMessage(message: Any?, connection: CONNECTION) { - when (message) { - is Ping -> { - pingManager.manage(connection, responseManager, message, logger) - } - - // small problem... If we expect IN ORDER messages (ie: setting a value, then later reading the value), multiple threads don't work. - // this is worked around by having RMI always return (unless async), even with a null value, so the CALLING side of RMI will always - // go in "lock step" - is RmiMessage -> { - // if we are an RMI message/registration, we have very specific, defined behavior. - // We do not use the "normal" listener callback pattern because this requires special functionality - rmiGlobalSupport.manage(serialization, connection, message, rmiConnectionSupport, responseManager, logger) - } - - is Any -> { - try { - @Suppress("UNCHECKED_CAST") - var hasListeners = listenerManager.notifyOnMessage(connection, message) - - // each connection registers, and is polled INDEPENDENTLY for messages. - hasListeners = hasListeners or connection.notifyOnMessage(message) - - if (!hasListeners) { - logger.error("No message callbacks found for ${message::class.java.simpleName}") - } - } catch (e: Exception) { - logger.error("Error processing message", e) - listenerManager.notifyError(connection, e) - } - } - - else -> { - logger.error("Unknown message received!!") - } - } - } - - /** * NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine! * @@ -531,62 +567,28 @@ internal constructor(val type: Class<*>, val objectSize = buffer.position() val internalBuffer = buffer.internalBuffer - var result: Long - while (true) { - result = publication.offer(internalBuffer, 0, objectSize) - if (result >= 0) { - // success! - return true - } - /** - * The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go. - * val NOT_CONNECTED: Long = -1 - * - * The offer failed due to back pressure from the subscribers preventing further transmission. - * val BACK_PRESSURED: Long = -2 - * - * The offer failed due to an administration action and should be retried. - * The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt. - * val ADMIN_ACTION: Long = -3 - */ - if (result >= Publication.ADMIN_ACTION) { - // we should retry, BUT we want suspend ANYONE ELSE trying to write at the same time! - sendIdleStrategy.idle() - continue - } - - - if (result == Publication.CLOSED && connection.isClosedViaAeron()) { - // this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's - // done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to - // send the message. - // NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this) - return false - } - - // more critical error sending the message. we shouldn't retry or anything. - val errorMessage = "[${publication.sessionId()}] Error sending message. $message (${errorCodeName(result)})" - - // either client or server. No other choices. We create an exception, because it's more useful! - val exception = newException(errorMessage) - - // +2 because we do not want to see the stack for the abstract `newException` - // +2 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is - // where we see who is calling "send()" - ListenerManager.cleanStackTrace(exception, 4) - - logger.error("Aeron error!", exception) - listenerManager.notifyError(connection, exception) - return false + // one small problem! What if the message is too big to send all at once? + val maxMessageLength = publication.maxMessageLength() + if (objectSize >= maxMessageLength) { + // we must split up the message! It's too large for Aeron to manage. + // this will split up the message, construct the necessary control message and state, then CALL the sendData + // method directly for each subsequent message. + return streamingManager.send(publication, internalBuffer, + objectSize, this, connection) } + + return sendData(publication, internalBuffer, 0, objectSize, connection) } catch (e: Exception) { if (message is MethodResponse && message.result is Exception) { val result = message.result as Exception - logger.error("[${publication.sessionId()}] Error serializing message $message", result) + logger.error("[${publication.sessionId()}] Error serializing message '$message'", result) listenerManager.notifyError(connection, result) + } else if (message is ClientException || message is ServerException) { + logger.error("[${publication.sessionId()}] Error for message '$message'", e) + listenerManager.notifyError(connection, e) } else { - logger.error("[${publication.sessionId()}] Error serializing message $message", e) + logger.error("[${publication.sessionId()}] Error serializing message '$message'", e) listenerManager.notifyError(connection, e) } } finally { @@ -597,6 +599,56 @@ internal constructor(val type: Class<*>, return false } + // the actual bits that send data on the network. + internal suspend fun sendData(publication: Publication, internalBuffer: MutableDirectBuffer, offset: Int, objectSize: Int, connection: CONNECTION): Boolean { + var result: Long + while (true) { + result = publication.offer(internalBuffer, offset, objectSize) + if (result >= 0) { + // success! + return true + } + + /** + * The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go. + * val NOT_CONNECTED: Long = -1 + * + * The offer failed due to back pressure from the subscribers preventing further transmission. + * val BACK_PRESSURED: Long = -2 + * + * The offer failed due to an administration action and should be retried. + * The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt. + * val ADMIN_ACTION: Long = -3 + */ + if (result >= Publication.ADMIN_ACTION) { + // we should retry, BUT we want to suspend ANYONE ELSE trying to write at the same time! + sendIdleStrategy.idle() + continue + } + + + if (result == Publication.CLOSED && connection.isClosedViaAeron()) { + // this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's + // done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to + // send the message. + // NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this) + return false + } + + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "[${publication.sessionId()}] Error sending message. (${errorCodeName(result)})" + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 5) + return false + } + } + override fun toString(): String { return "EndPoint [${type.simpleName}]" } diff --git a/src/dorkbox/network/connection/streaming/StreamingControl.kt b/src/dorkbox/network/connection/streaming/StreamingControl.kt new file mode 100644 index 00000000..1b9d653b --- /dev/null +++ b/src/dorkbox/network/connection/streaming/StreamingControl.kt @@ -0,0 +1,5 @@ +package dorkbox.network.connection.streaming + +data class StreamingControl(val state: StreamingState, val streamId: Long, + val totalSize: Long = 0L, + val isFile: Boolean = false, val fileName: String = ""): StreamingMessage diff --git a/src/dorkbox/network/connection/streaming/StreamingData.kt b/src/dorkbox/network/connection/streaming/StreamingData.kt new file mode 100644 index 00000000..bd0e67f2 --- /dev/null +++ b/src/dorkbox/network/connection/streaming/StreamingData.kt @@ -0,0 +1,32 @@ +package dorkbox.network.connection.streaming + +class StreamingData(var streamId: Long) : StreamingMessage { + + // These are set just after we receive the message, and before we process it + @Transient var payload: ByteArray? = null + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as StreamingData + + if (streamId != other.streamId) return false + if (payload != null) { + if (other.payload == null) return false + if (!payload.contentEquals(other.payload)) return false + } else if (other.payload != null) return false + + return true + } + + override fun hashCode(): Int { + var result = streamId.hashCode() + result = 31 * result + (payload?.contentHashCode() ?: 0) + return result + } + + override fun toString(): String { + return "StreamingData(streamId=$streamId, payloadSize=${payload?.size})" + } +} diff --git a/src/dorkbox/network/connection/streaming/StreamingManager.kt b/src/dorkbox/network/connection/streaming/StreamingManager.kt new file mode 100644 index 00000000..101fe757 --- /dev/null +++ b/src/dorkbox/network/connection/streaming/StreamingManager.kt @@ -0,0 +1,399 @@ +@file:Suppress("DuplicatedCode") + +package dorkbox.network.connection.streaming + +import dorkbox.bytes.OptimizeUtilsByteBuf +import dorkbox.collections.LockFreeHashMap +import dorkbox.network.connection.Connection +import dorkbox.network.connection.EndPoint +import dorkbox.network.connection.ListenerManager +import dorkbox.network.serialization.AeronInput +import dorkbox.network.serialization.AeronOutput +import dorkbox.network.serialization.KryoExtra +import io.aeron.Publication +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch +import mu.KLogger +import org.agrona.MutableDirectBuffer + +internal class StreamingManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) { + private val streamingDataTarget = LockFreeHashMap() + private val streamingDataInMemory = LockFreeHashMap() + + companion object { + @Suppress("UNUSED_CHANGED_VALUE") + private fun writeVarInt(internalBuffer: MutableDirectBuffer, position: Int, value: Int, optimizePositive: Boolean): Int { + var p = position + var newValue = value + if (!optimizePositive) newValue = newValue shl 1 xor (newValue shr 31) + if (newValue ushr 7 == 0) { + internalBuffer.putByte(p++, newValue.toByte()) + return 1 + } + if (newValue ushr 14 == 0) { + internalBuffer.putByte(p++, (newValue and 0x7F or 0x80).toByte()) + internalBuffer.putByte(p++, (newValue ushr 7).toByte()) + return 2 + } + if (newValue ushr 21 == 0) { + val byteBuf = internalBuffer + byteBuf.putByte(p++, (newValue and 0x7F or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 7 or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 14).toByte()) + return 3 + } + if (newValue ushr 28 == 0) { + val byteBuf = internalBuffer + byteBuf.putByte(p++, (newValue and 0x7F or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 7 or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 14 or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 21).toByte()) + return 4 + } + val byteBuf = internalBuffer + byteBuf.putByte(p++, (newValue and 0x7F or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 7 or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 14 or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 21 or 0x80).toByte()) + byteBuf.putByte(p++, (newValue ushr 28).toByte()) + return 5 + } + } + + + /** + * Reassemble/figure out the internal message pieces + */ + fun processControlMessage(message: StreamingControl, endPoint: EndPoint, connection: CONNECTION) { + val streamId = message.streamId + + when (message.state) { + StreamingState.START -> { + streamingDataTarget[streamId] = message + if (!message.isFile) { + streamingDataInMemory[streamId] = AeronOutput() + } + } + StreamingState.FINISHED -> { + // get the data out and send messages! + if (!message.isFile) { + val output = streamingDataInMemory.remove(streamId) + if (output != null) { + val kryo: KryoExtra = endPoint.serialization.takeKryo() + + try { + val input = AeronInput(output.internalBuffer) + val streamedMessage = kryo.readClassAndObject(input) + + // NOTE: This MUST be on a new co-routine + actionDispatch.launch { + val listenerManager = endPoint.listenerManager + + try { + @Suppress("UNCHECKED_CAST") + var hasListeners = listenerManager.notifyOnMessage(connection, streamedMessage) + + // each connection registers, and is polled INDEPENDENTLY for messages. + hasListeners = hasListeners or connection.notifyOnMessage(streamedMessage) + + if (!hasListeners) { + logger.error("No message callbacks found for ${streamedMessage::class.java.name}") + } + } catch (e: Exception) { + logger.error("Error processing message ${streamedMessage::class.java.name}", e) + listenerManager.notifyError(connection, e) + } + } + } catch (e: Exception) { + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "Error serializing message from received streaming content, stream $streamId" + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 2) + throw exception + } finally { + endPoint.serialization.returnKryo(kryo) + } + } else { + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "Error while receiving streaming content, stream $streamId not available." + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 2) + throw exception + } + } else { + // we are a file, so process accordingly + + + } + println(message) + } + StreamingState.FAILED -> { + // clear all state + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "Failure while receiving streaming content for stream $streamId" + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 2) + throw exception + } + StreamingState.UNKNOWN -> { + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "Unknown failure while receiving streaming content for stream $streamId" + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 2) + throw exception + } + } + } + + /** + * Reassemble/figure out the internal message pieces + * + * NOTE sending a huge file can prevent other other network traffic from arriving until it's done! + */ + fun processDataMessage(message: StreamingData, endPoint: EndPoint, connection: CONNECTION) { + // the receiving data will ALWAYS come sequentially, but there might be OTHER streaming data received meanwhile. + val streamId = message.streamId + + val controlMessage = streamingDataTarget[streamId] + if (controlMessage != null) { + streamingDataInMemory.getOrPut(streamId) { AeronOutput() }.writeBytes(message.payload!!) + } else { + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "Abnormal failure while receiving streaming content, stream $streamId not available." + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 5) + throw exception + } + } + + private suspend fun sendFailMessageAndThrow( + e: Exception, + streamSessionId: Long, + publication: Publication, + endPoint: EndPoint, + connection: CONNECTION + ) { + val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId) + val failSent = endPoint.send(failMessage, publication, connection) + if (!failSent) { + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "[${publication.sessionId()}] Abnormal failure while streaming content." + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +4 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 6) + throw exception + } else { + // send it up! + throw e + } + } + + /** + * This is called ONLY when a message is too large to send across the network in a single message (large data messages should + * be split into smaller ones anyways!) + * + * NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine! + * + * We don't write max possible length per message, we write out MTU (payload) length (so aeron doesn't fragment the message). + * The max possible length is WAY, WAY more than the max payload length. + * + * @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown! + */ + suspend fun send( + publication: Publication, + internalBuffer: MutableDirectBuffer, + objectSize: Int, + endPoint: EndPoint, + connection: CONNECTION): Boolean { + + // NOTE: our max object size for IN-MEMORY messages is an INT. For file transfer it's a LONG (so everything here is cast to a long) + var remainingPayload = objectSize + var payloadSent = 0 + + val streamSessionId = endPoint.crypto.secureRandom.nextLong() + + // tell the other side how much data we are sending + val startMessage = StreamingControl(StreamingState.START, streamSessionId, objectSize.toLong()) + val startSent = endPoint.send(startMessage, publication, connection) + if (!startSent) { + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "[${publication.sessionId()}] Error starting streaming content." + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 5) + throw exception + } + + + val kryo: KryoExtra = endPoint.serialization.takeKryo() + + // we do the FIRST chunk super-weird, because of the way we copy data around (we inject headers, + // so the first message is SUPER tiny and is a COPY, the rest are no-copy. + + // This is REUSED to prevent garbage collection issues. + val chunkData = StreamingData(streamSessionId) + + // payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time. + // MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates! + var maxPayloadLength = publication.maxPayloadLength() + if ((maxPayloadLength * 8) < publication.maxMessageLength()) { + maxPayloadLength *= 8 + } + + val header: ByteArray + val headerSize: Int + + try { + val objectBuffer = kryo.write(connection, chunkData) + headerSize = objectBuffer.position() + header = ByteArray(headerSize) + + // we have to account for the header + the MAX optimized int size + maxPayloadLength -= (headerSize + 5) + + // this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once. + val chunkBuffer = AeronOutput(headerSize + maxPayloadLength) + + // copy out our header info + objectBuffer.internalBuffer.getBytes(0, header, 0, headerSize) + + // write out our header + chunkBuffer.writeBytes(header) + + // write out the payload size using optimized data structures. + val varIntSize = chunkBuffer.writeVarInt(maxPayloadLength, true) + + // write out the payload. Our resulting data written out is the ACTUAL MTU of aeron. + internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, maxPayloadLength) + + remainingPayload -= maxPayloadLength + payloadSent += maxPayloadLength + + val success = endPoint.sendData(publication, chunkBuffer.internalBuffer, 0, headerSize + varIntSize + maxPayloadLength, connection) + if (!success) { + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "[${publication.sessionId()}] Abnormal failure while streaming content." + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 5) + throw exception + } + } catch (e: Exception) { + sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, connection) + return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy. + } finally { + endPoint.serialization.returnKryo(kryo) + } + + // now send the chunks as fast as possible. Aeron will have us back-off if we send too quickly + while (remainingPayload > 0) { + val amountToSend = if (remainingPayload < maxPayloadLength) { + remainingPayload + } else { + maxPayloadLength + } + + remainingPayload -= amountToSend + + + // to properly do this, we have to be careful with the underlying protocol, in order to avoid copying the buffer multiple times. + // the data that will be sent is object data + buffer data. We are sending the SAME parent buffer, just at different spots and + // with different headers -- so we don't copy out the data repeatedly + + // fortunately, the way that serialization works, we can safely ADD data to the tail and then appropriately read it off + // on the receiving end without worry. + + try { + val varIntSize = OptimizeUtilsByteBuf.intLength(maxPayloadLength, true) + val writeIndex = payloadSent - headerSize - varIntSize + + // write out our header data (this will OVERWRITE previous data!) + internalBuffer.putBytes(writeIndex, header) + + // write out the payload size using optimized data structures. + writeVarInt(internalBuffer, writeIndex + headerSize, maxPayloadLength, true) + + // write out the payload + endPoint.sendData(publication, internalBuffer, writeIndex, headerSize + amountToSend, connection) + + payloadSent += amountToSend + } catch (e: Exception) { + val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId) + val failSent = endPoint.send(failMessage, publication, connection) + if (!failSent) { + // something SUPER wrong! + // more critical error sending the message. we shouldn't retry or anything. + val errorMessage = "[${publication.sessionId()}] Abnormal failure while streaming content." + + // either client or server. No other choices. We create an exception, because it's more useful! + val exception = endPoint.newException(errorMessage) + + // +2 because we do not want to see the stack for the abstract `newException` + // +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is + // where we see who is calling "send()" + ListenerManager.cleanStackTrace(exception, 5) + throw exception + } else { + // send it up! + throw e + } + } + } + + // send the last chunk of data + val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong()) + return endPoint.send(finishedMessage, publication, connection) + } +} diff --git a/src/dorkbox/network/connection/streaming/StreamingMessage.kt b/src/dorkbox/network/connection/streaming/StreamingMessage.kt new file mode 100644 index 00000000..e0c621c0 --- /dev/null +++ b/src/dorkbox/network/connection/streaming/StreamingMessage.kt @@ -0,0 +1,18 @@ +/* + * Copyright 2020 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection.streaming + +interface StreamingMessage diff --git a/src/dorkbox/network/connection/streaming/StreamingSerializer.kt b/src/dorkbox/network/connection/streaming/StreamingSerializer.kt new file mode 100644 index 00000000..5f5dd426 --- /dev/null +++ b/src/dorkbox/network/connection/streaming/StreamingSerializer.kt @@ -0,0 +1,60 @@ +/* + * Copyright 2020 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection.streaming + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output + +class StreamingControlSerializer: Serializer() { + override fun write(kryo: Kryo, output: Output, data: StreamingControl) { + output.writeByte(data.state.ordinal) + output.writeVarLong(data.streamId, true) + output.writeVarLong(data.totalSize, true) + output.writeBoolean(data.isFile) + if (data.isFile) { + output.writeString(data.fileName) + } + } + + override fun read(kryo: Kryo, input: Input, type: Class): StreamingControl { + val stateOrdinal = input.readByte().toInt() + val state = StreamingState.values().first { it.ordinal == stateOrdinal } + val streamId = input.readVarLong(true) + val totalSize = input.readVarLong(true) + val isFile = input.readBoolean() + val fileName = if (isFile) { + input.readString() + } else { + "" + } + + return StreamingControl(state, streamId, totalSize, isFile, fileName) + } +} + +class StreamingDataSerializer: Serializer() { + override fun write(kryo: Kryo, output: Output, data: StreamingData) { + output.writeVarLong(data.streamId, true) + } + + override fun read(kryo: Kryo, input: Input, type: Class): StreamingData { + val streamId = input.readVarLong(true) + + return StreamingData(streamId) + } +} diff --git a/src/dorkbox/network/connection/streaming/StreamingState.kt b/src/dorkbox/network/connection/streaming/StreamingState.kt new file mode 100644 index 00000000..b9cfbfc1 --- /dev/null +++ b/src/dorkbox/network/connection/streaming/StreamingState.kt @@ -0,0 +1,5 @@ +package dorkbox.network.connection.streaming + +enum class StreamingState { + UNKNOWN, START, FINISHED, FAILED +} diff --git a/src/dorkbox/network/rmi/RmiManagerGlobal.kt b/src/dorkbox/network/rmi/RmiManagerGlobal.kt index 76537c8a..88e23074 100644 --- a/src/dorkbox/network/rmi/RmiManagerGlobal.kt +++ b/src/dorkbox/network/rmi/RmiManagerGlobal.kt @@ -18,6 +18,7 @@ package dorkbox.network.rmi import dorkbox.network.connection.Connection import dorkbox.network.rmi.messages.* import dorkbox.network.serialization.Serialization +import kotlinx.coroutines.launch import mu.KLogger import java.lang.reflect.Proxy import java.util.* @@ -88,7 +89,7 @@ internal class RmiManagerGlobal(logger: KLogger) : RmiOb * Manages ALL OF THE RMI SCOPES */ @Suppress("DuplicatedCode") - suspend fun manage( + suspend fun processMessage( serialization: Serialization, connection: CONNECTION, message: Any, diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index f81ee98b..4767efb1 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -18,10 +18,17 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.SerializerFactory +import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy import com.esotericsoftware.minlog.Log import dorkbox.network.Server import dorkbox.network.connection.Connection +import dorkbox.network.connection.streaming.StreamingControl +import dorkbox.network.connection.streaming.StreamingControlSerializer +import dorkbox.network.connection.streaming.StreamingData +import dorkbox.network.connection.streaming.StreamingDataSerializer +import dorkbox.network.connection.streaming.StreamingMessage +import dorkbox.network.connection.streaming.StreamingState import dorkbox.network.handshake.HandshakeMessage import dorkbox.network.ping.Ping import dorkbox.network.ping.PingSerializer @@ -146,6 +153,8 @@ open class Serialization(private val references: Boolean private val rmiClientSerializer = RmiClientSerializer() private val rmiServerSerializer = RmiServerSerializer() + private val streamingControlSerializer = StreamingControlSerializer() + private val streamingDataSerializer = StreamingDataSerializer() private val pingSerializer = PingSerializer() /** @@ -348,6 +357,10 @@ open class Serialization(private val references: Boolean kryo.register(MethodRequest::class.java, methodRequestSerializer) kryo.register(MethodResponse::class.java, methodResponseSerializer) + // Streaming/Chunked Messages! + kryo.register(StreamingControl::class.java, streamingControlSerializer) + kryo.register(StreamingData::class.java, streamingDataSerializer) + kryo.register(Ping::class.java, pingSerializer) @Suppress("UNCHECKED_CAST") @@ -362,43 +375,7 @@ open class Serialization(private val references: Boolean * called as the first thing inside when initializing the classesToRegister */ private fun initKryo(): KryoExtra { - val kryo = KryoExtra() - - kryo.instantiatorStrategy = instantiatorStrategy - kryo.references = references - - if (factory != null) { - kryo.setDefaultSerializer(factory) - } - - // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. - SerializationDefaults.register(kryo) - -// serialization.register(PingMessage::class.java) // TODO this is built into aeron!??!?!?! - - // TODO: this is for diffie hellmen handshake stuff! -// serialization.register(IESParameters::class.java, IesParametersSerializer()) -// serialization.register(IESWithCipherParameters::class.java, IesWithCipherParametersSerializer()) - // TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo -// serialization.register(XECPublicKey::class.java, XECPublicKeySerializer()) -// serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer()) -// serialization.register(Message::class.java) // must use full package name! - - // RMI stuff! - kryo.register(ConnectionObjectCreateRequest::class.java) - kryo.register(ConnectionObjectCreateResponse::class.java) - kryo.register(ConnectionObjectDeleteRequest::class.java) - kryo.register(ConnectionObjectDeleteResponse::class.java) - - kryo.register(MethodRequest::class.java, methodRequestSerializer) - kryo.register(MethodResponse::class.java, methodResponseSerializer) - - kryo.register(Ping::class.java, pingSerializer) - - @Suppress("UNCHECKED_CAST") - kryo.register(InvocationHandler::class.java as Class, rmiClientSerializer) - - kryo.register(Continuation::class.java, continuationSerializer) + val kryo = initGlobalKryo() // check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer) // note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again @@ -748,6 +725,11 @@ open class Serialization(private val references: Boolean return readKryo.read(buffer, offset, length, connection) } + fun readRaw(): Input { + return readKryo.readerBuffer + } + + // /** // * # BLOCKING // * diff --git a/test/dorkboxTest/network/DisconnectReconnectTest.kt b/test/dorkboxTest/network/DisconnectReconnectTest.kt index 4ec4d6f0..a9854024 100644 --- a/test/dorkboxTest/network/DisconnectReconnectTest.kt +++ b/test/dorkboxTest/network/DisconnectReconnectTest.kt @@ -5,10 +5,16 @@ import dorkbox.network.Server import dorkbox.network.aeron.AeronDriver import dorkbox.network.connection.Connection import kotlinx.atomicfu.atomic +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import org.junit.Assert import org.junit.Test import java.io.IOException +import kotlin.time.Duration.Companion.seconds class DisconnectReconnectTest : BaseTest() { private val reconnectCount = atomic(0) @@ -122,6 +128,55 @@ class DisconnectReconnectTest : BaseTest() { Assert.assertEquals(4, reconnectCount.value) } + @Test + fun multiConnectClient() { + // clients first, so they try to connect to the server at (roughly) the same time + val config = clientConfig() + + val client1: Client = Client(config) + val client2: Client = Client(config) + + addEndPoint(client1) + addEndPoint(client2) + client1.onDisconnect { + logger.error("Disconnected 1!") + } + client2.onDisconnect { + logger.error("Disconnected 2!") + } + + GlobalScope.launch { + client1.connect(LOCALHOST) + } + + GlobalScope.launch { + client2.connect(LOCALHOST) + } + + println("Starting server...") + + run { + val configuration = serverConfig() + + val server: Server = Server(configuration) + addEndPoint(server) + server.bind() + + server.onConnect { + logger.error("Disconnecting after 10 seconds.") + delay(10.seconds) + + logger.error("Disconnecting....") + close() + } + } + + waitForThreads() + + System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value) + Assert.assertEquals(4, reconnectCount.value) + } + interface CloseIface { suspend fun close() } @@ -204,7 +259,9 @@ class DisconnectReconnectTest : BaseTest() { fun manualMediaDriverAndReconnectClient() { // NOTE: once a config is assigned to a driver, the config cannot be changed val aeronDriver = AeronDriver(serverConfig()) - aeronDriver.start() + runBlocking { + aeronDriver.start() + } run { val serverConfiguration = serverConfig() diff --git a/test/dorkboxTest/network/StreamingTest.kt b/test/dorkboxTest/network/StreamingTest.kt new file mode 100644 index 00000000..3112fc4f --- /dev/null +++ b/test/dorkboxTest/network/StreamingTest.kt @@ -0,0 +1,55 @@ +package dorkboxTest.network + +import dorkbox.network.Client +import dorkbox.network.Server +import dorkbox.network.connection.Connection +import dorkbox.network.connection.ConnectionParams +import org.junit.Test + +class StreamingTest : BaseTest() { + + @Test + fun sendStreamingObject() { + run { + val configuration = serverConfig() + + val server: Server = Server(configuration) + addEndPoint(server) + server.bind() + + server.onMessage { + println("received data, shutting down!") + stopEndPoints() + } + } + + run { + var connectionParams: ConnectionParams? = null + val config = clientConfig() + + val client: Client = Client(config) { + connectionParams = it + Connection(it) + } + addEndPoint(client) + + client.onConnect { + val params = connectionParams ?: throw Exception("We should not have null connectionParams!") + val publication = params.mediaDriverConnection.publication + val hugeData = ByteArray(publication.maxMessageLength() + 10) + + this.endPoint.send(hugeData, publication, this) + } + + client.connect(LOCALHOST) + } + + + waitForThreads(0) + +// System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value) +// Assert.assertEquals(4, reconnectCount.value) + } + + +}