Moved aeron.send() logic to the driver

This commit is contained in:
Robinson 2023-08-09 21:12:12 -06:00
parent 9fcbabd061
commit 3dcd2af495
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 271 additions and 223 deletions

View File

@ -795,4 +795,248 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
close()
}
}
/**
* NOTE: This cannot be on a coroutine, because our kryo instances are NOT threadsafe!
*
* the actual bits that send data on the network.
*
* There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB.
* Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery
* properties from failure and streams with mechanical sympathy.
*
* This can be overridden if you want to customize exactly how data is sent on the network
*
* @param publication the connection specific publication
* @param internalBuffer the internal buffer that will be copied to the Aeron network driver
* @param offset the offset in the internal buffer at which to start copying bytes
* @param objectSize the number of bytes to copy (starting at the offset)
* @param connection the connection object
*
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
internal suspend fun <CONNECTION: Connection> send(
publication: Publication,
internalBuffer: MutableDirectBuffer,
bufferClaim: BufferClaim,
offset: Int,
objectSize: Int,
sendIdleStrategy: CoroutineIdleStrategy,
connection: Connection,
abortEarly: Boolean,
listenerManager: ListenerManager<CONNECTION>
): Boolean {
var result: Long
while (true) {
// The maximum claimable length is given by the maxPayloadLength() function, which is the MTU length less header (with defaults this is 1,376 bytes).
result = publication.tryClaim(objectSize, bufferClaim)
if (result >= 0) {
// success!
try {
// both .offer and .putBytes add bytes to the underlying termBuffer -- HOWEVER, putBytes is faster as there are no
// extra checks performed BECAUSE we have to do our own data fragmentation management.
// It doesn't make sense to use `.offer`, which ALSO has its own fragmentation handling (which is extra overhead for us)
bufferClaim.buffer().putBytes(DataHeaderFlyweight.HEADER_LENGTH, internalBuffer, offset, objectSize)
} finally {
// must commit() or abort() before the unblock timeout (default 15 seconds) occurs.
bufferClaim.commit()
}
return true
}
if (criticalDriverError) {
// there were critical errors. Don't even try anything! we will reconnect automatically (on the client) when it shuts-down (the connection is closed immediately when an error of this type is encountered
// aeron will likely report this is as "BACK PRESSURE"
return false
}
/**
* Since the publication is not connected, we weren't able to send data to the remote endpoint.
*/
if (result == Publication.NOT_CONNECTED) {
if (abortEarly) {
val exception = endPoint!!.newException(
"[${publication.sessionId()}] Unable to send message. (Connection in non-connected state, aborted attempt! ${
AeronDriver.errorCodeName(result)
})"
)
listenerManager.notifyError(exception)
return false
}
else if (publication.isConnected) {
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. (Connection in non-connected state longer than linger timeout. ${AeronDriver.errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint!!.newException(errorMessage)
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
exception.cleanStackTrace(3)
listenerManager.notifyError(exception)
return false
}
else {
logger.info { "[${publication.sessionId()}] Connection disconnected while sending data, closing connection." }
// publication was actually closed or the server was closed, so no bother throwing an error
connection.closeImmediately(sendDisconnectMessage = false,
notifyDisconnect = true)
return false
}
}
/**
* The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
* val NOT_CONNECTED: Long = -1
*
* The offer failed due to back pressure from the subscribers preventing further transmission.
* val BACK_PRESSURED: Long = -2
*
* The offer failed due to an administration action and should be retried.
* The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
* val ADMIN_ACTION: Long = -3
*/
if (result >= Publication.ADMIN_ACTION) {
// we should retry, BUT we want to suspend ANYONE ELSE trying to write at the same time!
sendIdleStrategy.idle()
continue
}
if (result == Publication.CLOSED && connection.isClosed()) {
// this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
// send the message.
// NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
val exception = endPoint!!.newException(
"[${publication.sessionId()}] Unable to send message. (Connection in closed, aborted attempt! ${
AeronDriver.errorCodeName(result)
})"
)
listenerManager.notifyError(exception)
return false
}
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. (${AeronDriver.errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint!!.newException(errorMessage)
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
exception.cleanStackTrace(3)
listenerManager.notifyError(exception)
return false
}
}
/**
* NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine!
* CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
* Server -> will be network polling thread
* Client -> will be thread that calls `connect()`
*
* @return true if the message was successfully sent by aeron
*/
internal suspend fun <CONNECTION: Connection> send(
publication: Publication,
buffer: AeronOutput,
logInfo: String,
listenerManager: ListenerManager<CONNECTION>,
handshakeSendIdleStrategy: CoroutineIdleStrategy
): Boolean {
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
var result: Long
while (true) {
result = publication.offer(internalBuffer, 0, objectSize)
if (result >= 0) {
// success!
return true
}
if (criticalDriverError) {
// there were critical errors. Don't even try anything! we will reconnect automatically (on the client) when it shuts-down (the connection is closed immediately when an error of this type is encountered
// aeron will likely report this is as "BACK PRESSURE"
return false
}
/**
* Since the publication is not connected, we weren't able to send data to the remote endpoint.
*
* According to Aeron Docs, Pubs and Subs can "come and go", whatever that means. We just want to make sure that we
* don't "loop forever" if a publication is ACTUALLY closed, like on purpose.
*/
if (result == Publication.NOT_CONNECTED) {
if (publication.isConnected) {
// more critical error sending the message. we shouldn't retry or anything.
// this exception will be a ClientException or a ServerException
val exception = endPoint!!.newException(
"[$logInfo] Error sending message. (Connection in non-connected state longer than linger timeout. ${
AeronDriver.errorCodeName(result)
})",
null
)
exception.cleanStackTraceInternal()
listenerManager.notifyError(exception)
throw exception
}
else {
// publication was actually closed, so no bother throwing an error
return false
}
}
/**
* The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
* val NOT_CONNECTED: Long = -1
*
* The offer failed due to back pressure from the subscribers preventing further transmission.
* val BACK_PRESSURED: Long = -2
*
* The offer failed due to an administration action and should be retried.
* The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
* val ADMIN_ACTION: Long = -3
*/
if (result >= Publication.ADMIN_ACTION) {
// we should retry.
handshakeSendIdleStrategy.idle()
continue
}
if (result == Publication.CLOSED) {
// NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
val exception = endPoint!!.newException(
"[${publication.sessionId()}] Unable to send message. (Connection in closed, aborted attempt! ${
AeronDriver.errorCodeName(result)
})"
)
listenerManager.notifyError(exception)
return false
}
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. (${AeronDriver.errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = endPoint!!.newException(errorMessage)
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
exception.cleanStackTrace(3)
listenerManager.notifyError(exception)
return false
}
}
}

