From c7258067278c1b814c38ae2ade829a5a1839e908 Mon Sep 17 00:00:00 2001 From: Robinson Date: Thu, 8 Jul 2021 14:33:24 +0200 Subject: [PATCH] Cleaned up heap vs stack access, fixed ping support, fixed coroutine errors during serialization --- src/dorkbox/network/Client.kt | 9 +- src/dorkbox/network/Server.kt | 12 +- src/dorkbox/network/connection/Connection.kt | 9 +- src/dorkbox/network/connection/EndPoint.kt | 73 +++++++----- .../network/handshake/ServerHandshake.kt | 62 ++++++---- src/dorkbox/network/ping/PingManager.kt | 46 +++++--- src/dorkbox/network/ping/PingSerializer.kt | 37 ++++++ src/dorkbox/network/rmi/ResponseManager.kt | 42 ++++--- src/dorkbox/network/rmi/RmiClient.kt | 11 +- .../network/rmi/RmiManagerConnections.kt | 47 ++------ src/dorkbox/network/rmi/RmiManagerGlobal.kt | 111 ++++++++---------- src/dorkbox/network/rmi/RmiUtils.kt | 5 + .../network/serialization/Serialization.kt | 7 +- test/dorkboxTest/network/PingTest.kt | 21 +++- 14 files changed, 285 insertions(+), 207 deletions(-) create mode 100644 src/dorkbox/network/ping/PingSerializer.kt diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 01c43a31..bee2a092 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -26,6 +26,7 @@ import dorkbox.network.exceptions.ClientRejectedException import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.handshake.ClientHandshake import dorkbox.network.ping.Ping +import dorkbox.network.ping.PingManager import dorkbox.util.Sys import kotlinx.atomicfu.atomic import kotlinx.coroutines.launch @@ -629,11 +630,11 @@ open class Client(config: Configuration = Configuration * * @return true if the ping was successfully sent to the client */ - suspend fun ping(function: suspend Ping.() -> Unit): Boolean { + suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean { val c = connection0 if (c != null) { - return pingManager.ping(c, actionDispatch, responseManager, function) + return pingManager.ping(c, pingTimeoutSeconds, actionDispatch, responseManager, logger, function) } else { logger.error("No connection!", ClientException("Cannot send a ping when there is no connection!")) } @@ -646,9 +647,9 @@ open class Client(config: Configuration = Configuration * * @param function called when the ping returns (ie: update time/latency counters/metrics/etc) */ - fun pingBlocking(function: suspend Ping.() -> Unit): Boolean { + fun pingBlocking(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean { return runBlocking { - ping(function) + ping(pingTimeoutSeconds, function) } } diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 9abb6c57..7c3b4f15 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -183,7 +183,8 @@ open class Server(config: ServerConfiguration = ServerC publication, sessionId, message, - aeronDriver) + aeronDriver, + logger) } override fun poll(): Int { return subscription.poll(handler, 1) } @@ -269,7 +270,8 @@ open class Server(config: ServerConfiguration = ServerC clientAddress, message, aeronDriver, - false) + false, + logger) } override fun poll(): Int { return subscription.poll(handler, 1) } @@ -355,7 +357,8 @@ open class Server(config: ServerConfiguration = ServerC clientAddress, message, aeronDriver, - false) + false, + logger) } override fun poll(): Int { return subscription.poll(handler, 1) } @@ -441,7 +444,8 @@ open class Server(config: ServerConfiguration = ServerC clientAddress, message, aeronDriver, - true) + true, + logger) } override fun poll(): Int { return subscription.poll(handler, 1) } diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 4f8a8880..a8bb2a7c 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -22,6 +22,7 @@ import dorkbox.network.aeron.UdpMediaDriverPairedConnection import dorkbox.network.handshake.ConnectionCounts import dorkbox.network.handshake.RandomIdAllocator import dorkbox.network.ping.Ping +import dorkbox.network.ping.PingManager import dorkbox.network.rmi.RmiSupportConnection import io.aeron.FragmentAssembler import io.aeron.Publication @@ -88,6 +89,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { private val listenerManager = atomic?>(null) val logger = endPoint.logger + private val isClosed = atomic(false) internal var preCloseAction: suspend () -> Unit = {} internal var postCloseAction: suspend () -> Unit = {} @@ -96,9 +98,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { private var connectionLastCheckTime = 0L private var connectionTimeoutTime = 0L - private val isClosed = atomic(false) - - private val connectionCheckIntervalInMS = connectionParameters.endPoint.config.connectionCheckIntervalInMS private val connectionExpirationTimoutInMS = connectionParameters.endPoint.config.connectionExpirationTimoutInMS @@ -243,8 +242,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) { * * @return true if the message was successfully sent by aeron */ - suspend fun ping(function: suspend Ping.() -> Unit): Boolean { - return endPoint.pingManager.ping(this, endPoint.actionDispatch, endPoint.responseManager, function) + suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean { + return endPoint.ping(this, pingTimeoutSeconds, function) } /** diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 03bfbcd8..f5e1a4e8 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -113,7 +113,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A val storage: SettingsStore internal val responseManager = ResponseManager(logger, actionDispatch) - internal val rmiGlobalSupport = RmiManagerGlobal(logger, listenerManager) + internal val rmiGlobalSupport = RmiManagerGlobal(logger) internal val rmiConnectionSupport: RmiManagerConnections internal val pingManager = PingManager() @@ -310,6 +310,15 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } } + /** + * Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection. + * + * @return true if the message was successfully sent by aeron + */ + internal suspend fun ping(connection: Connection, pingTimeoutMs: Int, function: suspend Ping.() -> Unit): Boolean { + return pingManager.ping(connection, pingTimeoutMs, actionDispatch, responseManager, logger, function) + } + /** * NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine! * CANNOT be called in action dispatch. ALWAYS ON SAME THREAD @@ -386,6 +395,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A // note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD internal fun readHandshakeMessage(buffer: DirectBuffer, offset: Int, length: Int, header: Header): Any? { return try { + // NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change! val message = handshakeKryo.read(buffer, offset, length) logger.trace { @@ -415,31 +425,35 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A @Suppress("UNCHECKED_CAST") connection as CONNECTION - val message: Any? try { - message = serialization.readMessage(buffer, offset, length, connection) - logger.trace { - "[${header.sessionId()}] received: $message" + // NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change! + val message = serialization.readMessage(buffer, offset, length, connection) + logger.trace { "[${header.sessionId()}] received: $message" } + + + // NOTE: This MUST be on a new co-routine + actionDispatch.launch { + try { + processMessage(message, connection) + } catch (e: Exception) { + logger.error("Error processing message", e) + listenerManager.notifyError(connection, e) + } } } catch (e: Exception) { // The handshake sessionId IS NOT globally unique logger.error("[${header.sessionId()}] Error de-serializing message", e) listenerManager.notifyError(connection, e) - - return // don't do anything! } + } - + /** + * Actually process the message. + */ + private suspend fun processMessage(message: Any?, connection: CONNECTION) { when (message) { is Ping -> { - actionDispatch.launch { - try { - pingManager.manage(connection, responseManager, message) - } catch (e: Exception) { - logger.error("Error processing PING message", e) - listenerManager.notifyError(connection, e) - } - } + pingManager.manage(connection, responseManager, message, logger) } // small problem... If we expect IN ORDER messages (ie: setting a value, then later reading the value), multiple threads don't work. @@ -448,26 +462,26 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A is RmiMessage -> { // if we are an RMI message/registration, we have very specific, defined behavior. // We do not use the "normal" listener callback pattern because this require special functionality - rmiGlobalSupport.manage(this@EndPoint, serialization, connection, message, rmiConnectionSupport, actionDispatch) + rmiGlobalSupport.manage(serialization, connection, message, rmiConnectionSupport, responseManager, logger) } + is Any -> { - actionDispatch.launch { - try { - @Suppress("UNCHECKED_CAST") - var hasListeners = listenerManager.notifyOnMessage(connection, message) + try { + @Suppress("UNCHECKED_CAST") + var hasListeners = listenerManager.notifyOnMessage(connection, message) - // each connection registers, and is polled INDEPENDENTLY for messages. - hasListeners = hasListeners or connection.notifyOnMessage(message) + // each connection registers, and is polled INDEPENDENTLY for messages. + hasListeners = hasListeners or connection.notifyOnMessage(message) - if (!hasListeners) { - logger.error("No message callbacks found for ${message::class.java.simpleName}") - } - } catch (e: Exception) { - logger.error("Error processing message", e) - listenerManager.notifyError(connection, e) + if (!hasListeners) { + logger.error("No message callbacks found for ${message::class.java.simpleName}") } + } catch (e: Exception) { + logger.error("Error processing message", e) + listenerManager.notifyError(connection, e) } } + else -> { // do nothing, there were problems with the message if (message != null) { @@ -479,6 +493,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } } + /** * NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine! * diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index 2a1758a2..297a91d9 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit * 'notifyConnect' must be THE ONLY THING in this class to use the action dispatch! */ @Suppress("DuplicatedCode") -internal class ServerHandshake(private val logger: KLogger, +internal class ServerHandshake(logger: KLogger, private val config: ServerConfiguration, private val listenerManager: ListenerManager) { @@ -68,12 +68,15 @@ internal class ServerHandshake(private val logger: KLog * @return true if we should continue parsing the incoming message, false if we should abort */ // note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD. ONLY RESPONSES ARE ON ACTION DISPATCH! - private fun validateMessageTypeAndDoPending(server: Server, - actionDispatch: CoroutineScope, - handshakePublication: Publication, - message: HandshakeMessage, - sessionId: Int, - connectionString: String): Boolean { + private fun validateMessageTypeAndDoPending( + server: Server, + actionDispatch: CoroutineScope, + handshakePublication: Publication, + message: HandshakeMessage, + sessionId: Int, + connectionString: String, + logger: KLogger + ): Boolean { // check to see if this sessionId is ALREADY in use by another connection! // this can happen if there are multiple connections from the SAME ip address (ie: localhost) @@ -131,11 +134,14 @@ internal class ServerHandshake(private val logger: KLog * @return true if we should continue parsing the incoming message, false if we should abort */ // note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD - private fun validateUdpConnectionInfo(server: Server, - handshakePublication: Publication, - config: ServerConfiguration, - clientAddressString: String, - clientAddress: InetAddress): Boolean { + private fun validateUdpConnectionInfo( + server: Server, + handshakePublication: Publication, + config: ServerConfiguration, + clientAddressString: String, + clientAddress: InetAddress, + logger: KLogger + ): Boolean { try { // VALIDATE:: Check to see if there are already too many clients connected. @@ -182,16 +188,19 @@ internal class ServerHandshake(private val logger: KLog // note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD - fun processIpcHandshakeMessageServer(server: Server, - rmiConnectionSupport: RmiManagerConnections, - handshakePublication: Publication, - sessionId: Int, - message: HandshakeMessage, - aeronDriver: AeronDriver) { + fun processIpcHandshakeMessageServer( + server: Server, + rmiConnectionSupport: RmiManagerConnections, + handshakePublication: Publication, + sessionId: Int, + message: HandshakeMessage, + aeronDriver: AeronDriver, + logger: KLogger + ) { val connectionString = "IPC" - if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, connectionString)) { + if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, connectionString, logger)) { return } @@ -325,9 +334,18 @@ internal class ServerHandshake(private val logger: KLog clientAddress: InetAddress, message: HandshakeMessage, aeronDriver: AeronDriver, - isIpv6Wildcard: Boolean) { + isIpv6Wildcard: Boolean, + logger: KLogger) { - if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, clientAddressString)) { + if (!validateMessageTypeAndDoPending( + server, + server.actionDispatch, + handshakePublication, + message, + sessionId, + clientAddressString, + logger + )) { return } @@ -343,7 +361,7 @@ internal class ServerHandshake(private val logger: KLog } if (!clientAddress.isLoopbackAddress && - !validateUdpConnectionInfo(server, handshakePublication, config, clientAddressString, clientAddress)) { + !validateUdpConnectionInfo(server, handshakePublication, config, clientAddressString, clientAddress, logger)) { // we do not want to limit loopback addresses! return } diff --git a/src/dorkbox/network/ping/PingManager.kt b/src/dorkbox/network/ping/PingManager.kt index 7dc36882..58d9d0b9 100644 --- a/src/dorkbox/network/ping/PingManager.kt +++ b/src/dorkbox/network/ping/PingManager.kt @@ -21,29 +21,33 @@ package dorkbox.network.ping import dorkbox.network.connection.Connection import dorkbox.network.rmi.ResponseManager -import dorkbox.network.rmi.RmiUtils import kotlinx.coroutines.CoroutineScope +import mu.KLogger /** * How to handle ping messages */ internal class PingManager { - @Suppress("UNCHECKED_CAST") - suspend fun manage(connection: CONNECTION, responseManager: ResponseManager, message: Ping) { - if (message.pongTime == 0L) { - message.pongTime = System.currentTimeMillis() - connection.send(message) - } else { - message.finishedTime = System.currentTimeMillis() + companion object { + val DEFAULT_TIMEOUT_SECONDS = 30 + } - val rmiId = RmiUtils.unpackUnsignedRight(message.packedId) + @Suppress("UNCHECKED_CAST") + suspend fun manage(connection: CONNECTION, responseManager: ResponseManager, ping: Ping, logger: KLogger) { + if (ping.pongTime == 0L) { + ping.pongTime = System.currentTimeMillis() + connection.send(ping) + } else { + ping.finishedTime = System.currentTimeMillis() + + val rmiId = ping.packedId // process the ping message so that our ping callback does something // this will be null if the ping took longer than 30 seconds and was cancelled - val result = responseManager.getWaiterCallback(rmiId) as (suspend Ping.() -> Unit)? + val result = responseManager.getWaiterCallback(rmiId, logger) as (suspend Ping.() -> Unit)? if (result != null) { - result(message) + result(ping) } } } @@ -53,17 +57,29 @@ internal class PingManager { * * @return true if the message was successfully sent by aeron */ - internal suspend fun ping(connection: Connection, actionDispatch: CoroutineScope, responseManager: ResponseManager, function: suspend Ping.() -> Unit): Boolean { - val id = responseManager.prepWithCallback(function) + internal suspend fun ping( + connection: Connection, + pingTimeoutSeconds: Int, + actionDispatch: CoroutineScope, + responseManager: ResponseManager, + logger: KLogger, + function: suspend Ping.() -> Unit + ): Boolean { + val id = responseManager.prepWithCallback(function, logger) val ping = Ping() - ping.packedId = RmiUtils.unpackUnsignedRight(id) + ping.packedId = id ping.pingTime = System.currentTimeMillis() // NOTE: the timout MUST NOT be more than the max SHORT value! + if (pingTimeoutSeconds > 60) { + // just over 1 minute timeout. + throw IllegalArgumentException("Ping timeout parameter `pingTimeoutSeconds` cannot exceed 60 seconds") + } + // ALWAYS cancel the ping after 30 seconds - responseManager.cancelRequest(actionDispatch, 30_000L, id) { + responseManager.cancelRequest(actionDispatch, pingTimeoutSeconds.toLong(), id, logger) { // kill the callback, since we are now "cancelled". If there is a race here (and the response comes at the exact same time) // we don't care since either it will be null or it won't (if it's not null, it will run the callback) result = null diff --git a/src/dorkbox/network/ping/PingSerializer.kt b/src/dorkbox/network/ping/PingSerializer.kt new file mode 100644 index 00000000..2f765c14 --- /dev/null +++ b/src/dorkbox/network/ping/PingSerializer.kt @@ -0,0 +1,37 @@ +/* + * Copyright 2020 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.ping + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output + +class PingSerializer: Serializer() { + override fun write(kryo: Kryo, output: Output, ping: Ping) { + output.writeInt(ping.packedId) + output.writeLong(ping.pingTime) + output.writeLong(ping.pongTime) + } + + override fun read(kryo: Kryo, input: Input, type: Class): Ping { + val ping = Ping() + ping.packedId = input.readInt() + ping.pingTime = input.readLong() + ping.pongTime = input.readLong() + return ping + } +} diff --git a/src/dorkbox/network/rmi/ResponseManager.kt b/src/dorkbox/network/rmi/ResponseManager.kt index d3686949..a6fb89d9 100644 --- a/src/dorkbox/network/rmi/ResponseManager.kt +++ b/src/dorkbox/network/rmi/ResponseManager.kt @@ -31,7 +31,7 @@ import kotlin.concurrent.write * * response ID's and the memory they hold will leak if the response never arrives! */ -internal class ResponseManager(private val logger: KLogger, actionDispatch: CoroutineScope) { +internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) { companion object { val TIMEOUT_EXCEPTION = Exception() } @@ -53,22 +53,22 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro init { // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! val ids = mutableListOf() + + // MIN (32768) -> -1 (65535) + // ZERO is special, and is never added! + // ONE is special, and is used for ASYNC (the response will never be sent back) + // 2 (2) -> MAX (32767) + + for (id in Short.MIN_VALUE..-1) { ids.add(id) } - - // MIN (32768) -> -1 (65535) - // 2 (2) -> MAX (32767) - - // ZERO is special, and is never added! - // ONE is special, and is used for ASYNC (the response will never be sent back) - for (id in 2..Short.MAX_VALUE) { ids.add(id) } ids.shuffle() - // populate the array of randomly assigned ID's + waiters. This can happen in a new thread + // populate the array of randomly assigned ID's + waiters. It is OK for this to happen in a new thread actionDispatch.launch { try { for (it in ids) { @@ -81,14 +81,13 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro } } - /** * Called when we receive the answer for our initial request. If no response data, then the pending rmi data entry is deleted * * resume any pending remote object method invocations (if they are not async, or not manually waiting) * NOTE: async RMI will never call this (because async doesn't return a response) */ - suspend fun notifyWaiter(rmiId: Int, result: Any?) { + suspend fun notifyWaiter(rmiId: Int, result: Any?, logger: KLogger) { logger.trace { "RMI return message: $rmiId" } val previous = pendingLock.write { @@ -111,7 +110,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro * * This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different) */ - suspend fun getWaiterCallback(rmiId: Int): T? { + suspend fun getWaiterCallback(rmiId: Int, logger: KLogger): T? { logger.trace { "RMI return message: $rmiId" } val previous = pendingLock.write { @@ -139,7 +138,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro * * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. */ - suspend fun prep(): ResponseWaiter { + suspend fun prep(logger: KLogger): ResponseWaiter { val responseRmi = waiterCache.receive() rmiWaitersInUse.getAndIncrement() logger.trace { "RMI count: ${rmiWaitersInUse.value}" } @@ -147,9 +146,11 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) responseRmi.prep() + val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id) + pendingLock.write { // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually - pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi + pending[rmiId] = responseRmi } return responseRmi @@ -160,7 +161,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro * * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. */ - suspend fun prepWithCallback(function: Any): Int { + suspend fun prepWithCallback(function: Any, logger: KLogger): Int { val responseRmi = waiterCache.receive() rmiWaitersInUse.getAndIncrement() logger.trace { "RMI count: ${rmiWaitersInUse.value}" } @@ -171,19 +172,22 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro // assign the callback that will be notified when the return message is received responseRmi.result = function + // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually + val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id) + pendingLock.write { // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually - pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi + pending[rmiId] = responseRmi } - return responseRmi.id + return rmiId } /** * Cancels the RMI request in the given timeout, the callback is executed inside the read lock */ - fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, onCancelled: ResponseWaiter.() -> Unit) { + fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) { actionDispatch.launch { delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting @@ -207,7 +211,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro * * @return the result (can be null) or timeout exception */ - suspend fun waitForReply(actionDispatch: CoroutineScope, responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? { + suspend fun waitForReply(actionDispatch: CoroutineScope, responseWaiter: ResponseWaiter, timeoutMillis: Long, logger: KLogger): Any? { val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually logger.trace { diff --git a/src/dorkbox/network/rmi/RmiClient.kt b/src/dorkbox/network/rmi/RmiClient.kt index 37f46bf7..096352fc 100644 --- a/src/dorkbox/network/rmi/RmiClient.kt +++ b/src/dorkbox/network/rmi/RmiClient.kt @@ -19,6 +19,7 @@ import dorkbox.network.connection.Connection import dorkbox.network.rmi.messages.MethodRequest import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.runBlocking +import mu.KLogger import java.lang.reflect.InvocationHandler import java.lang.reflect.Method import java.util.* @@ -76,7 +77,7 @@ internal class RmiClient(val isGlobal: Boolean, private var enableEquals = false // if we are ASYNC, then this method immediately returns - private suspend fun sendRequest(actionDispatch: CoroutineScope, invokeMethod: MethodRequest): Any? { + private suspend fun sendRequest(actionDispatch: CoroutineScope, invokeMethod: MethodRequest, logger: KLogger): Any? { // there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods -- // the "server" side can have out-of-order method invocation. There are 2 ways to solve this // 1) make the "server" side single threaded @@ -102,12 +103,12 @@ internal class RmiClient(val isGlobal: Boolean, null } else { // The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep - val rmiWaiter = responseManager.prep() + val rmiWaiter = responseManager.prep(logger) invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id) connection.send(invokeMethod) - responseManager.waitForReply(actionDispatch, rmiWaiter, timeoutMillis) + responseManager.waitForReply(actionDispatch, rmiWaiter, timeoutMillis, logger) } } @@ -217,7 +218,7 @@ internal class RmiClient(val isGlobal: Boolean, val continuation = suspendCoroutineArg as Continuation val suspendFunction: suspend () -> Any? = { - sendRequest(connection.endPoint.actionDispatch, invokeMethod) + sendRequest(connection.endPoint.actionDispatch, invokeMethod, connection.logger) } // function suspension works differently !! @@ -245,7 +246,7 @@ internal class RmiClient(val isGlobal: Boolean, }) } else { val any = runBlocking { - sendRequest(connection.endPoint.actionDispatch, invokeMethod) + sendRequest(connection.endPoint.actionDispatch, invokeMethod, connection.logger) } when (any) { ResponseManager.TIMEOUT_EXCEPTION -> { diff --git a/src/dorkbox/network/rmi/RmiManagerConnections.kt b/src/dorkbox/network/rmi/RmiManagerConnections.kt index 496d9f29..25dc8d19 100644 --- a/src/dorkbox/network/rmi/RmiManagerConnections.kt +++ b/src/dorkbox/network/rmi/RmiManagerConnections.kt @@ -24,8 +24,6 @@ import dorkbox.network.rmi.messages.ConnectionObjectDeleteResponse import dorkbox.network.serialization.Serialization import dorkbox.util.classes.ClassHelper import dorkbox.util.collections.LockFreeIntMap -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch import mu.KLogger class RmiManagerConnections internal constructor( @@ -140,12 +138,7 @@ class RmiManagerConnections internal constructor( /** * called on "server" */ - fun onConnectionObjectCreateRequest( - serialization: Serialization, - connection: CONNECTION, - message: ConnectionObjectCreateRequest, - actionDispatch: CoroutineScope - ) { + suspend fun onConnectionObjectCreateRequest(serialization: Serialization, connection: CONNECTION, message: ConnectionObjectCreateRequest) { val callbackId = RmiUtils.unpackLeft(message.packedIds) val kryoId = RmiUtils.unpackRight(message.packedIds) val objectParameters = message.objectParameters @@ -171,20 +164,14 @@ class RmiManagerConnections internal constructor( ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId)) } - actionDispatch.launch { - // we send the message ALWAYS, because the client needs to know it worked or not - connection.send(response) - } + // we send the message ALWAYS, because the client needs to know it worked or not + connection.send(response) } /** * called on "client" */ - fun onConnectionObjectCreateResponse( - connection: CONNECTION, - message: ConnectionObjectCreateResponse, - actionDispatch: CoroutineScope - ) { + suspend fun onConnectionObjectCreateResponse(connection: CONNECTION, message: ConnectionObjectCreateResponse) { val callbackId = RmiUtils.unpackLeft(message.packedIds) val rmiId = RmiUtils.unpackRight(message.packedIds) @@ -204,25 +191,19 @@ class RmiManagerConnections internal constructor( val proxyObject = getProxyObject(false, connection, rmiId, interfaceClass) // this should be executed on a NEW coroutine! - actionDispatch.launch { - try { - callback(proxyObject) - } catch (e: Exception) { - ListenerManager.cleanStackTrace(e) - logger.error("RMI error connection ${connection.id}", e) - listenerManager.notifyError(connection, e) - } + try { + callback(proxyObject) + } catch (e: Exception) { + ListenerManager.cleanStackTrace(e) + logger.error("RMI error connection ${connection.id}", e) + listenerManager.notifyError(connection, e) } } /** * called on "client" or "server" */ - fun onConnectionObjectDeleteRequest( - connection: CONNECTION, - message: ConnectionObjectDeleteRequest, - actionDispatch: CoroutineScope - ) { + suspend fun onConnectionObjectDeleteRequest(connection: CONNECTION, message: ConnectionObjectDeleteRequest) { val rmiId = message.rmiId // we only delete the impl object if the RMI id is valid! @@ -238,10 +219,8 @@ class RmiManagerConnections internal constructor( removeProxyObject(rmiId) removeImplObject(rmiId) - actionDispatch.launch { - // tell the "other side" to delete the proxy/impl object - connection.send(ConnectionObjectDeleteResponse(rmiId)) - } + // tell the "other side" to delete the proxy/impl object + connection.send(ConnectionObjectDeleteResponse(rmiId)) } diff --git a/src/dorkbox/network/rmi/RmiManagerGlobal.kt b/src/dorkbox/network/rmi/RmiManagerGlobal.kt index ac77f3b4..76537c8a 100644 --- a/src/dorkbox/network/rmi/RmiManagerGlobal.kt +++ b/src/dorkbox/network/rmi/RmiManagerGlobal.kt @@ -16,18 +16,13 @@ package dorkbox.network.rmi import dorkbox.network.connection.Connection -import dorkbox.network.connection.EndPoint -import dorkbox.network.connection.ListenerManager import dorkbox.network.rmi.messages.* import dorkbox.network.serialization.Serialization -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch import mu.KLogger import java.lang.reflect.Proxy import java.util.* -internal class RmiManagerGlobal(private val logger: KLogger, - private val listenerManager: ListenerManager) : RmiObjectCache(logger) { +internal class RmiManagerGlobal(logger: KLogger) : RmiObjectCache(logger) { companion object { /** @@ -93,32 +88,32 @@ internal class RmiManagerGlobal(private val logger: KLog * Manages ALL OF THE RMI SCOPES */ @Suppress("DuplicatedCode") - fun manage( - endPoint: EndPoint, + suspend fun manage( serialization: Serialization, connection: CONNECTION, message: Any, rmiConnectionSupport: RmiManagerConnections, - actionDispatch: CoroutineScope + responseManager: ResponseManager, + logger: KLogger ) { when (message) { is ConnectionObjectCreateRequest -> { /** * called on "server" */ - rmiConnectionSupport.onConnectionObjectCreateRequest(serialization, connection, message, actionDispatch) + rmiConnectionSupport.onConnectionObjectCreateRequest(serialization, connection, message) } is ConnectionObjectCreateResponse -> { /** * called on "client" */ - rmiConnectionSupport.onConnectionObjectCreateResponse(connection, message, actionDispatch) + rmiConnectionSupport.onConnectionObjectCreateResponse(connection, message) } is ConnectionObjectDeleteRequest -> { /** * called on "client" or "server" */ - rmiConnectionSupport.onConnectionObjectDeleteRequest(connection, message, actionDispatch) + rmiConnectionSupport.onConnectionObjectDeleteRequest(connection, message) } is ConnectionObjectDeleteResponse -> { /** @@ -159,9 +154,7 @@ internal class RmiManagerGlobal(private val logger: KLog logger ) - actionDispatch.launch { - connection.send(rmiMessage) - } + connection.send(rmiMessage) } return } @@ -191,49 +184,47 @@ internal class RmiManagerGlobal(private val logger: KLog if (isCoroutine) { // https://stackoverflow.com/questions/47654537/how-to-run-suspend-method-via-reflection // https://discuss.kotlinlang.org/t/calling-coroutines-suspend-functions-via-reflection/4672 - actionDispatch.launch { - var suspendResult = kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn { cont -> - // if we are a coroutine, we have to replace the LAST arg with the coroutine object - // we KNOW this is OK, because a continuation arg will always be there! - args!![args.size - 1] = cont + var suspendResult = kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn { cont -> + // if we are a coroutine, we have to replace the LAST arg with the coroutine object + // we KNOW this is OK, because a continuation arg will always be there! + args!![args.size - 1] = cont - var insideResult: Any? - try { - // args!! is safe to do here (even though it doesn't make sense) - insideResult = cachedMethod.invoke(connection, implObject, args) - } catch (ex: Exception) { - insideResult = ex.cause - // added to prevent a stack overflow when references is false, (because 'cause' == "this"). - // See: - // https://groups.google.com/forum/?fromgroups=#!topic/kryo-users/6PDs71M1e9Y - if (insideResult == null) { - insideResult = ex - } - else { - insideResult.initCause(null) - } + var insideResult: Any? + try { + // args!! is safe to do here (even though it doesn't make sense) + insideResult = cachedMethod.invoke(connection, implObject, args) + } catch (ex: Exception) { + insideResult = ex.cause + // added to prevent a stack overflow when references is false, (because 'cause' == "this"). + // See: + // https://groups.google.com/forum/?fromgroups=#!topic/kryo-users/6PDs71M1e9Y + if (insideResult == null) { + insideResult = ex } - insideResult + else { + insideResult.initCause(null) + } + } + insideResult + } + + + if (suspendResult === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) { + // we were suspending, and the stack will resume when possible, then it will call the response below + } + else { + if (suspendResult === Unit) { + // kotlin suspend returns, that DO NOT have a return value, REALLY return kotlin.Unit. This means there is no + // return value! + suspendResult = null + } else if (suspendResult is Exception) { + RmiUtils.cleanStackTraceForImpl(suspendResult, true) + logger.error("Connection ${connection.id}", suspendResult) } - - if (suspendResult === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) { - // we were suspending, and the stack will resume when possible, then it will call the response below - } - else { - if (suspendResult === Unit) { - // kotlin suspend returns, that DO NOT have a return value, REALLY return kotlin.Unit. This means there is no - // return value! - suspendResult = null - } else if (suspendResult is Exception) { - RmiUtils.cleanStackTraceForImpl(suspendResult, true) - logger.error("Connection ${connection.id}", suspendResult) - } - - if (sendResponse) { - val rmiMessage = returnRmiMessage(message, suspendResult, logger) - connection.send(rmiMessage) - } + if (sendResponse) { + val rmiMessage = returnRmiMessage(message, suspendResult, logger) + connection.send(rmiMessage) } } } @@ -261,20 +252,16 @@ internal class RmiManagerGlobal(private val logger: KLog if (sendResponse) { val rmiMessage = returnRmiMessage(message, result, logger) - actionDispatch.launch { - connection.send(rmiMessage) - } + connection.send(rmiMessage) } } } is MethodResponse -> { // notify the pending proxy requests that we have a response! - actionDispatch.launch { - val rmiId = RmiUtils.unpackUnsignedRight(message.packedId) - val result = message.result + val rmiId = RmiUtils.unpackUnsignedRight(message.packedId) + val result = message.result - endPoint.responseManager.notifyWaiter(rmiId, result) - } + responseManager.notifyWaiter(rmiId, result, logger) } } } diff --git a/src/dorkbox/network/rmi/RmiUtils.kt b/src/dorkbox/network/rmi/RmiUtils.kt index d50ec6e8..3c1c62a4 100644 --- a/src/dorkbox/network/rmi/RmiUtils.kt +++ b/src/dorkbox/network/rmi/RmiUtils.kt @@ -452,6 +452,11 @@ object RmiUtils { return packedInt.toUShort().toInt() } + @Suppress("EXPERIMENTAL_API_USAGE") + fun unpackUnsignedRight(packedLong: Long): Int { + return packedLong.toUShort().toInt() + } + fun makeFancyMethodName(method: CachedMethod): String { return makeFancyMethodName(method.method) } diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index 4bfa8d3c..c55ce733 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -24,6 +24,7 @@ import dorkbox.network.Server import dorkbox.network.connection.Connection import dorkbox.network.handshake.HandshakeMessage import dorkbox.network.ping.Ping +import dorkbox.network.ping.PingSerializer import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.messages.* @@ -135,6 +136,8 @@ open class Serialization(private val references: Boolean private val rmiClientSerializer = RmiClientSerializer() private val rmiServerSerializer = RmiServerSerializer() + private val pingSerializer = PingSerializer() + /** * There is additional overhead to using RMI. * @@ -335,7 +338,7 @@ open class Serialization(private val references: Boolean kryo.register(MethodRequest::class.java, methodRequestSerializer) kryo.register(MethodResponse::class.java, methodResponseSerializer) - kryo.register(Ping::class.java) + kryo.register(Ping::class.java, pingSerializer) @Suppress("UNCHECKED_CAST") kryo.register(InvocationHandler::class.java as Class, rmiClientSerializer) @@ -380,7 +383,7 @@ open class Serialization(private val references: Boolean kryo.register(MethodRequest::class.java, methodRequestSerializer) kryo.register(MethodResponse::class.java, methodResponseSerializer) - kryo.register(Ping::class.java) + kryo.register(Ping::class.java, pingSerializer) @Suppress("UNCHECKED_CAST") kryo.register(InvocationHandler::class.java as Class, rmiClientSerializer) diff --git a/test/dorkboxTest/network/PingTest.kt b/test/dorkboxTest/network/PingTest.kt index eb5c1e64..196f9761 100644 --- a/test/dorkboxTest/network/PingTest.kt +++ b/test/dorkboxTest/network/PingTest.kt @@ -1,5 +1,6 @@ package dorkboxTest.network +import ch.qos.logback.classic.Level import dorkbox.network.Client import dorkbox.network.Server import dorkbox.network.connection.Connection @@ -12,6 +13,8 @@ import org.junit.Test class PingTest : BaseTest() { @Test fun RmiPing() { + setLogLevel(Level.TRACE) + val clientSuccess = atomic(false) run { @@ -29,14 +32,20 @@ class PingTest : BaseTest() { addEndPoint(client) client.onConnect { - ping { - clientSuccess.value = true + repeat(100) { + ping { + println(it) - logger.error("out-bound: $outbound") - logger.error("in-bound: $inbound") - logger.error("round-trip: $roundtrip") + if (it == 99) { + clientSuccess.value = true - stopEndPoints() + logger.error("out-bound: $outbound") + logger.error("in-bound: $inbound") + logger.error("round-trip: $roundtrip") + + stopEndPoints() + } + } } }