diff --git a/src/dorkbox/network/rmi/CachedMethod.kt b/src/dorkbox/network/rmi/CachedMethod.kt index e52e74dd..a53aafb3 100644 --- a/src/dorkbox/network/rmi/CachedMethod.kt +++ b/src/dorkbox/network/rmi/CachedMethod.kt @@ -55,6 +55,6 @@ open class CachedMethod(val method: Method, val methodIndex: Int, val methodClas } override fun toString(): String { - return "CachedMethod{" + method.name + ", methodClassId=" + methodClassId + ", methodIndex=" + methodIndex + '}' + return "CachedMethod{name:" + method.name + ", methodClassId=" + methodClassId + ", methodIndex=" + methodIndex + '}' } } diff --git a/src/dorkbox/network/rmi/RemoteObject.kt b/src/dorkbox/network/rmi/RemoteObject.kt index 0f31330f..66027dc1 100644 --- a/src/dorkbox/network/rmi/RemoteObject.kt +++ b/src/dorkbox/network/rmi/RemoteObject.kt @@ -47,12 +47,7 @@ interface RemoteObject { var responseTimeout: Int /** - * @return the ID of response for the last method invocation. - */ - val lastResponseId: Int - - /** - * Sets the behavior when invoking a remote method. Default is false. + * Sets the behavior when invoking a remote method. DEFAULT is false. * * If true, the invoking thread will not wait for a response. The method will return immediately and the return value * should be ignored. @@ -60,50 +55,33 @@ interface RemoteObject { * If false, the invoking thread will wait (if called via suspend, then it will use coroutines) for the remote method to return or * timeout. * - * The return value or any thrown exception can later be retrieved with [RemoteObject.waitForLastResponse] or [RemoteObject.waitForResponse]. - * The responses will be stored until retrieved, so each method call should have a matching retrieve. + * If the return value or exception needs to be retrieved, then DO NOT set async, and change the response timeout */ var async: Boolean /** * Permits calls to [Object.toString] to actually return the `toString()` method on the object. * - * @param enableDetailedToString If false, calls to [Object.toString] will return "" (where `id` is the remote object ID) + * @param enabled If false, calls to [Object.toString] will return "" (where `id` is the remote object ID) * instead of invoking the remote `toString()` method on the object. */ - fun enableToString(enableDetailedToString: Boolean) + fun enableToString(enabled: Boolean) /** - * Permits calls to [RemoteObject.waitForLastResponse] and [RemoteObject.waitForResponse] to actually wait for a response. + * Permits calls to [Object.hashCode] to actually return the `hashCode()` method on the object. * - * You must be in ASYNC mode already for this to work. There will be undefined errors if you do not enable waiting - * BEFORE calling the method you want to wait for - * - * @param enableWaiting if true, you want wait for the method results. If false, undefined errors can happen while waiting + * @param enabled If false, calls to [Object.hashCode] will return "id" (where `id` is the remote object ID) + * instead of invoking the remote `hashCode()` method on the object. */ - fun enableWaitingForResponse(enableWaiting: Boolean) + fun enableHashCode(enabled: Boolean) /** - * Waits for the response to the last method invocation to be received or the response timeout to be reached. + * Permits calls to [Object.equals] to actually return the `equals()` method on the object. * - * You must be in ASYNC mode + enabled waiting for this to work. There will be undefined errors if you do not enable waiting BEFORE - * calling the method you want to wait for - * - * @return the response of the last method invocation + * @param enabled If false, calls to [Object.equals] will compare the "id" (where `id` is the remote object ID) + * instead of invoking the remote `equals()` method on the object. */ - suspend fun waitForLastResponse(): Any? - - /** - * Waits for the specified method invocation response to be received or the response timeout to be reached. - * - * You must be in ASYNC mode + enabled waiting for this to work. There will be undefined errors if you do not enable waiting BEFORE - * calling the method you want to wait for - * - * @param responseId usually this is the response ID obtained via [RemoteObject.lastResponseId] - * - * @return the response of the last method invocation - */ - suspend fun waitForResponse(responseId: Int): Any? + fun enableEquals(enabled: Boolean) /** * Causes this RemoteObject to stop listening to the connection for method invocation response messages. diff --git a/src/dorkbox/network/rmi/RemoteObjectStorage.kt b/src/dorkbox/network/rmi/RemoteObjectStorage.kt index 0d0af0e6..821cd0f0 100644 --- a/src/dorkbox/network/rmi/RemoteObjectStorage.kt +++ b/src/dorkbox/network/rmi/RemoteObjectStorage.kt @@ -216,9 +216,10 @@ class RemoteObjectStorage(val logger: KLogger) { * Registers an object to allow a remote connection access to this object via the specified ID * * @param objectId Must not be <= 0 or > 65535 + * * @return true if successful, false if there was an error */ - fun register(objectId: Int, `object`: Any): Boolean { + fun register(`object`: Any, objectId: Int): Boolean { validate(objectId) objectMap.put(objectId, `object`) @@ -231,12 +232,13 @@ class RemoteObjectStorage(val logger: KLogger) { } /** - * Removes an object. The remote connection will no longer be able to access it. + * Removes an object. The remote connection will no longer be able to access it. This object may, or may not exist */ - fun remove(objectId: Int): T { + fun remove(objectId: Int): T? { validate(objectId) - val rmiObject = objectMap.remove(objectId) as T + @Suppress("UNCHECKED_CAST") + val rmiObject = objectMap.remove(objectId) as T? returnId(objectId) logger.trace { @@ -264,9 +266,11 @@ class RemoteObjectStorage(val logger: KLogger) { } /** + * This object may, or may not exist + * * @return the object registered with the specified ID. */ - operator fun get(objectId: Int): Any { + operator fun get(objectId: Int): Any? { validate(objectId) return objectMap[objectId] diff --git a/src/dorkbox/network/rmi/RmiClient.kt b/src/dorkbox/network/rmi/RmiClient.kt index b0db7ab4..00cad84c 100644 --- a/src/dorkbox/network/rmi/RmiClient.kt +++ b/src/dorkbox/network/rmi/RmiClient.kt @@ -16,66 +16,69 @@ package dorkbox.network.rmi import dorkbox.network.connection.Connection -import dorkbox.network.other.SuspendWaiter -import dorkbox.network.other.invokeSuspendFunction +import dorkbox.network.other.SuspendFunctionAccess import dorkbox.network.rmi.messages.MethodRequest import kotlinx.coroutines.runBlocking import java.lang.reflect.InvocationHandler +import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method import java.util.* import kotlin.coroutines.Continuation /** - * Handles network communication when methods are invoked on a proxy. For NON-BLOCKING performance, the interface - * must have the 'suspend' keyword added. If it is not present, then all method invocation will be BLOCKING. + * Handles network communication when methods are invoked on a proxy. * + * For NON-BLOCKING performance, the RMI interface must have the 'suspend' keyword added. If this keyword is not + * present, then all method invocation will be BLOCKING. * - * - * @param connection this is really the network client -- there is ONLY ever 1 connection - * @param rmiSupport is used to provide RMI support + * @param isGlobal true if this is a global object, or false if it is connection specific * @param rmiObjectId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID + * @param connection this is really the network client -- there is ONLY ever 1 connection + * @param proxyString this is the name assigned to the proxy [toString] method + * @param rmiSupportCache is used to provide RMI support * @param cachedMethods this is the methods available for the specified class */ -class RmiClient(val isGlobal: Boolean, - val rmiObjectId: Int, - private val connection: Connection, - private val proxyString: String, - private val rmiSupportCache: RmiSupportCache, - private val cachedMethods: Array) : InvocationHandler { +internal class RmiClient(val isGlobal: Boolean, + val rmiObjectId: Int, + private val connection: Connection, + private val proxyString: String, + private val rmiSupportCache: RmiSupportCache, + private val cachedMethods: Array) : InvocationHandler { companion object { private val methods = RmiUtils.getMethods(RemoteObject::class.java) private val closeMethod = methods.find { it.name == "close" } + private val toStringMethod = methods.find { it.name == "toString" } + private val hashCodeMethod = methods.find { it.name == "hashCode" } + private val equalsMethod = methods.find { it.name == "equals" } + + private val enableToStringMethod = methods.find { it.name == "enableToString" } + private val enableHashCodeMethod = methods.find { it.name == "enableHashCode" } + private val enableEqualsMethod = methods.find { it.name == "enableEquals" } + + private val setResponseTimeoutMethod = methods.find { it.name == "setResponseTimeout" } private val getResponseTimeoutMethod = methods.find { it.name == "getResponseTimeout" } private val setAsyncMethod = methods.find { it.name == "setAsync" } private val getAsyncMethod = methods.find { it.name == "getAsync" } - private val enableToStringMethod = methods.find { it.name == "enableToString" } - private val enableWaitingForResponseMethod = methods.find { it.name == "enableWaitingForResponse" } - private val waitForLastResponseMethod = methods.find { it.name == "waitForLastResponse" } - private val getLastResponseIdMethod = methods.find { it.name == "getLastResponseId" } - private val waitForResponseMethod = methods.find { it.name == "waitForResponse" } - private val toStringMethod = methods.find { it.name == "toString" } @Suppress("UNCHECKED_CAST") private val EMPTY_ARRAY: Array = Collections.EMPTY_LIST.toTypedArray() as Array } - private val responseWaiter = SuspendWaiter() - private var timeoutMillis: Long = 3000 private var isAsync = false - private var allowWaiting = false + private var enableToString = false + private var enableHashCode = false + private var enableEquals = false - // this is really a a short! - @Volatile - private var previousResponseId: Int = 0 + // if we are ASYNC, then this method immediately returns + private suspend fun sendRequest(method: Method, args: Array): Any? { - private suspend fun invokeSuspend(method: Method, args: Array): Any? { // there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods -- // the "server" side can have out-of-order method invocation. There are 2 ways to solve this // 1) make the "server" side single threaded @@ -87,18 +90,14 @@ class RmiClient(val isGlobal: Boolean, val responseStorage = rmiSupportCache.getResponseStorage() - // If we are async, we ignore the response.... FOR NOW. The response, even if there is NOT one (ie: not void) will always return - // a thing (so we will know when to stop blocking). - val responseId = responseStorage.prep(rmiObjectId, responseWaiter) - - // so we can query for async, if we want to necessary - previousResponseId = responseId - + // If we are async, we ignore the response.... + // The response, even if there is NOT one (ie: not void) will always return a thing (so we will know when to stop blocking). + val rmiWaiter = responseStorage.prep(rmiObjectId) val invokeMethod = MethodRequest() invokeMethod.isGlobal = isGlobal invokeMethod.objectId = rmiObjectId - invokeMethod.responseId = responseId + invokeMethod.responseId = rmiWaiter.id invokeMethod.args = args // which method do we access? We always want to access the IMPLEMENTATION (if available!). we know that this will always succeed @@ -107,21 +106,26 @@ class RmiClient(val isGlobal: Boolean, connection.send(invokeMethod) - // if we are async, then this will immediately return! - return try { - val result = responseStorage.waitForReply(allowWaiting, isAsync, rmiObjectId, responseId, responseWaiter, timeoutMillis) - if (result is Exception) { - throw result - } else { - result + + // if we are async, then this will immediately return + val result = responseStorage.waitForReply(isAsync, rmiObjectId, rmiWaiter, timeoutMillis) + when (result) { + RmiResponseStorage.TIMEOUT_EXCEPTION -> { + throw TimeoutException("Response timed out: ${method.declaringClass.name}.${method.name}") + } + is Exception -> { + throw result + } + else -> { + return result } - } catch (ex: TimeoutException) { - throw TimeoutException("Response timed out: ${method.declaringClass.name}.${method.name}") } } @Suppress("DuplicatedCode") - @Throws(Exception::class) + /** + * @throws Exception + */ override fun invoke(proxy: Any, method: Method, args: Array?): Any? { if (method.declaringClass == RemoteObject::class.java) { // manage all of the RemoteObject proxy methods @@ -130,6 +134,7 @@ class RmiClient(val isGlobal: Boolean, rmiSupportCache.removeProxyObject(rmiObjectId) return null } + setResponseTimeoutMethod -> { timeoutMillis = (args!![0] as Int).toLong() return null @@ -137,6 +142,7 @@ class RmiClient(val isGlobal: Boolean, getResponseTimeoutMethod -> { return timeoutMillis.toInt() } + getAsyncMethod -> { return isAsync } @@ -144,52 +150,39 @@ class RmiClient(val isGlobal: Boolean, isAsync = args!![0] as Boolean return null } + enableToStringMethod -> { enableToString = args!![0] as Boolean return null } - getLastResponseIdMethod -> { - // only ASYNC can wait for responses - check(isAsync) { "This RemoteObject is not currently set to ASYNC mode. Unable to manually get the response ID." } - check(allowWaiting) { "This RemoteObject does not allow waiting for responses. You must enable this BEFORE " + - "calling the method that you want to wait for the respose to" } - - return previousResponseId - } - enableWaitingForResponseMethod -> { - allowWaiting = args!![0] as Boolean + enableHashCodeMethod -> { + enableHashCode = args!![0] as Boolean return null } - waitForLastResponseMethod -> { - // only ASYNC can wait for responses - check(isAsync) { "This RemoteObject is not currently set to ASYNC mode. Unable to manually wait for a response." } - check(allowWaiting) { "This RemoteObject does not allow waiting for responses. You must enable this BEFORE " + - "calling the method that you want to wait for the respose to" } - - val maybeContinuation = args?.lastOrNull() as Continuation<*> - - // this is a suspend method, so we don't need extra checks - return invokeSuspendFunction(maybeContinuation) { - rmiSupportCache.getResponseStorage().waitForReplyManually(rmiObjectId, previousResponseId, responseWaiter) - } + enableEqualsMethod -> { + enableEquals = args!![0] as Boolean + return null } - waitForResponseMethod -> { - // only ASYNC can wait for responses - check(isAsync) { "This RemoteObject is not currently set to ASYNC mode. Unable to manually wait for a response." } - check(allowWaiting) { "This RemoteObject does not allow waiting for responses. You must enable this BEFORE " + - "calling the method that you want to wait for the respose to" } - val maybeContinuation = args?.lastOrNull() as Continuation<*> - - // this is a suspend method, so we don't need extra checks - return invokeSuspendFunction(maybeContinuation) { - rmiSupportCache.getResponseStorage().waitForReplyManually(rmiObjectId, args[0] as Int, responseWaiter) - } - } else -> throw Exception("Invocation handler could not find RemoteObject method for ${method.name}") } - } else if (!enableToString && method == toStringMethod) { - return proxyString + } else { + when (method) { + toStringMethod -> if (!enableToString) return proxyString // otherwise, the RMI round trip logic is done for toString() + hashCodeMethod -> if (!enableHashCode) return rmiObjectId // otherwise, the RMI round trip logic is done for hashCode() + equalsMethod -> { + val other = args!![0] + if (other !is RmiClient) { + return false + } + + if (!enableEquals) { + return rmiObjectId == other.rmiObjectId + } + + // otherwise, the RMI round trip logic is done for equals() + } + } } // if a 'suspend' function is called, then our last argument is a 'Continuation' object @@ -201,11 +194,11 @@ class RmiClient(val isGlobal: Boolean, if (maybeContinuation is Continuation<*>) { val argsWithoutContinuation = args.take(args.size - 1) invokeSuspendFunction(maybeContinuation) { - invokeSuspend(method, argsWithoutContinuation.toTypedArray()) + sendRequest(method, argsWithoutContinuation.toTypedArray()) } } else { runBlocking { - invokeSuspend(method, args ?: EMPTY_ARRAY) + sendRequest(method, args ?: EMPTY_ARRAY) } } @@ -249,11 +242,11 @@ class RmiClient(val isGlobal: Boolean, return if (maybeContinuation is Continuation<*>) { val argsWithoutContinuation = args.take(args.size - 1) invokeSuspendFunction(maybeContinuation) { - invokeSuspend(method, argsWithoutContinuation.toTypedArray()) + sendRequest(method, argsWithoutContinuation.toTypedArray()) } } else { runBlocking { - invokeSuspend(method, args ?: EMPTY_ARRAY) + sendRequest(method, args ?: EMPTY_ARRAY) } } } @@ -283,4 +276,12 @@ class RmiClient(val isGlobal: Boolean, return rmiObjectId == other.rmiObjectId } + + private fun invokeSuspendFunction(continuation: Continuation<*>, suspendFunction: suspend () -> Any?): Any? { + return try { + SuspendFunctionAccess.invokeSuspendFunction(suspendFunction, continuation) + } catch (e: InvocationTargetException) { + throw e.cause!! + } + } } diff --git a/src/dorkbox/network/rmi/RmiResponseStorage.kt b/src/dorkbox/network/rmi/RmiResponseStorage.kt index cadce380..6d3978a8 100644 --- a/src/dorkbox/network/rmi/RmiResponseStorage.kt +++ b/src/dorkbox/network/rmi/RmiResponseStorage.kt @@ -1,10 +1,9 @@ package dorkbox.network.rmi import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue -import dorkbox.network.other.SuspendWaiter import dorkbox.network.rmi.messages.MethodResponse import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.agrona.collections.Hashing @@ -14,31 +13,90 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.read import kotlin.concurrent.write +internal data class RmiWaiter(val id: Int) { + // this is bi-directional waiting. The method names to not reflect this, however there is no possibility of race conditions w.r.t. waiting + // https://stackoverflow.com/questions/55421710/how-to-suspend-kotlin-coroutine-until-notified + // https://kotlinlang.org/docs/reference/coroutines/channels.html + + // "receive' suspends until another coroutine invokes "send" + // and + // "send" suspends until another coroutine invokes "receive". + // + // these are wrapped in a try/catch, because cancel will cause exceptions to be thrown (which we DO NOT want) + @Volatile + var channel: Channel = Channel(0) + + /** + * this will replace the waiter if it was cancelled (waiters are not valid if cancelled) + */ + fun prep() { + if (channel.isClosedForReceive && channel.isClosedForSend) { + println("renew waiter") + channel = Channel(0) + } + } + + suspend fun doNotify() { + println("notified waiter") + try { + channel.send(Unit) + } catch (ignored: Exception) { + } + } + + suspend fun doWait() { + println("waiting waiter") + try { + channel.receive() + } catch (ignored: Exception) { + } + } + + fun cancel() { + println("delay is cancelling suspending coroutine") + try { + channel.cancel() + } catch (ignored: Exception) { + } + } +} + + + /** * Manages the "pending response" from method invocation. * * response ID's and the memory they hold will leak if the response never arrives! */ -class RmiResponseStorage(private val actionDispatch: CoroutineScope) { +internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) { + + companion object { + val TIMEOUT_EXCEPTION = Exception() + } + // Response ID's are for ALL in-flight RMI on the network stack. instead of limited to (originally) 64, we are now limited to 65,535 // these are just looped around in a ring buffer. // These are stored here as int, however these are REALLY shorts and are int-packed when transferring data on the wire - private val rmiResponseIds = MultithreadConcurrentQueue(65535) + // 32,000 IN FLIGHT RMI method invocations is PLENTY + private val maxValuesInCache = Short.MAX_VALUE.toInt() + + private val rmiWaiterCache = MultithreadConcurrentQueue(maxValuesInCache) private val pendingLock = ReentrantReadWriteLock() private val pending = Int2NullableObjectHashMap(32, Hashing.DEFAULT_LOAD_FACTOR, true) init { // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! - val ids = IntArrayList() - for (id in Short.MIN_VALUE..Short.MAX_VALUE) { + val ids = IntArrayList(maxValuesInCache, Integer.MIN_VALUE) + // ZERO is special, and is never added! + for (id in 1..Short.MAX_VALUE) { ids.addInt(id) } ids.shuffle() - // populate the array of randomly assigned ID's. + // populate the array of randomly assigned ID's + waiters. ids.forEach { - rmiResponseIds.offer(it) + rmiWaiterCache.offer(RmiWaiter(it)) } } @@ -51,95 +109,87 @@ class RmiResponseStorage(private val actionDispatch: CoroutineScope) { val pendingId = RmiUtils.packShorts(objectId, responseId) + println("pending result received") + val previous = pendingLock.write { pending.put(pendingId, result) } // if NULL, since either we don't exist, or it was cancelled - if (previous is SuspendWaiter) { - // this means we were NOT timed out! + if (previous is RmiWaiter) { + // this means we were NOT timed out! If we were cancelled, then this does nothing. previous.doNotify() - } - // always return the responseId! It will (hopefully) be a while before this ID is used again - rmiResponseIds.offer(responseId) + // since this was the FIRST one to trigger, return it to the cache. + rmiWaiterCache.offer(previous) + } } - fun prep(rmiObjectId: Int, responseWaiter: SuspendWaiter): Int { - val responseId = rmiResponseIds.poll() + /** + * gets the RmiWaiter (id + waiter) + */ + internal fun prep(rmiObjectId: Int): RmiWaiter { + val responseRmi = rmiWaiterCache.poll() + + // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) + responseRmi.prep() // we pack them together so we can fully use the range of ints, so we can service ALL rmi requests in a single spot - pendingLock.write { pending[RmiUtils.packShorts(rmiObjectId, responseId)] = responseWaiter } + pendingLock.write { pending[RmiUtils.packShorts(rmiObjectId, responseRmi.id)] = responseRmi } - return responseId + return responseRmi } - suspend fun waitForReply(allowWaiting: Boolean, isAsync: Boolean, rmiObjectId: Int, responseId: Int, - responseWaiter: SuspendWaiter, timeoutMillis: Long): Any? { + /** + * @return the result (can be null) or timeout exception + */ + suspend fun waitForReply(isAsync: Boolean, rmiObjectId: Int, rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? { - val pendingId = RmiUtils.packShorts(rmiObjectId, responseId) + val pendingId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id) - var delayJobForTimeout: Job? = null + // NOTE: we ALWAYS send a response from the remote end. + // + // 'async' -> DO NOT WAIT + // 'timeout > 0' -> WAIT + // 'timeout == 0' -> same as async (DO NOT WAIT) + val returnImmediately = isAsync || timeoutMillis <= 0L - if (!(isAsync && allowWaiting) && timeoutMillis > 0L) { - // always launch a "cancel" coroutine, unless we want to wait forever - delayJobForTimeout = actionDispatch.launch { - delay(timeoutMillis) + if (returnImmediately) { + return null + } - val previous = pendingLock.write { - val prev = pending.remove(pendingId) - if (prev is SuspendWaiter) { - pending[pendingId] = TimeoutException("Response timed out.") - } + val responseTimeoutJob = actionDispatch.launch { + delay(timeoutMillis) // this will always wait - prev - } - - // if we are NOT SuspendWaiter, then it means we had a result! - // - // If there are tight timing issues, then we err on the side of "you timed out" - - if (!isAsync) { - // we only cancel waiting because when NON-ASYNC - if (previous is SuspendWaiter) { - previous.cancel() - } - } + // check if we have a result or not + val maybeResult = pendingLock.read { pending[pendingId] } + if (maybeResult is RmiWaiter) { + System.err.println("TIMEOUT $pendingId") +// maybeResult.cancel() } } - return if (isAsync) { - null - } else { - waitForReplyManually(pendingId, responseWaiter, delayJobForTimeout) + // wait for the response. + // + // If the response is ALREADY here, the doWait() returns instantly (with result) + // if no response yet, it will suspend and either + // A) get response + // B) timeout + rmiWaiter.doWait() + + // always cancel the timeout + responseTimeoutJob.cancel() + + val resultOrWaiter = pendingLock.write { pending.remove(pendingId) } + if (resultOrWaiter is RmiWaiter) { + // since this was the FIRST one to trigger, return it to the cache. + rmiWaiterCache.offer(resultOrWaiter) + return TIMEOUT_EXCEPTION } - } - // this is called when we MANUALLY want to wait for a reply as part of async! - // A timeout of 0 means we wait forever - suspend fun waitForReplyManually(rmiObjectId: Int, responseId: Int, responseWaiter: SuspendWaiter): Any? { - val pendingId = RmiUtils.packShorts(rmiObjectId, responseId) - return waitForReplyManually(pendingId, responseWaiter, null) - } - - - // we have to be careful when we resume, because SOMEONE ELSE'S METHOD RESPONSE can resume us (but only from the same object)! - private suspend fun waitForReplyManually(pendingId: Int,responseWaiter: SuspendWaiter, delayJobForTimeout: Job?): Any? { - while(true) { - val checkResult = pendingLock.read { pending[pendingId] } - if (checkResult !is SuspendWaiter) { - // this means we have correct data! (or it was an exception) we can safely remove the data from the map - pendingLock.write { pending.remove(pendingId) } - - delayJobForTimeout?.cancel() - return checkResult - } - - // keep waiting, since we don't have a response yet - responseWaiter.doWait() - } + return resultOrWaiter } fun close() { - rmiResponseIds.clear() + rmiWaiterCache.clear() pendingLock.write { pending.clear() } } } diff --git a/src/dorkbox/network/rmi/RmiSupport.kt b/src/dorkbox/network/rmi/RmiSupport.kt index 507a3f0a..913737b8 100644 --- a/src/dorkbox/network/rmi/RmiSupport.kt +++ b/src/dorkbox/network/rmi/RmiSupport.kt @@ -15,8 +15,8 @@ package dorkbox.network.rmi import dorkbox.network.connection.Connection -import dorkbox.network.connection.Connection_ import dorkbox.network.connection.EndPoint +import dorkbox.network.connection.ListenerManager import dorkbox.network.rmi.messages.* import dorkbox.network.serialization.NetworkSerializationManager import dorkbox.util.classes.ClassHelper @@ -26,10 +26,9 @@ import mu.KLogger import java.lang.reflect.Proxy import java.util.* -class RmiSupport(logger: KLogger, - actionDispatch: CoroutineScope, - internal val serialization: NetworkSerializationManager) : RmiSupportCache(logger, actionDispatch) -{ +internal class RmiSupport(logger: KLogger, + actionDispatch: CoroutineScope, + internal val serialization: NetworkSerializationManager) : RmiSupportCache(logger, actionDispatch) { companion object { /** * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked @@ -127,7 +126,7 @@ class RmiSupport(logger: KLogger, /** * called on "client" */ - private fun onGenericObjectResponse(endPoint: EndPoint, connection: Connection_, logger: KLogger, + private fun onGenericObjectResponse(endPoint: EndPoint<*>, connection: Connection, logger: KLogger, isGlobal: Boolean, rmiId: Int, callback: suspend (Any) -> Unit, rmiSupportCache: RmiSupportCache, serialization: NetworkSerializationManager) { @@ -162,30 +161,90 @@ class RmiSupport(logger: KLogger, // this is used for all connection specific ones as well. private val remoteObjectCreationCallbacks = RemoteObjectStorage(logger) + + + internal fun registerCallback(callback: suspend (Iface) -> Unit): Int { return remoteObjectCreationCallbacks.register(callback) } private fun removeCallback(callbackId: Int): suspend (Any) -> Unit { - return remoteObjectCreationCallbacks.remove(callbackId) + // callback's area always correct, because we track them ourselves. + return remoteObjectCreationCallbacks.remove(callbackId)!! } /** - * Get's the implementation object based on if it is global, or not global + * @return the implementation object based on if it is global, or not global */ - fun getImplObject(isGlobal: Boolean, rmiObjectId: Int, connection: Connection_): Any { - return if (isGlobal) getImplObject(rmiObjectId) else connection.rmiSupport().getImplObject(rmiObjectId) + fun getImplObject(isGlobal: Boolean, rmiObjectId: Int, connection: Connection): T? { + return if (isGlobal) getImplObject(rmiObjectId) else connection.rmiConnectionSupport.getImplObject(rmiObjectId) } + /** + * @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted) + */ + fun saveImplObject(logger: KLogger, `object`: Any): Int { + val rmiId = saveImplObject(`object`) + + if (rmiId != RemoteObjectStorage.INVALID_RMI) { + // this means we could register this object. + + // next, scan this object to see if there are any RMI fields + scanImplForRmiFields(logger, `object`) { + saveImplObject(it) + } + } else { + logger.error("Trying to create an RMI object with the INVALID_RMI id!!") + } + + return rmiId + } + + /** + * @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted) + */ + fun saveImplObject(logger: KLogger, `object`: Any, objectId: Int): Boolean { + val rmiSuccess = saveImplObject(`object`, objectId) + + if (rmiSuccess) { + // this means we could register this object. + + // next, scan this object to see if there are any RMI fields + scanImplForRmiFields(logger, `object`) { + saveImplObject(it) + } + } else { + logger.error("Trying to save an RMI object ${`object`.javaClass} with invalid id $objectId") + } + + return rmiSuccess + } + + /** + * @return the removed object. If null, an error log will be emitted + */ + fun removeImplObject(logger: KLogger, objectId: Int): T? { + val success = removeImplObject(objectId) + if (success == null) { + logger.error("Error trying to remove an RMI impl object id $objectId.") + } + + @Suppress("UNCHECKED_CAST") + return success as T? + } + + + + override fun close() { super.close() remoteObjectCreationCallbacks.close() } /** - * on the "client" to get a global remote object (that exists on the server) + * on the connection+client to get a global remote object (that exists on the server) */ - fun getGlobalRemoteObject(connection: C, endPoint: EndPoint, objectId: Int, interfaceClass: Class): Iface { + fun getGlobalRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class): Iface { // this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)! // so we can just instantly create the proxy object (or get the cached one) @@ -199,35 +258,17 @@ class RmiSupport(logger: KLogger, return proxyObject as Iface } - /** - * on the "client" to create a global remote object (that exists on the server) - */ - suspend fun createGlobalRemoteObject(connection: Connection, interfaceClassId: Int, callback: suspend (Iface) -> Unit) { - val callbackId = registerCallback(callback) - - // There is no rmiID yet, because we haven't created it! - val message = GlobalObjectCreateRequest(RmiUtils.packShorts(interfaceClassId, callbackId)) - - // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we - // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. - - // this means we are creating a NEW object on the server - connection.send(message) - } - - - /** * Manages ALL OF THE RMI stuff! */ @Throws(IllegalArgumentException::class) - suspend fun manage(endPoint: EndPoint, connection: Connection_, message: Any, logger: KLogger) { + suspend fun manage(endPoint: EndPoint<*>, connection: Connection, message: Any, logger: KLogger) { when (message) { is ConnectionObjectCreateRequest -> { /** * called on "server" */ - connection.rmiSupport().onConnectionObjectCreateRequest(endPoint, connection, message, logger) + connection.rmiConnectionSupport.onConnectionObjectCreateRequest(endPoint, connection, message, logger) } is ConnectionObjectCreateResponse -> { /** @@ -265,7 +306,19 @@ class RmiSupport(logger: KLogger, val isGlobal: Boolean = message.isGlobal val cachedMethod = message.cachedMethod - val implObject = getImplObject(isGlobal, objectId, connection) + val implObject = getImplObject(isGlobal, objectId, connection) + + if (implObject == null) { + logger.error("Unable to resolve implementation object for [global=$isGlobal, objectID=$objectId, connection=$connection") + + val invokeMethodResult = MethodResponse() + invokeMethodResult.objectId = objectId + invokeMethodResult.responseId = message.responseId + invokeMethodResult.result = NullPointerException("Remote object for proxy [global=$isGlobal, objectID=$objectId] does not exist.") + + connection.send(invokeMethodResult) + return + } logger.trace { var argString = "" @@ -295,8 +348,6 @@ class RmiSupport(logger: KLogger, // args!! is safe to do here (even though it doesn't make sense) result = cachedMethod.invoke(connection, implObject, message.args!!) } catch (ex: Exception) { - logger.error("Error invoking method: ${cachedMethod.method.declaringClass.name}.${cachedMethod.method.name}", ex) - result = ex.cause // added to prevent a stack overflow when references is false, (because 'cause' == "this"). // See: @@ -306,6 +357,13 @@ class RmiSupport(logger: KLogger, } else { result.initCause(null) } + + // only remove stuff if our logger is NOT on trace (so normal logs will not show extra info, trace will show extra info) + if (!logger.isTraceEnabled) { + ListenerManager.cleanStackTrace(result as Throwable) + } + + logger.error("Error invoking method: ${cachedMethod.method.declaringClass.name}.${cachedMethod.method.name}", result) } val invokeMethodResult = MethodResponse() @@ -325,28 +383,13 @@ class RmiSupport(logger: KLogger, /** * called on "server" */ - private suspend fun onGlobalObjectCreateRequest( - endPoint: EndPoint, connection: Connection_, message: GlobalObjectCreateRequest, logger: KLogger) { - + private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) { val interfaceClassId = RmiUtils.unpackLeft(message.packedIds) val callbackId = RmiUtils.unpackRight(message.packedIds) // We have to lookup the iface, since the proxy object requires it val implObject = endPoint.serialization.createRmiObject(interfaceClassId) - val rmiId = registerImplObject(implObject) - - if (rmiId != RemoteObjectStorage.INVALID_RMI) { - // this means we could register this object. - - // next, scan this object to see if there are any RMI fields - scanImplForRmiFields(logger, implObject) { - registerImplObject(implObject) - } - } else { - logger.error { - "Trying to create an RMI object with the INVALID_RMI id!!" - } - } + val rmiId = saveImplObject(logger, implObject) // we send the message ANYWAYS, because the client needs to know it did NOT succeed! connection.send(GlobalObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId))) diff --git a/src/dorkbox/network/rmi/RmiSupportCache.kt b/src/dorkbox/network/rmi/RmiSupportCache.kt index 0c820f83..689271d7 100644 --- a/src/dorkbox/network/rmi/RmiSupportCache.kt +++ b/src/dorkbox/network/rmi/RmiSupportCache.kt @@ -10,22 +10,27 @@ import mu.KLogger * The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there * would be conflicts in the data structure) */ -open class RmiSupportCache(logger: KLogger, actionDispatch: CoroutineScope) { +internal open class RmiSupportCache(logger: KLogger, actionDispatch: CoroutineScope) { private val responseStorage = RmiResponseStorage(actionDispatch) private val implObjects = RemoteObjectStorage(logger) private val proxyObjects = LockFreeIntMap() - fun registerImplObject(rmiObject: Any): Int { + fun saveImplObject(rmiObject: Any): Int { return implObjects.register(rmiObject) } - fun getImplObject(rmiId: Int): Any { - return implObjects[rmiId] + fun saveImplObject(rmiObject: Any, objectId: Int): Boolean { + return implObjects.register(rmiObject, objectId) } - fun removeImplObject(rmiId: Int) { - implObjects.remove(rmiId) as Any + fun getImplObject(rmiId: Int): T? { + @Suppress("UNCHECKED_CAST") + return implObjects[rmiId] as T? + } + + fun removeImplObject(rmiId: Int): T? { + return implObjects.remove(rmiId) as T? } /** diff --git a/src/dorkbox/network/rmi/RmiSupportConnection.kt b/src/dorkbox/network/rmi/RmiSupportConnection.kt index 3ed134bb..34dbdd3b 100644 --- a/src/dorkbox/network/rmi/RmiSupportConnection.kt +++ b/src/dorkbox/network/rmi/RmiSupportConnection.kt @@ -1,7 +1,6 @@ package dorkbox.network.rmi import dorkbox.network.connection.Connection -import dorkbox.network.connection.Connection_ import dorkbox.network.connection.EndPoint import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse @@ -9,21 +8,21 @@ import dorkbox.network.serialization.NetworkSerializationManager import kotlinx.coroutines.CoroutineScope import mu.KLogger -class RmiSupportConnection(logger: KLogger, - private val rmiGlobalSupport: RmiSupport, - private val serialization: NetworkSerializationManager, - actionDispatch: CoroutineScope) : RmiSupportCache(logger, actionDispatch) { +internal class RmiSupportConnection(logger: KLogger, + val rmiGlobalSupport: RmiSupport, + private val serialization: NetworkSerializationManager, + actionDispatch: CoroutineScope) : RmiSupportCache(logger, actionDispatch) { - /** - * on the "client" to get a connection-specific remote object (that exists on the server) - */ - fun getRemoteObject(connection: Connection, endPoint: EndPoint, objectId: Int, interfaceClass: Class): Iface { - // this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)! + private fun createProxyObject(isGlobalObject: Boolean, + connection: Connection, + endPoint: EndPoint<*>, + objectId: Int, + interfaceClass: Class) : Iface { // so we can just instantly create the proxy object (or get the cached one) var proxyObject = getProxyObject(objectId) if (proxyObject == null) { - proxyObject = RmiSupport.createProxyObject(false, connection, serialization, rmiGlobalSupport, endPoint.type.simpleName, objectId, interfaceClass) + proxyObject = RmiSupport.createProxyObject(isGlobalObject, connection, serialization, rmiGlobalSupport, endPoint.type.simpleName, objectId, interfaceClass) saveProxyObject(objectId, proxyObject) } @@ -31,11 +30,19 @@ class RmiSupportConnection(logger: KLogger, return proxyObject as Iface } + /** + * on the connection+client to get a connection-specific remote object (that exists on the server/client) + */ + fun getRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class): Iface { + // this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)! + return createProxyObject(false, connection, endPoint, objectId, interfaceClass) + } + /** * on the "client" to create a connection-specific remote object (that exists on the server) */ - suspend fun createRemoteObject(connection: C, interfaceClassId: Int, callback: suspend (Iface) -> Unit) { + suspend fun createRemoteObject(connection: Connection, interfaceClassId: Int, callback: suspend (Iface) -> Unit) { val callbackId = rmiGlobalSupport.registerCallback(callback) // There is no rmiID yet, because we haven't created it! @@ -51,22 +58,21 @@ class RmiSupportConnection(logger: KLogger, /** * called on "server" */ - internal suspend fun onConnectionObjectCreateRequest( - endPoint: EndPoint, connection: Connection_, message: ConnectionObjectCreateRequest, logger: KLogger) { + internal suspend fun onConnectionObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: ConnectionObjectCreateRequest, logger: KLogger) { val interfaceClassId = RmiUtils.unpackLeft(message.packedIds) val callbackId = RmiUtils.unpackRight(message.packedIds) // We have to lookup the iface, since the proxy object requires it val implObject = endPoint.serialization.createRmiObject(interfaceClassId) - val rmiId = registerImplObject(implObject) + val rmiId = saveImplObject(implObject) if (rmiId != RemoteObjectStorage.INVALID_RMI) { // this means we could register this object. // next, scan this object to see if there are any RMI fields RmiSupport.scanImplForRmiFields(logger, implObject) { - registerImplObject(implObject) + saveImplObject(it) } } else { logger.error { diff --git a/src/dorkbox/network/rmi/RmiUtils.kt b/src/dorkbox/network/rmi/RmiUtils.kt index a52d21e3..76e08478 100644 --- a/src/dorkbox/network/rmi/RmiUtils.kt +++ b/src/dorkbox/network/rmi/RmiUtils.kt @@ -64,8 +64,7 @@ object RmiUtils { } for (i in argTypes1.indices) { - diff = argTypes1[i].name - .compareTo(argTypes2[i].name) + diff = argTypes1[i].name.compareTo(argTypes2[i].name) if (diff != 0) { return@Comparator diff } @@ -371,7 +370,7 @@ object RmiUtils { try { serializerClass.getConstructor(Class::class.java).newInstance(superClass) } catch (ex3: NoSuchMethodException) { - serializerClass.newInstance() + serializerClass.getDeclaredConstructor().newInstance() } } } diff --git a/src/dorkbox/network/rmi/TimeoutException.java b/src/dorkbox/network/rmi/TimeoutException.kt similarity index 72% rename from src/dorkbox/network/rmi/TimeoutException.java rename to src/dorkbox/network/rmi/TimeoutException.kt index 5c9054f8..90b3eff8 100644 --- a/src/dorkbox/network/rmi/TimeoutException.java +++ b/src/dorkbox/network/rmi/TimeoutException.kt @@ -32,39 +32,25 @@ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package dorkbox.network.rmi; +package dorkbox.network.rmi -import java.io.IOException; +import java.io.IOException /** - * Thrown when a method with a return value is invoked on a remote object and the response is not received with the {@link - * RemoteObject#setResponseTimeout(int) response timeout}. + * Thrown when a method with a return value is invoked on a remote object and the response is not received with the [RemoteObject.responseTimeout]. * - * @author Nathan Sweet - * @see dorkbox.network.connection.Connection#getRemoteObject(int, RemoteObjectCallback) - * @see dorkbox.network.connection.Connection#createRemoteObject(Class, RemoteObjectCallback) + * @author Nathan Sweet + * + * @see dorkbox.network.connection.Connection.getObject + * @see dorkbox.network.connection.Connection.createObject */ -public -class TimeoutException extends IOException { - private static final long serialVersionUID = -3526277240277423682L; +class TimeoutException : IOException { + constructor() : super() {} + constructor(message: String?, cause: Throwable?) : super(message, cause) {} + constructor(message: String?) : super(message) {} + constructor(cause: Throwable?) : super(cause) {} - public - TimeoutException() { - super(); - } - - public - TimeoutException(String message, Throwable cause) { - super(message, cause); - } - - public - TimeoutException(String message) { - super(message); - } - - public - TimeoutException(Throwable cause) { - super(cause); + companion object { + private const val serialVersionUID = -3526277240277423682L } } diff --git a/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt b/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt index b01b6a50..a1a039ba 100644 --- a/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt +++ b/src/dorkbox/network/rmi/messages/GlobalObjectCreateRequest.kt @@ -22,3 +22,4 @@ package dorkbox.network.rmi.messages * @param callbackId (RIGHT) to know which callback to use when the object is created */ data class GlobalObjectCreateRequest(val packedIds: Int) : RmiMessage +//a asd diff --git a/src/dorkbox/network/rmi/messages/MethodRequest.kt b/src/dorkbox/network/rmi/messages/MethodRequest.kt index 926cb0a4..1d05cc3f 100644 --- a/src/dorkbox/network/rmi/messages/MethodRequest.kt +++ b/src/dorkbox/network/rmi/messages/MethodRequest.kt @@ -58,4 +58,9 @@ class MethodRequest : RmiMessage { // these are the arguments for executing the method (they are serialized using the info from the cachedMethod field var args: Array? = null + + + override fun toString(): String { + return "MethodRequest(isGlobal=$isGlobal, objectId=$objectId, responseId=$responseId, cachedMethod=$cachedMethod, args=${args?.contentToString()})" + } } diff --git a/src/dorkbox/network/rmi/messages/MethodResponse.kt b/src/dorkbox/network/rmi/messages/MethodResponse.kt index 653fe0a9..c24265c4 100644 --- a/src/dorkbox/network/rmi/messages/MethodResponse.kt +++ b/src/dorkbox/network/rmi/messages/MethodResponse.kt @@ -49,4 +49,9 @@ class MethodResponse : RmiMessage { // this is the result of the invoked method var result: Any? = null + + + override fun toString(): String { + return "MethodResponse(isGlobal=$isGlobal, objectId=$objectId, responseId=$responseId, result=$result)" + } } diff --git a/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt b/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt index c8066eb4..d86179ff 100644 --- a/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt +++ b/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt @@ -40,6 +40,6 @@ class RmiClientRequestSerializer : Serializer() { kryo as KryoExtra val connection = kryo.connection - return connection.endPoint().rmiSupport.getImplObject(isGlobal, objectId, connection) + return connection.endPoint().rmiGlobalSupport.getImplObject(isGlobal, objectId, connection) } } diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index e023aa33..c8951606 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -25,6 +25,7 @@ import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy import com.esotericsoftware.kryo.util.IdentityMap import dorkbox.network.connection.KryoExtra import dorkbox.network.connection.ping.PingMessage +import dorkbox.network.handshake.Message import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.messages.* @@ -93,7 +94,7 @@ class Serialization(references: Boolean, // TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo // serialization.register(XECPublicKey::class.java, XECPublicKeySerializer()) // serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer()) - serialization.register(dorkbox.network.connection.registration.Registration::class.java) // must use full package name! + serialization.register(Message::class.java) // must use full package name! return serialization }