Cleaned up RMI save/get/create methods. Fixed issues waiting for RMI method invocation responses

This commit is contained in:
nathan 2020-08-12 23:28:37 +02:00
parent 47891cc448
commit ccb1267bab
15 changed files with 387 additions and 303 deletions

View File

@ -55,6 +55,6 @@ open class CachedMethod(val method: Method, val methodIndex: Int, val methodClas
}
override fun toString(): String {
return "CachedMethod{" + method.name + ", methodClassId=" + methodClassId + ", methodIndex=" + methodIndex + '}'
return "CachedMethod{name:" + method.name + ", methodClassId=" + methodClassId + ", methodIndex=" + methodIndex + '}'
}
}

View File

@ -47,12 +47,7 @@ interface RemoteObject {
var responseTimeout: Int
/**
* @return the ID of response for the last method invocation.
*/
val lastResponseId: Int
/**
* Sets the behavior when invoking a remote method. Default is false.
* Sets the behavior when invoking a remote method. DEFAULT is false.
*
* If true, the invoking thread will not wait for a response. The method will return immediately and the return value
* should be ignored.
@ -60,50 +55,33 @@ interface RemoteObject {
* If false, the invoking thread will wait (if called via suspend, then it will use coroutines) for the remote method to return or
* timeout.
*
* The return value or any thrown exception can later be retrieved with [RemoteObject.waitForLastResponse] or [RemoteObject.waitForResponse].
* The responses will be stored until retrieved, so each method call should have a matching retrieve.
* If the return value or exception needs to be retrieved, then DO NOT set async, and change the response timeout
*/
var async: Boolean
/**
* Permits calls to [Object.toString] to actually return the `toString()` method on the object.
*
* @param enableDetailedToString If false, calls to [Object.toString] will return "<proxy #id>" (where `id` is the remote object ID)
* @param enabled If false, calls to [Object.toString] will return "<proxy #id>" (where `id` is the remote object ID)
* instead of invoking the remote `toString()` method on the object.
*/
fun enableToString(enableDetailedToString: Boolean)
fun enableToString(enabled: Boolean)
/**
* Permits calls to [RemoteObject.waitForLastResponse] and [RemoteObject.waitForResponse] to actually wait for a response.
* Permits calls to [Object.hashCode] to actually return the `hashCode()` method on the object.
*
* You must be in ASYNC mode already for this to work. There will be undefined errors if you do not enable waiting
* BEFORE calling the method you want to wait for
*
* @param enableWaiting if true, you want wait for the method results. If false, undefined errors can happen while waiting
* @param enabled If false, calls to [Object.hashCode] will return "id" (where `id` is the remote object ID)
* instead of invoking the remote `hashCode()` method on the object.
*/
fun enableWaitingForResponse(enableWaiting: Boolean)
fun enableHashCode(enabled: Boolean)
/**
* Waits for the response to the last method invocation to be received or the response timeout to be reached.
* Permits calls to [Object.equals] to actually return the `equals()` method on the object.
*
* You must be in ASYNC mode + enabled waiting for this to work. There will be undefined errors if you do not enable waiting BEFORE
* calling the method you want to wait for
*
* @return the response of the last method invocation
* @param enabled If false, calls to [Object.equals] will compare the "id" (where `id` is the remote object ID)
* instead of invoking the remote `equals()` method on the object.
*/
suspend fun waitForLastResponse(): Any?
/**
* Waits for the specified method invocation response to be received or the response timeout to be reached.
*
* You must be in ASYNC mode + enabled waiting for this to work. There will be undefined errors if you do not enable waiting BEFORE
* calling the method you want to wait for
*
* @param responseId usually this is the response ID obtained via [RemoteObject.lastResponseId]
*
* @return the response of the last method invocation
*/
suspend fun waitForResponse(responseId: Int): Any?
fun enableEquals(enabled: Boolean)
/**
* Causes this RemoteObject to stop listening to the connection for method invocation response messages.

View File

@ -216,9 +216,10 @@ class RemoteObjectStorage(val logger: KLogger) {
* Registers an object to allow a remote connection access to this object via the specified ID
*
* @param objectId Must not be <= 0 or > 65535
*
* @return true if successful, false if there was an error
*/
fun register(objectId: Int, `object`: Any): Boolean {
fun register(`object`: Any, objectId: Int): Boolean {
validate(objectId)
objectMap.put(objectId, `object`)
@ -231,12 +232,13 @@ class RemoteObjectStorage(val logger: KLogger) {
}
/**
* Removes an object. The remote connection will no longer be able to access it.
* Removes an object. The remote connection will no longer be able to access it. This object may, or may not exist
*/
fun <T> remove(objectId: Int): T {
fun <T> remove(objectId: Int): T? {
validate(objectId)
val rmiObject = objectMap.remove(objectId) as T
@Suppress("UNCHECKED_CAST")
val rmiObject = objectMap.remove(objectId) as T?
returnId(objectId)
logger.trace {
@ -264,9 +266,11 @@ class RemoteObjectStorage(val logger: KLogger) {
}
/**
* This object may, or may not exist
*
* @return the object registered with the specified ID.
*/
operator fun get(objectId: Int): Any {
operator fun get(objectId: Int): Any? {
validate(objectId)
return objectMap[objectId]

View File

@ -16,66 +16,69 @@
package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.other.SuspendWaiter
import dorkbox.network.other.invokeSuspendFunction
import dorkbox.network.other.SuspendFunctionAccess
import dorkbox.network.rmi.messages.MethodRequest
import kotlinx.coroutines.runBlocking
import java.lang.reflect.InvocationHandler
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.*
import kotlin.coroutines.Continuation
/**
* Handles network communication when methods are invoked on a proxy. For NON-BLOCKING performance, the interface
* must have the 'suspend' keyword added. If it is not present, then all method invocation will be BLOCKING.
* Handles network communication when methods are invoked on a proxy.
*
* For NON-BLOCKING performance, the RMI interface must have the 'suspend' keyword added. If this keyword is not
* present, then all method invocation will be BLOCKING.
*
*
* @param connection this is really the network client -- there is ONLY ever 1 connection
* @param rmiSupport is used to provide RMI support
* @param isGlobal true if this is a global object, or false if it is connection specific
* @param rmiObjectId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param connection this is really the network client -- there is ONLY ever 1 connection
* @param proxyString this is the name assigned to the proxy [toString] method
* @param rmiSupportCache is used to provide RMI support
* @param cachedMethods this is the methods available for the specified class
*/
class RmiClient(val isGlobal: Boolean,
val rmiObjectId: Int,
private val connection: Connection,
private val proxyString: String,
private val rmiSupportCache: RmiSupportCache,
private val cachedMethods: Array<CachedMethod>) : InvocationHandler {
internal class RmiClient(val isGlobal: Boolean,
val rmiObjectId: Int,
private val connection: Connection,
private val proxyString: String,
private val rmiSupportCache: RmiSupportCache,
private val cachedMethods: Array<CachedMethod>) : InvocationHandler {
companion object {
private val methods = RmiUtils.getMethods(RemoteObject::class.java)
private val closeMethod = methods.find { it.name == "close" }
private val toStringMethod = methods.find { it.name == "toString" }
private val hashCodeMethod = methods.find { it.name == "hashCode" }
private val equalsMethod = methods.find { it.name == "equals" }
private val enableToStringMethod = methods.find { it.name == "enableToString" }
private val enableHashCodeMethod = methods.find { it.name == "enableHashCode" }
private val enableEqualsMethod = methods.find { it.name == "enableEquals" }
private val setResponseTimeoutMethod = methods.find { it.name == "setResponseTimeout" }
private val getResponseTimeoutMethod = methods.find { it.name == "getResponseTimeout" }
private val setAsyncMethod = methods.find { it.name == "setAsync" }
private val getAsyncMethod = methods.find { it.name == "getAsync" }
private val enableToStringMethod = methods.find { it.name == "enableToString" }
private val enableWaitingForResponseMethod = methods.find { it.name == "enableWaitingForResponse" }
private val waitForLastResponseMethod = methods.find { it.name == "waitForLastResponse" }
private val getLastResponseIdMethod = methods.find { it.name == "getLastResponseId" }
private val waitForResponseMethod = methods.find { it.name == "waitForResponse" }
private val toStringMethod = methods.find { it.name == "toString" }
@Suppress("UNCHECKED_CAST")
private val EMPTY_ARRAY: Array<Any> = Collections.EMPTY_LIST.toTypedArray() as Array<Any>
}
private val responseWaiter = SuspendWaiter()
private var timeoutMillis: Long = 3000
private var isAsync = false
private var allowWaiting = false
private var enableToString = false
private var enableHashCode = false
private var enableEquals = false
// this is really a a short!
@Volatile
private var previousResponseId: Int = 0
// if we are ASYNC, then this method immediately returns
private suspend fun sendRequest(method: Method, args: Array<Any>): Any? {
private suspend fun invokeSuspend(method: Method, args: Array<Any>): Any? {
// there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods --
// the "server" side can have out-of-order method invocation. There are 2 ways to solve this
// 1) make the "server" side single threaded
@ -87,18 +90,14 @@ class RmiClient(val isGlobal: Boolean,
val responseStorage = rmiSupportCache.getResponseStorage()
// If we are async, we ignore the response.... FOR NOW. The response, even if there is NOT one (ie: not void) will always return
// a thing (so we will know when to stop blocking).
val responseId = responseStorage.prep(rmiObjectId, responseWaiter)
// so we can query for async, if we want to necessary
previousResponseId = responseId
// If we are async, we ignore the response....
// The response, even if there is NOT one (ie: not void) will always return a thing (so we will know when to stop blocking).
val rmiWaiter = responseStorage.prep(rmiObjectId)
val invokeMethod = MethodRequest()
invokeMethod.isGlobal = isGlobal
invokeMethod.objectId = rmiObjectId
invokeMethod.responseId = responseId
invokeMethod.responseId = rmiWaiter.id
invokeMethod.args = args
// which method do we access? We always want to access the IMPLEMENTATION (if available!). we know that this will always succeed
@ -107,21 +106,26 @@ class RmiClient(val isGlobal: Boolean,
connection.send(invokeMethod)
// if we are async, then this will immediately return!
return try {
val result = responseStorage.waitForReply(allowWaiting, isAsync, rmiObjectId, responseId, responseWaiter, timeoutMillis)
if (result is Exception) {
throw result
} else {
result
// if we are async, then this will immediately return
val result = responseStorage.waitForReply(isAsync, rmiObjectId, rmiWaiter, timeoutMillis)
when (result) {
RmiResponseStorage.TIMEOUT_EXCEPTION -> {
throw TimeoutException("Response timed out: ${method.declaringClass.name}.${method.name}")
}
is Exception -> {
throw result
}
else -> {
return result
}
} catch (ex: TimeoutException) {
throw TimeoutException("Response timed out: ${method.declaringClass.name}.${method.name}")
}
}
@Suppress("DuplicatedCode")
@Throws(Exception::class)
/**
* @throws Exception
*/
override fun invoke(proxy: Any, method: Method, args: Array<Any>?): Any? {
if (method.declaringClass == RemoteObject::class.java) {
// manage all of the RemoteObject proxy methods
@ -130,6 +134,7 @@ class RmiClient(val isGlobal: Boolean,
rmiSupportCache.removeProxyObject(rmiObjectId)
return null
}
setResponseTimeoutMethod -> {
timeoutMillis = (args!![0] as Int).toLong()
return null
@ -137,6 +142,7 @@ class RmiClient(val isGlobal: Boolean,
getResponseTimeoutMethod -> {
return timeoutMillis.toInt()
}
getAsyncMethod -> {
return isAsync
}
@ -144,52 +150,39 @@ class RmiClient(val isGlobal: Boolean,
isAsync = args!![0] as Boolean
return null
}
enableToStringMethod -> {
enableToString = args!![0] as Boolean
return null
}
getLastResponseIdMethod -> {
// only ASYNC can wait for responses
check(isAsync) { "This RemoteObject is not currently set to ASYNC mode. Unable to manually get the response ID." }
check(allowWaiting) { "This RemoteObject does not allow waiting for responses. You must enable this BEFORE " +
"calling the method that you want to wait for the respose to" }
return previousResponseId
}
enableWaitingForResponseMethod -> {
allowWaiting = args!![0] as Boolean
enableHashCodeMethod -> {
enableHashCode = args!![0] as Boolean
return null
}
waitForLastResponseMethod -> {
// only ASYNC can wait for responses
check(isAsync) { "This RemoteObject is not currently set to ASYNC mode. Unable to manually wait for a response." }
check(allowWaiting) { "This RemoteObject does not allow waiting for responses. You must enable this BEFORE " +
"calling the method that you want to wait for the respose to" }
val maybeContinuation = args?.lastOrNull() as Continuation<*>
// this is a suspend method, so we don't need extra checks
return invokeSuspendFunction(maybeContinuation) {
rmiSupportCache.getResponseStorage().waitForReplyManually(rmiObjectId, previousResponseId, responseWaiter)
}
enableEqualsMethod -> {
enableEquals = args!![0] as Boolean
return null
}
waitForResponseMethod -> {
// only ASYNC can wait for responses
check(isAsync) { "This RemoteObject is not currently set to ASYNC mode. Unable to manually wait for a response." }
check(allowWaiting) { "This RemoteObject does not allow waiting for responses. You must enable this BEFORE " +
"calling the method that you want to wait for the respose to" }
val maybeContinuation = args?.lastOrNull() as Continuation<*>
// this is a suspend method, so we don't need extra checks
return invokeSuspendFunction(maybeContinuation) {
rmiSupportCache.getResponseStorage().waitForReplyManually(rmiObjectId, args[0] as Int, responseWaiter)
}
}
else -> throw Exception("Invocation handler could not find RemoteObject method for ${method.name}")
}
} else if (!enableToString && method == toStringMethod) {
return proxyString
} else {
when (method) {
toStringMethod -> if (!enableToString) return proxyString // otherwise, the RMI round trip logic is done for toString()
hashCodeMethod -> if (!enableHashCode) return rmiObjectId // otherwise, the RMI round trip logic is done for hashCode()
equalsMethod -> {
val other = args!![0]
if (other !is RmiClient) {
return false
}
if (!enableEquals) {
return rmiObjectId == other.rmiObjectId
}
// otherwise, the RMI round trip logic is done for equals()
}
}
}
// if a 'suspend' function is called, then our last argument is a 'Continuation' object
@ -201,11 +194,11 @@ class RmiClient(val isGlobal: Boolean,
if (maybeContinuation is Continuation<*>) {
val argsWithoutContinuation = args.take(args.size - 1)
invokeSuspendFunction(maybeContinuation) {
invokeSuspend(method, argsWithoutContinuation.toTypedArray())
sendRequest(method, argsWithoutContinuation.toTypedArray())
}
} else {
runBlocking {
invokeSuspend(method, args ?: EMPTY_ARRAY)
sendRequest(method, args ?: EMPTY_ARRAY)
}
}
@ -249,11 +242,11 @@ class RmiClient(val isGlobal: Boolean,
return if (maybeContinuation is Continuation<*>) {
val argsWithoutContinuation = args.take(args.size - 1)
invokeSuspendFunction(maybeContinuation) {
invokeSuspend(method, argsWithoutContinuation.toTypedArray())
sendRequest(method, argsWithoutContinuation.toTypedArray())
}
} else {
runBlocking {
invokeSuspend(method, args ?: EMPTY_ARRAY)
sendRequest(method, args ?: EMPTY_ARRAY)
}
}
}
@ -283,4 +276,12 @@ class RmiClient(val isGlobal: Boolean,
return rmiObjectId == other.rmiObjectId
}
private fun invokeSuspendFunction(continuation: Continuation<*>, suspendFunction: suspend () -> Any?): Any? {
return try {
SuspendFunctionAccess.invokeSuspendFunction(suspendFunction, continuation)
} catch (e: InvocationTargetException) {
throw e.cause!!
}
}
}

View File

@ -1,10 +1,9 @@
package dorkbox.network.rmi
import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue
import dorkbox.network.other.SuspendWaiter
import dorkbox.network.rmi.messages.MethodResponse
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.agrona.collections.Hashing
@ -14,31 +13,90 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write
internal data class RmiWaiter(val id: Int) {
// this is bi-directional waiting. The method names to not reflect this, however there is no possibility of race conditions w.r.t. waiting
// https://stackoverflow.com/questions/55421710/how-to-suspend-kotlin-coroutine-until-notified
// https://kotlinlang.org/docs/reference/coroutines/channels.html
// "receive' suspends until another coroutine invokes "send"
// and
// "send" suspends until another coroutine invokes "receive".
//
// these are wrapped in a try/catch, because cancel will cause exceptions to be thrown (which we DO NOT want)
@Volatile
var channel: Channel<Unit> = Channel(0)
/**
* this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
*/
fun prep() {
if (channel.isClosedForReceive && channel.isClosedForSend) {
println("renew waiter")
channel = Channel(0)
}
}
suspend fun doNotify() {
println("notified waiter")
try {
channel.send(Unit)
} catch (ignored: Exception) {
}
}
suspend fun doWait() {
println("waiting waiter")
try {
channel.receive()
} catch (ignored: Exception) {
}
}
fun cancel() {
println("delay is cancelling suspending coroutine")
try {
channel.cancel()
} catch (ignored: Exception) {
}
}
}
/**
* Manages the "pending response" from method invocation.
*
* response ID's and the memory they hold will leak if the response never arrives!
*/
class RmiResponseStorage(private val actionDispatch: CoroutineScope) {
internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) {
companion object {
val TIMEOUT_EXCEPTION = Exception()
}
// Response ID's are for ALL in-flight RMI on the network stack. instead of limited to (originally) 64, we are now limited to 65,535
// these are just looped around in a ring buffer.
// These are stored here as int, however these are REALLY shorts and are int-packed when transferring data on the wire
private val rmiResponseIds = MultithreadConcurrentQueue<Int>(65535)
// 32,000 IN FLIGHT RMI method invocations is PLENTY
private val maxValuesInCache = Short.MAX_VALUE.toInt()
private val rmiWaiterCache = MultithreadConcurrentQueue<RmiWaiter>(maxValuesInCache)
private val pendingLock = ReentrantReadWriteLock()
private val pending = Int2NullableObjectHashMap<Any>(32, Hashing.DEFAULT_LOAD_FACTOR, true)
init {
// create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint!
val ids = IntArrayList()
for (id in Short.MIN_VALUE..Short.MAX_VALUE) {
val ids = IntArrayList(maxValuesInCache, Integer.MIN_VALUE)
// ZERO is special, and is never added!
for (id in 1..Short.MAX_VALUE) {
ids.addInt(id)
}
ids.shuffle()
// populate the array of randomly assigned ID's.
// populate the array of randomly assigned ID's + waiters.
ids.forEach {
rmiResponseIds.offer(it)
rmiWaiterCache.offer(RmiWaiter(it))
}
}
@ -51,95 +109,87 @@ class RmiResponseStorage(private val actionDispatch: CoroutineScope) {
val pendingId = RmiUtils.packShorts(objectId, responseId)
println("pending result received")
val previous = pendingLock.write { pending.put(pendingId, result) }
// if NULL, since either we don't exist, or it was cancelled
if (previous is SuspendWaiter) {
// this means we were NOT timed out!
if (previous is RmiWaiter) {
// this means we were NOT timed out! If we were cancelled, then this does nothing.
previous.doNotify()
}
// always return the responseId! It will (hopefully) be a while before this ID is used again
rmiResponseIds.offer(responseId)
// since this was the FIRST one to trigger, return it to the cache.
rmiWaiterCache.offer(previous)
}
}
fun prep(rmiObjectId: Int, responseWaiter: SuspendWaiter): Int {
val responseId = rmiResponseIds.poll()
/**
* gets the RmiWaiter (id + waiter)
*/
internal fun prep(rmiObjectId: Int): RmiWaiter {
val responseRmi = rmiWaiterCache.poll()
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep()
// we pack them together so we can fully use the range of ints, so we can service ALL rmi requests in a single spot
pendingLock.write { pending[RmiUtils.packShorts(rmiObjectId, responseId)] = responseWaiter }
pendingLock.write { pending[RmiUtils.packShorts(rmiObjectId, responseRmi.id)] = responseRmi }
return responseId
return responseRmi
}
suspend fun waitForReply(allowWaiting: Boolean, isAsync: Boolean, rmiObjectId: Int, responseId: Int,
responseWaiter: SuspendWaiter, timeoutMillis: Long): Any? {
/**
* @return the result (can be null) or timeout exception
*/
suspend fun waitForReply(isAsync: Boolean, rmiObjectId: Int, rmiWaiter: RmiWaiter, timeoutMillis: Long): Any? {
val pendingId = RmiUtils.packShorts(rmiObjectId, responseId)
val pendingId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id)
var delayJobForTimeout: Job? = null
// NOTE: we ALWAYS send a response from the remote end.
//
// 'async' -> DO NOT WAIT
// 'timeout > 0' -> WAIT
// 'timeout == 0' -> same as async (DO NOT WAIT)
val returnImmediately = isAsync || timeoutMillis <= 0L
if (!(isAsync && allowWaiting) && timeoutMillis > 0L) {
// always launch a "cancel" coroutine, unless we want to wait forever
delayJobForTimeout = actionDispatch.launch {
delay(timeoutMillis)
if (returnImmediately) {
return null
}
val previous = pendingLock.write {
val prev = pending.remove(pendingId)
if (prev is SuspendWaiter) {
pending[pendingId] = TimeoutException("Response timed out.")
}
val responseTimeoutJob = actionDispatch.launch {
delay(timeoutMillis) // this will always wait
prev
}
// if we are NOT SuspendWaiter, then it means we had a result!
//
// If there are tight timing issues, then we err on the side of "you timed out"
if (!isAsync) {
// we only cancel waiting because when NON-ASYNC
if (previous is SuspendWaiter) {
previous.cancel()
}
}
// check if we have a result or not
val maybeResult = pendingLock.read { pending[pendingId] }
if (maybeResult is RmiWaiter) {
System.err.println("TIMEOUT $pendingId")
// maybeResult.cancel()
}
}
return if (isAsync) {
null
} else {
waitForReplyManually(pendingId, responseWaiter, delayJobForTimeout)
// wait for the response.
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will suspend and either
// A) get response
// B) timeout
rmiWaiter.doWait()
// always cancel the timeout
responseTimeoutJob.cancel()
val resultOrWaiter = pendingLock.write { pending.remove(pendingId) }
if (resultOrWaiter is RmiWaiter) {
// since this was the FIRST one to trigger, return it to the cache.
rmiWaiterCache.offer(resultOrWaiter)
return TIMEOUT_EXCEPTION
}
}
// this is called when we MANUALLY want to wait for a reply as part of async!
// A timeout of 0 means we wait forever
suspend fun waitForReplyManually(rmiObjectId: Int, responseId: Int, responseWaiter: SuspendWaiter): Any? {
val pendingId = RmiUtils.packShorts(rmiObjectId, responseId)
return waitForReplyManually(pendingId, responseWaiter, null)
}
// we have to be careful when we resume, because SOMEONE ELSE'S METHOD RESPONSE can resume us (but only from the same object)!
private suspend fun waitForReplyManually(pendingId: Int,responseWaiter: SuspendWaiter, delayJobForTimeout: Job?): Any? {
while(true) {
val checkResult = pendingLock.read { pending[pendingId] }
if (checkResult !is SuspendWaiter) {
// this means we have correct data! (or it was an exception) we can safely remove the data from the map
pendingLock.write { pending.remove(pendingId) }
delayJobForTimeout?.cancel()
return checkResult
}
// keep waiting, since we don't have a response yet
responseWaiter.doWait()
}
return resultOrWaiter
}
fun close() {
rmiResponseIds.clear()
rmiWaiterCache.clear()
pendingLock.write { pending.clear() }
}
}

View File

@ -15,8 +15,8 @@
package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.connection.Connection_
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.rmi.messages.*
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.util.classes.ClassHelper
@ -26,10 +26,9 @@ import mu.KLogger
import java.lang.reflect.Proxy
import java.util.*
class RmiSupport<C : Connection>(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: NetworkSerializationManager) : RmiSupportCache(logger, actionDispatch)
{
internal class RmiSupport(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: NetworkSerializationManager) : RmiSupportCache(logger, actionDispatch) {
companion object {
/**
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
@ -127,7 +126,7 @@ class RmiSupport<C : Connection>(logger: KLogger,
/**
* called on "client"
*/
private fun onGenericObjectResponse(endPoint: EndPoint<Connection_>, connection: Connection_, logger: KLogger,
private fun onGenericObjectResponse(endPoint: EndPoint<*>, connection: Connection, logger: KLogger,
isGlobal: Boolean, rmiId: Int, callback: suspend (Any) -> Unit,
rmiSupportCache: RmiSupportCache, serialization: NetworkSerializationManager) {
@ -162,30 +161,90 @@ class RmiSupport<C : Connection>(logger: KLogger,
// this is used for all connection specific ones as well.
private val remoteObjectCreationCallbacks = RemoteObjectStorage(logger)
internal fun <Iface> registerCallback(callback: suspend (Iface) -> Unit): Int {
return remoteObjectCreationCallbacks.register(callback)
}
private fun removeCallback(callbackId: Int): suspend (Any) -> Unit {
return remoteObjectCreationCallbacks.remove(callbackId)
// callback's area always correct, because we track them ourselves.
return remoteObjectCreationCallbacks.remove(callbackId)!!
}
/**
* Get's the implementation object based on if it is global, or not global
* @return the implementation object based on if it is global, or not global
*/
fun getImplObject(isGlobal: Boolean, rmiObjectId: Int, connection: Connection_): Any {
return if (isGlobal) getImplObject(rmiObjectId) else connection.rmiSupport().getImplObject(rmiObjectId)
fun <T> getImplObject(isGlobal: Boolean, rmiObjectId: Int, connection: Connection): T? {
return if (isGlobal) getImplObject(rmiObjectId) else connection.rmiConnectionSupport.getImplObject(rmiObjectId)
}
/**
* @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted)
*/
fun saveImplObject(logger: KLogger, `object`: Any): Int {
val rmiId = saveImplObject(`object`)
if (rmiId != RemoteObjectStorage.INVALID_RMI) {
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
scanImplForRmiFields(logger, `object`) {
saveImplObject(it)
}
} else {
logger.error("Trying to create an RMI object with the INVALID_RMI id!!")
}
return rmiId
}
/**
* @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted)
*/
fun saveImplObject(logger: KLogger, `object`: Any, objectId: Int): Boolean {
val rmiSuccess = saveImplObject(`object`, objectId)
if (rmiSuccess) {
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
scanImplForRmiFields(logger, `object`) {
saveImplObject(it)
}
} else {
logger.error("Trying to save an RMI object ${`object`.javaClass} with invalid id $objectId")
}
return rmiSuccess
}
/**
* @return the removed object. If null, an error log will be emitted
*/
fun <T> removeImplObject(logger: KLogger, objectId: Int): T? {
val success = removeImplObject<Any>(objectId)
if (success == null) {
logger.error("Error trying to remove an RMI impl object id $objectId.")
}
@Suppress("UNCHECKED_CAST")
return success as T?
}
override fun close() {
super.close()
remoteObjectCreationCallbacks.close()
}
/**
* on the "client" to get a global remote object (that exists on the server)
* on the connection+client to get a global remote object (that exists on the server)
*/
fun <Iface> getGlobalRemoteObject(connection: C, endPoint: EndPoint<C>, objectId: Int, interfaceClass: Class<Iface>): Iface {
fun <Iface> getGlobalRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class<Iface>): Iface {
// this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)!
// so we can just instantly create the proxy object (or get the cached one)
@ -199,35 +258,17 @@ class RmiSupport<C : Connection>(logger: KLogger,
return proxyObject as Iface
}
/**
* on the "client" to create a global remote object (that exists on the server)
*/
suspend fun <Iface> createGlobalRemoteObject(connection: Connection, interfaceClassId: Int, callback: suspend (Iface) -> Unit) {
val callbackId = registerCallback(callback)
// There is no rmiID yet, because we haven't created it!
val message = GlobalObjectCreateRequest(RmiUtils.packShorts(interfaceClassId, callbackId))
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
// this means we are creating a NEW object on the server
connection.send(message)
}
/**
* Manages ALL OF THE RMI stuff!
*/
@Throws(IllegalArgumentException::class)
suspend fun manage(endPoint: EndPoint<Connection_>, connection: Connection_, message: Any, logger: KLogger) {
suspend fun manage(endPoint: EndPoint<*>, connection: Connection, message: Any, logger: KLogger) {
when (message) {
is ConnectionObjectCreateRequest -> {
/**
* called on "server"
*/
connection.rmiSupport().onConnectionObjectCreateRequest(endPoint, connection, message, logger)
connection.rmiConnectionSupport.onConnectionObjectCreateRequest(endPoint, connection, message, logger)
}
is ConnectionObjectCreateResponse -> {
/**
@ -265,7 +306,19 @@ class RmiSupport<C : Connection>(logger: KLogger,
val isGlobal: Boolean = message.isGlobal
val cachedMethod = message.cachedMethod
val implObject = getImplObject(isGlobal, objectId, connection)
val implObject = getImplObject<Any>(isGlobal, objectId, connection)
if (implObject == null) {
logger.error("Unable to resolve implementation object for [global=$isGlobal, objectID=$objectId, connection=$connection")
val invokeMethodResult = MethodResponse()
invokeMethodResult.objectId = objectId
invokeMethodResult.responseId = message.responseId
invokeMethodResult.result = NullPointerException("Remote object for proxy [global=$isGlobal, objectID=$objectId] does not exist.")
connection.send(invokeMethodResult)
return
}
logger.trace {
var argString = ""
@ -295,8 +348,6 @@ class RmiSupport<C : Connection>(logger: KLogger,
// args!! is safe to do here (even though it doesn't make sense)
result = cachedMethod.invoke(connection, implObject, message.args!!)
} catch (ex: Exception) {
logger.error("Error invoking method: ${cachedMethod.method.declaringClass.name}.${cachedMethod.method.name}", ex)
result = ex.cause
// added to prevent a stack overflow when references is false, (because 'cause' == "this").
// See:
@ -306,6 +357,13 @@ class RmiSupport<C : Connection>(logger: KLogger,
} else {
result.initCause(null)
}
// only remove stuff if our logger is NOT on trace (so normal logs will not show extra info, trace will show extra info)
if (!logger.isTraceEnabled) {
ListenerManager.cleanStackTrace(result as Throwable)
}
logger.error("Error invoking method: ${cachedMethod.method.declaringClass.name}.${cachedMethod.method.name}", result)
}
val invokeMethodResult = MethodResponse()
@ -325,28 +383,13 @@ class RmiSupport<C : Connection>(logger: KLogger,
/**
* called on "server"
*/
private suspend fun onGlobalObjectCreateRequest(
endPoint: EndPoint<Connection_>, connection: Connection_, message: GlobalObjectCreateRequest, logger: KLogger) {
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) {
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
// We have to lookup the iface, since the proxy object requires it
val implObject = endPoint.serialization.createRmiObject(interfaceClassId)
val rmiId = registerImplObject(implObject)
if (rmiId != RemoteObjectStorage.INVALID_RMI) {
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
scanImplForRmiFields(logger, implObject) {
registerImplObject(implObject)
}
} else {
logger.error {
"Trying to create an RMI object with the INVALID_RMI id!!"
}
}
val rmiId = saveImplObject(logger, implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
connection.send(GlobalObjectCreateResponse(RmiUtils.packShorts(rmiId, callbackId)))

View File

@ -10,22 +10,27 @@ import mu.KLogger
* The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there
* would be conflicts in the data structure)
*/
open class RmiSupportCache(logger: KLogger, actionDispatch: CoroutineScope) {
internal open class RmiSupportCache(logger: KLogger, actionDispatch: CoroutineScope) {
private val responseStorage = RmiResponseStorage(actionDispatch)
private val implObjects = RemoteObjectStorage(logger)
private val proxyObjects = LockFreeIntMap<RemoteObject>()
fun registerImplObject(rmiObject: Any): Int {
fun saveImplObject(rmiObject: Any): Int {
return implObjects.register(rmiObject)
}
fun getImplObject(rmiId: Int): Any {
return implObjects[rmiId]
fun saveImplObject(rmiObject: Any, objectId: Int): Boolean {
return implObjects.register(rmiObject, objectId)
}
fun removeImplObject(rmiId: Int) {
implObjects.remove(rmiId) as Any
fun <T> getImplObject(rmiId: Int): T? {
@Suppress("UNCHECKED_CAST")
return implObjects[rmiId] as T?
}
fun <T> removeImplObject(rmiId: Int): T? {
return implObjects.remove(rmiId) as T?
}
/**

View File

@ -1,7 +1,6 @@
package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.connection.Connection_
import dorkbox.network.connection.EndPoint
import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest
import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse
@ -9,21 +8,21 @@ import dorkbox.network.serialization.NetworkSerializationManager
import kotlinx.coroutines.CoroutineScope
import mu.KLogger
class RmiSupportConnection<C: Connection>(logger: KLogger,
private val rmiGlobalSupport: RmiSupport<out Connection>,
private val serialization: NetworkSerializationManager,
actionDispatch: CoroutineScope) : RmiSupportCache(logger, actionDispatch) {
internal class RmiSupportConnection(logger: KLogger,
val rmiGlobalSupport: RmiSupport,
private val serialization: NetworkSerializationManager,
actionDispatch: CoroutineScope) : RmiSupportCache(logger, actionDispatch) {
/**
* on the "client" to get a connection-specific remote object (that exists on the server)
*/
fun <Iface> getRemoteObject(connection: Connection, endPoint: EndPoint<Connection_>, objectId: Int, interfaceClass: Class<Iface>): Iface {
// this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)!
private fun <Iface> createProxyObject(isGlobalObject: Boolean,
connection: Connection,
endPoint: EndPoint<*>,
objectId: Int,
interfaceClass: Class<Iface>) : Iface {
// so we can just instantly create the proxy object (or get the cached one)
var proxyObject = getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = RmiSupport.createProxyObject(false, connection, serialization, rmiGlobalSupport, endPoint.type.simpleName, objectId, interfaceClass)
proxyObject = RmiSupport.createProxyObject(isGlobalObject, connection, serialization, rmiGlobalSupport, endPoint.type.simpleName, objectId, interfaceClass)
saveProxyObject(objectId, proxyObject)
}
@ -31,11 +30,19 @@ class RmiSupportConnection<C: Connection>(logger: KLogger,
return proxyObject as Iface
}
/**
* on the connection+client to get a connection-specific remote object (that exists on the server/client)
*/
fun <Iface> getRemoteObject(connection: Connection, endPoint: EndPoint<*>, objectId: Int, interfaceClass: Class<Iface>): Iface {
// this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)!
return createProxyObject(false, connection, endPoint, objectId, interfaceClass)
}
/**
* on the "client" to create a connection-specific remote object (that exists on the server)
*/
suspend fun <Iface> createRemoteObject(connection: C, interfaceClassId: Int, callback: suspend (Iface) -> Unit) {
suspend fun <Iface> createRemoteObject(connection: Connection, interfaceClassId: Int, callback: suspend (Iface) -> Unit) {
val callbackId = rmiGlobalSupport.registerCallback(callback)
// There is no rmiID yet, because we haven't created it!
@ -51,22 +58,21 @@ class RmiSupportConnection<C: Connection>(logger: KLogger,
/**
* called on "server"
*/
internal suspend fun onConnectionObjectCreateRequest(
endPoint: EndPoint<Connection_>, connection: Connection_, message: ConnectionObjectCreateRequest, logger: KLogger) {
internal suspend fun onConnectionObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: ConnectionObjectCreateRequest, logger: KLogger) {
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
// We have to lookup the iface, since the proxy object requires it
val implObject = endPoint.serialization.createRmiObject(interfaceClassId)
val rmiId = registerImplObject(implObject)
val rmiId = saveImplObject(implObject)
if (rmiId != RemoteObjectStorage.INVALID_RMI) {
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
RmiSupport.scanImplForRmiFields(logger, implObject) {
registerImplObject(implObject)
saveImplObject(it)
}
} else {
logger.error {

View File

@ -64,8 +64,7 @@ object RmiUtils {
}
for (i in argTypes1.indices) {
diff = argTypes1[i].name
.compareTo(argTypes2[i].name)
diff = argTypes1[i].name.compareTo(argTypes2[i].name)
if (diff != 0) {
return@Comparator diff
}
@ -371,7 +370,7 @@ object RmiUtils {
try {
serializerClass.getConstructor(Class::class.java).newInstance(superClass)
} catch (ex3: NoSuchMethodException) {
serializerClass.newInstance()
serializerClass.getDeclaredConstructor().newInstance()
}
}
}

View File

@ -32,39 +32,25 @@
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package dorkbox.network.rmi;
package dorkbox.network.rmi
import java.io.IOException;
import java.io.IOException
/**
* Thrown when a method with a return value is invoked on a remote object and the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* Thrown when a method with a return value is invoked on a remote object and the response is not received with the [RemoteObject.responseTimeout].
*
* @author Nathan Sweet <misc@n4te.com>
* @see dorkbox.network.connection.Connection#getRemoteObject(int, RemoteObjectCallback)
* @see dorkbox.network.connection.Connection#createRemoteObject(Class, RemoteObjectCallback)
* @author Nathan Sweet
*
* @see dorkbox.network.connection.Connection.getObject
* @see dorkbox.network.connection.Connection.createObject
*/
public
class TimeoutException extends IOException {
private static final long serialVersionUID = -3526277240277423682L;
class TimeoutException : IOException {
constructor() : super() {}
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
constructor(message: String?) : super(message) {}
constructor(cause: Throwable?) : super(cause) {}
public
TimeoutException() {
super();
}
public
TimeoutException(String message, Throwable cause) {
super(message, cause);
}
public
TimeoutException(String message) {
super(message);
}
public
TimeoutException(Throwable cause) {
super(cause);
companion object {
private const val serialVersionUID = -3526277240277423682L
}
}

View File

@ -22,3 +22,4 @@ package dorkbox.network.rmi.messages
* @param callbackId (RIGHT) to know which callback to use when the object is created
*/
data class GlobalObjectCreateRequest(val packedIds: Int) : RmiMessage
//a asd

View File

@ -58,4 +58,9 @@ class MethodRequest : RmiMessage {
// these are the arguments for executing the method (they are serialized using the info from the cachedMethod field
var args: Array<Any>? = null
override fun toString(): String {
return "MethodRequest(isGlobal=$isGlobal, objectId=$objectId, responseId=$responseId, cachedMethod=$cachedMethod, args=${args?.contentToString()})"
}
}

View File

@ -49,4 +49,9 @@ class MethodResponse : RmiMessage {
// this is the result of the invoked method
var result: Any? = null
override fun toString(): String {
return "MethodResponse(isGlobal=$isGlobal, objectId=$objectId, responseId=$responseId, result=$result)"
}
}

View File

@ -40,6 +40,6 @@ class RmiClientRequestSerializer : Serializer<Any>() {
kryo as KryoExtra
val connection = kryo.connection
return connection.endPoint().rmiSupport.getImplObject(isGlobal, objectId, connection)
return connection.endPoint().rmiGlobalSupport.getImplObject(isGlobal, objectId, connection)
}
}

View File

@ -25,6 +25,7 @@ import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy
import com.esotericsoftware.kryo.util.IdentityMap
import dorkbox.network.connection.KryoExtra
import dorkbox.network.connection.ping.PingMessage
import dorkbox.network.handshake.Message
import dorkbox.network.rmi.CachedMethod
import dorkbox.network.rmi.RmiUtils
import dorkbox.network.rmi.messages.*
@ -93,7 +94,7 @@ class Serialization(references: Boolean,
// TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo
// serialization.register(XECPublicKey::class.java, XECPublicKeySerializer())
// serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer())
serialization.register(dorkbox.network.connection.registration.Registration::class.java) // must use full package name!
serialization.register(Message::class.java) // must use full package name!
return serialization
}