View File

@ -502,7 +502,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
)
}
} else {
dataSend(publication, internalBuffer, bufferClaim, 0, objectSize, sendIdleStrategy, connection, abortEarly)
aeronDriver.send(publication, internalBuffer, bufferClaim, 0, objectSize, sendIdleStrategy, connection, abortEarly, listenerManager)
}
}
} catch (e: Throwable) {
@ -541,7 +541,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
val internalBuffer = buffer.internalBuffer
val bufferClaim = kryo.bufferClaim
return dataSend(publication, internalBuffer, bufferClaim, 0, objectSize, sendIdleStrategy, connection, false)
return aeronDriver.send(publication, internalBuffer, bufferClaim, 0, objectSize, sendIdleStrategy, connection, false, listenerManager)
}
/**
@ -669,135 +669,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
}
/**
* NOTE: This cannot be on a coroutine, because our kryo instances are NOT threadsafe!
*
* the actual bits that send data on the network.
*
* There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB.
* Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery
* properties from failure and streams with mechanical sympathy.
*
* This can be overridden if you want to customize exactly how data is sent on the network
*
* @param publication the connection specific publication
* @param internalBuffer the internal buffer that will be copied to the Aeron network driver
* @param offset the offset in the internal buffer at which to start copying bytes
* @param objectSize the number of bytes to copy (starting at the offset)
* @param connection the connection object
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
internal suspend fun dataSend(
publication: Publication,
internalBuffer: MutableDirectBuffer,
bufferClaim: BufferClaim,
offset: Int,
objectSize: Int,
sendIdleStrategy: CoroutineIdleStrategy,
connection: Connection,
abortEarly: Boolean
): Boolean {
var timeoutInNanos = 0L
var startTime = 0L
var result: Long
while (true) {
// The maximum claimable length is given by the maxPayloadLength() function, which is the MTU length less header (with defaults this is 1,376 bytes).
result = publication.tryClaim(objectSize, bufferClaim)
if (result >= 0) {
// success!
try {
// both .offer and .putBytes add bytes to the underlying termBuffer -- HOWEVER, putBytes is faster as there are no
// extra checks performed BECAUSE we have to do our own data fragmentation management.
// It doesn't make sense to use `.offer`, which ALSO has its own fragmentation handling (which is extra overhead for us)
bufferClaim.buffer().putBytes(DataHeaderFlyweight.HEADER_LENGTH, internalBuffer, offset, objectSize)
} finally {
// must commit() or abort() before the unblock timeout (default 15 seconds) occurs.
bufferClaim.commit()
}
return true
}
/**
* Since the publication is not connected, we weren't able to send data to the remote endpoint.
*
* According to Aeron Docs, Pubs and Subs can "come and go", whatever that means. We just want to make sure that we
* don't "loop forever" if a publication is ACTUALLY closed, like on purpose.
*/
if (result == Publication.NOT_CONNECTED) {
if (abortEarly) {
listenerManager.notifyError(newException("[${publication.sessionId()}] Unable to send message. (Connection in non-connected state, aborted attempt! ${AeronDriver.errorCodeName(result)})"))
return false
}
if (timeoutInNanos == 0L) {
timeoutInNanos = (aeronDriver.lingerNs() * 1.2).toLong() // close enough. Just needs to be slightly longer
startTime = System.nanoTime()
}
if (System.nanoTime() - startTime < timeoutInNanos) {
// we should retry.
sendIdleStrategy.idle()
continue
} else if (publication.isConnected) {
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. (Connection in non-connected state longer than linger timeout. ${AeronDriver.errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = newException(errorMessage)
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
exception.cleanStackTrace(3)
return false
} else {
// publication was actually closed, so no bother throwing an error
return false
}
}
/**
* The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
* val NOT_CONNECTED: Long = -1
*
* The offer failed due to back pressure from the subscribers preventing further transmission.
* val BACK_PRESSURED: Long = -2
*
* The offer failed due to an administration action and should be retried.
* The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
* val ADMIN_ACTION: Long = -3
*/
if (result >= Publication.ADMIN_ACTION) {
// we should retry, BUT we want to suspend ANYONE ELSE trying to write at the same time!
sendIdleStrategy.idle()
continue
}
if (result == Publication.CLOSED && connection.isClosed()) {
// this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
// send the message.
// NOTE: we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
listenerManager.notifyError(newException("[${publication.sessionId()}] Unable to send message. (Connection in closed, aborted attempt! ${AeronDriver.errorCodeName(result)})"))
return false
}
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. (${AeronDriver.errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = newException(errorMessage)
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
// where we see who is calling "send()"
exception.cleanStackTrace(3)
return false
}
}
/**
* Ensures that an endpoint (using the specified configuration) is NO LONGER running.
*

View File

@ -454,7 +454,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
val reusedPayloadSize = headerSize + varIntSize + sizeOfBlockData
val success = endPoint.dataSend(
val success = endPoint.aeronDriver.send(
publication = publication,
internalBuffer = blockBuffer.internalBuffer,
bufferClaim = kryo.bufferClaim,
@ -462,7 +462,8 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
objectSize = reusedPayloadSize,
sendIdleStrategy = sendIdleStrategy,
connection = connection,
abortEarly = false
abortEarly = false,
listenerManager = endPoint.listenerManager
)
if (!success) {
@ -517,7 +518,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
val reusedPayloadSize = headerSize + varIntSize + amountToSend
// write out the payload
val success = endPoint.dataSend(
val success = endPoint.aeronDriver.send(
publication = publication,
internalBuffer = originalBuffer,
bufferClaim = kryo.bufferClaim,
@ -525,7 +526,8 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
objectSize = reusedPayloadSize,
sendIdleStrategy = sendIdleStrategy,
connection = connection,
abortEarly = false
abortEarly = false,
listenerManager = endPoint.listenerManager
)
if (!success) {
@ -664,7 +666,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
// we reuse/recycle objects, so the payload size is not EXACTLY what is specified
val reusedPayloadSize = headerSize + varIntSize + sizeOfBlockData
val success = endPoint.dataSend(
val success = endPoint.aeronDriver.send(
publication = publication,
internalBuffer = blockBuffer,
bufferClaim = kryo.bufferClaim,
@ -672,7 +674,8 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
objectSize = reusedPayloadSize,
sendIdleStrategy = sendIdleStrategy,
connection = connection,
abortEarly = false
abortEarly = false,
listenerManager = endPoint.listenerManager
)
if (!success) {
@ -696,7 +699,8 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
}
val aeronDriver = endPoint.aeronDriver
val listenerManager = endPoint.listenerManager
// now send the block as fast as possible. Aeron will have us back-off if we send too quickly
while (remainingPayload > 0) {
@ -742,15 +746,16 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
val reusedPayloadSize = headerSize + varIntSize + amountToSend
// write out the payload
endPoint.dataSend(
publication,
blockBuffer,
kryo.bufferClaim,
0,
reusedPayloadSize,
sendIdleStrategy,
connection,
false
aeronDriver.send(
publication = publication,
internalBuffer = blockBuffer,
bufferClaim = kryo.bufferClaim,
offset = 0,
objectSize = reusedPayloadSize,
sendIdleStrategy = sendIdleStrategy,
connection = connection,
abortEarly = false,
listenerManager = listenerManager
)
payloadSent += amountToSend

View File

@ -48,7 +48,7 @@ import java.util.*
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
internal class ClientHandshakeDriver(
private val aeronDriver: AeronDriver,
val aeronDriver: AeronDriver,
val pubSub: PubSub,
private val logInfo: String,
val details: String

View File

@ -38,7 +38,7 @@ internal class Handshaker<CONNECTION : Connection>(
config: Configuration,
serialization: Serialization<CONNECTION>,
private val listenerManager: ListenerManager<CONNECTION>,
aeronDriver: AeronDriver,
val aeronDriver: AeronDriver,
val newException: (String, Throwable?) -> Throwable
) {
private val handshakeReadKryo: KryoReader<CONNECTION>
@ -70,86 +70,14 @@ internal class Handshaker<CONNECTION : Connection>(
* @return true if the message was successfully sent by aeron
*/
@Suppress("DuplicatedCode")
internal suspend inline fun writeMessage(publication: Publication, logInfo: String, message: HandshakeMessage) {
internal suspend inline fun writeMessage(publication: Publication, logInfo: String, message: HandshakeMessage): Boolean {
// The handshake sessionId IS NOT globally unique
logger.trace { "[$logInfo] (${message.connectKey}) send HS: $message" }
try {
val buffer = handshakeWriteKryo.write(message)
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
var timeoutInNanos = 0L
var startTime = 0L
var result: Long
while (true) {
result = publication.offer(internalBuffer, 0, objectSize)
if (result >= 0) {
// success!
return
}
/**
* Since the publication is not connected, we weren't able to send data to the remote endpoint.
*
* According to Aeron Docs, Pubs and Subs can "come and go", whatever that means. We just want to make sure that we
* don't "loop forever" if a publication is ACTUALLY closed, like on purpose.
*/
if (result == Publication.NOT_CONNECTED) {
if (timeoutInNanos == 0L) {
timeoutInNanos = writeTimeoutNS
startTime = System.nanoTime()
}
if (System.nanoTime() - startTime < timeoutInNanos) {
// we should retry.
handshakeSendIdleStrategy.idle()
continue
} else if (publication.isConnected) {
// more critical error sending the message. we shouldn't retry or anything.
// this exception will be a ClientException or a ServerException
val exception = newException(
"[$logInfo] Error sending message. (Connection in non-connected state longer than linger timeout. ${
AeronDriver.errorCodeName(result)
})",
null
)
exception.cleanStackTraceInternal()
listenerManager.notifyError(exception)
throw exception
}
else {
// publication was actually closed, so no bother throwing an error
return
}
}
/**
* The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
* val NOT_CONNECTED: Long = -1
*
* The offer failed due to back pressure from the subscribers preventing further transmission.
* val BACK_PRESSURED: Long = -2
*
* The offer failed due to an administration action and should be retried.
* The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
* val ADMIN_ACTION: Long = -3
*/
if (result >= Publication.ADMIN_ACTION) {
// we should retry.
handshakeSendIdleStrategy.idle()
continue
}
// more critical error sending the message. we shouldn't retry or anything.
// this exception will be a ClientException or a ServerException
val exception = newException("[$logInfo] Error sending handshake message. $message (${AeronDriver.errorCodeName(result)})", null)
exception.cleanStackTraceInternal()
listenerManager.notifyError(exception)
throw exception
}
return aeronDriver.send(publication, buffer, logInfo, listenerManager, handshakeSendIdleStrategy)
} catch (e: Exception) {
if (e is ClientException || e is ServerException) {
throw e