Split kryo TYPES into read/write types, so usage is very clear. Now use a kryo pool for concurrent serialization

This commit is contained in:
Robinson 2023-07-12 14:08:12 +02:00
parent d3c3bf50d6
commit 6bf870bd7b
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
17 changed files with 637 additions and 442 deletions

View File

@ -718,25 +718,29 @@ open class Client<CONNECTION : Connection>(
//// RMI //// RMI
/////////////// ///////////////
// we set up our kryo information once we connect to a server (using the server's kryo registration details) try {
val kryoConfiguredFromServer = serialization.finishClientConnect(connectionInfo.kryoRegistrationDetails) // only have ot do one
if (kryoConfiguredFromServer == null) { serialization.finishClientConnect(connectionInfo.kryoRegistrationDetails, maxMessageSize)
} catch (e: Exception) {
handshakeConnection.close() handshakeConnection.close()
// because we are getting the class registration details from the SERVER, this should never be the case. // because we are getting the class registration details from the SERVER, this should never be the case.
// It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers) // It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers)
val exception = if (handshakeConnection.pubSub.isIpc) { val exception = if (handshakeConnection.pubSub.isIpc) {
ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!") ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!", e)
} else { } else {
ClientRejectedException("[${handshake.connectKey}] Connection to [$addressString] has incorrect class registration details!!") ClientRejectedException("[${handshake.connectKey}] Connection to [$addressString] has incorrect class registration details!!", e)
} }
exception.cleanStackTraceInternal() exception.cleanStackTraceInternal()
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
// every time we connect to a server, we have to reconfigure AND reassign the readKryos. // we set up our kryo information once we connect to a server (using the server's kryo registration details)
readKryo = kryoConfiguredFromServer
// every time we connect to a server, we have to reconfigure AND reassign kryo
readKryo = serialization.newReadKryo(maxMessageSize)
/////////////// ///////////////

View File

@ -70,8 +70,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*/ */
private val sendIdleStrategy: IdleStrategy private val sendIdleStrategy: IdleStrategy
private val writeKryo: KryoExtra<Connection>
/** /**
* This is the client UUID. This is useful determine if the same client is connecting multiple times to a server (instead of only using IP address) * This is the client UUID. This is useful determine if the same client is connecting multiple times to a server (instead of only using IP address)
*/ */
@ -149,9 +147,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
init { init {
@Suppress("UNCHECKED_CAST")
writeKryo = endPoint.serialization.initKryo() as KryoExtra<Connection>
sendIdleStrategy = endPoint.config.sendIdleStrategy.cloneToNormal() sendIdleStrategy = endPoint.config.sendIdleStrategy.cloneToNormal()
@ -215,48 +210,46 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
/** /**
* Safely sends objects to a destination, if `abortEarly` is true, there are no retries if sending the message fails. * Safely sends objects to a destination, if `abortEarly` is true, there are no retries if sending the message fails.
* *
* NOTE: this is dispatched to the IO context!! (since network calls are IO/blocking calls) * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
*
* @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
*/ */
internal suspend fun send(message: Any, abortEarly: Boolean): Boolean { internal suspend fun send(message: Any, abortEarly: Boolean): Boolean {
// we use a mutex because we do NOT want different threads/coroutines to be able to send data over the SAME connections at the SAME time. var success = false
// NOTE: additionally we want to propagate back-pressure to the calling coroutines, PER CONNECTION!
val success = writeMutex.withLock { // this is dispatched to the IO context!! (since network calls are IO/blocking calls)
// we reset the sending timeout strategy when a message was successfully sent. withContext(Dispatchers.IO) {
sendIdleStrategy.reset() // we use a mutex because we do NOT want different threads/coroutines to be able to send data over the SAME connections at the SAME time.
// exclusive publications are not thread safe/concurrent!
writeMutex.withLock {
// we reset the sending timeout strategy when a message was successfully sent.
sendIdleStrategy.reset()
try { try {
// The handshake sessionId IS NOT globally unique // The handshake sessionId IS NOT globally unique
logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" } logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" }
val write = endPoint.write(writeKryo, message, publication, sendIdleStrategy, this@Connection, abortEarly) success = endPoint.write(message, publication, sendIdleStrategy, this@Connection, abortEarly)
write } catch (e: Throwable) {
} catch (e: Throwable) { // make sure we atomically create the listener manager, if necessary
// make sure we atomically create the listener manager, if necessary listenerManager.getAndUpdate { origManager ->
listenerManager.getAndUpdate { origManager -> origManager ?: ListenerManager(logger)
origManager ?: ListenerManager(logger) }
val listenerManager = listenerManager.value!!
if (message is MethodResponse && message.result is Exception) {
val result = message.result as Exception
val newException = SerializationException("Error serializing message ${message.javaClass.simpleName}: '$message'", result)
listenerManager.notifyError(this@Connection, newException)
} else if (message is ClientException || message is ServerException) {
val newException = TransmitException("Error with message ${message.javaClass.simpleName}: '$message'", e)
listenerManager.notifyError(this@Connection, newException)
} else {
val newException = TransmitException("Error sending message ${message.javaClass.simpleName}: '$message'", e)
listenerManager.notifyError(this@Connection, newException)
}
} }
val listenerManager = listenerManager.value!!
if (message is MethodResponse && message.result is Exception) {
val result = message.result as Exception
val newException = SerializationException("Error serializing message ${message.javaClass.simpleName}: '$message'", result)
listenerManager.notifyError(this@Connection, newException)
} else if (message is ClientException || message is ServerException) {
val newException = TransmitException("Error with message ${message.javaClass.simpleName}: '$message'", e)
listenerManager.notifyError(this@Connection, newException)
} else {
val newException = TransmitException("Error sending message ${message.javaClass.simpleName}: '$message'", e)
listenerManager.notifyError(this@Connection, newException)
}
false
} }
} }
return success return success
} }

View File

