From 510fc85f2c7ac41babd9865cb38d3d41eddf60ea Mon Sep 17 00:00:00 2001 From: Robinson Date: Wed, 28 Jun 2023 20:31:08 +0200 Subject: [PATCH] Removed streaming kryo and temp kryo --- src/dorkbox/network/Client.kt | 2 +- src/dorkbox/network/connection/Connection.kt | 6 +---- src/dorkbox/network/connection/EndPoint.kt | 8 ++---- .../connection/streaming/StreamingManager.kt | 26 ++++++++++++------- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 2a72501f..c2671286 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -196,6 +196,7 @@ open class Client( var connectionTimeoutSec: Int = 0 private set + private val handshake = ClientHandshake(this, logger) @Volatile @@ -720,7 +721,6 @@ open class Client( // every time we connect to a server, we have to reconfigure AND reassign the readKryos. readKryo = kryoConfiguredFromServer - streamingReadKryo = serialization.initKryo() /////////////// diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 59f490cf..f2dd7979 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -57,7 +57,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { private val sendIdleStrategy: IdleStrategy private val writeKryo: KryoExtra - private val tempWriteKryo: KryoExtra /** * This is the client UUID. This is useful determine if the same client is connecting multiple times to a server (instead of only using IP address) @@ -140,9 +139,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { init { @Suppress("UNCHECKED_CAST") writeKryo = endPoint.serialization.initKryo() as KryoExtra - @Suppress("UNCHECKED_CAST") - tempWriteKryo = endPoint.serialization.initKryo() as KryoExtra - sendIdleStrategy = endPoint.config.sendIdleStrategy.cloneToNormal() @@ -224,7 +220,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { try { // The handshake sessionId IS NOT globally unique logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" } - val write = endPoint.write(writeKryo, tempWriteKryo, message, publication, sendIdleStrategy, this@Connection, abortEarly) + val write = endPoint.write(writeKryo, message, publication, sendIdleStrategy, this@Connection, abortEarly) write } catch (e: Throwable) { // make sure we atomically create the listener manager, if necessary diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 0de8d1aa..66bd2721 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -149,7 +149,6 @@ abstract class EndPoint private constructor(val type: C // The readKryo WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem) @Volatile internal var readKryo: KryoExtra - internal var streamingReadKryo: KryoExtra internal val handshaker: Handshaker @@ -212,11 +211,9 @@ abstract class EndPoint private constructor(val type: C // the initial kryo created for serialization is reused as the read kryo if (type == Server::class.java) { readKryo = serialization.initKryo(kryo) - streamingReadKryo = serialization.initKryo() } else { // these will be reassigned by the client Connect method! readKryo = kryo - streamingReadKryo = kryo } // we have to be able to specify the property store @@ -468,7 +465,6 @@ abstract class EndPoint private constructor(val type: C */ open fun write( writeKryo: KryoExtra, - tempWriteKryo: KryoExtra, message: Any, publication: Publication, sendIdleStrategy: IdleStrategy, @@ -496,7 +492,7 @@ abstract class EndPoint private constructor(val type: C internalBuffer = internalBuffer, objectSize = objectSize, endPoint = this, - tempWriteKryo = tempWriteKryo, + kryo = writeKryo, sendIdleStrategy = sendIdleStrategy, connection = connection ) @@ -582,7 +578,6 @@ abstract class EndPoint private constructor(val type: C // send arbitrarily large pieces of data (gigs in size, potentially). // This will recursively call into this method for each of the unwrapped chunks of data. is StreamingControl -> { - // TODO: this can be the streaming kryo??? streamingManager.processControlMessage(message, readKryo,this@EndPoint, connection) } is StreamingData -> { @@ -651,6 +646,7 @@ abstract class EndPoint private constructor(val type: C /** * NOTE: we use exclusive publications, and they are not thread safe/concurrent! + * NOTE: This cannot be on a coroutine, because our kryo instances are NOT threadsafe! * * the actual bits that send data on the network. * diff --git a/src/dorkbox/network/connection/streaming/StreamingManager.kt b/src/dorkbox/network/connection/streaming/StreamingManager.kt index b3d90a15..1e3351a7 100644 --- a/src/dorkbox/network/connection/streaming/StreamingManager.kt +++ b/src/dorkbox/network/connection/streaming/StreamingManager.kt @@ -330,10 +330,16 @@ internal class StreamingManager( internalBuffer: MutableDirectBuffer, objectSize: Int, endPoint: EndPoint, - tempWriteKryo: KryoExtra, + kryo: KryoExtra, sendIdleStrategy: IdleStrategy, connection: Connection ): Boolean { + // this buffer is the exact size as our internal buffer, so it is unnecessary to have multiple kryo instances + val originalBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size! + + // we have to save out our internal buffer, so we can reuse the kryo instance! + originalBuffer.putBytes(0, internalBuffer, 0, objectSize) + // NOTE: our max object size for IN-MEMORY messages is an INT. For file transfer it's a LONG (so everything here is cast to a long) var remainingPayload = objectSize @@ -345,7 +351,7 @@ internal class StreamingManager( // tell the other side how much data we are sending val startMessage = StreamingControl(StreamingState.START, streamSessionId, objectSize.toLong()) - val startSent = endPoint.writeUnsafe(tempWriteKryo, startMessage, publication, sendIdleStrategy, connection) + val startSent = endPoint.writeUnsafe(kryo, startMessage, publication, sendIdleStrategy, connection) if (!startSent) { // more critical error sending the message. we shouldn't retry or anything. val errorMessage = "[${publication.sessionId()}] Error starting streaming content." @@ -377,7 +383,7 @@ internal class StreamingManager( val headerSize: Int try { - val objectBuffer = tempWriteKryo.write(connection, chunkData) + val objectBuffer = kryo.write(connection, chunkData) headerSize = objectBuffer.position() header = ByteArray(headerSize) @@ -397,7 +403,7 @@ internal class StreamingManager( val varIntSize = chunkBuffer.writeVarInt(sizeOfPayload, true) // write out the payload. Our resulting data written out is the ACTUAL MTU of aeron. - internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload) + originalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload) remainingPayload -= sizeOfPayload payloadSent += sizeOfPayload @@ -426,7 +432,7 @@ internal class StreamingManager( throw exception } } catch (e: Exception) { - sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, tempWriteKryo, sendIdleStrategy, connection) + sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, kryo, sendIdleStrategy, connection) return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy. } @@ -453,15 +459,15 @@ internal class StreamingManager( val writeIndex = payloadSent - headerSize - varIntSize // write out our header data (this will OVERWRITE previous data!) - internalBuffer.putBytes(writeIndex, header) + originalBuffer.putBytes(writeIndex, header) // write out the payload size using optimized data structures. - writeVarInt(internalBuffer, writeIndex + headerSize, sizeOfPayload, true) + writeVarInt(originalBuffer, writeIndex + headerSize, sizeOfPayload, true) // write out the payload endPoint.dataSend( publication, - internalBuffer, + originalBuffer, writeIndex, headerSize + varIntSize + amountToSend, sendIdleStrategy, @@ -473,7 +479,7 @@ internal class StreamingManager( } catch (e: Exception) { val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId) - val failSent = endPoint.writeUnsafe(tempWriteKryo, failMessage, publication, sendIdleStrategy, connection) + val failSent = endPoint.writeUnsafe(kryo, failMessage, publication, sendIdleStrategy, connection) if (!failSent) { // something SUPER wrong! // more critical error sending the message. we shouldn't retry or anything. @@ -497,6 +503,6 @@ internal class StreamingManager( // send the last chunk of data val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong()) - return endPoint.writeUnsafe(tempWriteKryo, finishedMessage, publication, sendIdleStrategy, connection) + return endPoint.writeUnsafe(kryo, finishedMessage, publication, sendIdleStrategy, connection) } }