Cleaned up how kryo's are used

Changed idleStrategy
StreamingManager no longer copies bytes (it just uses a pooled kryo instance)
This commit is contained in:
Robinson 2023-07-21 00:19:31 +02:00
parent 7ed474111a
commit 2620a06409
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
6 changed files with 103 additions and 105 deletions

View File

@ -25,9 +25,7 @@ import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.getAndUpdate
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import org.agrona.DirectBuffer
import javax.crypto.SecretKey
@ -58,7 +56,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
* There can be concurrent writes to the network stack, at most 1 per connection. Each connection has its own logic on the remote endpoint,
* and can have its own back-pressure.
*/
internal val sendIdleStrategy = endPoint.config.sendIdleStrategy.cloneToNormal()
internal val sendIdleStrategy = endPoint.config.sendIdleStrategy.clone()
/**
* This is the client UUID. This is useful determine if the same client is connecting multiple times to a server (instead of only using IP address)
@ -184,22 +182,17 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
return image.poll(messageHandler, 1)
}
/**
* Safely sends objects to a destination, if `abortEarly` is true, there are no retries if sending the message fails.
*
* @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
*/
internal suspend fun send(message: Any, abortEarly: Boolean): Boolean {
var success = false
// this is dispatched to the IO context!! (since network calls are IO/blocking calls)
withContext(Dispatchers.IO) {
// The handshake sessionId IS NOT globally unique
logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" }
success = endPoint.write(message, publication, sendIdleStrategy, this@Connection, abortEarly)
}
return success
// The handshake sessionId IS NOT globally unique
logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" }
return endPoint.write(message, publication, sendIdleStrategy, this@Connection, abortEarly)
}
/**

View File

@ -23,6 +23,7 @@ import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.BacklogStat
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.aeron.EventPoller
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connection.streaming.StreamingControl
@ -51,7 +52,6 @@ import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.MutableDirectBuffer
import org.agrona.concurrent.IdleStrategy
import java.util.concurrent.*
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
@ -98,9 +98,13 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// the first byte manage: byte/message/stream/etc, no-crypt, crypt, crypt+compress
const val RAWBYTES = (1 shl 1).toByte()
const val ENCRYPTD = (1 shl 2).toByte()
const val COMPRESS = (1 shl 3).toByte()
const val kryo = 0.toByte()
const val byteArray = 1.toByte()
const val file = 2.toByte()
const val stream = 3.toByte()
const val ENCRYPTD = (1 shl 6).toByte()
const val COMPRESS = (1 shl 7).toByte()
}
val logger: KLogger = KotlinLogging.logger(loggerName)
@ -132,6 +136,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* is (in reality) only limited by available ram.
*/
internal val maxMessageSize = config.networkMtuSize - DataHeaderFlyweight.HEADER_LENGTH
// internal val maxMessageSize = FrameDescriptor.computeMaxMessageLength(config.publicationTermBufferLength);
/**
* Read and Write can be concurrent (different buffers are used)
@ -477,48 +482,49 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
open fun write(
open suspend fun write(
message: Any,
publication: Publication,
sendIdleStrategy: IdleStrategy,
sendIdleStrategy: CoroutineIdleStrategy,
connection: Connection,
abortEarly: Boolean
): Boolean {
// NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
@Suppress("UNCHECKED_CAST")
connection as CONNECTION
// we reset the sending timeout strategy when a message was successfully sent.
// prep for idle states
sendIdleStrategy.reset()
val kryo = serialization.getWriteKryo()
// A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
return try {
serialization.withKryo {
// since ANY thread can call 'send', we have to take kryo instances in a safe way
// the maximum size that this buffer can be is:
// ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824
val buffer = this.write(connection, message)
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
try {
// since ANY thread can call 'send', we have to take kryo instances in a safe way
// the maximum size that this buffer can be is:
// ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH = 1073741824
val buffer = kryo.write(connection, message)
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
val bufferClaim = kryo.bufferClaim
// one small problem! What if the message is too big to send all at once?
// The maximum size we can send in a "single fragment" is the maxPayloadLength() function, which is the MTU length less header (with defaults this is 1,376 bytes).
return if (objectSize >= maxMessageSize) {
// we must split up the message! It's too large for Aeron to manage.
streamingManager.send(
publication = publication,
internalBuffer = internalBuffer,
objectSize = objectSize,
maxMessageSize = maxMessageSize,
endPoint = this,
kryo = kryo,
sendIdleStrategy = sendIdleStrategy,
connection = connection
)
} else {
dataSend(publication, internalBuffer, bufferClaim, 0, objectSize, sendIdleStrategy, connection, abortEarly)
// one small problem! What if the message is too big to send all at once?
// The maximum size we can send in a "single fragment" is the maxPayloadLength() function, which is the MTU length less header (with defaults this is 1,376 bytes).
if (objectSize >= maxMessageSize) {
serialization.withKryo {
// we must split up the message! It's too large for Aeron to manage.
streamingManager.send(
publication = publication,
originalBuffer = internalBuffer,
objectSize = objectSize,
maxMessageSize = maxMessageSize,
endPoint = this@EndPoint,
kryo = this, // this is safe, because we save out the bytes from the original object!
sendIdleStrategy = sendIdleStrategy,
connection = connection
)
}
} else {
dataSend(publication, internalBuffer, bufferClaim, 0, objectSize, sendIdleStrategy, connection, abortEarly)
}
}
} catch (e: Throwable) {
// make sure we atomically create the listener manager, if necessary
@ -534,9 +540,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
listenerManager.notifyError(connection, newException)
}
return false
} finally {
serialization.returnWriteKryo(kryo)
false
}
}
@ -547,7 +551,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
open fun writeUnsafe(message: Any, publication: Publication, sendIdleStrategy: IdleStrategy, connection: CONNECTION, kryo: KryoWriter<CONNECTION>): Boolean {
open suspend fun writeUnsafe(message: Any, publication: Publication, sendIdleStrategy: CoroutineIdleStrategy, connection: CONNECTION, kryo: KryoWriter<CONNECTION>): Boolean {
// NOTE: A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
// since ANY thread can call 'send', we have to take kryo instances in a safe way
@ -703,13 +707,13 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* @param connection the connection object
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
internal fun dataSend(
internal suspend fun dataSend(
publication: Publication,
internalBuffer: MutableDirectBuffer,
bufferClaim: BufferClaim,
offset: Int,
objectSize: Int,
sendIdleStrategy: IdleStrategy,
sendIdleStrategy: CoroutineIdleStrategy,
connection: Connection,
abortEarly: Boolean
): Boolean {

View File

@ -23,6 +23,7 @@ import dorkbox.bytes.OptimizeUtilsByteArray
import dorkbox.bytes.OptimizeUtilsByteBuf
import dorkbox.collections.LockFreeHashMap
import dorkbox.network.Configuration
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.connection.Connection
import dorkbox.network.connection.CryptoManagement
import dorkbox.network.connection.EndPoint
@ -38,9 +39,7 @@ import io.aeron.Publication
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KLogger
import org.agrona.ExpandableDirectByteBuffer
import org.agrona.MutableDirectBuffer
import org.agrona.concurrent.IdleStrategy
import org.agrona.concurrent.UnsafeBuffer
import java.io.File
import java.io.FileInputStream
@ -336,12 +335,12 @@ internal class StreamingManager<CONNECTION : Connection>(
}
}
private fun sendFailMessageAndThrow(
private suspend fun sendFailMessageAndThrow(
e: Exception,
streamSessionId: Int,
publication: Publication,
endPoint: EndPoint<CONNECTION>,
sendIdleStrategy: IdleStrategy,
sendIdleStrategy: CoroutineIdleStrategy,
connection: CONNECTION,
kryo: KryoWriter<CONNECTION>
) {
@ -375,27 +374,20 @@ internal class StreamingManager<CONNECTION : Connection>(
* We don't write max possible length per message, we write out MTU (payload) length (so aeron doesn't fragment the message).
* The max possible length is WAY, WAY more than the max payload length.
*
* @param internalBuffer this is the ORIGINAL object data that is to be blocks sent across the wire
* @param originalBuffer this is the ORIGINAL object data that is to be blocks sent across the wire
*
* @return true if ALL the message blocks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
*/
fun send(
suspend fun send(
publication: Publication,
internalBuffer: MutableDirectBuffer,
originalBuffer: MutableDirectBuffer,
maxMessageSize: Int,
objectSize: Int,
endPoint: EndPoint<CONNECTION>,
kryo: KryoWriter<CONNECTION>,
sendIdleStrategy: IdleStrategy,
sendIdleStrategy: CoroutineIdleStrategy,
connection: CONNECTION
): Boolean {
// this buffer is the exact size as our internal buffer, so it is unnecessary to have multiple kryo instances
val originalObjectBuffer = ExpandableDirectByteBuffer(objectSize) // this can grow, so it's fine to lock it to this size!
// we have to save out our internal buffer, so we can reuse the kryo instance later!
originalObjectBuffer.putBytes(0, internalBuffer, 0, objectSize)
// NOTE: our max object size for IN-MEMORY messages is an INT. For file transfer it's a LONG (so everything here is cast to a long)
var remainingPayload = objectSize
var payloadSent = 0
@ -455,7 +447,7 @@ internal class StreamingManager<CONNECTION : Connection>(
val varIntSize = blockBuffer.writeVarInt(sizeOfBlockData, true)
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
originalObjectBuffer.getBytes(0, blockBuffer.internalBuffer, headerSize + varIntSize, sizeOfBlockData)
originalBuffer.getBytes(0, blockBuffer.internalBuffer, headerSize + varIntSize, sizeOfBlockData)
remainingPayload -= sizeOfBlockData
payloadSent += sizeOfBlockData
@ -495,7 +487,7 @@ internal class StreamingManager<CONNECTION : Connection>(
// now send the block as fast as possible. Aeron will have us back-off if we send too quickly
while (remainingPayload > 0) {
val amountToSend = if (remainingPayload < sizeOfBlockData) {
remainingPayload.toInt()
remainingPayload
} else {
sizeOfBlockData
}
@ -517,10 +509,10 @@ internal class StreamingManager<CONNECTION : Connection>(
val writeIndex = payloadSent - headerSize - varIntSize
// write out our header data (this will OVERWRITE previous data!)
originalObjectBuffer.putBytes(writeIndex, header)
originalBuffer.putBytes(writeIndex, header)
// write out the payload size using optimized data structures.
writeVarInt(originalObjectBuffer, writeIndex + headerSize, amountToSend, true)
writeVarInt(originalBuffer, writeIndex + headerSize, amountToSend, true)
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
val reusedPayloadSize = headerSize + varIntSize + amountToSend
@ -528,7 +520,7 @@ internal class StreamingManager<CONNECTION : Connection>(
// write out the payload
endPoint.dataSend(
publication = publication,
internalBuffer = originalObjectBuffer,
internalBuffer = originalBuffer,
bufferClaim = kryo.bufferClaim,
offset = writeIndex,
objectSize = reusedPayloadSize,
@ -580,12 +572,12 @@ internal class StreamingManager<CONNECTION : Connection>(
*
* @return true if ALL the message blocks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
*/
fun sendFile(
suspend fun sendFile(
file: File,
publication: Publication,
endPoint: EndPoint<CONNECTION>,
kryo: KryoWriter<CONNECTION>,
sendIdleStrategy: IdleStrategy,
sendIdleStrategy: CoroutineIdleStrategy,
connection: CONNECTION,
streamSessionId: Int
): Boolean {

View File

@ -130,7 +130,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
// can throw an exception! We catch it in the calling class
val publication = aeronDriver.addExclusivePublication(publicationUri, streamIdPub, logInfo, true)
val publication = aeronDriver.addPublication(publicationUri, streamIdPub, logInfo, true)
// can throw an exception! We catch it in the calling class
// we actually have to wait for it to connect before we continue
@ -179,7 +179,7 @@ internal class ClientConnectionDriver(val connectionInfo: PubSub) {
// publication of any state to other threads and not be long running or re-entrant with the client.
// can throw an exception! We catch it in the calling class
val publication = aeronDriver.addExclusivePublication(publicationUri, streamIdPub, logInfo, false)
val publication = aeronDriver.addPublication(publicationUri, streamIdPub, logInfo, false)
// can throw an exception! We catch it in the calling class
// we actually have to wait for it to connect before we continue

View File

@ -24,6 +24,7 @@ import dorkbox.network.connection.Connection
import dorkbox.network.connection.CryptoManagement
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.streaming.StreamingManager
import kotlinx.coroutines.runBlocking
import java.io.File
internal class FileContentsSerializer<CONNECTION : Connection> : Serializer<File>() {
@ -46,21 +47,21 @@ internal class FileContentsSerializer<CONNECTION : Connection> : Serializer<File
val streamSessionId = CryptoManagement.secureRandom.nextInt()
// use the streaming manager to send the file in blocks to the remove endpoint
val streamingKryo = endPoint.serialization.getWriteKryo()
try {
streamingManager.sendFile(
file = file,
publication = publication,
endPoint = endPoint,
kryo = streamingKryo,
sendIdleStrategy = sendIdleStrategy,
connection = connection,
streamSessionId = streamSessionId
)
} finally {
endPoint.serialization.returnWriteKryo(streamingKryo)
runBlocking {
endPoint.serialization.withKryo {
streamingManager.sendFile(
file = file,
publication = publication,
endPoint = endPoint,
kryo = this,
sendIdleStrategy = sendIdleStrategy,
connection = connection,
streamSessionId = streamSessionId
)
}
}
output.writeString(file.path)
output.writeInt(streamSessionId, true)
}

View File

@ -133,8 +133,21 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
}
private lateinit var logger: KLogger
@Volatile
private var maxMessageSize: Int = 500_000
private val writeKryos: Pool<KryoWriter<CONNECTION>> = ObjectPool.nonBlockingBounded(
poolObject = object : BoundedPoolObject<KryoWriter<CONNECTION>>() {
override fun newInstance(): KryoWriter<CONNECTION> {
logger.debug { "Creating new Kryo($maxMessageSize)" }
return newWriteKryo(maxMessageSize)
}
},
maxSize = OS.optimumNumberOfThreads * 2
)
private var initialized = atomic(false)
// used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class)
@ -165,6 +178,9 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
internal val fileContentsSerializer = FileContentsSerializer<CONNECTION>()
/**
* There is additional overhead to using RMI.
*
@ -742,21 +758,13 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
return newRegistrations
}
private val writeKryos: Pool<KryoWriter<CONNECTION>> = ObjectPool.nonBlockingBounded(
poolObject = object : BoundedPoolObject<KryoWriter<CONNECTION>>() {
override fun newInstance(): KryoWriter<CONNECTION> {
return newWriteKryo(maxMessageSize)
}
},
maxSize = OS.optimumNumberOfThreads * 2
)
fun getWriteKryo(): KryoWriter<CONNECTION> {
return writeKryos.take()
}
fun returnWriteKryo(kryo: KryoWriter<CONNECTION>) {
writeKryos.put(kryo)
internal inline fun <T> withKryo(kryoAccess: KryoWriter<CONNECTION>.() -> T): T {
val kryo = writeKryos.take()
try {
return kryoAccess(kryo)
} finally {
writeKryos.put(kryo)
}
}
/**