Added a streaming protocol (currently in-memory only) for sending data that large, or generally exceeds aeron's max transmit payload size

This commit is contained in:
Robinson 2022-04-04 14:46:22 +02:00
parent c2ea674989
commit 22e7acab46
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
11 changed files with 803 additions and 137 deletions

View File

@ -21,6 +21,9 @@ import
@ -48,6 +51,7 @@ import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.MutableDirectBuffer
import org.agrona.concurrent.IdleStrategy
fun CoroutineScope.eventLoop(block: suspend CoroutineScope.() -> Unit): Job {
@ -152,6 +156,8 @@ internal constructor(val type: Class<*>,
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger)
internal val rmiConnectionSupport: RmiManagerConnections<CONNECTION>
internal val streamingManager = StreamingManager<CONNECTION>(logger, actionDispatch)
internal val pingManager = PingManager<CONNECTION>()
init {
@ -451,14 +457,86 @@ internal constructor(val type: Class<*>,
val message = serialization.readMessage(buffer, offset, length, connection)
logger.trace { "[${header.sessionId()}] received: $message" }
// the REPEATED usage of wrapping methods below is because Streaming messages have to intercept date BEFORE it goes to a coroutine
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
try {
processMessage(message, connection)
} catch (e: Exception) {
logger.error("Error processing message", e)
listenerManager.notifyError(connection, e)
when (message) {
is Ping -> {
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
try {
pingManager.manage(connection, responseManager, message, logger)
} catch (e: Exception) {
logger.error("Error processing message", e)
listenerManager.notifyError(connection, e)
// small problem... If we expect IN ORDER messages (ie: setting a value, then later reading the value), multiple threads don't work.
// this is worked around by having RMI always return (unless async), even with a null value, so the CALLING side of RMI will always
// go in "lock step"
is RmiMessage -> {
// if we are an RMI message/registration, we have very specific, defined behavior.
// We do not use the "normal" listener callback pattern because this requires special functionality
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
try {
rmiGlobalSupport.processMessage(serialization, connection, message, rmiConnectionSupport, responseManager, logger)
} catch (e: Exception) {
logger.error("Error processing message", e)
listenerManager.notifyError(connection, e)
// streaming/chunked message. This is used when the published data is too large for a single Aeron message.
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped chunks of data.
is StreamingControl -> {
streamingManager.processControlMessage(message, this@EndPoint, connection)
is StreamingData -> {
// NOTE: this will read extra data from the kryo input as necessary (which is why it's not on action dispatch)!
val rawInput = serialization.readRaw()
val dataLength = rawInput.readVarInt(true)
message.payload = rawInput.readBytes(dataLength)
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
try {
streamingManager.processDataMessage(message, this@EndPoint, connection)
} catch (e: Exception) {
logger.error("Error processing StreamingMessage", e)
listenerManager.notifyError(connection, e)
is Any -> {
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
try {
var hasListeners = listenerManager.notifyOnMessage(connection, message)
// each connection registers, and is polled INDEPENDENTLY for messages.
hasListeners = hasListeners or connection.notifyOnMessage(message)
if (!hasListeners) {
logger.error("No message callbacks found for ${}")
} catch (e: Exception) {
logger.error("Error processing message ${}", e)
listenerManager.notifyError(connection, e)
else -> {
logger.error("Unknown message received!!")
} catch (e: Exception) {
@ -468,48 +546,6 @@ internal constructor(val type: Class<*>,
* Actually process the message.
private suspend fun processMessage(message: Any?, connection: CONNECTION) {
when (message) {
is Ping -> {
pingManager.manage(connection, responseManager, message, logger)
// small problem... If we expect IN ORDER messages (ie: setting a value, then later reading the value), multiple threads don't work.
// this is worked around by having RMI always return (unless async), even with a null value, so the CALLING side of RMI will always
// go in "lock step"
is RmiMessage -> {
// if we are an RMI message/registration, we have very specific, defined behavior.
// We do not use the "normal" listener callback pattern because this requires special functionality
rmiGlobalSupport.manage(serialization, connection, message, rmiConnectionSupport, responseManager, logger)
is Any -> {
try {
var hasListeners = listenerManager.notifyOnMessage(connection, message)
// each connection registers, and is polled INDEPENDENTLY for messages.
hasListeners = hasListeners or connection.notifyOnMessage(message)
if (!hasListeners) {
logger.error("No message callbacks found for ${}")
} catch (e: Exception) {
logger.error("Error processing message", e)
listenerManager.notifyError(connection, e)
else -> {
logger.error("Unknown message received!!")
* NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine!
@ -531,62 +567,28 @@ internal constructor(val type: Class<*>,
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
var result: Long
while (true) {
result = publication.offer(internalBuffer, 0, objectSize)
if (result >= 0) {
// success!
return true
* The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
* val NOT_CONNECTED: Long = -1
* The offer failed due to back pressure from the subscribers preventing further transmission.
* val BACK_PRESSURED: Long = -2
* The offer failed due to an administration action and should be retried.
* The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
* val ADMIN_ACTION: Long = -3
if (result >= Publication.ADMIN_ACTION) {
// we should retry, BUT we want suspend ANYONE ELSE trying to write at the same time!
if (result == Publication.CLOSED && connection.isClosedViaAeron()) {
// this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
// send the message.
// NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
return false
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. $message (${errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +2 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 4)
logger.error("Aeron error!", exception)
listenerManager.notifyError(connection, exception)
return false
// one small problem! What if the message is too big to send all at once?
val maxMessageLength = publication.maxMessageLength()
if (objectSize >= maxMessageLength) {
// we must split up the message! It's too large for Aeron to manage.
// this will split up the message, construct the necessary control message and state, then CALL the sendData
// method directly for each subsequent message.
return streamingManager.send(publication, internalBuffer,
objectSize, this, connection)
return sendData(publication, internalBuffer, 0, objectSize, connection)
} catch (e: Exception) {
if (message is MethodResponse && message.result is Exception) {
val result = message.result as Exception
logger.error("[${publication.sessionId()}] Error serializing message $message", result)
logger.error("[${publication.sessionId()}] Error serializing message '$message'", result)
listenerManager.notifyError(connection, result)
} else if (message is ClientException || message is ServerException) {
logger.error("[${publication.sessionId()}] Error for message '$message'", e)
listenerManager.notifyError(connection, e)
} else {
logger.error("[${publication.sessionId()}] Error serializing message $message", e)
logger.error("[${publication.sessionId()}] Error serializing message '$message'", e)
listenerManager.notifyError(connection, e)
} finally {
@ -597,6 +599,56 @@ internal constructor(val type: Class<*>,
return false
// the actual bits that send data on the network.
internal suspend fun sendData(publication: Publication, internalBuffer: MutableDirectBuffer, offset: Int, objectSize: Int, connection: CONNECTION): Boolean {
var result: Long
while (true) {
result = publication.offer(internalBuffer, offset, objectSize)
if (result >= 0) {
// success!
return true
* The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
* val NOT_CONNECTED: Long = -1
* The offer failed due to back pressure from the subscribers preventing further transmission.
* val BACK_PRESSURED: Long = -2
* The offer failed due to an administration action and should be retried.
* The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
* val ADMIN_ACTION: Long = -3
if (result >= Publication.ADMIN_ACTION) {
// we should retry, BUT we want to suspend ANYONE ELSE trying to write at the same time!
if (result == Publication.CLOSED && connection.isClosedViaAeron()) {
// this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
// send the message.
// NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
return false
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. (${errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 5)
return false
override fun toString(): String {
return "EndPoint [${type.simpleName}]"

View File

@ -0,0 +1,5 @@
data class StreamingControl(val state: StreamingState, val streamId: Long,
val totalSize: Long = 0L,
val isFile: Boolean = false, val fileName: String = ""): StreamingMessage

View File

@ -0,0 +1,32 @@
class StreamingData(var streamId: Long) : StreamingMessage {
// These are set just after we receive the message, and before we process it
@Transient var payload: ByteArray? = null
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as StreamingData
if (streamId != other.streamId) return false
if (payload != null) {
if (other.payload == null) return false
if (!payload.contentEquals(other.payload)) return false
} else if (other.payload != null) return false
return true
override fun hashCode(): Int {
var result = streamId.hashCode()
result = 31 * result + (payload?.contentHashCode() ?: 0)
return result
override fun toString(): String {
return "StreamingData(streamId=$streamId, payloadSize=${payload?.size})"

View File

@ -0,0 +1,399 @@
import dorkbox.bytes.OptimizeUtilsByteBuf
import dorkbox.collections.LockFreeHashMap
import io.aeron.Publication
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KLogger
import org.agrona.MutableDirectBuffer
internal class StreamingManager<CONNECTION : Connection>(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
private val streamingDataTarget = LockFreeHashMap<Long, StreamingControl>()
private val streamingDataInMemory = LockFreeHashMap<Long, AeronOutput>()
companion object {
private fun writeVarInt(internalBuffer: MutableDirectBuffer, position: Int, value: Int, optimizePositive: Boolean): Int {
var p = position
var newValue = value
if (!optimizePositive) newValue = newValue shl 1 xor (newValue shr 31)
if (newValue ushr 7 == 0) {
internalBuffer.putByte(p++, newValue.toByte())
return 1
if (newValue ushr 14 == 0) {
internalBuffer.putByte(p++, (newValue and 0x7F or 0x80).toByte())
internalBuffer.putByte(p++, (newValue ushr 7).toByte())
return 2
if (newValue ushr 21 == 0) {
val byteBuf = internalBuffer
byteBuf.putByte(p++, (newValue and 0x7F or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 7 or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 14).toByte())
return 3
if (newValue ushr 28 == 0) {
val byteBuf = internalBuffer
byteBuf.putByte(p++, (newValue and 0x7F or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 7 or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 14 or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 21).toByte())
return 4
val byteBuf = internalBuffer
byteBuf.putByte(p++, (newValue and 0x7F or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 7 or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 14 or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 21 or 0x80).toByte())
byteBuf.putByte(p++, (newValue ushr 28).toByte())
return 5
* Reassemble/figure out the internal message pieces
fun processControlMessage(message: StreamingControl, endPoint: EndPoint<CONNECTION>, connection: CONNECTION) {
val streamId = message.streamId
when (message.state) {
StreamingState.START -> {
streamingDataTarget[streamId] = message
if (!message.isFile) {
streamingDataInMemory[streamId] = AeronOutput()
StreamingState.FINISHED -> {
// get the data out and send messages!
if (!message.isFile) {
val output = streamingDataInMemory.remove(streamId)
if (output != null) {
val kryo: KryoExtra<CONNECTION> = endPoint.serialization.takeKryo()
try {
val input = AeronInput(output.internalBuffer)
val streamedMessage = kryo.readClassAndObject(input)
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
val listenerManager = endPoint.listenerManager
try {
var hasListeners = listenerManager.notifyOnMessage(connection, streamedMessage)
// each connection registers, and is polled INDEPENDENTLY for messages.
hasListeners = hasListeners or connection.notifyOnMessage(streamedMessage)
if (!hasListeners) {
logger.error("No message callbacks found for ${}")
} catch (e: Exception) {
logger.error("Error processing message ${}", e)
listenerManager.notifyError(connection, e)
} catch (e: Exception) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Error serializing message from received streaming content, stream $streamId"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 2)
throw exception
} finally {
} else {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Error while receiving streaming content, stream $streamId not available."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 2)
throw exception
} else {
// we are a file, so process accordingly
StreamingState.FAILED -> {
// clear all state
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Failure while receiving streaming content for stream $streamId"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 2)
throw exception
StreamingState.UNKNOWN -> {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Unknown failure while receiving streaming content for stream $streamId"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 2)
throw exception
* Reassemble/figure out the internal message pieces
* NOTE sending a huge file can prevent other other network traffic from arriving until it's done!
fun processDataMessage(message: StreamingData, endPoint: EndPoint<CONNECTION>, connection: CONNECTION) {
// the receiving data will ALWAYS come sequentially, but there might be OTHER streaming data received meanwhile.
val streamId = message.streamId
val controlMessage = streamingDataTarget[streamId]
if (controlMessage != null) {
streamingDataInMemory.getOrPut(streamId) { AeronOutput() }.writeBytes(message.payload!!)
} else {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "Abnormal failure while receiving streaming content, stream $streamId not available."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 5)
throw exception
private suspend fun sendFailMessageAndThrow(
e: Exception,
streamSessionId: Long,
publication: Publication,
endPoint: EndPoint<CONNECTION>,
connection: CONNECTION
) {
val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId)
val failSent = endPoint.send(failMessage, publication, connection)
if (!failSent) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Abnormal failure while streaming content."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +4 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 6)
throw exception
} else {
// send it up!
throw e
* This is called ONLY when a message is too large to send across the network in a single message (large data messages should
* be split into smaller ones anyways!)
* NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine!
* We don't write max possible length per message, we write out MTU (payload) length (so aeron doesn't fragment the message).
* The max possible length is WAY, WAY more than the max payload length.
* @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
suspend fun send(
publication: Publication,
internalBuffer: MutableDirectBuffer,
objectSize: Int,
endPoint: EndPoint<CONNECTION>,
connection: CONNECTION): Boolean {
// NOTE: our max object size for IN-MEMORY messages is an INT. For file transfer it's a LONG (so everything here is cast to a long)
var remainingPayload = objectSize
var payloadSent = 0
val streamSessionId = endPoint.crypto.secureRandom.nextLong()
// tell the other side how much data we are sending
val startMessage = StreamingControl(StreamingState.START, streamSessionId, objectSize.toLong())
val startSent = endPoint.send(startMessage, publication, connection)
if (!startSent) {
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error starting streaming content."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 5)
throw exception
val kryo: KryoExtra<CONNECTION> = endPoint.serialization.takeKryo()
// we do the FIRST chunk super-weird, because of the way we copy data around (we inject headers,
// so the first message is SUPER tiny and is a COPY, the rest are no-copy.
// This is REUSED to prevent garbage collection issues.
val chunkData = StreamingData(streamSessionId)
// payload size is for a PRODUCER, and not SUBSCRIBER, so we have to include this amount every time.
// MINOR fragmentation by aeron is OK, since that will greatly speed up data transfer rates!
var maxPayloadLength = publication.maxPayloadLength()
if ((maxPayloadLength * 8) < publication.maxMessageLength()) {
maxPayloadLength *= 8
val header: ByteArray
val headerSize: Int
try {
val objectBuffer = kryo.write(connection, chunkData)
headerSize = objectBuffer.position()
header = ByteArray(headerSize)
// we have to account for the header + the MAX optimized int size
maxPayloadLength -= (headerSize + 5)
// this size might be a LITTLE too big, but that's ok, since we only make this specific buffer once.
val chunkBuffer = AeronOutput(headerSize + maxPayloadLength)
// copy out our header info
objectBuffer.internalBuffer.getBytes(0, header, 0, headerSize)
// write out our header
// write out the payload size using optimized data structures.
val varIntSize = chunkBuffer.writeVarInt(maxPayloadLength, true)
// write out the payload. Our resulting data written out is the ACTUAL MTU of aeron.
internalBuffer.getBytes(0, chunkBuffer.internalBuffer, headerSize + varIntSize, maxPayloadLength)
remainingPayload -= maxPayloadLength
payloadSent += maxPayloadLength
val success = endPoint.sendData(publication, chunkBuffer.internalBuffer, 0, headerSize + varIntSize + maxPayloadLength, connection)
if (!success) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Abnormal failure while streaming content."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 5)
throw exception
} catch (e: Exception) {
sendFailMessageAndThrow(e, streamSessionId, publication, endPoint, connection)
return false // doesn't actually get here because exceptions are thrown, but this makes the IDE happy.
} finally {
// now send the chunks as fast as possible. Aeron will have us back-off if we send too quickly
while (remainingPayload > 0) {
val amountToSend = if (remainingPayload < maxPayloadLength) {
} else {
remainingPayload -= amountToSend
// to properly do this, we have to be careful with the underlying protocol, in order to avoid copying the buffer multiple times.
// the data that will be sent is object data + buffer data. We are sending the SAME parent buffer, just at different spots and
// with different headers -- so we don't copy out the data repeatedly
// fortunately, the way that serialization works, we can safely ADD data to the tail and then appropriately read it off
// on the receiving end without worry.
try {
val varIntSize = OptimizeUtilsByteBuf.intLength(maxPayloadLength, true)
val writeIndex = payloadSent - headerSize - varIntSize
// write out our header data (this will OVERWRITE previous data!)
internalBuffer.putBytes(writeIndex, header)
// write out the payload size using optimized data structures.
writeVarInt(internalBuffer, writeIndex + headerSize, maxPayloadLength, true)
// write out the payload
endPoint.sendData(publication, internalBuffer, writeIndex, headerSize + amountToSend, connection)
payloadSent += amountToSend
} catch (e: Exception) {
val failMessage = StreamingControl(StreamingState.FAILED, streamSessionId)
val failSent = endPoint.send(failMessage, publication, connection)
if (!failSent) {
// something SUPER wrong!
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Abnormal failure while streaming content."
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint.newException(errorMessage)
// +2 because we do not want to see the stack for the abstract `newException`
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 5)
throw exception
} else {
// send it up!
throw e
// send the last chunk of data
val finishedMessage = StreamingControl(StreamingState.FINISHED, streamSessionId, payloadSent.toLong())
return endPoint.send(finishedMessage, publication, connection)

View File

@ -0,0 +1,18 @@
* Copyright 2020 dorkbox, llc
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
interface StreamingMessage

View File

@ -0,0 +1,60 @@
* Copyright 2020 dorkbox, llc
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
class StreamingControlSerializer: Serializer<StreamingControl>() {
override fun write(kryo: Kryo, output: Output, data: StreamingControl) {
output.writeVarLong(data.streamId, true)
output.writeVarLong(data.totalSize, true)
if (data.isFile) {
override fun read(kryo: Kryo, input: Input, type: Class<out StreamingControl>): StreamingControl {
val stateOrdinal = input.readByte().toInt()
val state = StreamingState.values().first { it.ordinal == stateOrdinal }
val streamId = input.readVarLong(true)
val totalSize = input.readVarLong(true)
val isFile = input.readBoolean()
val fileName = if (isFile) {
} else {
return StreamingControl(state, streamId, totalSize, isFile, fileName)
class StreamingDataSerializer: Serializer<StreamingData>() {
override fun write(kryo: Kryo, output: Output, data: StreamingData) {
output.writeVarLong(data.streamId, true)
override fun read(kryo: Kryo, input: Input, type: Class<out StreamingData>): StreamingData {
val streamId = input.readVarLong(true)
return StreamingData(streamId)

View File

@ -0,0 +1,5 @@
enum class StreamingState {

View File

@ -18,6 +18,7 @@ package
import kotlinx.coroutines.launch
import mu.KLogger
import java.lang.reflect.Proxy
import java.util.*
@ -88,7 +89,7 @@ internal class RmiManagerGlobal<CONNECTION: Connection>(logger: KLogger) : RmiOb
suspend fun manage(
suspend fun processMessage(
serialization: Serialization<CONNECTION>,
connection: CONNECTION,
message: Any,

View File

@ -18,10 +18,17 @@ package
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.SerializerFactory
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy
import com.esotericsoftware.minlog.Log
@ -146,6 +153,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
private val rmiClientSerializer = RmiClientSerializer<CONNECTION>()
private val rmiServerSerializer = RmiServerSerializer<CONNECTION>()
private val streamingControlSerializer = StreamingControlSerializer()
private val streamingDataSerializer = StreamingDataSerializer()
private val pingSerializer = PingSerializer()
@ -348,6 +357,10 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
kryo.register(, methodRequestSerializer)
kryo.register(, methodResponseSerializer)
// Streaming/Chunked Messages!
kryo.register(, streamingControlSerializer)
kryo.register(, streamingDataSerializer)
kryo.register(, pingSerializer)
@ -362,43 +375,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
* called as the first thing inside when initializing the classesToRegister
private fun initKryo(): KryoExtra<CONNECTION> {
val kryo = KryoExtra<CONNECTION>()
kryo.instantiatorStrategy = instantiatorStrategy
kryo.references = references
if (factory != null) {
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
// serialization.register( // TODO this is built into aeron!??!?!?!
// TODO: this is for diffie hellmen handshake stuff!
// serialization.register(, IesParametersSerializer())
// serialization.register(, IesWithCipherParametersSerializer())
// TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo
// serialization.register(, XECPublicKeySerializer())
// serialization.register(, XECPrivateKeySerializer())
// serialization.register( // must use full package name!
// RMI stuff!
kryo.register(, methodRequestSerializer)
kryo.register(, methodResponseSerializer)
kryo.register(, pingSerializer)
kryo.register( as Class<Any>, rmiClientSerializer)
kryo.register(, continuationSerializer)
val kryo = initGlobalKryo()
// check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer)
// note, we have to check to make sure a class is not ALREADY registered for RMI before it is registered again
@ -748,6 +725,11 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
return, offset, length, connection)
fun readRaw(): Input {
return readKryo.readerBuffer
// /**
// *

View File

@ -5,10 +5,16 @@ import
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Test
import kotlin.time.Duration.Companion.seconds
class DisconnectReconnectTest : BaseTest() {
private val reconnectCount = atomic(0)
@ -122,6 +128,55 @@ class DisconnectReconnectTest : BaseTest() {
Assert.assertEquals(4, reconnectCount.value)
fun multiConnectClient() {
// clients first, so they try to connect to the server at (roughly) the same time
val config = clientConfig()
val client1: Client<Connection> = Client(config)
val client2: Client<Connection> = Client(config)
client1.onDisconnect {
logger.error("Disconnected 1!")
client2.onDisconnect {
logger.error("Disconnected 2!")
GlobalScope.launch {
GlobalScope.launch {
println("Starting server...")
run {
val configuration = serverConfig()
val server: Server<Connection> = Server(configuration)
server.onConnect {
logger.error("Disconnecting after 10 seconds.")
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
interface CloseIface {
suspend fun close()
@ -204,7 +259,9 @@ class DisconnectReconnectTest : BaseTest() {
fun manualMediaDriverAndReconnectClient() {
// NOTE: once a config is assigned to a driver, the config cannot be changed
val aeronDriver = AeronDriver(serverConfig())
runBlocking {
run {
val serverConfiguration = serverConfig()

View File

@ -0,0 +1,55 @@
import org.junit.Test
class StreamingTest : BaseTest() {
fun sendStreamingObject() {
run {
val configuration = serverConfig()
val server: Server<Connection> = Server(configuration)
server.onMessage<ByteArray> {
println("received data, shutting down!")
run {
var connectionParams: ConnectionParams<Connection>? = null
val config = clientConfig()
val client: Client<Connection> = Client(config) {
connectionParams = it
client.onConnect {
val params = connectionParams ?: throw Exception("We should not have null connectionParams!")
val publication = params.mediaDriverConnection.publication
val hugeData = ByteArray(publication.maxMessageLength() + 10)
this.endPoint.send(hugeData, publication, this)
// System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
// Assert.assertEquals(4, reconnectCount.value)