Back to suspending

This commit is contained in:
Robinson 2022-08-18 22:01:42 +02:00
parent ae7c043240
commit 4af38ef212
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
6 changed files with 51 additions and 14 deletions

View File

@ -891,7 +891,7 @@ open class Client<CONNECTION : Connection>(
*
* @return true if the message was sent successfully, false if the connection has been closed
*/
fun send(message: Any): Boolean {
suspend fun send(message: Any): Boolean {
val c = connection0
return if (c != null) {
@ -903,6 +903,25 @@ open class Client<CONNECTION : Connection>(
}
}
/**
* Sends a message to the server, if the connection is closed for any reason, this returns false.
*
* @return true if the message was sent successfully, false if the connection has been closed
*/
fun sendBlocking(message: Any): Boolean {
val c = connection0
return if (c != null) {
runBlocking {
c.send(message)
}
} else {
val exception = ClientException("Cannot send a message when there is no connection!")
logger.error(exception) { "No connection!" }
false
}
}
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*

View File

@ -27,6 +27,7 @@ import io.aeron.logbuffer.Header
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.getAndUpdate
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import org.agrona.DirectBuffer
import java.lang.Thread.sleep
import java.net.InetAddress
@ -46,6 +47,11 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private val subscriptionPort: Int
private val publicationPort: Int
/**
* When publishing data, we cannot have concurrent publications for a single connection (per Aeron publication)
*/
internal val publicationMutex = Mutex()
/**
* the stream id of this connection. Can be 0 for IPC connections
*/
@ -193,7 +199,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*
* @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
*/
fun send(message: Any): Boolean {
suspend fun send(message: Any): Boolean {
messagesInProgress.getAndIncrement()
val success = endPoint.send(message, publication, this)
messagesInProgress.getAndDecrement()
@ -201,6 +207,17 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
return success
}
/**
* Safely sends objects to a destination.
*
* @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
*/
fun sendBlocking(message: Any): Boolean {
return runBlocking {
send(message)
}
}
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*
@ -343,12 +360,13 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// send out a "close" message. MAYBE it gets to the remote endpoint, maybe not. If it DOES, then the remote endpoint starts
// the close process faster.
try {
endPoint.send(CloseMessage(), publication, this)
runBlocking {
endPoint.send(CloseMessage(), publication, this@Connection)
}
} catch (ignored: Exception) {
}
val timoutInNanos = TimeUnit.SECONDS.toNanos(endPoint.config.connectionCloseTimeoutInSeconds.toLong())
var closeTimeoutTime = System.nanoTime()

View File

@ -678,13 +678,13 @@ internal constructor(val type: Class<*>,
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
@Suppress("DuplicatedCode", "UNCHECKED_CAST")
internal fun send(message: Any, publication: Publication, connection: Connection): Boolean {
internal suspend fun send(message: Any, publication: Publication, connection: Connection): Boolean {
// The handshake sessionId IS NOT globally unique
logger.trace { "[${publication.sessionId()}] send: ${message.javaClass.simpleName} : $message" }
connection as CONNECTION
synchronized(connection) {
connection.publicationMutex.withLock {
// since ANY thread can call 'send', we have to take kryo instances in a safe way
val kryo: KryoExtra<CONNECTION> = serialization.takeKryo()
try {

View File

@ -213,7 +213,7 @@ internal class StreamingManager<CONNECTION : Connection>(
}
}
private fun sendFailMessageAndThrow(
private suspend fun sendFailMessageAndThrow(
e: Exception,
streamSessionId: Long,
publication: Publication,
@ -253,7 +253,7 @@ internal class StreamingManager<CONNECTION : Connection>(
* @param internalBuffer this is the ORIGINAL object data that is to be "chunked" and sent across the wire
* @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
*/
fun send(
suspend fun send(
publication: Publication,
internalBuffer: MutableDirectBuffer,
objectSize: Int,

View File

@ -36,7 +36,7 @@ class RmiManagerConnections<CONNECTION: Connection> internal constructor(
/**
* called on "server"
*/
fun onConnectionObjectCreateRequest(serialization: Serialization<CONNECTION>, connection: CONNECTION, message: ConnectionObjectCreateRequest) {
suspend fun onConnectionObjectCreateRequest(serialization: Serialization<CONNECTION>, connection: CONNECTION, message: ConnectionObjectCreateRequest) {
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val kryoId = RmiUtils.unpackRight(message.packedIds)
val objectParameters = message.objectParameters
@ -105,7 +105,7 @@ class RmiManagerConnections<CONNECTION: Connection> internal constructor(
/**
* called on "client" or "server"
*/
fun onConnectionObjectDeleteRequest(connection: CONNECTION, message: ConnectionObjectDeleteRequest) {
suspend fun onConnectionObjectDeleteRequest(connection: CONNECTION, message: ConnectionObjectDeleteRequest) {
val rmiId = message.rmiId
// we only delete the impl object if the RMI id is valid!

View File

@ -146,7 +146,7 @@ class RmiSupportConnection<CONNECTION: Connection> internal constructor(
*
* @see RemoteObject
*/
fun <Iface> create(vararg objectParameters: Any?, callback: suspend Iface.() -> Unit) {
suspend fun <Iface> create(vararg objectParameters: Any?, callback: suspend Iface.() -> Unit) {
val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function1::class.java, callback.javaClass, 0)
val kryoId = serialization.getKryoIdForRmiClient(iFaceClass)
@ -171,7 +171,7 @@ class RmiSupportConnection<CONNECTION: Connection> internal constructor(
*
* @see RemoteObject
*/
fun <Iface> create(callback: suspend Iface.() -> Unit) {
suspend fun <Iface> create(callback: suspend Iface.() -> Unit) {
val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function1::class.java, callback.javaClass, 0)
val kryoId = serialization.getKryoIdForRmiClient(iFaceClass)
@ -185,7 +185,7 @@ class RmiSupportConnection<CONNECTION: Connection> internal constructor(
*
* Future '.get' requests will succeed, as they do not check the existence of the implementation object (methods called on it will fail)
*/
fun delete(rmiObjectId: Int) {
suspend fun delete(rmiObjectId: Int) {
// we only create the proxy + execute the callback if the RMI id is valid!
if (rmiObjectId == RemoteObjectStorage.INVALID_RMI) {
val exception = Exception("RMI ID '${rmiObjectId}' is invalid. Unable to delete RMI object!")
@ -293,7 +293,7 @@ class RmiSupportConnection<CONNECTION: Connection> internal constructor(
/**
* on the "client" to create a connection-specific remote object (that exists on the server)
*/
private fun <Iface> createRemoteObject(connection: CONNECTION, kryoId: Int, objectParameters: Array<Any?>?, callback: suspend Iface.() -> Unit) {
private suspend fun <Iface> createRemoteObject(connection: CONNECTION, kryoId: Int, objectParameters: Array<Any?>?, callback: suspend Iface.() -> Unit) {
val callbackId = registerCallback(callback)
// There is no rmiID yet, because we haven't created it!