@ -36,10 +36,12 @@ import dorkbox.network.rmi.ResponseManager
import dorkbox.network.rmi.RmiManagerConnections import dorkbox.network.rmi.RmiManagerConnections
import dorkbox.network.rmi.RmiManagerGlobal import dorkbox.network.rmi.RmiManagerGlobal
import dorkbox.network.rmi.messages.RmiMessage import dorkbox.network.rmi.messages.RmiMessage
import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.KryoReader
import dorkbox.network.serialization.KryoWriter
import dorkbox.network.serialization.Serialization import dorkbox.network.serialization.Serialization
import dorkbox.network.serialization.SettingsStore import dorkbox.network.serialization.SettingsStore
import io.aeron.Publication import io.aeron.Publication
import io.aeron.logbuffer.FrameDescriptor
import io.aeron.logbuffer.Header import io.aeron.logbuffer.Header
import kotlinx.atomicfu.atomic import kotlinx.atomicfu.atomic
import kotlinx.coroutines.* import kotlinx.coroutines.*
@ -118,10 +120,19 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*/ */
val serialization: Serialization<CONNECTION> val serialization: Serialization<CONNECTION>
// These are GLOBAL, single threaded only kryo instances. /**
// The readKryo WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem) * The largest size a SINGLE message via AERON can be. We chunk messages larger than this, so we can send very large files using aeron.
*/
internal val maxMessageSize = FrameDescriptor.computeMaxMessageLength(config.publicationTermBufferLength)
/**
* Read and Write can be concurrent (different buffers are used)
* GLOBAL, single threaded only kryo instances.
*
* This WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem)
*/
@Volatile @Volatile
internal var readKryo: KryoExtra<CONNECTION> internal lateinit var readKryo: KryoReader<CONNECTION>
internal val handshaker: Handshaker<CONNECTION> internal val handshaker: Handshaker<CONNECTION>
@ -194,20 +205,16 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// serialization stuff // serialization stuff
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
serialization = config.serialization as Serialization<CONNECTION> serialization = config.serialization as Serialization<CONNECTION>
serialization.finishInit(type, maxMessageSize)
// we are done with initial configuration, now finish serialization // we are done with initial configuration, now finish serialization
val kryo = serialization.initGlobalKryo() // the CLIENT will reassign these in the `connect0` method (because it registers what the server says to register)
serialization.finishInit(type, kryo) if (type == Server::class.java) {
readKryo = serialization.newReadKryo(maxMessageSize)
// the initial kryo created for serialization is reused as the read kryo
readKryo = if (type == Server::class.java) {
serialization.initKryo(kryo)
} else {
// these will be reassigned by the client Connect method!
kryo
} }
// we have to be able to specify the property store // we have to be able to specify the property store
storage = SettingsStore(config.settingsStore, logger) storage = SettingsStore(config.settingsStore, logger)
crypto = CryptoManagement(logger, storage, type, config.enableRemoteSignatureValidation) crypto = CryptoManagement(logger, storage, type, config.enableRemoteSignatureValidation)
@ -458,14 +465,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* *
* This will split a message if it's too large to send in a single network message. * This will split a message if it's too large to send in a single network message.
* *
* NOTE: we use exclusive publications, and they are not thread safe/concurrent!
*
* THIS IS CALLED FROM WITHIN A MUTEX!!
*
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown! * @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/ */
open fun write( open suspend fun write(
writeKryo: KryoExtra<Connection>,
message: Any, message: Any,
publication: Publication, publication: Publication,
sendIdleStrategy: IdleStrategy, sendIdleStrategy: IdleStrategy,
@ -474,31 +476,40 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
): Boolean { ): Boolean {
// NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network! // NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
// since ANY thread can call 'send', we have to take kryo instances in a safe way @Suppress("UNCHECKED_CAST")
// the maximum size that this buffer can be is: connection as CONNECTION
// ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824
val buffer = writeKryo.write(connection, message) val kryo = serialization.getWriteKryo();
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer try {
// since ANY thread can call 'send', we have to take kryo instances in a safe way
// the maximum size that this buffer can be is:
// ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824
val buffer = kryo.write(connection, message)
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
// one small problem! What if the message is too big to send all at once? // one small problem! What if the message is too big to send all at once?
val maxMessageLength = publication.maxMessageLength() return if (objectSize >= maxMessageSize) {
return if (objectSize >= maxMessageLength) { // we must split up the message! It's too large for Aeron to manage.
// we must split up the message! It's too large for Aeron to manage. // this will split up the message, construct the necessary control message and state, then CALL the sendData
// this will split up the message, construct the necessary control message and state, then CALL the sendData // method directly for each subsequent message.
// method directly for each subsequent message. streamingManager.send(
streamingManager.send( publication = publication,
publication = publication, internalBuffer = internalBuffer,
internalBuffer = internalBuffer, objectSize = objectSize,
objectSize = objectSize, maxMessageSize = maxMessageSize,
endPoint = this, endPoint = this,
kryo = writeKryo, kryo = kryo,
sendIdleStrategy = sendIdleStrategy, sendIdleStrategy = sendIdleStrategy,
connection = connection connection = connection
) )
} else { } else {
dataSend(publication, internalBuffer, 0, objectSize, sendIdleStrategy, connection, abortEarly) dataSend(publication, internalBuffer, 0, objectSize, sendIdleStrategy, connection, abortEarly)
}
} finally {
serialization.returnWriteKryo(kryo)
} }
} }
@ -511,13 +522,13 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* *
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown! * @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/ */
open fun writeUnsafe(writeKryo: KryoExtra<Connection>, message: Any, publication: Publication, sendIdleStrategy: IdleStrategy, connection: Connection): Boolean { open fun writeUnsafe(message: Any, publication: Publication, sendIdleStrategy: IdleStrategy, connection: CONNECTION, kryo: KryoWriter<CONNECTION>): Boolean {
// NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network! // NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
// since ANY thread can call 'send', we have to take kryo instances in a safe way // since ANY thread can call 'send', we have to take kryo instances in a safe way
// the maximum size that this buffer can be is: // the maximum size that this buffer can be is:
// ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824 // ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824
val buffer = writeKryo.write(connection, message) val buffer = kryo.write(connection, message)
val objectSize = buffer.position() val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer val internalBuffer = buffer.internalBuffer
@ -533,7 +544,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* must be EXPLICITLY used by the implementation, and if a custom message processor is to be used (ie: a state machine) you must * must be EXPLICITLY used by the implementation, and if a custom message processor is to be used (ie: a state machine) you must
* guarantee that Ping, RMI, Streaming object, etc. are not used (as it would not function without this custom * guarantee that Ping, RMI, Streaming object, etc. are not used (as it would not function without this custom
*/ */
open fun processMessage(message: Any?, connection: CONNECTION) { open fun processMessage(message: Any?, connection: CONNECTION, readKryo: KryoReader<CONNECTION>) {
// the REPEATED usage of wrapping methods below is because Streaming messages have to intercept data BEFORE it goes to a coroutine // the REPEATED usage of wrapping methods below is because Streaming messages have to intercept data BEFORE it goes to a coroutine
when (message) { when (message) {
// the remote endPoint will send this message if it is closing the connection. // the remote endPoint will send this message if it is closing the connection.
@ -630,7 +641,13 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* @param header The aeron header information * @param header The aeron header information
* @param connection The connection this message happened on * @param connection The connection this message happened on
*/ */
internal fun dataReceive(buffer: DirectBuffer, offset: Int, length: Int, header: Header, connection: Connection) { internal fun dataReceive(
buffer: DirectBuffer,
offset: Int,
length: Int,
header: Header,
connection: Connection
) {
// this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe! // this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe!
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
connection as CONNECTION connection as CONNECTION
@ -639,7 +656,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change! // NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change!
val message = readKryo.read(buffer, offset, length, connection) val message = readKryo.read(buffer, offset, length, connection)
logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" } logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" }
processMessage(message, connection) processMessage(message, connection, readKryo)
} catch (e: Exception) { } catch (e: Exception) {
listenerManager.notifyError(connection, newException("Error de-serializing message", e)) listenerManager.notifyError(connection, newException("Error de-serializing message", e))
} }

View File

@ -29,7 +29,8 @@ import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.exceptions.StreamingException import dorkbox.network.exceptions.StreamingException
import dorkbox.network.serialization.AeronInput import dorkbox.network.serialization.AeronInput
import dorkbox.network.serialization.AeronOutput import dorkbox.network.serialization.AeronOutput
import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.KryoReader
import dorkbox.network.serialization.KryoWriter
import dorkbox.os.OS import dorkbox.os.OS
import dorkbox.util.Sys import dorkbox.util.Sys
import io.aeron.Publication import io.aeron.Publication
@ -108,7 +109,7 @@ internal class StreamingManager<CONNECTION : Connection>(
*/ */
fun processControlMessage( fun processControlMessage(
message: StreamingControl, message: StreamingControl,
kryo: KryoExtra<CONNECTION>, kryo: KryoReader<CONNECTION>,
endPoint: EndPoint<CONNECTION>, endPoint: EndPoint<CONNECTION>,
connection: CONNECTION connection: CONNECTION
) { ) {
@ -294,13 +295,13 @@ internal class StreamingManager<CONNECTION : Connection>(
streamSessionId: Int, streamSessionId: Int,
publication: Publication, publication: Publication,
endPoint: EndPoint<CONNECTION>, endPoint: EndPoint<CONNECTION>,
kryoExtra: KryoExtra<Connection>,
sendIdleStrategy: IdleStrategy, sendIdleStrategy: IdleStrategy,
connection: Connection connection: CONNECTION,
kryo: KryoWriter<CONNECTION>
) { ) {
val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId) val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId)
val failSent = endPoint.writeUnsafe(kryoExtra, failMessage, publication, sendIdleStrategy, connection) val failSent = endPoint.writeUnsafe(failMessage, publication, sendIdleStrategy, connection, kryo)
if (!failSent) { if (!failSent) {
// something SUPER wrong! // something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything. // more critical error sending the message. we shouldn't retry or anything.
@ -334,11 +335,12 @@ internal class StreamingManager<CONNECTION : Connection>(
fun send( fun send(
publication: Publication, publication: Publication,
internalBuffer: MutableDirectBuffer, internalBuffer: MutableDirectBuffer,
maxMessageSize: Int,
objectSize: Int, objectSize: Int,
endPoint: EndPoint<CONNECTION>, endPoint: EndPoint<CONNECTION>,
kryo: KryoExtra<Connection>, kryo: KryoWriter<CONNECTION>,
sendIdleStrategy: IdleStrategy, sendIdleStrategy: IdleStrategy,
connection: Connection connection: CONNECTION
): Boolean { ): Boolean {
// this buffer is the exact size as our internal buffer, so it is unnecessary to have multiple kryo instances // this buffer is the exact size as our internal buffer, so it is unnecessary to have multiple kryo instances
val originalBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size! val originalBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size!
@ -357,7 +359,7 @@ internal class StreamingManager<CONNECTION : Connection>(
// tell the other side how much data we are sending // tell the other side how much data we are sending
val startMessage = StreamingControl(StreamingState.START, streamSessionId, objectSize.toLong()) val startMessage = StreamingControl(StreamingState.START, streamSessionId, objectSize.toLong())
val startSent = endPoint.writeUnsafe(kryo, startMessage, publication, sendIdleStrategy, connection) val startSent = endPoint.writeUnsafe(startMessage, publication, sendIdleStrategy, connection, kryo)
if (!startSent) { if (!startSent) {
// more critical error sending the message. we shouldn't retry or anything. // more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error starting streaming content." val errorMessage = "[${publication.sessionId()}] Error starting streaming content."
@ -381,8 +383,7 @@ internal class StreamingManager<CONNECTION : Connection>(
// payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time. // payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time.
// MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates! // MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates!
// the maxPayloadLength MUST ABSOLUTELY be less that the max size + header! var sizeOfPayload = maxMessageSize
var sizeOfPayload = publication.maxMessageLength() - 200
val header: ByteArray val header: ByteArray
val headerSize: Int val headerSize: Int
@ -436,7 +437,7 @@ internal class StreamingManager<CONNECTION : Connection>(
throw exception throw exception
} }
} catch (e: Exception) { } catch (e: Exception) {
sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, kryo, sendIdleStrategy, connection) sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, sendIdleStrategy, connection, kryo)
return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy. return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy.
} }
@ -483,7 +484,7 @@ internal class StreamingManager<CONNECTION : Connection>(
} catch (e: Exception) { } catch (e: Exception) {
val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId) val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId)
val failSent = endPoint.writeUnsafe(kryo, failMessage, publication, sendIdleStrategy, connection) val failSent = endPoint.writeUnsafe(failMessage, publication, sendIdleStrategy, connection, kryo)
if (!failSent) { if (!failSent) {
// something SUPER wrong! // something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything. // more critical error sending the message. we shouldn't retry or anything.
@ -506,6 +507,6 @@ internal class StreamingManager<CONNECTION : Connection>(
// send the last chunk of data // send the last chunk of data
val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong()) val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong())
return endPoint.writeUnsafe(kryo, finishedMessage, publication, sendIdleStrategy, connection) return endPoint.writeUnsafe(finishedMessage, publication, sendIdleStrategy, connection, kryo)
} }
} }

View File

@ -20,15 +20,16 @@ import dorkbox.network.Configuration
import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.ServerException
import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.KryoReader
import dorkbox.network.serialization.KryoWriter
import dorkbox.network.serialization.Serialization import dorkbox.network.serialization.Serialization
import io.aeron.Publication import io.aeron.Publication
import io.aeron.logbuffer.FrameDescriptor
import mu.KLogger import mu.KLogger
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
@ -40,15 +41,23 @@ internal class Handshaker<CONNECTION : Connection>(
aeronDriver: AeronDriver, aeronDriver: AeronDriver,
val newException: (String, Throwable?) -> Throwable val newException: (String, Throwable?) -> Throwable
) { ) {
private val handshakeReadKryo: KryoExtra<CONNECTION> private val handshakeReadKryo: KryoReader<CONNECTION>
private val handshakeWriteKryo: KryoExtra<CONNECTION> private val handshakeWriteKryo: KryoWriter<CONNECTION>
private val handshakeSendIdleStrategy: CoroutineIdleStrategy private val handshakeSendIdleStrategy: CoroutineIdleStrategy
private val writeTimeoutNS = (aeronDriver.lingerNs() * 1.2).toLong() // close enough. Just needs to be slightly longer private val writeTimeoutNS = (aeronDriver.lingerNs() * 1.2).toLong() // close enough. Just needs to be slightly longer
init { init {
handshakeReadKryo = serialization.newHandshakeKryo() val maxMessageSize = FrameDescriptor.computeMaxMessageLength(config.publicationTermBufferLength)
handshakeWriteKryo = serialization.newHandshakeKryo()
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
handshakeReadKryo = KryoReader(maxMessageSize)
handshakeWriteKryo = KryoWriter(maxMessageSize)
serialization.newHandshakeKryo(handshakeReadKryo)
serialization.newHandshakeKryo(handshakeWriteKryo)
handshakeSendIdleStrategy = config.sendIdleStrategy.clone() handshakeSendIdleStrategy = config.sendIdleStrategy.clone()
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -12,7 +12,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* */
/*
* Copyright (c) 2008, Nathan Sweet * Copyright (c) 2008, Nathan Sweet
* All rights reserved. * All rights reserved.
* *
@ -42,7 +43,7 @@ import com.esotericsoftware.kryo.io.Output
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.CachedMethod
import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.RmiUtils
import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.KryoReader
import org.agrona.collections.Int2ObjectHashMap import org.agrona.collections.Int2ObjectHashMap
import java.lang.reflect.Method import java.lang.reflect.Method
@ -83,7 +84,7 @@ class MethodRequestSerializer<CONNECTION: Connection>(private val methodCache: I
val methodIndex = RmiUtils.unpackRight(methodInfo) val methodIndex = RmiUtils.unpackRight(methodInfo)
val isGlobal = input.readBoolean() val isGlobal = input.readBoolean()
kryo as KryoExtra<CONNECTION> kryo as KryoReader<CONNECTION>
val cachedMethod = try { val cachedMethod = try {
methodCache[methodClassId][methodIndex] methodCache[methodClassId][methodIndex]

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -23,7 +23,7 @@ import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EndPoint
import dorkbox.network.rmi.RmiClient import dorkbox.network.rmi.RmiClient
import dorkbox.network.rmi.RmiSupportConnection import dorkbox.network.rmi.RmiSupportConnection
import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.KryoReader
import java.lang.reflect.Proxy import java.lang.reflect.Proxy
/** /**
@ -67,7 +67,7 @@ class RmiClientSerializer<CONNECTION: Connection>: Serializer<Any>() {
val isGlobal = input.readBoolean() val isGlobal = input.readBoolean()
val objectId = input.readInt(true) val objectId = input.readInt(true)
kryo as KryoExtra<CONNECTION> kryo as KryoReader<CONNECTION>
val endPoint: EndPoint<CONNECTION> = kryo.connection.endPoint as EndPoint<CONNECTION> val endPoint: EndPoint<CONNECTION> = kryo.connection.endPoint as EndPoint<CONNECTION>
return if (isGlobal) { return if (isGlobal) {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -12,7 +12,8 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* */
/*
* Copyright (c) 2008, Nathan Sweet * Copyright (c) 2008, Nathan Sweet
* All rights reserved. * All rights reserved.
* *
@ -41,7 +42,8 @@ import com.esotericsoftware.kryo.io.Output
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.rmi.RemoteObjectStorage import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.rmi.RmiSupportConnection import dorkbox.network.rmi.RmiSupportConnection
import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.KryoReader
import dorkbox.network.serialization.KryoWriter
/** /**
* This is to manage serializing RMI objects across the wire... * This is to manage serializing RMI objects across the wire...
@ -76,7 +78,7 @@ import dorkbox.network.serialization.KryoExtra
class RmiServerSerializer<CONNECTION: Connection> : Serializer<Any>(false) { class RmiServerSerializer<CONNECTION: Connection> : Serializer<Any>(false) {
override fun write(kryo: Kryo, output: Output, `object`: Any) { override fun write(kryo: Kryo, output: Output, `object`: Any) {
val kryoExtra = kryo as KryoExtra<CONNECTION> val kryoExtra = kryo as KryoWriter<CONNECTION>
val connection = kryoExtra.connection val connection = kryoExtra.connection
val rmi = connection.rmi val rmi = connection.rmi
// have to write what the rmi ID is ONLY. A remote object sent via a connection IS ONLY a connection-scope object! // have to write what the rmi ID is ONLY. A remote object sent via a connection IS ONLY a connection-scope object!
@ -96,7 +98,7 @@ class RmiServerSerializer<CONNECTION: Connection> : Serializer<Any>(false) {
} }
override fun read(kryo: Kryo, input: Input, interfaceClass: Class<*>): Any? { override fun read(kryo: Kryo, input: Input, interfaceClass: Class<*>): Any? {
val kryoExtra = kryo as KryoExtra<CONNECTION> val kryoExtra = kryo as KryoReader<CONNECTION>
val rmiId = input.readInt(true) val rmiId = input.readInt(true)
val connection = kryoExtra.connection val connection = kryoExtra.connection

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,7 +20,7 @@ import com.esotericsoftware.kryo.Serializer
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.rmi.messages.RmiServerSerializer import dorkbox.network.rmi.messages.RmiServerSerializer
internal abstract class ClassRegistration<CONNECTION: Connection>(val clazz: Class<*>, val serializer: Serializer<*>? = null, var id: Int = 0) { internal abstract class ClassRegistration(val clazz: Class<*>, val serializer: Serializer<*>? = null, var id: Int = 0) {
companion object { companion object {
const val IGNORE_REGISTRATION = -1 const val IGNORE_REGISTRATION = -1
} }
@ -33,7 +33,7 @@ internal abstract class ClassRegistration<CONNECTION: Connection>(val clazz: Cla
* If so, we ignore it - any IFACE or IMPL that already has been assigned to an RMI serializer, *MUST* remain an RMI serializer * If so, we ignore it - any IFACE or IMPL that already has been assigned to an RMI serializer, *MUST* remain an RMI serializer
* If this class registration will EVENTUALLY be for RMI, then [ClassRegistrationForRmi] will reassign the serializer * If this class registration will EVENTUALLY be for RMI, then [ClassRegistrationForRmi] will reassign the serializer
*/ */
open fun register(kryo: KryoExtra<CONNECTION>, rmi: RmiHolder) { open fun register(kryo: Kryo, rmi: RmiHolder) {
// ClassRegistrationForRmi overrides this method // ClassRegistrationForRmi overrides this method
if (id == IGNORE_REGISTRATION) { if (id == IGNORE_REGISTRATION) {
// we have previously specified that this registration should be ignored! // we have previously specified that this registration should be ignored!

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,7 +19,7 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
internal class ClassRegistration0<CONNECTION: Connection>(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration<CONNECTION>(clazz, serializer) { internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz, serializer) {
override fun register(kryo: Kryo) { override fun register(kryo: Kryo) {
id = kryo.register(clazz, serializer).id id = kryo.register(clazz, serializer).id
info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass.name}" info = "Registered $id -> ${clazz.name} using ${serializer!!.javaClass.name}"

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
internal class ClassRegistration1<CONNECTION: Connection>(clazz: Class<*>, id: Int) : ClassRegistration<CONNECTION>(clazz, null, id) { internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz, null, id) {
override fun register(kryo: Kryo) { override fun register(kryo: Kryo) {
kryo.register(clazz, id) kryo.register(clazz, id)
info = "Registered $id -> (specified) ${clazz.name}" info = "Registered $id -> (specified) ${clazz.name}"

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,7 +19,7 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
internal class ClassRegistration2<CONNECTION: Connection>(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration<CONNECTION>(clazz, serializer, id) { internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz, serializer, id) {
override fun register(kryo: Kryo) { override fun register(kryo: Kryo) {
kryo.register(clazz, serializer, id) kryo.register(clazz, serializer, id)

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
internal open class ClassRegistration3<CONNECTION: Connection>(clazz: Class<*>) : ClassRegistration<CONNECTION>(clazz) { internal open class ClassRegistration3(clazz: Class<*>) : ClassRegistration(clazz) {
override fun register(kryo: Kryo) { override fun register(kryo: Kryo) {
id = kryo.register(clazz).id id = kryo.register(clazz).id

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -15,6 +15,7 @@
*/ */
package dorkbox.network.serialization package dorkbox.network.serialization
import com.esotericsoftware.kryo.Kryo
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.rmi.messages.RmiServerSerializer import dorkbox.network.rmi.messages.RmiServerSerializer
@ -47,7 +48,7 @@ import dorkbox.network.rmi.messages.RmiServerSerializer
*/ */
internal class ClassRegistrationForRmi<CONNECTION: Connection>(ifaceClass: Class<*>, internal class ClassRegistrationForRmi<CONNECTION: Connection>(ifaceClass: Class<*>,
var implClass: Class<*>?, var implClass: Class<*>?,
serializer: RmiServerSerializer<CONNECTION>) : ClassRegistration<CONNECTION>(ifaceClass, serializer) { serializer: RmiServerSerializer<CONNECTION>) : ClassRegistration(ifaceClass, serializer) {
/** /**
* In general: * In general:
* *
@ -104,7 +105,7 @@ internal class ClassRegistrationForRmi<CONNECTION: Connection>(ifaceClass: Class
* send: register IMPL object class with RmiServerSerializer * send: register IMPL object class with RmiServerSerializer
* lookup IMPL object -> rmiID * lookup IMPL object -> rmiID
*/ */
override fun register(kryo: KryoExtra<CONNECTION>, rmi: RmiHolder) { override fun register(kryo: Kryo, rmi: RmiHolder) {
// we override this, because we ALWAYS will call our RMI registration! // we override this, because we ALWAYS will call our RMI registration!
if (id == IGNORE_REGISTRATION) { if (id == IGNORE_REGISTRATION) {
// we have previously specified that this registration should be ignored! // we have previously specified that this registration should be ignored!

View File

@ -17,35 +17,63 @@ package dorkbox.network.serialization
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import net.jpountz.lz4.LZ4Factory
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
/** /**
* READ and WRITE are exclusive to each other and can be performed in different threads. * READ and WRITE are exclusive to each other and can be performed in different threads.
*
*/ */
class KryoExtra<CONNECTION: Connection>() : Kryo() { class KryoReader<CONNECTION: Connection>(maxMessageSize: Int) : Kryo() {
companion object {
internal const val DEBUG = false
internal val factory = LZ4Factory.fastestInstance()
}
// for kryo serialization // for kryo serialization
private val readerBuffer = AeronInput() private val readerBuffer = AeronInput()
private val writerBuffer = AeronOutput()
// crypto + compression have to work with native byte arrays, so here we go... // crypto + compression have to work with native byte arrays, so here we go...
// private val reader = Input(ABSOLUTE_MAX_SIZE_OBJECT) private val reader = Input(maxMessageSize)
// private val writer = Output(ABSOLUTE_MAX_SIZE_OBJECT)
// private val temp = ByteArray(ABSOLUTE_MAX_SIZE_OBJECT)
// This is unique per connection. volatile/etc is not necessary because it is set/read in the same thread // This is unique per connection. volatile/etc is not necessary because it is set/read in the same thread
lateinit var connection: CONNECTION lateinit var connection: CONNECTION
// private val secureRandom = SecureRandom() // private val secureRandom = SecureRandom()
// private var cipher: Cipher? = null // private var cipher: Cipher? = null
// private val compressor = factory.fastCompressor() private val decompressor = factory.fastDecompressor()
// private val decompressor = factory.fastDecompressor()
// The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter)
// The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
// counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
// private val aes_gcm_iv = atomic(0)
// /**
// * This is the per-message sequence number.
// *
// * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter)
// * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
// * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
// */
// fun nextGcmSequence(): Long {
// return aes_gcm_iv.getAndIncrement()
// }
//
// /**
// * @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation).
// */
// fun cryptoKey(): SecretKey {
//// return channelWrapper.cryptoKey()
// }
// //
// companion object { // companion object {
// private const val ABSOLUTE_MAX_SIZE_OBJECT = 500000 // by default, this is about 500k //
// private const val DEBUG = false //
// //
// // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) // // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
// // snappyuncomp : 1.391 micros/op; 2808.1 MB/s // // snappyuncomp : 1.391 micros/op; 2808.1 MB/s
@ -65,37 +93,6 @@ class KryoExtra<CONNECTION: Connection>() : Kryo() {
// } // }
// } // }
/**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
*
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
@Throws(Exception::class)
fun write(message: Any): AeronOutput {
writerBuffer.reset()
writeClassAndObject(writerBuffer, message)
return writerBuffer
}
/**
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
@Throws(Exception::class)
fun write(connection: CONNECTION, message: Any): AeronOutput {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.connection = connection
writerBuffer.reset()
writeClassAndObject(writerBuffer, message)
return writerBuffer
}
/** /**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
* *
@ -153,35 +150,6 @@ class KryoExtra<CONNECTION: Connection>() : Kryo() {
//////////////// ////////////////
//////////////// ////////////////
/**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
*
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
fun write(writer: Output, message: Any) {
// write the object to the NORMAL output buffer!
writer.reset()
writeClassAndObject(writer, message)
}
/**
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
private fun write(connection: CONNECTION, writer: Output, message: Any) {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.connection = connection
// write the object to the NORMAL output buffer!
writer.reset()
writeClassAndObject(writer, message)
}
/** /**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
* *
@ -218,95 +186,7 @@ class KryoExtra<CONNECTION: Connection>() : Kryo() {
val dataLength = readerBuffer.readVarInt(true) val dataLength = readerBuffer.readVarInt(true)
return readerBuffer.readBytes(dataLength) return readerBuffer.readBytes(dataLength)
} }
//
// /**
// * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
// *
// * BUFFER:
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// * + uncompressed length (1-4 bytes) + compressed data +
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// *
// * COMPRESSED DATA:
// * ++++++++++++++++++++++++++
// * + class and object bytes +
// * ++++++++++++++++++++++++++
// */
// fun writeCompressed(logger: Logger, output: Output, message: Any) {
// // write the object to a TEMP buffer! this will be compressed later
// write(writer, message)
//
// // save off how much data the object took
// val length = writer.position()
// val maxCompressedLength = compressor.maxCompressedLength(length)
//
// ////////// compressing data
// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// // output), will be negated by the increase in size by the encryption
// val compressOutput = temp
//
// // LZ4 compress.
// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
//
// if (DEBUG) {
// val orig = Sys.bytesToHex(writer.buffer, 0, length) use String.toHexBytes() instead
// val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength)
// logger.error(OS.LINE_SEPARATOR +
// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
// OS.LINE_SEPARATOR +
// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
// }
//
// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
// output.writeInt(length, true)
//
// // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
// output.writeBytes(compressOutput, 0, compressedLength)
// }
//
// /**
// * BUFFER:
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// * + uncompressed length (1-4 bytes) + compressed data +
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// *
// * COMPRESSED DATA:
// * ++++++++++++++++++++++++++
// * + class and object bytes +
// * ++++++++++++++++++++++++++
// */
// fun writeCompressed(logger: Logger, connection: Connection, output: Output, message: Any) {
// // write the object to a TEMP buffer! this will be compressed later
// write(connection, writer, message)
//
// // save off how much data the object took
// val length = writer.position()
// val maxCompressedLength = compressor.maxCompressedLength(length)
//
// ////////// compressing data
// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// // output), will be negated by the increase in size by the encryption
// val compressOutput = temp
//
// // LZ4 compress.
// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
//
// if (DEBUG) {
// val orig = Sys.bytesToHex(writer.buffer, 0, length)
// val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength)
// logger.error(OS.LINE_SEPARATOR +
// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
// OS.LINE_SEPARATOR +
// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
// }
//
// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
// output.writeInt(length, true)
//
// // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
// output.writeBytes(compressOutput, 0, compressedLength)
// }
//
// /** // /**
// * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! // * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
// * // *
@ -414,91 +294,7 @@ class KryoExtra<CONNECTION: Connection>() : Kryo() {
// return read(connection, reader) // return read(connection, reader)
// } // }
// //
// /**
// * BUFFER:
// * +++++++++++++++++++++++++++++++
// * + IV (12) + encrypted data +
// * +++++++++++++++++++++++++++++++
// *
// * ENCRYPTED DATA:
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// * + uncompressed length (1-4 bytes) + compressed data +
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// *
// * COMPRESSED DATA:
// * ++++++++++++++++++++++++++
// * + class and object bytes +
// * ++++++++++++++++++++++++++
// */
// fun writeCrypto(logger: Logger, connection: Connection_, buffer: ByteBuf, message: Any) {
// // write the object to a TEMP buffer! this will be compressed later
// write(connection, writer, message)
//
// // save off how much data the object took
// val length = writer.position()
// val maxCompressedLength = compressor.maxCompressedLength(length)
//
// ////////// compressing data
// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// // output), will be negated by the increase in size by the encryption
// val compressOutput = temp
//
// // LZ4 compress. Offset by 4 in the dest array so we have room for the length
// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 4, maxCompressedLength)
// if (DEBUG) {
// val orig = ByteBufUtil.hexDump(writer.buffer, 0, length)
// val compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength)
// logger.error(OS.LINE_SEPARATOR +
// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
// OS.LINE_SEPARATOR +
// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
// }
//
// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
// val lengthLength = OptimizeUtilsByteArray.intLength(length, true)
//
// // this is where we start writing the length data, so that the end of this lines up with the compressed data
// val start = 4 - lengthLength
// OptimizeUtilsByteArray.writeInt(compressOutput, length, true, start)
//
// // now compressOutput contains "uncompressed length + data"
// val compressedArrayLength = lengthLength + compressedLength
//
//
// /////// encrypting data.
// val cryptoKey = connection.cryptoKey()
// val iv = ByteArray(IV_LENGTH_BYTE) // NEVER REUSE THIS IV WITH SAME KEY
// secureRandom.nextBytes(iv)
// val parameterSpec = GCMParameterSpec(TAG_LENGTH_BIT, iv) // 128 bit auth tag length
// try {
// cipher!!.init(Cipher.ENCRYPT_MODE, cryptoKey, parameterSpec)
// } catch (e: Exception) {
// throw IOException("Unable to AES encrypt the data", e)
// }
//
// // we REUSE the writer buffer! (since that data is now compressed in a different array)
// val encryptedLength: Int
// encryptedLength = try {
// cipher!!.doFinal(compressOutput, start, compressedArrayLength, writer.buffer, 0)
// } catch (e: Exception) {
// throw IOException("Unable to AES encrypt the data", e)
// }
//
// // write out our IV
// buffer.writeBytes(iv, 0, IV_LENGTH_BYTE)
// Arrays.fill(iv, 0.toByte()) // overwrite the IV with zeros so we can't leak this value
//
// // have to copy over the orig data, because we used the temp buffer
// buffer.writeBytes(writer.buffer, 0, encryptedLength)
// if (DEBUG) {
// val ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE)
// val crypto = ByteBufUtil.hexDump(writer.buffer, 0, encryptedLength)
// logger.error(OS.LINE_SEPARATOR +
// "IV: (12)" + OS.LINE_SEPARATOR + ivString +
// OS.LINE_SEPARATOR +
// "CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto)
// }
// }
/** /**
* BUFFER: * BUFFER:

View File

@ -0,0 +1,349 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.serialization
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Output
import dorkbox.bytes.toHexString
import dorkbox.network.connection.Connection
import dorkbox.network.connection.CryptoManagement
import dorkbox.os.OS
import mu.KLogger
import net.jpountz.lz4.LZ4Factory
import javax.crypto.Cipher
/**
* READ and WRITE are exclusive to each other and can be performed in different threads.
*
*/
open class KryoWriter<CONNECTION: Connection>(maxMessageSize: Int) : Kryo() {
companion object {
internal const val DEBUG = false
internal val factory = LZ4Factory.fastestInstance()
}
// for kryo serialization
private val writerBuffer = AeronOutput()
// crypto + compression have to work with native byte arrays, so here we go...
private val writer = Output(maxMessageSize)
private val temp = ByteArray(maxMessageSize)
// This is unique per connection. volatile/etc is not necessary because it is set/read in the same thread
lateinit var connection: CONNECTION
private val cipher = Cipher.getInstance(CryptoManagement.AES_ALGORITHM)
// private val secureRandom = SecureRandom()
// private var cipher: Cipher? = null
private val compressor = factory.fastCompressor()
// The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter)
// The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
// counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
// private val aes_gcm_iv = atomic(0)
// /**
// * This is the per-message sequence number.
// *
// * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter)
// * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
// * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
// */
// fun nextGcmSequence(): Long {
// return aes_gcm_iv.getAndIncrement()
// }
//
// /**
// * @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation).
// */
// fun cryptoKey(): SecretKey {
//// return channelWrapper.cryptoKey()
// }
//
// companion object {
//
//
//
// // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
// // snappyuncomp : 1.391 micros/op; 2808.1 MB/s
// // lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%)
// // lz4uncomp : 0.641 micros/op; 6097.9 MB/s
// private val factory = LZ4Factory.fastestInstance()
// private const val ALGORITHM = "AES/GCM/NoPadding"
// private const val TAG_LENGTH_BIT = 128
// private const val IV_LENGTH_BYTE = 12
// }
// init {
// cipher = try {
// Cipher.getInstance(ALGORITHM)
// } catch (e: Exception) {
// throw IllegalStateException("could not get cipher instance", e)
// }
// }
/**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
*
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
@Throws(Exception::class)
fun write(message: Any): AeronOutput {
writerBuffer.reset()
writeClassAndObject(writerBuffer, message)
return writerBuffer
}
/**
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
@Throws(Exception::class)
fun write(connection: CONNECTION, message: Any): AeronOutput {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.connection = connection
writerBuffer.reset()
writeClassAndObject(writerBuffer, message)
return writerBuffer
}
////////////////
////////////////
////////////////
// for more complicated writes, sadly, we have to deal DIRECTLY with byte arrays
////////////////
////////////////
////////////////
/**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
*
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
fun write(writer: Output, message: Any) {
// write the object to the NORMAL output buffer!
writer.reset()
writeClassAndObject(writer, message)
}
/**
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
private fun write(connection: CONNECTION, writer: Output, message: Any) {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.connection = connection
// write the object to the NORMAL output buffer!
writer.reset()
writeClassAndObject(writer, message)
}
/**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
*
* BUFFER:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*
* COMPRESSED DATA:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
fun writeCompressed(logger: KLogger, output: Output, message: Any) {
// write the object to a TEMP buffer! this will be compressed later
write(writer, message)
// save off how much data the object took
val length = writer.position()
val maxCompressedLength = compressor.maxCompressedLength(length)
////////// compressing data
// we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// output), will be negated by the increase in size by the encryption
val compressOutput = temp
// LZ4 compress.
val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
if (DEBUG) {
val orig = writer.buffer.toHexString()
val compressed = compressOutput.toHexString()
logger.error(
OS.LINE_SEPARATOR +
"ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
OS.LINE_SEPARATOR +
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
}
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
output.writeInt(length, true)
// have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
output.writeBytes(compressOutput, 0, compressedLength)
}
//
// /**
// * BUFFER:
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// * + uncompressed length (1-4 bytes) + compressed data +
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// *
// * COMPRESSED DATA:
// * ++++++++++++++++++++++++++
// * + class and object bytes +
// * ++++++++++++++++++++++++++
// */
// fun writeCompressed(logger: Logger, connection: Connection, output: Output, message: Any) {
// // write the object to a TEMP buffer! this will be compressed later
// write(connection, writer, message)
//
// // save off how much data the object took
// val length = writer.position()
// val maxCompressedLength = compressor.maxCompressedLength(length)
//
// ////////// compressing data
// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// // output), will be negated by the increase in size by the encryption
// val compressOutput = temp
//
// // LZ4 compress.
// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
//
// if (DEBUG) {
// val orig = Sys.bytesToHex(writer.buffer, 0, length)
// val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength)
// logger.error(OS.LINE_SEPARATOR +
// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
// OS.LINE_SEPARATOR +
// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
// }
//
// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
// output.writeInt(length, true)
//
// // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
// output.writeBytes(compressOutput, 0, compressedLength)
// }
//
//
//
// /**
// * BUFFER:
// * +++++++++++++++++++++++++++++++
// * + IV (12) + encrypted data +
// * +++++++++++++++++++++++++++++++
// *
// * ENCRYPTED DATA:
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// * + uncompressed length (1-4 bytes) + compressed data +
// * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// *
// * COMPRESSED DATA:
// * ++++++++++++++++++++++++++
// * + class and object bytes +
// * ++++++++++++++++++++++++++
// */
// fun writeCrypto(logger: Logger, connection: Connection_, buffer: ByteBuf, message: Any) {
// // write the object to a TEMP buffer! this will be compressed later
// write(connection, writer, message)
//
// // save off how much data the object took
// val length = writer.position()
// val maxCompressedLength = compressor.maxCompressedLength(length)
//
// ////////// compressing data
// // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// // output), will be negated by the increase in size by the encryption
// val compressOutput = temp
//
// // LZ4 compress. Offset by 4 in the dest array so we have room for the length
// val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 4, maxCompressedLength)
// if (DEBUG) {
// val orig = ByteBufUtil.hexDump(writer.buffer, 0, length)
// val compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength)
// logger.error(OS.LINE_SEPARATOR +
// "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
// OS.LINE_SEPARATOR +
// "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
// }
//
// // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
// val lengthLength = OptimizeUtilsByteArray.intLength(length, true)
//
// // this is where we start writing the length data, so that the end of this lines up with the compressed data
// val start = 4 - lengthLength
// OptimizeUtilsByteArray.writeInt(compressOutput, length, true, start)
//
// // now compressOutput contains "uncompressed length + data"
// val compressedArrayLength = lengthLength + compressedLength
//
//
// /////// encrypting data.
// val cryptoKey = connection.cryptoKey()
// val iv = ByteArray(IV_LENGTH_BYTE) // NEVER REUSE THIS IV WITH SAME KEY
// secureRandom.nextBytes(iv)
// val parameterSpec = GCMParameterSpec(TAG_LENGTH_BIT, iv) // 128 bit auth tag length
// try {
// cipher!!.init(Cipher.ENCRYPT_MODE, cryptoKey, parameterSpec)
// } catch (e: Exception) {
// throw IOException("Unable to AES encrypt the data", e)
// }
//
// // we REUSE the writer buffer! (since that data is now compressed in a different array)
// val encryptedLength: Int
// encryptedLength = try {
// cipher!!.doFinal(compressOutput, start, compressedArrayLength, writer.buffer, 0)
// } catch (e: Exception) {
// throw IOException("Unable to AES encrypt the data", e)
// }
//
// // write out our IV
// buffer.writeBytes(iv, 0, IV_LENGTH_BYTE)
// Arrays.fill(iv, 0.toByte()) // overwrite the IV with zeros so we can't leak this value
//
// // have to copy over the orig data, because we used the temp buffer
// buffer.writeBytes(writer.buffer, 0, encryptedLength)
// if (DEBUG) {
// val ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE)
// val crypto = ByteBufUtil.hexDump(writer.buffer, 0, encryptedLength)
// logger.error(OS.LINE_SEPARATOR +
// "IV: (12)" + OS.LINE_SEPARATOR + ivString +
// OS.LINE_SEPARATOR +
// "CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto)
// }
// }
}

View File

@ -35,6 +35,9 @@ import dorkbox.network.ping.PingSerializer
import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.CachedMethod
import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.RmiUtils
import dorkbox.network.rmi.messages.* import dorkbox.network.rmi.messages.*
import dorkbox.objectPool.BoundedPoolObject
import dorkbox.objectPool.ObjectPool
import dorkbox.objectPool.Pool
import dorkbox.os.OS import dorkbox.os.OS
import dorkbox.serializers.* import dorkbox.serializers.*
import kotlinx.atomicfu.AtomicBoolean import kotlinx.atomicfu.AtomicBoolean
@ -58,12 +61,12 @@ import kotlin.coroutines.Continuation
// Observability issues: make sure that we know WHAT connection is causing serialization errors when they occur! // Observability issues: make sure that we know WHAT connection is causing serialization errors when they occur!
// ASYC isues: RMI can timeout when OTHER rmi connections happen! EACH RMI NEEDS TO BE SEPARATE IN THE IO DISPATCHER // ASYC isues: RMI can timeout when OTHER rmi connections happen! EACH RMI NEEDS TO BE SEPARATE IN THE IO DISPATCHER
/** /**
* Threads reading/writing at the same time a single instance of kryo. it is possible to use a single kryo with the use of * Threads reading/writing at the same time a single instance of kryo. it is possible to use a single kryo with the use of
* synchronize, however - that defeats the point of having multi-threaded serialization. * synchronize, however - that defeats the point of having multithreaded serialization.
* *
* Additionally, this serialization manager will register the entire class+interface hierarchy for an object. If you want to specify a * Additionally, this serialization manager will register the entire class+interface hierarchy for an object. If you want to specify a
* serialization scheme for a specific class in an objects hierarchy, you must register that first. * serialization scheme for a specific class in an objects hierarchy, you must register that first.
@ -97,7 +100,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
open class RmiSupport<CONNECTION: Connection> internal constructor( open class RmiSupport<CONNECTION: Connection> internal constructor(
private val initialized: AtomicBoolean, private val initialized: AtomicBoolean,
private val classesToRegister: MutableList<ClassRegistration<CONNECTION>>, private val classesToRegister: MutableList<ClassRegistration>,
private val rmiServerSerializer: RmiServerSerializer<CONNECTION> private val rmiServerSerializer: RmiServerSerializer<CONNECTION>
) { ) {
/** /**
@ -130,14 +133,15 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
} }
private lateinit var logger: KLogger private lateinit var logger: KLogger
private var maxMessageSize: Int = 500_000
private var initialized = atomic(false) private var initialized = atomic(false)
// used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class)
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
// Object checking is performed during actual registration. // Object checking is performed during actual registration.
private val classesToRegister = mutableListOf<ClassRegistration<CONNECTION>>() private val classesToRegister = mutableListOf<ClassRegistration>()
private lateinit var finalClassRegistrations: Array<ClassRegistration<CONNECTION>> private lateinit var finalClassRegistrations: Array<ClassRegistration>
private lateinit var savedRegistrationDetails: ByteArray private lateinit var savedRegistrationDetails: ByteArray
// the purpose of the method cache, is to accelerate looking up methods for specific class // the purpose of the method cache, is to accelerate looking up methods for specific class
@ -286,10 +290,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
/** /**
* Kryo specifically for handshakes * Kryo specifically for handshakes
*/ */
internal fun newHandshakeKryo(): KryoExtra<CONNECTION> { internal fun newHandshakeKryo(kryo: Kryo) {
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
val kryo = KryoExtra<CONNECTION>()
kryo.instantiatorStrategy = instantiatorStrategy kryo.instantiatorStrategy = instantiatorStrategy
kryo.references = references kryo.references = references
@ -299,14 +301,12 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
kryo.register(ByteArray::class.java) kryo.register(ByteArray::class.java)
kryo.register(HandshakeMessage::class.java) kryo.register(HandshakeMessage::class.java)
return kryo
} }
/** /**
* called as the first thing inside when initializing the classesToRegister * called as the first thing inside when initializing the classesToRegister
*/ */
internal fun initGlobalKryo(): KryoExtra<CONNECTION> { private fun newGlobalKryo(kryo: Kryo) {
// NOTE: classesRegistrations.forEach will be called after serialization init! // NOTE: classesRegistrations.forEach will be called after serialization init!
// NOTE: All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. // NOTE: All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
@ -316,9 +316,6 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
// Note that this is very inefficient and should be avoided if possible. // Note that this is very inefficient and should be avoided if possible.
// val javaSerializer = JavaSerializer() // val javaSerializer = JavaSerializer()
val kryo = KryoExtra<CONNECTION>()
kryo.instantiatorStrategy = instantiatorStrategy kryo.instantiatorStrategy = instantiatorStrategy
kryo.references = references kryo.references = references
@ -397,14 +394,6 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
UnmodifiableCollectionsSerializer.registerSerializers(kryo) UnmodifiableCollectionsSerializer.registerSerializers(kryo)
SynchronizedCollectionsSerializer.registerSerializers(kryo) SynchronizedCollectionsSerializer.registerSerializers(kryo)
// TODO: this is for diffie hellmen handshake stuff!
// serialization.register(IESParameters::class.java, IesParametersSerializer())
// serialization.register(IESWithCipherParameters::class.java, IesWithCipherParametersSerializer())
// TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo
// serialization.register(XECPublicKey::class.java, XECPublicKeySerializer())
// serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer())
// serialization.register(Message::class.java) // must use full package name!
// RMI stuff! // RMI stuff!
kryo.register(ConnectionObjectCreateRequest::class.java) kryo.register(ConnectionObjectCreateRequest::class.java)
kryo.register(ConnectionObjectCreateResponse::class.java) kryo.register(ConnectionObjectCreateResponse::class.java)
@ -437,8 +426,6 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
kryo.register(Reserved7::class.java) kryo.register(Reserved7::class.java)
kryo.register(Reserved8::class.java) kryo.register(Reserved8::class.java)
kryo.register(Reserved9::class.java) kryo.register(Reserved9::class.java)
return kryo
} }
/** /**
@ -446,8 +433,9 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
* *
* This is to prevent (and recognize) out-of-order class/serializer registration. If an ID is already in use by a different type, an exception is thrown. * This is to prevent (and recognize) out-of-order class/serializer registration. If an ID is already in use by a different type, an exception is thrown.
*/ */
internal fun finishInit(type: Class<*>, kryo: KryoExtra<CONNECTION>) { internal fun finishInit(type: Class<*>, maxMessageSize: Int) {
logger = KotlinLogging.logger(type.simpleName) logger = KotlinLogging.logger(type.simpleName)
this.maxMessageSize = maxMessageSize
val firstInitialization = initialized.compareAndSet(expect = false, update = true) val firstInitialization = initialized.compareAndSet(expect = false, update = true)
@ -456,6 +444,9 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
throw IllegalArgumentException("Unable to initialize object serialization more than once!") throw IllegalArgumentException("Unable to initialize object serialization more than once!")
} }
val kryo = KryoWriter<CONNECTION>(maxMessageSize)
newGlobalKryo(kryo)
initializeRegistrations(kryo, classesToRegister) initializeRegistrations(kryo, classesToRegister)
classesToRegister.clear() // don't need to keep a reference, since this can never be reinitialized. classesToRegister.clear() // don't need to keep a reference, since this can never be reinitialized.
@ -484,30 +475,27 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
* *
* @return true if initialization was successful, false otherwise. DOES NOT CATCH EXCEPTIONS EXTERNALLY * @return true if initialization was successful, false otherwise. DOES NOT CATCH EXCEPTIONS EXTERNALLY
*/ */
internal fun finishClientConnect(kryoRegistrationDetailsFromServer: ByteArray): KryoExtra<CONNECTION>? { internal fun finishClientConnect(kryoRegistrationDetailsFromServer: ByteArray, maxMessageSize: Int) {
val readKryo = KryoReader<CONNECTION>(maxMessageSize)
val writeKryo = KryoWriter<CONNECTION>(maxMessageSize)
newGlobalKryo(readKryo)
newGlobalKryo(writeKryo)
// we self initialize our registrations, THEN we compare them to the server. // we self initialize our registrations, THEN we compare them to the server.
val kryo = initGlobalKryo() val newRegistrations = initializeRegistrationsForClient(kryoRegistrationDetailsFromServer, classesToRegister, readKryo)
?: throw Exception("Unable to initialize class registration information from the server")
val newRegistrations = initializeRegistrationsForClient(kryoRegistrationDetailsFromServer, classesToRegister) ?: return null initializeRegistrations(writeKryo, newRegistrations)
try { // NOTE: we MUST be super careful to never modify `classesToRegister`!!
initializeRegistrations(kryo, newRegistrations) // NOTE: DO NOT CLEAR THIS WITH CLIENTS, THEY HAVE TO REBUILD EVERY TIME WITH A NEW CONNECTION!
// classesToRegister.clear() // don't need to keep a reference, since this can never be reinitialized.
// NOTE: we MUST be super careful to never modify `classesToRegister`!! if (logger.isTraceEnabled) {
// NOTE: DO NOT CLEAR THIS WITH CLIENTS, THEY HAVE TO REBUILD EVERY TIME WITH A NEW CONNECTION! // log the in-order output first
// classesToRegister.clear() // don't need to keep a reference, since this can never be reinitialized. finalClassRegistrations.forEach { classRegistration ->
logger.trace(classRegistration.info)
if (logger.isTraceEnabled) {
// log the in-order output first
finalClassRegistrations.forEach { classRegistration ->
logger.trace(classRegistration.info)
}
} }
return kryo
} catch (e: Exception) {
logger.error(e) { "Unable to correctly register classes for serialization during client connection!" }
return null
} }
} }
@ -515,8 +503,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
/** /**
* @throws IllegalArgumentException if there is are too many RMI methods OR if a problem setting up the registration details * @throws IllegalArgumentException if there is are too many RMI methods OR if a problem setting up the registration details
*/ */
private fun initializeRegistrations(kryo: KryoExtra<CONNECTION>, classesToRegister: List<ClassRegistration<CONNECTION>>) { private fun initializeRegistrations(kryo: KryoWriter<CONNECTION>, classesToRegister: List<ClassRegistration>) {
val mergedRegistrations = mergeClassRegistrations(kryo, classesToRegister) val mergedRegistrations = mergeClassRegistrations(classesToRegister, kryo)
// make sure our RMI cached methods have been initialized // make sure our RMI cached methods have been initialized
initializeRmiMethodCache(mergedRegistrations, kryo) initializeRmiMethodCache(mergedRegistrations, kryo)
@ -531,7 +519,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
/** /**
* Merge all the registrations (since we can have registrations overwrite newer/specific registrations based on ID) * Merge all the registrations (since we can have registrations overwrite newer/specific registrations based on ID)
*/ */
private fun mergeClassRegistrations(kryo: KryoExtra<CONNECTION>, classesToRegister: List<ClassRegistration<CONNECTION>>): List<ClassRegistration<CONNECTION>> { private fun mergeClassRegistrations(classesToRegister: List<ClassRegistration>, kryo: Kryo): List<ClassRegistration> {
// check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer) // check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer)
// note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again // note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again
@ -540,7 +528,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
} }
// in order to get the ID's, these have to be registered with a kryo instance! // in order to get the ID's, these have to be registered with a kryo instance!
val mergedRegistrations = mutableListOf<ClassRegistration<CONNECTION>>() val mergedRegistrations = mutableListOf<ClassRegistration>()
classesToRegister.forEach { registration -> classesToRegister.forEach { registration ->
val id = registration.id val id = registration.id
@ -580,13 +568,13 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
* *
* @throws IllegalArgumentException if there are too many RMI methods * @throws IllegalArgumentException if there are too many RMI methods
*/ */
private fun initializeRmiMethodCache(classesToRegister: List<ClassRegistration<CONNECTION>>, kryo: KryoExtra<CONNECTION>) { private fun initializeRmiMethodCache(classesToRegister: List<ClassRegistration>, kryo: Kryo) {
classesToRegister.forEach { classRegistration -> classesToRegister.forEach { classRegistration ->
// we should cache RMI methods! We don't always know if something is RMI or not (from just how things are registered...) // we should cache RMI methods! We don't always know if something is RMI or not (from just how things are registered...)
// so it is super trivial to map out all possible, relevant types // so it is super trivial to map out all possible, relevant types
val kryoId = classRegistration.id val kryoId = classRegistration.id
if (classRegistration is ClassRegistrationForRmi) { if (classRegistration is ClassRegistrationForRmi<*>) {
// on the "RMI server" (aka, where the object lives) side, there will be an interface + implementation! // on the "RMI server" (aka, where the object lives) side, there will be an interface + implementation!
val implClass = classRegistration.implClass val implClass = classRegistration.implClass
@ -621,7 +609,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
/** /**
* @throws IllegalArgumentException if there is a problem setting up the registration details * @throws IllegalArgumentException if there is a problem setting up the registration details
*/ */
private fun createRegistrationDetails(classesToRegister: List<ClassRegistration<CONNECTION>>, kryo: KryoExtra<CONNECTION>): ByteArray { private fun createRegistrationDetails(classesToRegister: List<ClassRegistration>, kryo: KryoWriter<CONNECTION>): ByteArray {
// now create the registration details, used to validate that the client/server have the EXACT same class registration setup // now create the registration details, used to validate that the client/server have the EXACT same class registration setup
val registrationDetails = arrayListOf<Array<Any>>() val registrationDetails = arrayListOf<Array<Any>>()
@ -651,24 +639,22 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
*/ */
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
private fun initializeRegistrationsForClient( private fun initializeRegistrationsForClient(
kryoRegistrationDetailsFromServer: ByteArray, kryoRegistrationDetailsFromServer: ByteArray, classesToRegister: List<ClassRegistration>, kryo: KryoReader<CONNECTION>
classesToRegister: List<ClassRegistration<CONNECTION>> ): MutableList<ClassRegistration>? {
): MutableList<ClassRegistration<CONNECTION>>? {
// we have to allow CUSTOM classes to register (where the order does not matter), so that if the CLIENT is the RMI-SERVER, it can // we have to allow CUSTOM classes to register (where the order does not matter), so that if the CLIENT is the RMI-SERVER, it can
// specify IMPL classes for RMI. // specify IMPL classes for RMI.
classesToRegister.forEach { registration -> classesToRegister.forEach { registration ->
require(registration is ClassRegistrationForRmi) { "Unable to register a *class* by itself. This is only permitted on the CLIENT for RMI. " + require(registration is ClassRegistrationForRmi<*>) { "Unable to register a *class* by itself. This is only permitted on the CLIENT for RMI. " +
"To fix this, remove xx.register(${registration.clazz.name})" } "To fix this, remove xx.register(${registration.clazz.name})" }
} }
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
val classesToRegisterForRmi = listOf(*classesToRegister.toTypedArray()) as List<ClassRegistrationForRmi<CONNECTION>> val classesToRegisterForRmi = listOf(*classesToRegister.toTypedArray()) as List<ClassRegistrationForRmi<CONNECTION>>
val kryo = initGlobalKryo()
val input = AeronInput(kryoRegistrationDetailsFromServer) val input = AeronInput(kryoRegistrationDetailsFromServer)
val clientClassRegistrations = kryo.read(input) as Array<Array<Any>> val clientClassRegistrations = kryo.read(input) as Array<Array<Any>>
val newRegistrations = mutableListOf<ClassRegistration<CONNECTION>>() val newRegistrations = mutableListOf<ClassRegistration>()
val maker = kryo.instantiatorStrategy val maker = kryo.instantiatorStrategy
val rmiSerializer = rmiServerSerializer val rmiSerializer = rmiServerSerializer
@ -754,12 +740,48 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
return newRegistrations return newRegistrations
} }
private val writeKryos: Pool<KryoWriter<CONNECTION>> = ObjectPool.nonBlockingBounded(
poolObject = object : BoundedPoolObject<KryoWriter<CONNECTION>>() {
override fun newInstance(): KryoWriter<CONNECTION> {
return newWriteKryo(maxMessageSize)
}
},
maxSize = OS.optimumNumberOfThreads * 2
)
fun getWriteKryo(): KryoWriter<CONNECTION> {
return writeKryos.take()
}
fun returnWriteKryo(kryo: KryoWriter<CONNECTION>) {
writeKryos.put(kryo)
}
/** /**
* NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network! * NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
* *
* @return takes a kryo instance from the pool, or creates one if the pool was empty * @return takes a kryo instance from the pool, or creates one if the pool was empty
*/ */
fun initKryo(kryo: KryoExtra<CONNECTION> = initGlobalKryo()): KryoExtra<CONNECTION> { fun newReadKryo(maxMessageSize: Int): KryoReader<CONNECTION> {
val kryo = KryoReader<CONNECTION>(maxMessageSize)
newGlobalKryo(kryo)
// the final list of all registrations in the EndPoint. This cannot change for the serer.
finalClassRegistrations.forEach { registration ->
registration.register(kryo, rmiHolder)
}
return kryo
}
/**
* NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
*
* @return takes a kryo instance from the pool, or creates one if the pool was empty
*/
fun newWriteKryo(maxMessageSize: Int): KryoWriter<CONNECTION> {
val kryo = KryoWriter<CONNECTION>(maxMessageSize)
newGlobalKryo(kryo)
// the final list of all registrations in the EndPoint. This cannot change for the serer. // the final list of all registrations in the EndPoint. This cannot change for the serer.
finalClassRegistrations.forEach { registration -> finalClassRegistrations.forEach { registration ->
registration.register(kryo, rmiHolder) registration.register(kryo, rmiHolder)