From 632603c8c7308fbc4325a5c78fec761f6066690a Mon Sep 17 00:00:00 2001 From: Robinson Date: Wed, 7 Jul 2021 16:19:00 +0200 Subject: [PATCH] WIP RMI and message performance/ping testing --- src/dorkbox/network/Client.kt | 2 +- src/dorkbox/network/connection/Connection.kt | 13 +- src/dorkbox/network/connection/EndPoint.kt | 44 ++-- .../network/connection/ListenerManager.kt | 31 --- src/dorkbox/network/ping/Ping.kt | 83 +++++-- .../network/ping/PingCanceledException.kt | 22 -- src/dorkbox/network/ping/PingFuture.kt | 118 ---------- src/dorkbox/network/ping/PingListener.kt | 37 --- src/dorkbox/network/ping/PingManager.kt | 100 ++++----- src/dorkbox/network/ping/PingMessage.kt | 24 -- src/dorkbox/network/ping/PingTuple.kt | 20 -- src/dorkbox/network/rmi/ResponseManager.kt | 151 +++++++------ src/dorkbox/network/rmi/ResponseWaiter.kt | 2 +- src/dorkbox/network/rmi/RmiClient.kt | 27 +-- src/dorkbox/network/rmi/RmiManagerGlobal.kt | 5 +- .../network/serialization/Serialization.kt | 8 +- test/dorkboxTest/network/PingPongTest.kt | 2 +- test/dorkboxTest/network/PingTest.kt | 211 +++++++++++++----- 18 files changed, 384 insertions(+), 516 deletions(-) delete mode 100644 src/dorkbox/network/ping/PingCanceledException.kt delete mode 100644 src/dorkbox/network/ping/PingFuture.kt delete mode 100644 src/dorkbox/network/ping/PingListener.kt delete mode 100644 src/dorkbox/network/ping/PingMessage.kt delete mode 100644 src/dorkbox/network/ping/PingTuple.kt diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index e03ffdee..01c43a31 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -633,7 +633,7 @@ open class Client(config: Configuration = Configuration val c = connection0 if (c != null) { - return pingManager.ping(c, function) + return pingManager.ping(c, actionDispatch, responseManager, function) } else { logger.error("No connection!", ClientException("Cannot send a ping when there is no connection!")) } diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 4da8af21..4f8a8880 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -22,7 +22,6 @@ import dorkbox.network.aeron.UdpMediaDriverPairedConnection import dorkbox.network.handshake.ConnectionCounts import dorkbox.network.handshake.RandomIdAllocator import dorkbox.network.ping.Ping -import dorkbox.network.ping.PingMessage import dorkbox.network.rmi.RmiSupportConnection import io.aeron.FragmentAssembler import io.aeron.Publication @@ -103,14 +102,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) { private val connectionCheckIntervalInMS = connectionParameters.endPoint.config.connectionCheckIntervalInMS private val connectionExpirationTimoutInMS = connectionParameters.endPoint.config.connectionExpirationTimoutInMS - /** - // * Returns the last calculated TCP return trip time, or -1 if or the [PingMessage] response has not yet been received. - // */ -// val lastRoundTripTime: Int -// get() { -// val pingFuture2 = pingFuture -// return pingFuture2?.response ?: -1 -// } // while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error. private val remoteKeyChanged = connectionParameters.publicKeyValidation == PublicKeyValidationState.TAMPERED @@ -250,12 +241,10 @@ open class Connection(connectionParameters: ConnectionParams<*>) { /** * Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection. * - * Only 1 in-flight ping can be performed at a time. Calling ping() again, before the previous ping returns will do nothing. - * * @return true if the message was successfully sent by aeron */ suspend fun ping(function: suspend Ping.() -> Unit): Boolean { - return endPoint.pingManager.ping(this, function) + return endPoint.pingManager.ping(this, endPoint.actionDispatch, endPoint.responseManager, function) } /** diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 9dd5fc2e..03bfbcd8 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -28,7 +28,7 @@ import dorkbox.network.handshake.HandshakeMessage import dorkbox.network.ipFilter.IpFilterRule import dorkbox.network.ping.Ping import dorkbox.network.ping.PingManager -import dorkbox.network.ping.PingMessage +import dorkbox.network.rmi.ResponseManager import dorkbox.network.rmi.RmiManagerConnections import dorkbox.network.rmi.RmiManagerGlobal import dorkbox.network.rmi.messages.MethodResponse @@ -80,8 +80,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A internal val listenerManager = ListenerManager(logger) internal val connections = ConnectionManager() - internal val pingManager = PingManager(logger, actionDispatch) - internal val aeronDriver: AeronDriver /** @@ -114,9 +112,12 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A */ val storage: SettingsStore - internal val rmiGlobalSupport = RmiManagerGlobal(logger, actionDispatch) + internal val responseManager = ResponseManager(logger, actionDispatch) + internal val rmiGlobalSupport = RmiManagerGlobal(logger, listenerManager) internal val rmiConnectionSupport: RmiManagerConnections + internal val pingManager = PingManager() + init { require(!config.previouslyUsed) { "${type.simpleName} configuration cannot be reused!" } config.validate() @@ -148,13 +149,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A if (type.javaClass == Server::class.java) { // server cannot "get" global RMI objects, only the client can @Suppress("UNCHECKED_CAST") - rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, config.serialization as Serialization) + rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization) { _, _, _ -> throw IllegalAccessException("Global RMI access is only possible from a Client connection!") } } else { @Suppress("UNCHECKED_CAST") - rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, config.serialization as Serialization) + rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization) { connection, objectId, interfaceClass -> return@RmiManagerConnections rmiGlobalSupport.getGlobalRemoteObject(connection, objectId, interfaceClass) } @@ -309,17 +310,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A } } - /** - * Adds a function that will be called when a ping message is received - * - * @param function called when the ping returns (ie: update time/latency counters/metrics/etc) - */ - fun onPing(function: suspend CONNECTION.(Ping) -> Unit) { - actionDispatch.launch { - listenerManager.onPing(function) - } - } - /** * NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine! * CANNOT be called in action dispatch. ALWAYS ON SAME THREAD @@ -441,11 +431,10 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A when (message) { - is PingMessage -> { - // the ping listener + is Ping -> { actionDispatch.launch { try { - pingManager.manage(this@EndPoint, connection, message, logger) + pingManager.manage(connection, responseManager, message) } catch (e: Exception) { logger.error("Error processing PING message", e) listenerManager.notifyError(connection, e) @@ -457,16 +446,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A // this is worked around by having RMI always return (unless async), even with a null value, so the CALLING side of RMI will always // go in "lock step" is RmiMessage -> { - actionDispatch.launch { - // if we are an RMI message/registration, we have very specific, defined behavior. - // We do not use the "normal" listener callback pattern because this require special functionality - try { - rmiGlobalSupport.manage(this@EndPoint, connection, message, logger) - } catch (e: Exception) { - logger.error("Error processing RMI message", e) - listenerManager.notifyError(connection, e) - } - } + // if we are an RMI message/registration, we have very specific, defined behavior. + // We do not use the "normal" listener callback pattern because this require special functionality + rmiGlobalSupport.manage(this@EndPoint, serialization, connection, message, rmiConnectionSupport, actionDispatch) } is Any -> { actionDispatch.launch { @@ -680,7 +662,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A // Connections are closed first, because we want to make sure that no RMI messages can be received // when we close the RMI support objects (in which case, weird - but harmless - errors show up) // this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via and RMI method) - rmiGlobalSupport.close() + responseManager.close() } // the storage is closed via this as well. diff --git a/src/dorkbox/network/connection/ListenerManager.kt b/src/dorkbox/network/connection/ListenerManager.kt index 313d13e3..ed5aab4b 100644 --- a/src/dorkbox/network/connection/ListenerManager.kt +++ b/src/dorkbox/network/connection/ListenerManager.kt @@ -16,7 +16,6 @@ package dorkbox.network.connection import dorkbox.network.ipFilter.IpFilterRule -import dorkbox.network.ping.Ping import dorkbox.os.OS import dorkbox.util.classes.ClassHelper import dorkbox.util.classes.ClassHierarchy @@ -155,9 +154,6 @@ internal class ListenerManager(private val logger: KLogg private val onMessageMap = atomic(IdentityMap, Array Unit>>(32, LOAD_FACTOR)) private val onMessageMutex = Mutex() - private val onPingList = atomic(Array Unit)>(0) { { } }) - private val onPingMutex = Mutex() - // used to keep a cache of class hierarchy for distributing messages private val classHierarchyCache = ClassHierarchy(LOAD_FACTOR) @@ -307,18 +303,6 @@ internal class ListenerManager(private val logger: KLogg } } - /** - * Adds a function that will be called when a client/server "connects" with each other - * - * For a server, this function will be called for ALL clients. - */ - suspend fun onPing(function: suspend CONNECTION.(Ping) -> Unit) { - onPingMutex.withLock { - // we have to follow the single-writer principle! - onPingList.lazySet(add(function, onPingList.value)) - } - } - /** * Invoked just after a connection is created, but before it is connected. * @@ -448,19 +432,4 @@ internal class ListenerManager(private val logger: KLogg return hasListeners } - - /** - * Invoked when a connection is 'pinged' by the remote endpoint - */ - suspend fun notifyPing(ping: Ping, connection: CONNECTION) { - onPingList.value.forEach { - try { - it(connection, ping) - } catch (t: Throwable) { - // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace - cleanStackTrace(t) - logger.error("Connection ${connection.id} error", t) - } - } - } } diff --git a/src/dorkbox/network/ping/Ping.kt b/src/dorkbox/network/ping/Ping.kt index b398c230..bf0850cf 100644 --- a/src/dorkbox/network/ping/Ping.kt +++ b/src/dorkbox/network/ping/Ping.kt @@ -15,26 +15,67 @@ */ package dorkbox.network.ping +import dorkbox.network.rmi.RmiUtils + class Ping { -// /** -// * Wait for the ping to return, and returns the ping response time in MS or -1 if it failed. -// */ - val time: Int = 0 -// -// /** -// * Adds a ping listener to this future. The listener is notified when this future is done. If this future is already completed, -// * then the listener is notified immediately. -// */ -// fun add(listener: PingListener) -// -// /** -// * Removes a ping listener from this future. The listener is no longer notified when this future is done. If the listener -// * was not previously associated with this future, this method does nothing and returns silently. -// */ -// fun remove(listener: PingListener) -// -// /** -// * Cancel this Ping. -// */ -// fun cancel() + var packedId = 0 + + // ping/pong times are the LOWER 8 bytes of a long, which gives us 65 seconds. This is the same as the max value timeout (a short) so this is acceptible + + var pingTime = 0L + var pongTime = 0L + + @Transient + var finishedTime = 0L + + /** + * The time it took for the remote connection to return the ping to us. This is only accurate if the clocks are synchronized + */ + val inbound: Long + get() { + return finishedTime - pongTime + } + + /** + * The time it took for us to ping the remote connection. This is only accurate if the clocks are synchronized. + */ + val outbound: Long + get() { + return pongTime - pingTime + } + + + /** + * The round-trip time it took to ping the remote connection + */ + val roundtrip: Long + get() { + return finishedTime - pingTime + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as Ping + + if (packedId != other.packedId) return false + if (pingTime != other.pingTime) return false + if (pongTime != other.pongTime) return false + if (finishedTime != other.finishedTime) return false + + return true + } + + override fun hashCode(): Int { + var result = packedId + result = 31 * result + pingTime.hashCode() + result = 31 * result + pongTime.hashCode() + result = 31 * result + finishedTime.hashCode() + return result + } + + override fun toString(): String { + return "Ping ${RmiUtils.unpackUnsignedRight(packedId)} (pingTime=$pingTime, pongTime=$pongTime, finishedTime=$finishedTime)" + } } diff --git a/src/dorkbox/network/ping/PingCanceledException.kt b/src/dorkbox/network/ping/PingCanceledException.kt deleted file mode 100644 index 3ad424a9..00000000 --- a/src/dorkbox/network/ping/PingCanceledException.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2020 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.ping - -import java.io.IOException - -object PingCanceledException : IOException() { - private const val serialVersionUID = 9045461384091038605L -} diff --git a/src/dorkbox/network/ping/PingFuture.kt b/src/dorkbox/network/ping/PingFuture.kt deleted file mode 100644 index 58a85228..00000000 --- a/src/dorkbox/network/ping/PingFuture.kt +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2020 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -@file:Suppress("UNUSED_PARAMETER") - -package dorkbox.network.ping - -import dorkbox.network.connection.Connection -import java.util.concurrent.atomic.AtomicInteger - -class PingFuture internal constructor() { - /** - * @return the ID of this ping future - */ - // private final Promise> promise; - val id: Int - private val sentTime: Long - // public - // PingFuture(Promise> promise) { - // this.promise = promise; - // this.id = pingCounter.getAndIncrement(); - // this.sentTime = System.currentTimeMillis(); - // - // if (this.id == Integer.MAX_VALUE) { - // pingCounter.set(0); - // } - // }// try { - // PingTuple entry = this.promise.syncUninterruptibly() - // .get(); - // if (entry != null) { - // return entry.responseTime; - // } - // } catch (InterruptedException e) { - // } catch (ExecutionException e) { - // } - /** - * Wait for the ping to return, and returns the ping response time in MS or -1 if it failed. - */ - val response: Int - get() =// try { - // PingTuple entry = this.promise.syncUninterruptibly() - // .get(); - // if (entry != null) { - // return entry.responseTime; - // } - // } catch (InterruptedException e) { - // } catch (ExecutionException e) { - // } - -1 - - /** - * Adds the specified listener to this future. The specified listener is notified when this future is done. If this future is already - * completed, the specified listener is notified immediately. - */ - fun add(listener: PingListener) { - // this.promise.addListener((GenericFutureListener) listener); - } - - /** - * Removes the specified listener from this future. The specified listener is no longer notified when this future is done. If the - * specified listener is not associated with this future, this method does nothing and returns silently. - */ - fun remove(listener: PingListener) { - // this.promise.removeListener((GenericFutureListener) listener); - } - - /** - * Cancel this Ping. - */ - fun cancel() { - // this.promise.tryFailure(new PingCanceledException()); - } - - /** - * This is when the endpoint that ORIGINALLY sent the ping, finally receives a response. - */ - fun setSuccess(connection: C, ping: PingMessage?) { - // if (ping.id == this.id) { - // long longTime = System.currentTimeMillis() - this.sentTime; - // if (longTime < Integer.MAX_VALUE) { - // this.promise.setSuccess(new PingTuple(connection, (int) longTime)); - // } - // else { - // this.promise.setSuccess(new PingTuple(connection, Integer.MAX_VALUE)); - // } - // } - } - - // return this.promise.isSuccess(); - val isSuccess: Boolean - get() =// return this.promise.isSuccess(); - false - - companion object { - private val pingCounter = AtomicInteger(0) - } - - /** - * Protected constructor for when we are completely overriding this class. (Used by the "local" connection for instant pings) - */ - init { - // this(null); - id = -1 - sentTime = -2 - } -} diff --git a/src/dorkbox/network/ping/PingListener.kt b/src/dorkbox/network/ping/PingListener.kt deleted file mode 100644 index 86233619..00000000 --- a/src/dorkbox/network/ping/PingListener.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2020 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.ping - -import dorkbox.network.connection.Connection - -// note that we specifically DO NOT implement equals/hashCode, because we cannot create two separate -// listeners that are somehow equal to each other. -abstract class PingListener // implements GenericFutureListener>> -{ - // @Override - // public - // void operationComplete(Future> future) throws Exception { - // PingTuple pingTuple = future.get(); - // response(pingTuple.connection, pingTuple.responseTime); - // } - /** - * Called when the ping response has been received. - */ - abstract fun response(connection: C, pingResponseTime: Int) - override fun toString(): String { - return "PingListener" - } -} diff --git a/src/dorkbox/network/ping/PingManager.kt b/src/dorkbox/network/ping/PingManager.kt index 9a7d4b66..7dc36882 100644 --- a/src/dorkbox/network/ping/PingManager.kt +++ b/src/dorkbox/network/ping/PingManager.kt @@ -20,73 +20,55 @@ package dorkbox.network.ping import dorkbox.network.connection.Connection -import dorkbox.network.connection.EndPoint -import dorkbox.network.handshake.RandomIdAllocator import dorkbox.network.rmi.ResponseManager +import dorkbox.network.rmi.RmiUtils import kotlinx.coroutines.CoroutineScope -import mu.KLogger /** - * + * How to handle ping messages */ -class PingManager(logger: KLogger, actionDispatch: CoroutineScope) { - /** - * allocates ID's for use when pinging a remote endpoint - */ - internal val pingIdAllocator = RandomIdAllocator(Integer.MIN_VALUE, Integer.MAX_VALUE) +internal class PingManager { + @Suppress("UNCHECKED_CAST") + suspend fun manage(connection: CONNECTION, responseManager: ResponseManager, message: Ping) { + if (message.pongTime == 0L) { + message.pongTime = System.currentTimeMillis() + connection.send(message) + } else { + message.finishedTime = System.currentTimeMillis() - internal val responseManager = ResponseManager(logger, actionDispatch) + val rmiId = RmiUtils.unpackUnsignedRight(message.packedId) + + // process the ping message so that our ping callback does something + + // this will be null if the ping took longer than 30 seconds and was cancelled + val result = responseManager.getWaiterCallback(rmiId) as (suspend Ping.() -> Unit)? + if (result != null) { + result(message) + } + } + } /** - * Updates the ping times for this connection (called when this connection gets a REPLY ping message). + * Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection. + * + * @return true if the message was successfully sent by aeron */ - fun updatePingResponse(ping: PingMessage) { -// -// @Volatile -// private var pingFuture: PingFuture? = null -// -// pingFuture?.setSuccess(this, ping) + internal suspend fun ping(connection: Connection, actionDispatch: CoroutineScope, responseManager: ResponseManager, function: suspend Ping.() -> Unit): Boolean { + val id = responseManager.prepWithCallback(function) + + val ping = Ping() + ping.packedId = RmiUtils.unpackUnsignedRight(id) + ping.pingTime = System.currentTimeMillis() + + // NOTE: the timout MUST NOT be more than the max SHORT value! + + // ALWAYS cancel the ping after 30 seconds + responseManager.cancelRequest(actionDispatch, 30_000L, id) { + // kill the callback, since we are now "cancelled". If there is a race here (and the response comes at the exact same time) + // we don't care since either it will be null or it won't (if it's not null, it will run the callback) + result = null + } + + return connection.send(ping) } - - - suspend fun manage(endPoint: EndPoint, connection: CONNECTION, message: PingMessage, logger: KLogger) { -// if (message.isReply) { -// connection.updatePingResponse(message) -// } else { -// // return the ping from whence it came -// message.isReply = true -// connection.send(message) -// } - } - - suspend fun ping(function1: Connection, function: suspend Ping.() -> Unit): Boolean { -// val ping = PingMessage() -// ping.id = pingIdAllocator.allocate() -// -// -// pingFuture = PingFuture() -// -//// function: suspend (CONNECTION) -> Unit -// // TODO: USE AERON FOR THIS -//// val pingFuture2 = pingFuture -//// if (pingFuture2 != null && !pingFuture2.isSuccess) { -//// pingFuture2.cancel() -//// } -//// val newPromise: Promise> -//// newPromise = if (channelWrapper.udp() != null) { -//// channelWrapper.udp() -//// .newPromise() -//// } else { -//// channelWrapper.tcp() -//// .newPromise() -//// } -// pingFuture = PingFuture() -//// val ping = PingMessage() -//// ping.id = pingFuture!!.id -//// ping0(ping) -//// return pingFuture!! -// TODO() - return false - } - } diff --git a/src/dorkbox/network/ping/PingMessage.kt b/src/dorkbox/network/ping/PingMessage.kt deleted file mode 100644 index af77e5b6..00000000 --- a/src/dorkbox/network/ping/PingMessage.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2020 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.ping - -/** - * Internal message to determine round trip time. - */ -class PingMessage { - var id: Int = 0 - var isReply: Boolean = false -} diff --git a/src/dorkbox/network/ping/PingTuple.kt b/src/dorkbox/network/ping/PingTuple.kt deleted file mode 100644 index 74fadcec..00000000 --- a/src/dorkbox/network/ping/PingTuple.kt +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2020 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.ping - -import dorkbox.network.connection.Connection - -class PingTuple(var connection: C, var responseTime: Int) diff --git a/src/dorkbox/network/rmi/ResponseManager.kt b/src/dorkbox/network/rmi/ResponseManager.kt index 9d40f692..d3686949 100644 --- a/src/dorkbox/network/rmi/ResponseManager.kt +++ b/src/dorkbox/network/rmi/ResponseManager.kt @@ -15,7 +15,6 @@ */ package dorkbox.network.rmi -import dorkbox.network.rmi.messages.MethodResponse import kotlinx.atomicfu.atomic import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel @@ -32,7 +31,7 @@ import kotlin.concurrent.write * * response ID's and the memory they hold will leak if the response never arrives! */ -class ResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) { +internal class ResponseManager(private val logger: KLogger, actionDispatch: CoroutineScope) { companion object { val TIMEOUT_EXCEPTION = Exception() } @@ -77,20 +76,19 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C } } catch (e: ClosedSendChannelException) { // this can happen if we are starting/stopping an endpoint (and thus a response-manager) VERY quickly, and can be ignored - logger.trace("Error during RMI preparation. Usually this is caused by fast a start/stop") + logger.trace("Error during RMI preparation. Usually this is caused by fast a start-then-stop") } } } /** + * Called when we receive the answer for our initial request. If no response data, then the pending rmi data entry is deleted + * * resume any pending remote object method invocations (if they are not async, or not manually waiting) * NOTE: async RMI will never call this (because async doesn't return a response) */ - suspend fun onRmiMessage(message: MethodResponse) { - val rmiId = RmiUtils.unpackUnsignedRight(message.packedId) - val result = message.result - + suspend fun notifyWaiter(rmiId: Int, result: Any?) { logger.trace { "RMI return message: $rmiId" } val previous = pendingLock.write { @@ -108,12 +106,40 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C } } + /** + * Called when we receive the answer for our initial request. If no response data, then the pending rmi data entry is deleted + * + * This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different) + */ + suspend fun getWaiterCallback(rmiId: Int): T? { + logger.trace { "RMI return message: $rmiId" } + + val previous = pendingLock.write { + val previous = pending[rmiId] + pending[rmiId] = null + previous + } + + @Suppress("UNCHECKED_CAST") + if (previous is ResponseWaiter) { + val result = previous.result + + // always return this to the cache! + waiterCache.send(previous) + rmiWaitersInUse.getAndDecrement() + + return result as T + } + + return null + } + /** * gets the ResponseWaiter (id + waiter) and prepares the pending response map * * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. */ - internal suspend fun prep(): ResponseWaiter { + suspend fun prep(): ResponseWaiter { val responseRmi = waiterCache.receive() rmiWaitersInUse.getAndIncrement() logger.trace { "RMI count: ${rmiWaitersInUse.value}" } @@ -129,6 +155,51 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C return responseRmi } + /** + * gets the ResponseWaiter (id + waiter) and prepares the pending response map + * + * We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored. + */ + suspend fun prepWithCallback(function: Any): Int { + val responseRmi = waiterCache.receive() + rmiWaitersInUse.getAndIncrement() + logger.trace { "RMI count: ${rmiWaitersInUse.value}" } + + // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) + responseRmi.prep() + + // assign the callback that will be notified when the return message is received + responseRmi.result = function + + pendingLock.write { + // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually + pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi + } + + return responseRmi.id + } + + + /** + * Cancels the RMI request in the given timeout, the callback is executed inside the read lock + */ + fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, onCancelled: ResponseWaiter.() -> Unit) { + actionDispatch.launch { + delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting + + // check if we have a result or not + pendingLock.read { + val maybeResult = pending[rmiId] + if (maybeResult is ResponseWaiter) { + logger.trace { "RMI timeout ($timeoutMillis) with callback cancel: $rmiId" } + + maybeResult.cancel() + onCancelled(maybeResult) + } + } + } + } + /** * We only wait for a reply if we are SYNC. * @@ -136,7 +207,7 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C * * @return the result (can be null) or timeout exception */ - suspend fun waitForReply(responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? { + suspend fun waitForReply(actionDispatch: CoroutineScope, responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? { val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually logger.trace { @@ -181,6 +252,7 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C } + // deletes the entry in the map val resultOrWaiter = pendingLock.write { val previous = pending[rmiId] pending[rmiId] = null @@ -200,67 +272,6 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C return resultOrWaiter } - - - /////// - /////// - //// Callback waiting/execution. This is part of this class only because we want to use the SAME RMI SYSTEM for other things, - //// namely, being able to have an easier time setting up message responses - /////// - /////// - - /** - * on response, runs the waiting callback - * NOTE: This uses the RMI-ID mechanism to know what the response ID is (the name is left alone) - */ - suspend fun onCallbackMessage(message: MethodResponse) { - val rmiId = RmiUtils.unpackUnsignedRight(message.packedId) - val result = message.result - - logger.trace { "RMI return callback: $rmiId" } - - val previous = pendingLock.write { - val previous = pending[rmiId] - pending[rmiId] = result - previous - } - - // if NULL, since either we don't exist (because we were async), or it was cancelled - if (previous is ResponseWaiter) { - logger.trace { "RMI valid-cancel onCallback: $rmiId" } - - // this means we were NOT timed out! (we cannot be timed out here) - previous.doNotify() - } - } - - /** - * gets the ResponseWaiter (id + waiter) and prepares the pending response map - * - * We ONLY care about the ID to get the correct response info. - * NOTE: This uses the RMI-ID mechanism to know what the response ID is (the name is left alone) - */ - internal suspend fun prepForCallback(callback: (Any) -> Unit): ResponseWaiter { - val responseRmi = waiterCache.receive() - rmiWaitersInUse.getAndIncrement() - logger.trace { "RMI2 count: ${rmiWaitersInUse.value}" } - - - // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) - responseRmi.prep() - - // assign the callback that will be notified when the return message is received - responseRmi.result = callback - - pendingLock.write { - // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually - pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi - } - - return responseRmi - } - - suspend fun close() { // wait for responses, or wait for timeouts! while (rmiWaitersInUse.value > 0) { diff --git a/src/dorkbox/network/rmi/ResponseWaiter.kt b/src/dorkbox/network/rmi/ResponseWaiter.kt index 8ab8d4af..3674bdb3 100644 --- a/src/dorkbox/network/rmi/ResponseWaiter.kt +++ b/src/dorkbox/network/rmi/ResponseWaiter.kt @@ -30,7 +30,7 @@ data class ResponseWaiter(val id: Int) { var channel: Channel = Channel(Channel.RENDEZVOUS) var isCancelled = false - // holds the RMI result or callback. This is ALWAYS accessed from within a lock! + // holds the RMI result or callback. This is ALWAYS accessed from within a lock (so no synchronize/volatile/etc necessary)! var result: Any? = null /** diff --git a/src/dorkbox/network/rmi/RmiClient.kt b/src/dorkbox/network/rmi/RmiClient.kt index d90be2d3..37f46bf7 100644 --- a/src/dorkbox/network/rmi/RmiClient.kt +++ b/src/dorkbox/network/rmi/RmiClient.kt @@ -17,17 +17,15 @@ package dorkbox.network.rmi import dorkbox.network.connection.Connection import dorkbox.network.rmi.messages.MethodRequest -import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.runBlocking import java.lang.reflect.InvocationHandler import java.lang.reflect.Method import java.util.* -import kotlin.coroutines.* - -object MyUnconfined : CoroutineDispatcher() { - override fun isDispatchNeeded(context: CoroutineContext): Boolean = false - override fun dispatch(context: CoroutineContext, block: Runnable) = block.run() // !!! -} +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException /** * Handles network communication when methods are invoked on a proxy. @@ -78,7 +76,7 @@ internal class RmiClient(val isGlobal: Boolean, private var enableEquals = false // if we are ASYNC, then this method immediately returns - private suspend fun sendRequest(invokeMethod: MethodRequest): Any? { + private suspend fun sendRequest(actionDispatch: CoroutineScope, invokeMethod: MethodRequest): Any? { // there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods -- // the "server" side can have out-of-order method invocation. There are 2 ways to solve this // 1) make the "server" side single threaded @@ -96,13 +94,12 @@ internal class RmiClient(val isGlobal: Boolean, invokeMethod.isGlobal = isGlobal - if (isAsync) { - // If we are async, we ignore the response.... - + return if (isAsync) { + // If we are async, we ignore the response (don't invoke the response manager at all).... invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, RemoteObjectStorage.ASYNC_RMI) connection.send(invokeMethod) - return null + null } else { // The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep val rmiWaiter = responseManager.prep() @@ -110,7 +107,7 @@ internal class RmiClient(val isGlobal: Boolean, connection.send(invokeMethod) - return responseManager.waitForReply(rmiWaiter, timeoutMillis) + responseManager.waitForReply(actionDispatch, rmiWaiter, timeoutMillis) } } @@ -220,7 +217,7 @@ internal class RmiClient(val isGlobal: Boolean, val continuation = suspendCoroutineArg as Continuation val suspendFunction: suspend () -> Any? = { - sendRequest(invokeMethod) + sendRequest(connection.endPoint.actionDispatch, invokeMethod) } // function suspension works differently !! @@ -248,7 +245,7 @@ internal class RmiClient(val isGlobal: Boolean, }) } else { val any = runBlocking { - sendRequest(invokeMethod) + sendRequest(connection.endPoint.actionDispatch, invokeMethod) } when (any) { ResponseManager.TIMEOUT_EXCEPTION -> { diff --git a/src/dorkbox/network/rmi/RmiManagerGlobal.kt b/src/dorkbox/network/rmi/RmiManagerGlobal.kt index 5deab637..ac77f3b4 100644 --- a/src/dorkbox/network/rmi/RmiManagerGlobal.kt +++ b/src/dorkbox/network/rmi/RmiManagerGlobal.kt @@ -270,7 +270,10 @@ internal class RmiManagerGlobal(private val logger: KLog is MethodResponse -> { // notify the pending proxy requests that we have a response! actionDispatch.launch { - endPoint.responseManager.onRmiMessage(message) + val rmiId = RmiUtils.unpackUnsignedRight(message.packedId) + val result = message.result + + endPoint.responseManager.notifyWaiter(rmiId, result) } } } diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index 99d75786..4bfa8d3c 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -23,6 +23,7 @@ import com.esotericsoftware.minlog.Log import dorkbox.network.Server import dorkbox.network.connection.Connection import dorkbox.network.handshake.HandshakeMessage +import dorkbox.network.ping.Ping import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.messages.* @@ -334,6 +335,8 @@ open class Serialization(private val references: Boolean kryo.register(MethodRequest::class.java, methodRequestSerializer) kryo.register(MethodResponse::class.java, methodResponseSerializer) + kryo.register(Ping::class.java) + @Suppress("UNCHECKED_CAST") kryo.register(InvocationHandler::class.java as Class, rmiClientSerializer) @@ -377,6 +380,8 @@ open class Serialization(private val references: Boolean kryo.register(MethodRequest::class.java, methodRequestSerializer) kryo.register(MethodResponse::class.java, methodResponseSerializer) + kryo.register(Ping::class.java) + @Suppress("UNCHECKED_CAST") kryo.register(InvocationHandler::class.java as Class, rmiClientSerializer) @@ -420,7 +425,8 @@ open class Serialization(private val references: Boolean // we have to allow CUSTOM classes to register (where the order does not matter), so that if the CLIENT is the RMI-SERVER, it can // specify IMPL classes for RMI. classesToRegister.forEach { registration -> - require(registration is ClassRegistrationForRmi) { "Unable to register a *class* by itself. This is only permitted for RMI. To fix this, remove xx.register(${registration.clazz.name})" } + require(registration is ClassRegistrationForRmi) { "Unable to register a *class* by itself. This is only permitted on the CLIENT for RMI. " + + "To fix this, remove xx.register(${registration.clazz.name})" } } @Suppress("UNCHECKED_CAST") diff --git a/test/dorkboxTest/network/PingPongTest.kt b/test/dorkboxTest/network/PingPongTest.kt index 7f533286..0e2e37ec 100644 --- a/test/dorkboxTest/network/PingPongTest.kt +++ b/test/dorkboxTest/network/PingPongTest.kt @@ -98,7 +98,7 @@ class PingPongTest : BaseTest() { } val counter = AtomicInteger(0) - client.onMessage { message -> + client.onMessage { _ -> if (counter.getAndIncrement() <= tries) { send(data) } else { diff --git a/test/dorkboxTest/network/PingTest.kt b/test/dorkboxTest/network/PingTest.kt index 7cf0486c..eb5c1e64 100644 --- a/test/dorkboxTest/network/PingTest.kt +++ b/test/dorkboxTest/network/PingTest.kt @@ -1,59 +1,168 @@ package dorkboxTest.network +import dorkbox.network.Client +import dorkbox.network.Server +import dorkbox.network.connection.Connection +import dorkbox.network.ping.Ping +import dorkbox.network.rmi.RmiUtils +import kotlinx.atomicfu.atomic +import org.junit.Assert import org.junit.Test class PingTest : BaseTest() { @Test - fun onServerPing() { -// val serverSuccess = atomic(false) -// val clientSuccess = atomic(false) -// -// run { -// val configuration = serverConfig() -// -// val server: Server = Server(configuration) -// addEndPoint(server) -// server.bind() -// -// server.onPing { ping -> -// serverSuccess.value = true -// println("Ping info ${ping.time}") -// close() -// } -// } -// -// run { -// val config = clientConfig() -// -// val client: Client = Client(config) -// addEndPoint(client) -// -// client.onConnect { -// clientSuccess.value = true -// -// it.ping { -// println("received ping back! Val: $time") -// } -// } -// -// client.onDisconnect { -// stopEndPoints() -// } -// -// runBlocking { -// try { -// client.connect(LOOPBACK) -// } catch (e: Exception) { -// stopEndPoints() -// throw e -// } -// } -// } -// -// -// waitForThreads() -// -// Assert.assertTrue(serverSuccess.value) -// Assert.assertTrue(clientSuccess.value) + fun RmiPing() { + val clientSuccess = atomic(false) + + run { + val configuration = serverConfig() + + val server: Server = Server(configuration) + addEndPoint(server) + server.bind() + } + + run { + val config = clientConfig() + + val client: Client = Client(config) + addEndPoint(client) + + client.onConnect { + ping { + clientSuccess.value = true + + logger.error("out-bound: $outbound") + logger.error("in-bound: $inbound") + logger.error("round-trip: $roundtrip") + + stopEndPoints() + } + } + + client.connect(LOOPBACK) + } + + waitForThreads() + + Assert.assertTrue(clientSuccess.value) + } + + @Test + fun MessagePing() { + val serverSuccess = atomic(false) + val clientSuccess = atomic(false) + + run { + val configuration = serverConfig() + configuration.serialization.register(PingMessage::class.java) + + val server: Server = Server(configuration) + addEndPoint(server) + server.bind() + + server.onMessage { ping -> + serverSuccess.value = true + + ping.pongTime = System.currentTimeMillis() + send(ping) + } + } + + run { + val config = clientConfig() + + val client: Client = Client(config) + addEndPoint(client) + + client.onMessage { ping -> + clientSuccess.value = true + + ping.finishedTime = System.currentTimeMillis() + + logger.error("out-bound: ${ping.outbound}") + logger.error("in-bound: ${ping.inbound}") + logger.error("round-trip: ${ping.roundtrip}") + + stopEndPoints() + } + + client.onConnect { + logger.error("Connecting...") + + val ping = PingMessage() + ping.packedId = 1 + ping.pingTime = System.currentTimeMillis() + + send(ping) + } + + client.connect(LOOPBACK) + } + + waitForThreads() + + Assert.assertTrue(serverSuccess.value) + Assert.assertTrue(clientSuccess.value) + } + + class PingMessage { + var packedId = 0 + var pingTime = 0L + var pongTime = 0L + + @Transient + var finishedTime = 0L + + /** + * The time it took for the remote connection to return the ping to us. This is only accurate if the clocks are synchronized + */ + val inbound: Long + get() { + return finishedTime - pongTime + } + + /** + * The time it took for us to ping the remote connection. This is only accurate if the clocks are synchronized. + */ + val outbound: Long + get() { + return pongTime - pingTime + } + + + /** + * The round-trip time it took to ping the remote connection + */ + val roundtrip: Long + get() { + return finishedTime - pingTime + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as Ping + + if (packedId != other.packedId) return false + if (pingTime != other.pingTime) return false + if (pongTime != other.pongTime) return false + if (finishedTime != other.finishedTime) return false + + return true + } + + override fun hashCode(): Int { + var result = packedId + result = 31 * result + pingTime.hashCode() + result = 31 * result + pongTime.hashCode() + result = 31 * result + finishedTime.hashCode() + return result + } + + override fun toString(): String { + return "PingMessage ${RmiUtils.unpackUnsignedRight(packedId)} (pingTime=$pingTime, pongTime=$pongTime, finishedTime=$finishedTime)" + } } }