diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 9fcaf547..c65de650 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -718,25 +718,29 @@ open class Client( //// RMI /////////////// - // we set up our kryo information once we connect to a server (using the server's kryo registration details) - val kryoConfiguredFromServer = serialization.finishClientConnect(connectionInfo.kryoRegistrationDetails) - if (kryoConfiguredFromServer == null) { + try { + // only have ot do one + serialization.finishClientConnect(connectionInfo.kryoRegistrationDetails, maxMessageSize) + } catch (e: Exception) { handshakeConnection.close() // because we are getting the class registration details from the SERVER, this should never be the case. // It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers) val exception = if (handshakeConnection.pubSub.isIpc) { - ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!") + ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!", e) } else { - ClientRejectedException("[${handshake.connectKey}] Connection to [$addressString] has incorrect class registration details!!") + ClientRejectedException("[${handshake.connectKey}] Connection to [$addressString] has incorrect class registration details!!", e) } + exception.cleanStackTraceInternal() listenerManager.notifyError(exception) throw exception } - // every time we connect to a server, we have to reconfigure AND reassign the readKryos. - readKryo = kryoConfiguredFromServer + // we set up our kryo information once we connect to a server (using the server's kryo registration details) + + // every time we connect to a server, we have to reconfigure AND reassign kryo + readKryo = serialization.newReadKryo(maxMessageSize) /////////////// diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 5eb0bbeb..e932fb51 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -70,8 +70,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { */ private val sendIdleStrategy: IdleStrategy - private val writeKryo: KryoExtra - /** * This is the client UUID. This is useful determine if the same client is connecting multiple times to a server (instead of only using IP address) */ @@ -149,9 +147,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { init { - @Suppress("UNCHECKED_CAST") - writeKryo = endPoint.serialization.initKryo() as KryoExtra - sendIdleStrategy = endPoint.config.sendIdleStrategy.cloneToNormal() @@ -215,48 +210,46 @@ open class Connection(connectionParameters: ConnectionParams<*>) { /** * Safely sends objects to a destination, if `abortEarly` is true, there are no retries if sending the message fails. * - * NOTE: this is dispatched to the IO context!! (since network calls are IO/blocking calls) - * - * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown! + * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown! */ internal suspend fun send(message: Any, abortEarly: Boolean): Boolean { - // we use a mutex because we do NOT want different threads/coroutines to be able to send data over the SAME connections at the SAME time. - // NOTE: additionally we want to propagate back-pressure to the calling coroutines, PER CONNECTION! + var success = false - val success = writeMutex.withLock { - // we reset the sending timeout strategy when a message was successfully sent. - sendIdleStrategy.reset() + // this is dispatched to the IO context!! (since network calls are IO/blocking calls) + withContext(Dispatchers.IO) { + // we use a mutex because we do NOT want different threads/coroutines to be able to send data over the SAME connections at the SAME time. + // exclusive publications are not thread safe/concurrent! + writeMutex.withLock { + // we reset the sending timeout strategy when a message was successfully sent. + sendIdleStrategy.reset() - try { - // The handshake sessionId IS NOT globally unique - logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" } - val write = endPoint.write(writeKryo, message, publication, sendIdleStrategy, this@Connection, abortEarly) - write - } catch (e: Throwable) { - // make sure we atomically create the listener manager, if necessary - listenerManager.getAndUpdate { origManager -> - origManager ?: ListenerManager(logger) + try { + // The handshake sessionId IS NOT globally unique + logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" } + success = endPoint.write(message, publication, sendIdleStrategy, this@Connection, abortEarly) + } catch (e: Throwable) { + // make sure we atomically create the listener manager, if necessary + listenerManager.getAndUpdate { origManager -> + origManager ?: ListenerManager(logger) + } + + val listenerManager = listenerManager.value!! + + if (message is MethodResponse && message.result is Exception) { + val result = message.result as Exception + val newException = SerializationException("Error serializing message ${message.javaClass.simpleName}: '$message'", result) + listenerManager.notifyError(this@Connection, newException) + } else if (message is ClientException || message is ServerException) { + val newException = TransmitException("Error with message ${message.javaClass.simpleName}: '$message'", e) + listenerManager.notifyError(this@Connection, newException) + } else { + val newException = TransmitException("Error sending message ${message.javaClass.simpleName}: '$message'", e) + listenerManager.notifyError(this@Connection, newException) + } } - - val listenerManager = listenerManager.value!! - - if (message is MethodResponse && message.result is Exception) { - val result = message.result as Exception - val newException = SerializationException("Error serializing message ${message.javaClass.simpleName}: '$message'", result) - listenerManager.notifyError(this@Connection, newException) - } else if (message is ClientException || message is ServerException) { - val newException = TransmitException("Error with message ${message.javaClass.simpleName}: '$message'", e) - listenerManager.notifyError(this@Connection, newException) - } else { - val newException = TransmitException("Error sending message ${message.javaClass.simpleName}: '$message'", e) - listenerManager.notifyError(this@Connection, newException) - } - - false } } - return success } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index faefdae5..9b89313e 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -36,10 +36,12 @@ import dorkbox.network.rmi.ResponseManager import dorkbox.network.rmi.RmiManagerConnections import dorkbox.network.rmi.RmiManagerGlobal import dorkbox.network.rmi.messages.RmiMessage -import dorkbox.network.serialization.KryoExtra +import dorkbox.network.serialization.KryoReader +import dorkbox.network.serialization.KryoWriter import dorkbox.network.serialization.Serialization import dorkbox.network.serialization.SettingsStore import io.aeron.Publication +import io.aeron.logbuffer.FrameDescriptor import io.aeron.logbuffer.Header import kotlinx.atomicfu.atomic import kotlinx.coroutines.* @@ -118,10 +120,19 @@ abstract class EndPoint private constructor(val type: C */ val serialization: Serialization - // These are GLOBAL, single threaded only kryo instances. - // The readKryo WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem) + /** + * The largest size a SINGLE message via AERON can be. We chunk messages larger than this, so we can send very large files using aeron. + */ + internal val maxMessageSize = FrameDescriptor.computeMaxMessageLength(config.publicationTermBufferLength) + + /** + * Read and Write can be concurrent (different buffers are used) + * GLOBAL, single threaded only kryo instances. + * + * This WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem) + */ @Volatile - internal var readKryo: KryoExtra + internal lateinit var readKryo: KryoReader internal val handshaker: Handshaker @@ -194,20 +205,16 @@ abstract class EndPoint private constructor(val type: C // serialization stuff @Suppress("UNCHECKED_CAST") serialization = config.serialization as Serialization + serialization.finishInit(type, maxMessageSize) + // we are done with initial configuration, now finish serialization - val kryo = serialization.initGlobalKryo() - serialization.finishInit(type, kryo) - - - // the initial kryo created for serialization is reused as the read kryo - readKryo = if (type == Server::class.java) { - serialization.initKryo(kryo) - } else { - // these will be reassigned by the client Connect method! - kryo + // the CLIENT will reassign these in the `connect0` method (because it registers what the server says to register) + if (type == Server::class.java) { + readKryo = serialization.newReadKryo(maxMessageSize) } + // we have to be able to specify the property store storage = SettingsStore(config.settingsStore, logger) crypto = CryptoManagement(logger, storage, type, config.enableRemoteSignatureValidation) @@ -458,14 +465,9 @@ abstract class EndPoint private constructor(val type: C * * This will split a message if it's too large to send in a single network message. * - * NOTE: we use exclusive publications, and they are not thread safe/concurrent! - * - * THIS IS CALLED FROM WITHIN A MUTEX!! - * * @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown! */ - open fun write( - writeKryo: KryoExtra, + open suspend fun write( message: Any, publication: Publication, sendIdleStrategy: IdleStrategy, @@ -474,31 +476,40 @@ abstract class EndPoint private constructor(val type: C ): Boolean { // NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network! - // since ANY thread can call 'send', we have to take kryo instances in a safe way - // the maximum size that this buffer can be is: - // ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824 - val buffer = writeKryo.write(connection, message) - val objectSize = buffer.position() - val internalBuffer = buffer.internalBuffer + @Suppress("UNCHECKED_CAST") + connection as CONNECTION + + val kryo = serialization.getWriteKryo(); + + try { + // since ANY thread can call 'send', we have to take kryo instances in a safe way + // the maximum size that this buffer can be is: + // ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824 + val buffer = kryo.write(connection, message) + val objectSize = buffer.position() + val internalBuffer = buffer.internalBuffer - // one small problem! What if the message is too big to send all at once? - val maxMessageLength = publication.maxMessageLength() - return if (objectSize >= maxMessageLength) { - // we must split up the message! It's too large for Aeron to manage. - // this will split up the message, construct the necessary control message and state, then CALL the sendData - // method directly for each subsequent message. - streamingManager.send( - publication = publication, - internalBuffer = internalBuffer, - objectSize = objectSize, - endPoint = this, - kryo = writeKryo, - sendIdleStrategy = sendIdleStrategy, - connection = connection - ) - } else { - dataSend(publication, internalBuffer, 0, objectSize, sendIdleStrategy, connection, abortEarly) + // one small problem! What if the message is too big to send all at once? + return if (objectSize >= maxMessageSize) { + // we must split up the message! It's too large for Aeron to manage. + // this will split up the message, construct the necessary control message and state, then CALL the sendData + // method directly for each subsequent message. + streamingManager.send( + publication = publication, + internalBuffer = internalBuffer, + objectSize = objectSize, + maxMessageSize = maxMessageSize, + endPoint = this, + kryo = kryo, + sendIdleStrategy = sendIdleStrategy, + connection = connection + ) + } else { + dataSend(publication, internalBuffer, 0, objectSize, sendIdleStrategy, connection, abortEarly) + } + } finally { + serialization.returnWriteKryo(kryo) } } @@ -511,13 +522,13 @@ abstract class EndPoint private constructor(val type: C * * @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown! */ - open fun writeUnsafe(writeKryo: KryoExtra, message: Any, publication: Publication, sendIdleStrategy: IdleStrategy, connection: Connection): Boolean { + open fun writeUnsafe(message: Any, publication: Publication, sendIdleStrategy: IdleStrategy, connection: CONNECTION, kryo: KryoWriter): Boolean { // NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network! // since ANY thread can call 'send', we have to take kryo instances in a safe way // the maximum size that this buffer can be is: // ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824 - val buffer = writeKryo.write(connection, message) + val buffer = kryo.write(connection, message) val objectSize = buffer.position() val internalBuffer = buffer.internalBuffer @@ -533,7 +544,7 @@ abstract class EndPoint private constructor(val type: C * must be EXPLICITLY used by the implementation, and if a custom message processor is to be used (ie: a state machine) you must * guarantee that Ping, RMI, Streaming object, etc. are not used (as it would not function without this custom */ - open fun processMessage(message: Any?, connection: CONNECTION) { + open fun processMessage(message: Any?, connection: CONNECTION, readKryo: KryoReader) { // the REPEATED usage of wrapping methods below is because Streaming messages have to intercept data BEFORE it goes to a coroutine when (message) { // the remote endPoint will send this message if it is closing the connection. @@ -630,7 +641,13 @@ abstract class EndPoint private constructor(val type: C * @param header The aeron header information * @param connection The connection this message happened on */ - internal fun dataReceive(buffer: DirectBuffer, offset: Int, length: Int, header: Header, connection: Connection) { + internal fun dataReceive( + buffer: DirectBuffer, + offset: Int, + length: Int, + header: Header, + connection: Connection + ) { // this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe! @Suppress("UNCHECKED_CAST") connection as CONNECTION @@ -639,7 +656,7 @@ abstract class EndPoint private constructor(val type: C // NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change! val message = readKryo.read(buffer, offset, length, connection) logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" } - processMessage(message, connection) + processMessage(message, connection, readKryo) } catch (e: Exception) { listenerManager.notifyError(connection, newException("Error de-serializing message", e)) } diff --git a/src/dorkbox/network/connection/streaming/StreamingManager.kt b/src/dorkbox/network/connection/streaming/StreamingManager.kt index 34681ad3..b376568d 100644 --- a/src/dorkbox/network/connection/streaming/StreamingManager.kt +++ b/src/dorkbox/network/connection/streaming/StreamingManager.kt @@ -29,7 +29,8 @@ import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.exceptions.StreamingException import dorkbox.network.serialization.AeronInput import dorkbox.network.serialization.AeronOutput -import dorkbox.network.serialization.KryoExtra +import dorkbox.network.serialization.KryoReader +import dorkbox.network.serialization.KryoWriter import dorkbox.os.OS import dorkbox.util.Sys import io.aeron.Publication @@ -108,7 +109,7 @@ internal class StreamingManager( */ fun processControlMessage( message: StreamingControl, - kryo: KryoExtra, + kryo: KryoReader, endPoint: EndPoint, connection: CONNECTION ) { @@ -294,13 +295,13 @@ internal class StreamingManager( streamSessionId: Int, publication: Publication, endPoint: EndPoint, - kryoExtra: KryoExtra, sendIdleStrategy: IdleStrategy, - connection: Connection + connection: CONNECTION, + kryo: KryoWriter ) { val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId) - val failSent = endPoint.writeUnsafe(kryoExtra, failMessage, publication, sendIdleStrategy, connection) + val failSent = endPoint.writeUnsafe(failMessage, publication, sendIdleStrategy, connection, kryo) if (!failSent) { // something SUPER wrong! // more critical error sending the message. we shouldn't retry or anything. @@ -334,11 +335,12 @@ internal class StreamingManager( fun send( publication: Publication, internalBuffer: MutableDirectBuffer, + maxMessageSize: Int, objectSize: Int, endPoint: EndPoint, - kryo: KryoExtra, + kryo: KryoWriter, sendIdleStrategy: IdleStrategy, - connection: Connection + connection: CONNECTION ): Boolean { // this buffer is the exact size as our internal buffer, so it is unnecessary to have multiple kryo instances val originalBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size! @@ -357,7 +359,7 @@ internal class StreamingManager( // tell the other side how much data we are sending val startMessage = StreamingControl(StreamingState.START, streamSessionId, objectSize.toLong()) - val startSent = endPoint.writeUnsafe(kryo, startMessage, publication, sendIdleStrategy, connection) + val startSent = endPoint.writeUnsafe(startMessage, publication, sendIdleStrategy, connection, kryo) if (!startSent) { // more critical error sending the message. we shouldn't retry or anything. val errorMessage = "[${publication.sessionId()}] Error starting streaming content." @@ -381,8 +383,7 @@ internal class StreamingManager( // payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time. // MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates! - // the maxPayloadLength MUST ABSOLUTELY be less that the max size + header! - var sizeOfPayload = publication.maxMessageLength() - 200 + var sizeOfPayload = maxMessageSize val header: ByteArray val headerSize: Int @@ -436,7 +437,7 @@ internal class StreamingManager( throw exception } } catch (e: Exception) { - sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, kryo, sendIdleStrategy, connection) + sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, sendIdleStrategy, connection, kryo) return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy. } @@ -483,7 +484,7 @@ internal class StreamingManager( } catch (e: Exception) { val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId) - val failSent = endPoint.writeUnsafe(kryo, failMessage, publication, sendIdleStrategy, connection) + val failSent = endPoint.writeUnsafe(failMessage, publication, sendIdleStrategy, connection, kryo) if (!failSent) { // something SUPER wrong! // more critical error sending the message. we shouldn't retry or anything. @@ -506,6 +507,6 @@ internal class StreamingManager( // send the last chunk of data val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong()) - return endPoint.writeUnsafe(kryo, finishedMessage, publication, sendIdleStrategy, connection) + return endPoint.writeUnsafe(finishedMessage, publication, sendIdleStrategy, connection, kryo) } } diff --git a/src/dorkbox/network/handshake/Handshaker.kt b/src/dorkbox/network/handshake/Handshaker.kt index 6fadb502..d815ac79 100644 --- a/src/dorkbox/network/handshake/Handshaker.kt +++ b/src/dorkbox/network/handshake/Handshaker.kt @@ -20,15 +20,16 @@ import dorkbox.network.Configuration import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.connection.Connection -import dorkbox.network.connection.EndPoint import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ServerException -import dorkbox.network.serialization.KryoExtra +import dorkbox.network.serialization.KryoReader +import dorkbox.network.serialization.KryoWriter import dorkbox.network.serialization.Serialization import io.aeron.Publication +import io.aeron.logbuffer.FrameDescriptor import mu.KLogger import org.agrona.DirectBuffer @@ -40,15 +41,23 @@ internal class Handshaker( aeronDriver: AeronDriver, val newException: (String, Throwable?) -> Throwable ) { - private val handshakeReadKryo: KryoExtra - private val handshakeWriteKryo: KryoExtra + private val handshakeReadKryo: KryoReader + private val handshakeWriteKryo: KryoWriter private val handshakeSendIdleStrategy: CoroutineIdleStrategy private val writeTimeoutNS = (aeronDriver.lingerNs() * 1.2).toLong() // close enough. Just needs to be slightly longer init { - handshakeReadKryo = serialization.newHandshakeKryo() - handshakeWriteKryo = serialization.newHandshakeKryo() + val maxMessageSize = FrameDescriptor.computeMaxMessageLength(config.publicationTermBufferLength) + + // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. + + handshakeReadKryo = KryoReader(maxMessageSize) + handshakeWriteKryo = KryoWriter(maxMessageSize) + + serialization.newHandshakeKryo(handshakeReadKryo) + serialization.newHandshakeKryo(handshakeWriteKryo) + handshakeSendIdleStrategy = config.sendIdleStrategy.clone() } diff --git a/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt b/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt index f3f34c49..29f8ad3d 100644 --- a/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt +++ b/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,7 +12,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + */ +/* * Copyright (c) 2008, Nathan Sweet * All rights reserved. * @@ -42,7 +43,7 @@ import com.esotericsoftware.kryo.io.Output import dorkbox.network.connection.Connection import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.RmiUtils -import dorkbox.network.serialization.KryoExtra +import dorkbox.network.serialization.KryoReader import org.agrona.collections.Int2ObjectHashMap import java.lang.reflect.Method @@ -83,7 +84,7 @@ class MethodRequestSerializer(private val methodCache: I val methodIndex = RmiUtils.unpackRight(methodInfo) val isGlobal = input.readBoolean() - kryo as KryoExtra + kryo as KryoReader val cachedMethod = try { methodCache[methodClassId][methodIndex] diff --git a/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt b/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt index 28f06dcd..c7b2460e 100644 --- a/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt +++ b/src/dorkbox/network/rmi/messages/RmiClientSerializer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ import dorkbox.network.connection.Connection import dorkbox.network.connection.EndPoint import dorkbox.network.rmi.RmiClient import dorkbox.network.rmi.RmiSupportConnection -import dorkbox.network.serialization.KryoExtra +import dorkbox.network.serialization.KryoReader import java.lang.reflect.Proxy /** @@ -67,7 +67,7 @@ class RmiClientSerializer: Serializer() { val isGlobal = input.readBoolean() val objectId = input.readInt(true) - kryo as KryoExtra + kryo as KryoReader val endPoint: EndPoint = kryo.connection.endPoint as EndPoint return if (isGlobal) { diff --git a/src/dorkbox/network/rmi/messages/RmiServerSerializer.kt b/src/dorkbox/network/rmi/messages/RmiServerSerializer.kt index d6e00724..e9006a6e 100644 --- a/src/dorkbox/network/rmi/messages/RmiServerSerializer.kt +++ b/src/dorkbox/network/rmi/messages/RmiServerSerializer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,7 +12,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + */ +/* * Copyright (c) 2008, Nathan Sweet * All rights reserved. * @@ -41,7 +42,8 @@ import com.esotericsoftware.kryo.io.Output import dorkbox.network.connection.Connection import dorkbox.network.rmi.RemoteObjectStorage import dorkbox.network.rmi.RmiSupportConnection -import dorkbox.network.serialization.KryoExtra +import dorkbox.network.serialization.KryoReader +import dorkbox.network.serialization.KryoWriter /** * This is to manage serializing RMI objects across the wire... @@ -76,7 +78,7 @@ import dorkbox.network.serialization.KryoExtra class RmiServerSerializer : Serializer(false) { override fun write(kryo: Kryo, output: Output, `object`: Any) { - val kryoExtra = kryo as KryoExtra + val kryoExtra = kryo as KryoWriter val connection = kryoExtra.connection val rmi = connection.rmi // have to write what the rmi ID is ONLY. A remote object sent via a connection IS ONLY a connection-scope object! @@ -96,7 +98,7 @@ class RmiServerSerializer : Serializer(false) { } override fun read(kryo: Kryo, input: Input, interfaceClass: Class<*>): Any? { - val kryoExtra = kryo as KryoExtra + val kryoExtra = kryo as KryoReader val rmiId = input.readInt(true) val connection = kryoExtra.connection diff --git a/src/dorkbox/network/serialization/ClassRegistration.kt b/src/dorkbox/network/serialization/ClassRegistration.kt index a9d4fabc..598d40ee 100644 --- a/src/dorkbox/network/serialization/ClassRegistration.kt +++ b/src/dorkbox/network/serialization/ClassRegistration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import com.esotericsoftware.kryo.Serializer import dorkbox.network.connection.Connection import dorkbox.network.rmi.messages.RmiServerSerializer -internal abstract class ClassRegistration(val clazz: Class<*>, val serializer: Serializer<*>? = null, var id: Int = 0) { +internal abstract class ClassRegistration(val clazz: Class<*>, val serializer: Serializer<*>? = null, var id: Int = 0) { companion object { const val IGNORE_REGISTRATION = -1 } @@ -33,7 +33,7 @@ internal abstract class ClassRegistration(val clazz: Cla * If so, we ignore it - any IFACE or IMPL that already has been assigned to an RMI serializer, *MUST* remain an RMI serializer * If this class registration will EVENTUALLY be for RMI, then [ClassRegistrationForRmi] will reassign the serializer */ - open fun register(kryo: KryoExtra, rmi: RmiHolder) { + open fun register(kryo: Kryo, rmi: RmiHolder) { // ClassRegistrationForRmi overrides this method if (id == IGNORE_REGISTRATION) { // we have previously specified that this registration should be ignored! diff --git a/src/dorkbox/network/serialization/ClassRegistration0.kt b/src/dorkbox/network/serialization/ClassRegistration0.kt index 6224d850..035610c6 100644 --- a/src/dorkbox/network/serialization/ClassRegistration0.kt +++ b/src/dorkbox/network/serialization/ClassRegistration0.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import dorkbox.network.connection.Connection -internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz, serializer) { +internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz, serializer) { override fun register(kryo: Kryo) { id = kryo.register(clazz, serializer).id info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass.name}" diff --git a/src/dorkbox/network/serialization/ClassRegistration1.kt b/src/dorkbox/network/serialization/ClassRegistration1.kt index 8b9113a7..1c23bab3 100644 --- a/src/dorkbox/network/serialization/ClassRegistration1.kt +++ b/src/dorkbox/network/serialization/ClassRegistration1.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Kryo import dorkbox.network.connection.Connection -internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz, null, id) { +internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz, null, id) { override fun register(kryo: Kryo) { kryo.register(clazz, id) info = "Registered $id -> (specified) ${clazz.name}" diff --git a/src/dorkbox/network/serialization/ClassRegistration2.kt b/src/dorkbox/network/serialization/ClassRegistration2.kt index a7500613..3fd89713 100644 --- a/src/dorkbox/network/serialization/ClassRegistration2.kt +++ b/src/dorkbox/network/serialization/ClassRegistration2.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import dorkbox.network.connection.Connection -internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz, serializer, id) { +internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz, serializer, id) { override fun register(kryo: Kryo) { kryo.register(clazz, serializer, id) diff --git a/src/dorkbox/network/serialization/ClassRegistration3.kt b/src/dorkbox/network/serialization/ClassRegistration3.kt index 02f650cf..300b0af4 100644 --- a/src/dorkbox/network/serialization/ClassRegistration3.kt +++ b/src/dorkbox/network/serialization/ClassRegistration3.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Kryo import dorkbox.network.connection.Connection -internal open class ClassRegistration3(clazz: Class<*>) : ClassRegistration(clazz) { +internal open class ClassRegistration3(clazz: Class<*>) : ClassRegistration(clazz) { override fun register(kryo: Kryo) { id = kryo.register(clazz).id diff --git a/src/dorkbox/network/serialization/ClassRegistrationForRmi.kt b/src/dorkbox/network/serialization/ClassRegistrationForRmi.kt index 1125ad1a..deb2e6f0 100644 --- a/src/dorkbox/network/serialization/ClassRegistrationForRmi.kt +++ b/src/dorkbox/network/serialization/ClassRegistrationForRmi.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020 dorkbox, llc + * Copyright 2023 dorkbox, llc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package dorkbox.network.serialization +import com.esotericsoftware.kryo.Kryo import dorkbox.network.connection.Connection import dorkbox.network.rmi.messages.RmiServerSerializer @@ -47,7 +48,7 @@ import dorkbox.network.rmi.messages.RmiServerSerializer */ internal class ClassRegistrationForRmi(ifaceClass: Class<*>, var implClass: Class<*>?, - serializer: RmiServerSerializer) : ClassRegistration(ifaceClass, serializer) { + serializer: RmiServerSerializer) : ClassRegistration(ifaceClass, serializer) { /** * In general: * @@ -104,7 +105,7 @@ internal class ClassRegistrationForRmi(ifaceClass: Class * send: register IMPL object class with RmiServerSerializer * lookup IMPL object -> rmiID */ - override fun register(kryo: KryoExtra, rmi: RmiHolder) { + override fun register(kryo: Kryo, rmi: RmiHolder) { // we override this, because we ALWAYS will call our RMI registration! if (id == IGNORE_REGISTRATION) { // we have previously specified that this registration should be ignored! diff --git a/src/dorkbox/network/serialization/KryoExtra.kt b/src/dorkbox/network/serialization/KryoReader.kt similarity index 54% rename from src/dorkbox/network/serialization/KryoExtra.kt rename to src/dorkbox/network/serialization/KryoReader.kt index b237255b..4c7785f0 100644 --- a/src/dorkbox/network/serialization/KryoExtra.kt +++ b/src/dorkbox/network/serialization/KryoReader.kt @@ -17,35 +17,63 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output import dorkbox.network.connection.Connection +import net.jpountz.lz4.LZ4Factory import org.agrona.DirectBuffer /** * READ and WRITE are exclusive to each other and can be performed in different threads. + * */ -class KryoExtra() : Kryo() { +class KryoReader(maxMessageSize: Int) : Kryo() { + companion object { + internal const val DEBUG = false + + internal val factory = LZ4Factory.fastestInstance() + } + // for kryo serialization private val readerBuffer = AeronInput() - private val writerBuffer = AeronOutput() // crypto + compression have to work with native byte arrays, so here we go... -// private val reader = Input(ABSOLUTE_MAX_SIZE_OBJECT) -// private val writer = Output(ABSOLUTE_MAX_SIZE_OBJECT) -// private val temp = ByteArray(ABSOLUTE_MAX_SIZE_OBJECT) + private val reader = Input(maxMessageSize) // This is unique per connection. volatile/etc is not necessary because it is set/read in the same thread lateinit var connection: CONNECTION // private val secureRandom = SecureRandom() // private var cipher: Cipher? = null -// private val compressor = factory.fastCompressor() -// private val decompressor = factory.fastDecompressor() + private val decompressor = factory.fastDecompressor() + + + // The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter) + // The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this + // counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) +// private val aes_gcm_iv = atomic(0) + +// /** +// * This is the per-message sequence number. +// * +// * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter) +// * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this +// * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) +// */ +// fun nextGcmSequence(): Long { +// return aes_gcm_iv.getAndIncrement() +// } +// +// /** +// * @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation). +// */ +// fun cryptoKey(): SecretKey { +//// return channelWrapper.cryptoKey() +// } + // // companion object { -// private const val ABSOLUTE_MAX_SIZE_OBJECT = 500000 // by default, this is about 500k -// private const val DEBUG = false +// +// // // // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) // // snappyuncomp : 1.391 micros/op; 2808.1 MB/s @@ -65,37 +93,6 @@ class KryoExtra() : Kryo() { // } // } - /** - * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! - * - * OUTPUT: - * ++++++++++++++++++++++++++ - * + class and object bytes + - * ++++++++++++++++++++++++++ - */ - @Throws(Exception::class) - fun write(message: Any): AeronOutput { - writerBuffer.reset() - writeClassAndObject(writerBuffer, message) - return writerBuffer - } - - /** - * OUTPUT: - * ++++++++++++++++++++++++++ - * + class and object bytes + - * ++++++++++++++++++++++++++ - */ - @Throws(Exception::class) - fun write(connection: CONNECTION, message: Any): AeronOutput { - // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.connection = connection - - writerBuffer.reset() - writeClassAndObject(writerBuffer, message) - return writerBuffer - } - /** * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! * @@ -153,35 +150,6 @@ class KryoExtra() : Kryo() { //////////////// //////////////// - /** - * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! - * - * OUTPUT: - * ++++++++++++++++++++++++++ - * + class and object bytes + - * ++++++++++++++++++++++++++ - */ - fun write(writer: Output, message: Any) { - // write the object to the NORMAL output buffer! - writer.reset() - writeClassAndObject(writer, message) - } - - /** - * OUTPUT: - * ++++++++++++++++++++++++++ - * + class and object bytes + - * ++++++++++++++++++++++++++ - */ - private fun write(connection: CONNECTION, writer: Output, message: Any) { - // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.connection = connection - - // write the object to the NORMAL output buffer! - writer.reset() - writeClassAndObject(writer, message) - } - /** * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! * @@ -218,95 +186,7 @@ class KryoExtra() : Kryo() { val dataLength = readerBuffer.readVarInt(true) return readerBuffer.readBytes(dataLength) } -// -// /** -// * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! -// * -// * BUFFER: -// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -// * + uncompressed length (1-4 bytes) + compressed data + -// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -// * -// * COMPRESSED DATA: -// * ++++++++++++++++++++++++++ -// * + class and object bytes + -// * ++++++++++++++++++++++++++ -// */ -// fun writeCompressed(logger: Logger, output: Output, message: Any) { -// // write the object to a TEMP buffer! this will be compressed later -// write(writer, message) -// -// // save off how much data the object took -// val length = writer.position() -// val maxCompressedLength = compressor.maxCompressedLength(length) -// -// ////////// compressing data -// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger -// // output), will be negated by the increase in size by the encryption -// val compressOutput = temp -// -// // LZ4 compress. -// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) -// -// if (DEBUG) { -// val orig = Sys.bytesToHex(writer.buffer, 0, length) use String.toHexBytes() instead -// val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength) -// logger.error(OS.LINE_SEPARATOR + -// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + -// OS.LINE_SEPARATOR + -// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) -// } -// -// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version -// output.writeInt(length, true) -// -// // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size -// output.writeBytes(compressOutput, 0, compressedLength) -// } -// -// /** -// * BUFFER: -// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -// * + uncompressed length (1-4 bytes) + compressed data + -// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -// * -// * COMPRESSED DATA: -// * ++++++++++++++++++++++++++ -// * + class and object bytes + -// * ++++++++++++++++++++++++++ -// */ -// fun writeCompressed(logger: Logger, connection: Connection, output: Output, message: Any) { -// // write the object to a TEMP buffer! this will be compressed later -// write(connection, writer, message) -// -// // save off how much data the object took -// val length = writer.position() -// val maxCompressedLength = compressor.maxCompressedLength(length) -// -// ////////// compressing data -// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger -// // output), will be negated by the increase in size by the encryption -// val compressOutput = temp -// -// // LZ4 compress. -// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) -// -// if (DEBUG) { -// val orig = Sys.bytesToHex(writer.buffer, 0, length) -// val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength) -// logger.error(OS.LINE_SEPARATOR + -// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + -// OS.LINE_SEPARATOR + -// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) -// } -// -// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version -// output.writeInt(length, true) -// -// // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size -// output.writeBytes(compressOutput, 0, compressedLength) -// } -// + // /** // * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! // * @@ -414,91 +294,7 @@ class KryoExtra() : Kryo() { // return read(connection, reader) // } // -// /** -// * BUFFER: -// * +++++++++++++++++++++++++++++++ -// * + IV (12) + encrypted data + -// * +++++++++++++++++++++++++++++++ -// * -// * ENCRYPTED DATA: -// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -// * + uncompressed length (1-4 bytes) + compressed data + -// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -// * -// * COMPRESSED DATA: -// * ++++++++++++++++++++++++++ -// * + class and object bytes + -// * ++++++++++++++++++++++++++ -// */ -// fun writeCrypto(logger: Logger, connection: Connection_, buffer: ByteBuf, message: Any) { -// // write the object to a TEMP buffer! this will be compressed later -// write(connection, writer, message) -// -// // save off how much data the object took -// val length = writer.position() -// val maxCompressedLength = compressor.maxCompressedLength(length) -// -// ////////// compressing data -// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger -// // output), will be negated by the increase in size by the encryption -// val compressOutput = temp -// -// // LZ4 compress. Offset by 4 in the dest array so we have room for the length -// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 4, maxCompressedLength) -// if (DEBUG) { -// val orig = ByteBufUtil.hexDump(writer.buffer, 0, length) -// val compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength) -// logger.error(OS.LINE_SEPARATOR + -// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + -// OS.LINE_SEPARATOR + -// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) -// } -// -// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version -// val lengthLength = OptimizeUtilsByteArray.intLength(length, true) -// -// // this is where we start writing the length data, so that the end of this lines up with the compressed data -// val start = 4 - lengthLength -// OptimizeUtilsByteArray.writeInt(compressOutput, length, true, start) -// -// // now compressOutput contains "uncompressed length + data" -// val compressedArrayLength = lengthLength + compressedLength -// -// -// /////// encrypting data. -// val cryptoKey = connection.cryptoKey() -// val iv = ByteArray(IV_LENGTH_BYTE) // NEVER REUSE THIS IV WITH SAME KEY -// secureRandom.nextBytes(iv) -// val parameterSpec = GCMParameterSpec(TAG_LENGTH_BIT, iv) // 128 bit auth tag length -// try { -// cipher!!.init(Cipher.ENCRYPT_MODE, cryptoKey, parameterSpec) -// } catch (e: Exception) { -// throw IOException("Unable to AES encrypt the data", e) -// } -// -// // we REUSE the writer buffer! (since that data is now compressed in a different array) -// val encryptedLength: Int -// encryptedLength = try { -// cipher!!.doFinal(compressOutput, start, compressedArrayLength, writer.buffer, 0) -// } catch (e: Exception) { -// throw IOException("Unable to AES encrypt the data", e) -// } -// -// // write out our IV -// buffer.writeBytes(iv, 0, IV_LENGTH_BYTE) -// Arrays.fill(iv, 0.toByte()) // overwrite the IV with zeros so we can't leak this value -// -// // have to copy over the orig data, because we used the temp buffer -// buffer.writeBytes(writer.buffer, 0, encryptedLength) -// if (DEBUG) { -// val ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE) -// val crypto = ByteBufUtil.hexDump(writer.buffer, 0, encryptedLength) -// logger.error(OS.LINE_SEPARATOR + -// "IV: (12)" + OS.LINE_SEPARATOR + ivString + -// OS.LINE_SEPARATOR + -// "CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto) -// } -// } + /** * BUFFER: diff --git a/src/dorkbox/network/serialization/KryoWriter.kt b/src/dorkbox/network/serialization/KryoWriter.kt new file mode 100644 index 00000000..5c2bba82 --- /dev/null +++ b/src/dorkbox/network/serialization/KryoWriter.kt @@ -0,0 +1,349 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.serialization + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.Output +import dorkbox.bytes.toHexString +import dorkbox.network.connection.Connection +import dorkbox.network.connection.CryptoManagement +import dorkbox.os.OS +import mu.KLogger +import net.jpountz.lz4.LZ4Factory +import javax.crypto.Cipher + +/** + * READ and WRITE are exclusive to each other and can be performed in different threads. + * + */ +open class KryoWriter(maxMessageSize: Int) : Kryo() { + companion object { + internal const val DEBUG = false + + internal val factory = LZ4Factory.fastestInstance() + } + + // for kryo serialization + private val writerBuffer = AeronOutput() + + // crypto + compression have to work with native byte arrays, so here we go... + private val writer = Output(maxMessageSize) + private val temp = ByteArray(maxMessageSize) + + // This is unique per connection. volatile/etc is not necessary because it is set/read in the same thread + lateinit var connection: CONNECTION + + + private val cipher = Cipher.getInstance(CryptoManagement.AES_ALGORITHM) + +// private val secureRandom = SecureRandom() +// private var cipher: Cipher? = null + private val compressor = factory.fastCompressor() + + + // The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter) + // The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this + // counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) +// private val aes_gcm_iv = atomic(0) + +// /** +// * This is the per-message sequence number. +// * +// * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter) +// * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this +// * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) +// */ +// fun nextGcmSequence(): Long { +// return aes_gcm_iv.getAndIncrement() +// } +// +// /** +// * @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation). +// */ +// fun cryptoKey(): SecretKey { +//// return channelWrapper.cryptoKey() +// } + + +// +// companion object { +// +// +// +// // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) +// // snappyuncomp : 1.391 micros/op; 2808.1 MB/s +// // lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%) +// // lz4uncomp : 0.641 micros/op; 6097.9 MB/s +// private val factory = LZ4Factory.fastestInstance() +// private const val ALGORITHM = "AES/GCM/NoPadding" +// private const val TAG_LENGTH_BIT = 128 +// private const val IV_LENGTH_BYTE = 12 +// } + +// init { +// cipher = try { +// Cipher.getInstance(ALGORITHM) +// } catch (e: Exception) { +// throw IllegalStateException("could not get cipher instance", e) +// } +// } + + /** + * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! + * + * OUTPUT: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + @Throws(Exception::class) + fun write(message: Any): AeronOutput { + writerBuffer.reset() + writeClassAndObject(writerBuffer, message) + return writerBuffer + } + + /** + * OUTPUT: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + @Throws(Exception::class) + fun write(connection: CONNECTION, message: Any): AeronOutput { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.connection = connection + + writerBuffer.reset() + writeClassAndObject(writerBuffer, message) + return writerBuffer + } + + //////////////// + //////////////// + //////////////// + // for more complicated writes, sadly, we have to deal DIRECTLY with byte arrays + //////////////// + //////////////// + //////////////// + + /** + * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! + * + * OUTPUT: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + fun write(writer: Output, message: Any) { + // write the object to the NORMAL output buffer! + writer.reset() + writeClassAndObject(writer, message) + } + + /** + * OUTPUT: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + private fun write(connection: CONNECTION, writer: Output, message: Any) { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.connection = connection + + // write the object to the NORMAL output buffer! + writer.reset() + writeClassAndObject(writer, message) + } + + /** + * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! + * + * BUFFER: + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + uncompressed length (1-4 bytes) + compressed data + + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * COMPRESSED DATA: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + fun writeCompressed(logger: KLogger, output: Output, message: Any) { + // write the object to a TEMP buffer! this will be compressed later + write(writer, message) + + // save off how much data the object took + val length = writer.position() + val maxCompressedLength = compressor.maxCompressedLength(length) + + ////////// compressing data + // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger + // output), will be negated by the increase in size by the encryption + val compressOutput = temp + + + // LZ4 compress. + val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) + + if (DEBUG) { + val orig = writer.buffer.toHexString() + val compressed = compressOutput.toHexString() + logger.error( + OS.LINE_SEPARATOR + + "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + + OS.LINE_SEPARATOR + + "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) + } + + // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version + output.writeInt(length, true) + + // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size + output.writeBytes(compressOutput, 0, compressedLength) + } +// +// /** +// * BUFFER: +// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +// * + uncompressed length (1-4 bytes) + compressed data + +// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +// * +// * COMPRESSED DATA: +// * ++++++++++++++++++++++++++ +// * + class and object bytes + +// * ++++++++++++++++++++++++++ +// */ +// fun writeCompressed(logger: Logger, connection: Connection, output: Output, message: Any) { +// // write the object to a TEMP buffer! this will be compressed later +// write(connection, writer, message) +// +// // save off how much data the object took +// val length = writer.position() +// val maxCompressedLength = compressor.maxCompressedLength(length) +// +// ////////// compressing data +// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger +// // output), will be negated by the increase in size by the encryption +// val compressOutput = temp +// +// // LZ4 compress. +// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) +// +// if (DEBUG) { +// val orig = Sys.bytesToHex(writer.buffer, 0, length) +// val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength) +// logger.error(OS.LINE_SEPARATOR + +// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + +// OS.LINE_SEPARATOR + +// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) +// } +// +// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version +// output.writeInt(length, true) +// +// // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size +// output.writeBytes(compressOutput, 0, compressedLength) +// } +// +// +// +// /** +// * BUFFER: +// * +++++++++++++++++++++++++++++++ +// * + IV (12) + encrypted data + +// * +++++++++++++++++++++++++++++++ +// * +// * ENCRYPTED DATA: +// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +// * + uncompressed length (1-4 bytes) + compressed data + +// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +// * +// * COMPRESSED DATA: +// * ++++++++++++++++++++++++++ +// * + class and object bytes + +// * ++++++++++++++++++++++++++ +// */ +// fun writeCrypto(logger: Logger, connection: Connection_, buffer: ByteBuf, message: Any) { +// // write the object to a TEMP buffer! this will be compressed later +// write(connection, writer, message) +// +// // save off how much data the object took +// val length = writer.position() +// val maxCompressedLength = compressor.maxCompressedLength(length) +// +// ////////// compressing data +// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger +// // output), will be negated by the increase in size by the encryption +// val compressOutput = temp +// +// // LZ4 compress. Offset by 4 in the dest array so we have room for the length +// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 4, maxCompressedLength) +// if (DEBUG) { +// val orig = ByteBufUtil.hexDump(writer.buffer, 0, length) +// val compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength) +// logger.error(OS.LINE_SEPARATOR + +// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + +// OS.LINE_SEPARATOR + +// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) +// } +// +// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version +// val lengthLength = OptimizeUtilsByteArray.intLength(length, true) +// +// // this is where we start writing the length data, so that the end of this lines up with the compressed data +// val start = 4 - lengthLength +// OptimizeUtilsByteArray.writeInt(compressOutput, length, true, start) +// +// // now compressOutput contains "uncompressed length + data" +// val compressedArrayLength = lengthLength + compressedLength +// +// +// /////// encrypting data. +// val cryptoKey = connection.cryptoKey() +// val iv = ByteArray(IV_LENGTH_BYTE) // NEVER REUSE THIS IV WITH SAME KEY +// secureRandom.nextBytes(iv) +// val parameterSpec = GCMParameterSpec(TAG_LENGTH_BIT, iv) // 128 bit auth tag length +// try { +// cipher!!.init(Cipher.ENCRYPT_MODE, cryptoKey, parameterSpec) +// } catch (e: Exception) { +// throw IOException("Unable to AES encrypt the data", e) +// } +// +// // we REUSE the writer buffer! (since that data is now compressed in a different array) +// val encryptedLength: Int +// encryptedLength = try { +// cipher!!.doFinal(compressOutput, start, compressedArrayLength, writer.buffer, 0) +// } catch (e: Exception) { +// throw IOException("Unable to AES encrypt the data", e) +// } +// +// // write out our IV +// buffer.writeBytes(iv, 0, IV_LENGTH_BYTE) +// Arrays.fill(iv, 0.toByte()) // overwrite the IV with zeros so we can't leak this value +// +// // have to copy over the orig data, because we used the temp buffer +// buffer.writeBytes(writer.buffer, 0, encryptedLength) +// if (DEBUG) { +// val ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE) +// val crypto = ByteBufUtil.hexDump(writer.buffer, 0, encryptedLength) +// logger.error(OS.LINE_SEPARATOR + +// "IV: (12)" + OS.LINE_SEPARATOR + ivString + +// OS.LINE_SEPARATOR + +// "CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto) +// } +// } +} diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index e43bed15..8eea11c5 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -35,6 +35,9 @@ import dorkbox.network.ping.PingSerializer import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.messages.* +import dorkbox.objectPool.BoundedPoolObject +import dorkbox.objectPool.ObjectPool +import dorkbox.objectPool.Pool import dorkbox.os.OS import dorkbox.serializers.* import kotlinx.atomicfu.AtomicBoolean @@ -58,12 +61,12 @@ import kotlin.coroutines.Continuation -// Observability issues: make sure that we know WHAT connection is causing serialization errors when they occur! -// ASYC isues: RMI can timeout when OTHER rmi connections happen! EACH RMI NEEDS TO BE SEPARATE IN THE IO DISPATCHER +// Observability issues: make sure that we know WHAT connection is causing serialization errors when they occur! +// ASYC isues: RMI can timeout when OTHER rmi connections happen! EACH RMI NEEDS TO BE SEPARATE IN THE IO DISPATCHER /** * Threads reading/writing at the same time a single instance of kryo. it is possible to use a single kryo with the use of - * synchronize, however - that defeats the point of having multi-threaded serialization. + * synchronize, however - that defeats the point of having multithreaded serialization. * * Additionally, this serialization manager will register the entire class+interface hierarchy for an object. If you want to specify a * serialization scheme for a specific class in an objects hierarchy, you must register that first. @@ -97,7 +100,7 @@ open class Serialization(private val references: Boolean open class RmiSupport internal constructor( private val initialized: AtomicBoolean, - private val classesToRegister: MutableList>, + private val classesToRegister: MutableList, private val rmiServerSerializer: RmiServerSerializer ) { /** @@ -130,14 +133,15 @@ open class Serialization(private val references: Boolean } private lateinit var logger: KLogger + private var maxMessageSize: Int = 500_000 private var initialized = atomic(false) // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. // Object checking is performed during actual registration. - private val classesToRegister = mutableListOf>() - private lateinit var finalClassRegistrations: Array> + private val classesToRegister = mutableListOf() + private lateinit var finalClassRegistrations: Array private lateinit var savedRegistrationDetails: ByteArray // the purpose of the method cache, is to accelerate looking up methods for specific class @@ -286,10 +290,8 @@ open class Serialization(private val references: Boolean /** * Kryo specifically for handshakes */ - internal fun newHandshakeKryo(): KryoExtra { + internal fun newHandshakeKryo(kryo: Kryo) { // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. - val kryo = KryoExtra() - kryo.instantiatorStrategy = instantiatorStrategy kryo.references = references @@ -299,14 +301,12 @@ open class Serialization(private val references: Boolean kryo.register(ByteArray::class.java) kryo.register(HandshakeMessage::class.java) - - return kryo } /** * called as the first thing inside when initializing the classesToRegister */ - internal fun initGlobalKryo(): KryoExtra { + private fun newGlobalKryo(kryo: Kryo) { // NOTE: classesRegistrations.forEach will be called after serialization init! // NOTE: All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. @@ -316,9 +316,6 @@ open class Serialization(private val references: Boolean // Note that this is very inefficient and should be avoided if possible. // val javaSerializer = JavaSerializer() - - val kryo = KryoExtra() - kryo.instantiatorStrategy = instantiatorStrategy kryo.references = references @@ -397,14 +394,6 @@ open class Serialization(private val references: Boolean UnmodifiableCollectionsSerializer.registerSerializers(kryo) SynchronizedCollectionsSerializer.registerSerializers(kryo) - // TODO: this is for diffie hellmen handshake stuff! -// serialization.register(IESParameters::class.java, IesParametersSerializer()) -// serialization.register(IESWithCipherParameters::class.java, IesWithCipherParametersSerializer()) - // TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo -// serialization.register(XECPublicKey::class.java, XECPublicKeySerializer()) -// serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer()) -// serialization.register(Message::class.java) // must use full package name! - // RMI stuff! kryo.register(ConnectionObjectCreateRequest::class.java) kryo.register(ConnectionObjectCreateResponse::class.java) @@ -437,8 +426,6 @@ open class Serialization(private val references: Boolean kryo.register(Reserved7::class.java) kryo.register(Reserved8::class.java) kryo.register(Reserved9::class.java) - - return kryo } /** @@ -446,8 +433,9 @@ open class Serialization(private val references: Boolean * * This is to prevent (and recognize) out-of-order class/serializer registration. If an ID is already in use by a different type, an exception is thrown. */ - internal fun finishInit(type: Class<*>, kryo: KryoExtra) { + internal fun finishInit(type: Class<*>, maxMessageSize: Int) { logger = KotlinLogging.logger(type.simpleName) + this.maxMessageSize = maxMessageSize val firstInitialization = initialized.compareAndSet(expect = false, update = true) @@ -456,6 +444,9 @@ open class Serialization(private val references: Boolean throw IllegalArgumentException("Unable to initialize object serialization more than once!") } + val kryo = KryoWriter(maxMessageSize) + newGlobalKryo(kryo) + initializeRegistrations(kryo, classesToRegister) classesToRegister.clear() // don't need to keep a reference, since this can never be reinitialized. @@ -484,30 +475,27 @@ open class Serialization(private val references: Boolean * * @return true if initialization was successful, false otherwise. DOES NOT CATCH EXCEPTIONS EXTERNALLY */ - internal fun finishClientConnect(kryoRegistrationDetailsFromServer: ByteArray): KryoExtra? { + internal fun finishClientConnect(kryoRegistrationDetailsFromServer: ByteArray, maxMessageSize: Int) { + val readKryo = KryoReader(maxMessageSize) + val writeKryo = KryoWriter(maxMessageSize) + newGlobalKryo(readKryo) + newGlobalKryo(writeKryo) + // we self initialize our registrations, THEN we compare them to the server. - val kryo = initGlobalKryo() + val newRegistrations = initializeRegistrationsForClient(kryoRegistrationDetailsFromServer, classesToRegister, readKryo) + ?: throw Exception("Unable to initialize class registration information from the server") - val newRegistrations = initializeRegistrationsForClient(kryoRegistrationDetailsFromServer, classesToRegister) ?: return null + initializeRegistrations(writeKryo, newRegistrations) - try { - initializeRegistrations(kryo, newRegistrations) + // NOTE: we MUST be super careful to never modify `classesToRegister`!! + // NOTE: DO NOT CLEAR THIS WITH CLIENTS, THEY HAVE TO REBUILD EVERY TIME WITH A NEW CONNECTION! + // classesToRegister.clear() // don't need to keep a reference, since this can never be reinitialized. - // NOTE: we MUST be super careful to never modify `classesToRegister`!! - // NOTE: DO NOT CLEAR THIS WITH CLIENTS, THEY HAVE TO REBUILD EVERY TIME WITH A NEW CONNECTION! - // classesToRegister.clear() // don't need to keep a reference, since this can never be reinitialized. - - if (logger.isTraceEnabled) { - // log the in-order output first - finalClassRegistrations.forEach { classRegistration -> - logger.trace(classRegistration.info) - } + if (logger.isTraceEnabled) { + // log the in-order output first + finalClassRegistrations.forEach { classRegistration -> + logger.trace(classRegistration.info) } - - return kryo - } catch (e: Exception) { - logger.error(e) { "Unable to correctly register classes for serialization during client connection!" } - return null } } @@ -515,8 +503,8 @@ open class Serialization(private val references: Boolean /** * @throws IllegalArgumentException if there is are too many RMI methods OR if a problem setting up the registration details */ - private fun initializeRegistrations(kryo: KryoExtra, classesToRegister: List>) { - val mergedRegistrations = mergeClassRegistrations(kryo, classesToRegister) + private fun initializeRegistrations(kryo: KryoWriter, classesToRegister: List) { + val mergedRegistrations = mergeClassRegistrations(classesToRegister, kryo) // make sure our RMI cached methods have been initialized initializeRmiMethodCache(mergedRegistrations, kryo) @@ -531,7 +519,7 @@ open class Serialization(private val references: Boolean /** * Merge all the registrations (since we can have registrations overwrite newer/specific registrations based on ID) */ - private fun mergeClassRegistrations(kryo: KryoExtra, classesToRegister: List>): List> { + private fun mergeClassRegistrations(classesToRegister: List, kryo: Kryo): List { // check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer) // note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again @@ -540,7 +528,7 @@ open class Serialization(private val references: Boolean } // in order to get the ID's, these have to be registered with a kryo instance! - val mergedRegistrations = mutableListOf>() + val mergedRegistrations = mutableListOf() classesToRegister.forEach { registration -> val id = registration.id @@ -580,13 +568,13 @@ open class Serialization(private val references: Boolean * * @throws IllegalArgumentException if there are too many RMI methods */ - private fun initializeRmiMethodCache(classesToRegister: List>, kryo: KryoExtra) { + private fun initializeRmiMethodCache(classesToRegister: List, kryo: Kryo) { classesToRegister.forEach { classRegistration -> // we should cache RMI methods! We don't always know if something is RMI or not (from just how things are registered...) // so it is super trivial to map out all possible, relevant types val kryoId = classRegistration.id - if (classRegistration is ClassRegistrationForRmi) { + if (classRegistration is ClassRegistrationForRmi<*>) { // on the "RMI server" (aka, where the object lives) side, there will be an interface + implementation! val implClass = classRegistration.implClass @@ -621,7 +609,7 @@ open class Serialization(private val references: Boolean /** * @throws IllegalArgumentException if there is a problem setting up the registration details */ - private fun createRegistrationDetails(classesToRegister: List>, kryo: KryoExtra): ByteArray { + private fun createRegistrationDetails(classesToRegister: List, kryo: KryoWriter): ByteArray { // now create the registration details, used to validate that the client/server have the EXACT same class registration setup val registrationDetails = arrayListOf>() @@ -651,24 +639,22 @@ open class Serialization(private val references: Boolean */ @Suppress("UNCHECKED_CAST") private fun initializeRegistrationsForClient( - kryoRegistrationDetailsFromServer: ByteArray, - classesToRegister: List> - ): MutableList>? { + kryoRegistrationDetailsFromServer: ByteArray, classesToRegister: List, kryo: KryoReader + ): MutableList? { // we have to allow CUSTOM classes to register (where the order does not matter), so that if the CLIENT is the RMI-SERVER, it can // specify IMPL classes for RMI. classesToRegister.forEach { registration -> - require(registration is ClassRegistrationForRmi) { "Unable to register a *class* by itself. This is only permitted on the CLIENT for RMI. " + + require(registration is ClassRegistrationForRmi<*>) { "Unable to register a *class* by itself. This is only permitted on the CLIENT for RMI. " + "To fix this, remove xx.register(${registration.clazz.name})" } } @Suppress("UNCHECKED_CAST") val classesToRegisterForRmi = listOf(*classesToRegister.toTypedArray()) as List> - val kryo = initGlobalKryo() val input = AeronInput(kryoRegistrationDetailsFromServer) val clientClassRegistrations = kryo.read(input) as Array> - val newRegistrations = mutableListOf>() + val newRegistrations = mutableListOf() val maker = kryo.instantiatorStrategy val rmiSerializer = rmiServerSerializer @@ -754,12 +740,48 @@ open class Serialization(private val references: Boolean return newRegistrations } + private val writeKryos: Pool> = ObjectPool.nonBlockingBounded( + poolObject = object : BoundedPoolObject>() { + override fun newInstance(): KryoWriter { + return newWriteKryo(maxMessageSize) + } + }, + maxSize = OS.optimumNumberOfThreads * 2 + ) + + fun getWriteKryo(): KryoWriter { + return writeKryos.take() + } + + fun returnWriteKryo(kryo: KryoWriter) { + writeKryos.put(kryo) + } + /** * NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network! * * @return takes a kryo instance from the pool, or creates one if the pool was empty */ - fun initKryo(kryo: KryoExtra = initGlobalKryo()): KryoExtra { + fun newReadKryo(maxMessageSize: Int): KryoReader { + val kryo = KryoReader(maxMessageSize) + newGlobalKryo(kryo) + + // the final list of all registrations in the EndPoint. This cannot change for the serer. + finalClassRegistrations.forEach { registration -> + registration.register(kryo, rmiHolder) + } + + return kryo + } + /** + * NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network! + * + * @return takes a kryo instance from the pool, or creates one if the pool was empty + */ + fun newWriteKryo(maxMessageSize: Int): KryoWriter { + val kryo = KryoWriter(maxMessageSize) + newGlobalKryo(kryo) + // the final list of all registrations in the EndPoint. This cannot change for the serer. finalClassRegistrations.forEach { registration -> registration.register(kryo, rmiHolder)