Removed streaming kryo and temp kryo

This commit is contained in:
Robinson 2023-06-28 20:31:08 +02:00
parent ecbbd55ff6
commit 510fc85f2c
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
4 changed files with 20 additions and 22 deletions

View File

@ -196,6 +196,7 @@ open class Client<CONNECTION : Connection>(
var connectionTimeoutSec: Int = 0
private set
private val handshake = ClientHandshake(this, logger)
@Volatile
@ -720,7 +721,6 @@ open class Client<CONNECTION : Connection>(
// every time we connect to a server, we have to reconfigure AND reassign the readKryos.
readKryo = kryoConfiguredFromServer
streamingReadKryo = serialization.initKryo()
///////////////

View File

@ -57,7 +57,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private val sendIdleStrategy: IdleStrategy
private val writeKryo: KryoExtra<Connection>
private val tempWriteKryo: KryoExtra<Connection>
/**
* This is the client UUID. This is useful determine if the same client is connecting multiple times to a server (instead of only using IP address)
@ -140,9 +139,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
init {
@Suppress("UNCHECKED_CAST")
writeKryo = endPoint.serialization.initKryo() as KryoExtra<Connection>
@Suppress("UNCHECKED_CAST")
tempWriteKryo = endPoint.serialization.initKryo() as KryoExtra<Connection>
sendIdleStrategy = endPoint.config.sendIdleStrategy.cloneToNormal()
@ -224,7 +220,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
try {
// The handshake sessionId IS NOT globally unique
logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" }
val write = endPoint.write(writeKryo, tempWriteKryo, message, publication, sendIdleStrategy, this@Connection, abortEarly)
val write = endPoint.write(writeKryo, message, publication, sendIdleStrategy, this@Connection, abortEarly)
write
} catch (e: Throwable) {
// make sure we atomically create the listener manager, if necessary

View File

@ -149,7 +149,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// The readKryo WILL RE-CONFIGURED during the client handshake! (it is all the same thread, so object visibility is not a problem)
@Volatile
internal var readKryo: KryoExtra<CONNECTION>
internal var streamingReadKryo: KryoExtra<CONNECTION>
internal val handshaker: Handshaker<CONNECTION>
@ -212,11 +211,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// the initial kryo created for serialization is reused as the read kryo
if (type == Server::class.java) {
readKryo = serialization.initKryo(kryo)
streamingReadKryo = serialization.initKryo()
} else {
// these will be reassigned by the client Connect method!
readKryo = kryo
streamingReadKryo = kryo
}
// we have to be able to specify the property store
@ -468,7 +465,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*/
open fun write(
writeKryo: KryoExtra<Connection>,
tempWriteKryo: KryoExtra<Connection>,
message: Any,
publication: Publication,
sendIdleStrategy: IdleStrategy,
@ -496,7 +492,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
internalBuffer = internalBuffer,
objectSize = objectSize,
endPoint = this,
tempWriteKryo = tempWriteKryo,
kryo = writeKryo,
sendIdleStrategy = sendIdleStrategy,
connection = connection
)
@ -582,7 +578,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped chunks of data.
is StreamingControl -> {
// TODO: this can be the streaming kryo???
streamingManager.processControlMessage(message, readKryo,this@EndPoint, connection)
}
is StreamingData -> {
@ -651,6 +646,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
/**
* NOTE: we use exclusive publications, and they are not thread safe/concurrent!
* NOTE: This cannot be on a coroutine, because our kryo instances are NOT threadsafe!
*
* the actual bits that send data on the network.
*

View File

@ -330,10 +330,16 @@ internal class StreamingManager<CONNECTION : Connection>(
internalBuffer: MutableDirectBuffer,
objectSize: Int,
endPoint: EndPoint<CONNECTION>,
tempWriteKryo: KryoExtra<Connection>,
kryo: KryoExtra<Connection>,
sendIdleStrategy: IdleStrategy,
connection: Connection
): Boolean {
// this buffer is the exact size as our internal buffer, so it is unnecessary to have multiple kryo instances
val originalBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size!
// we have to save out our internal buffer, so we can reuse the kryo instance!
originalBuffer.putBytes(0, internalBuffer, 0, objectSize)
// NOTE: our max object size for IN-MEMORY messages is an INT. For file transfer it's a LONG (so everything here is cast to a long)
var remainingPayload = objectSize
@ -345,7 +351,7 @@ internal class StreamingManager<CONNECTION : Connection>(
// tell the other side how much data we are sending
val startMessage = StreamingControl(StreamingState.START, streamSessionId, objectSize.toLong())
val startSent = endPoint.writeUnsafe(tempWriteKryo, startMessage, publication, sendIdleStrategy, connection)
val startSent = endPoint.writeUnsafe(kryo, startMessage, publication, sendIdleStrategy, connection)
if (!startSent) {
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error starting streaming content."
@ -377,7 +383,7 @@ internal class StreamingManager<CONNECTION : Connection>(
val headerSize: Int
try {
val objectBuffer = tempWriteKryo.write(connection, chunkData)
val objectBuffer = kryo.write(connection, chunkData)
headerSize = objectBuffer.position()
header = ByteArray(headerSize)
@ -397,7 +403,7 @@ internal class StreamingManager<CONNECTION : Connection>(
val varIntSize = chunkBuffer.writeVarInt(sizeOfPayload, true)
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload)
originalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, sizeOfPayload)
remainingPayload -= sizeOfPayload
payloadSent += sizeOfPayload
@ -426,7 +432,7 @@ internal class StreamingManager<CONNECTION : Connection>(
throw exception
}
} catch (e: Exception) {
sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, tempWriteKryo, sendIdleStrategy, connection)
sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, kryo, sendIdleStrategy, connection)
return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy.
}
@ -453,15 +459,15 @@ internal class StreamingManager<CONNECTION : Connection>(
val writeIndex = payloadSent - headerSize - varIntSize
// write out our header data (this will OVERWRITE previous data!)
internalBuffer.putBytes(writeIndex, header)
originalBuffer.putBytes(writeIndex, header)
// write out the payload size using optimized data structures.
writeVarInt(internalBuffer, writeIndex + headerSize, sizeOfPayload, true)
writeVarInt(originalBuffer, writeIndex + headerSize, sizeOfPayload, true)
// write out the payload
endPoint.dataSend(
publication,
internalBuffer,
originalBuffer,
writeIndex,
headerSize + varIntSize + amountToSend,
sendIdleStrategy,
@ -473,7 +479,7 @@ internal class StreamingManager<CONNECTION : Connection>(
} catch (e: Exception) {
val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId)
val failSent = endPoint.writeUnsafe(tempWriteKryo, failMessage, publication, sendIdleStrategy, connection)
val failSent = endPoint.writeUnsafe(kryo, failMessage, publication, sendIdleStrategy, connection)
if (!failSent) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
@ -497,6 +503,6 @@ internal class StreamingManager<CONNECTION : Connection>(
// send the last chunk of data
val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong())
return endPoint.writeUnsafe(tempWriteKryo, finishedMessage, publication, sendIdleStrategy, connection)
return endPoint.writeUnsafe(kryo, finishedMessage, publication, sendIdleStrategy, connection)
}
}