From fe5aa1d8a6d93730e35bf55fb50db3bcb595ebce Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 28 Aug 2020 10:20:15 +0200 Subject: [PATCH] RMI cache now emits errors using the notifyError callback --- .../network/rmi/RmiManagerConnections.kt | 20 ++- src/dorkbox/network/rmi/RmiManagerGlobal.kt | 127 +++--------------- src/dorkbox/network/rmi/RmiObjectCache.kt | 35 ++++- 3 files changed, 59 insertions(+), 123 deletions(-) diff --git a/src/dorkbox/network/rmi/RmiManagerConnections.kt b/src/dorkbox/network/rmi/RmiManagerConnections.kt index 257d64bb..b7f6a312 100644 --- a/src/dorkbox/network/rmi/RmiManagerConnections.kt +++ b/src/dorkbox/network/rmi/RmiManagerConnections.kt @@ -24,9 +24,11 @@ import dorkbox.network.serialization.Serialization import dorkbox.util.collections.LockFreeIntMap import mu.KLogger -internal class RmiManagerConnections(logger: KLogger, - val rmiGlobalSupport: RmiManagerGlobal, - private val serialization: Serialization) : RmiObjectCache(logger) { +internal class RmiManagerConnections( + logger: KLogger, + listenerManager: ListenerManager, + val rmiGlobalSupport: RmiManagerGlobal, + private val serialization: Serialization) : RmiObjectCache(listenerManager, logger) { // It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections! private val proxyObjects = LockFreeIntMap() @@ -49,7 +51,7 @@ internal class RmiManagerConnections(logger: KLogger, /** * on the connection+client to get a connection-specific remote object (that exists on the server/client) */ - fun getRemoteObject(connection: Connection, objectId: Int, interfaceClass: Class): Iface { + fun getRemoteObject(connection: Connection, kryoId: Int, objectId: Int, interfaceClass: Class): Iface { // so we can just instantly create the proxy object (or get the cached one) var proxyObject = getProxyObject(objectId) if (proxyObject == null) { @@ -58,6 +60,7 @@ internal class RmiManagerConnections(logger: KLogger, serialization, rmiGlobalSupport.rmiResponseManager, objectId, + kryoId, interfaceClass) saveProxyObject(objectId, proxyObject) } @@ -106,14 +109,7 @@ internal class RmiManagerConnections(logger: KLogger, } else { val rmiId = saveImplObject(implObject) - if (rmiId != RemoteObjectStorage.INVALID_RMI) { - // this means we could register this object. - - // next, scan this object to see if there are any RMI fields - RmiManagerGlobal.scanImplForRmiFields(endPoint.logger, implObject) { - saveImplObject(it) - } - } else { + if (rmiId == RemoteObjectStorage.INVALID_RMI) { val exception = NullPointerException("Trying to create an RMI object with the INVALID_RMI id!!") ListenerManager.cleanStackTrace(exception) endPoint.listenerManager.notifyError(connection, exception) diff --git a/src/dorkbox/network/rmi/RmiManagerGlobal.kt b/src/dorkbox/network/rmi/RmiManagerGlobal.kt index 4b7734e2..05871b06 100644 --- a/src/dorkbox/network/rmi/RmiManagerGlobal.kt +++ b/src/dorkbox/network/rmi/RmiManagerGlobal.kt @@ -32,9 +32,12 @@ import mu.KLogger import java.lang.reflect.Proxy import java.util.* -internal class RmiManagerGlobal(logger: KLogger, - actionDispatch: CoroutineScope, - internal val serialization: Serialization) : RmiObjectCache(logger) { +internal class RmiManagerGlobal( + logger: KLogger, + listenerManager: ListenerManager, + actionDispatch: CoroutineScope, + internal val serialization: Serialization) : RmiObjectCache(listenerManager, logger) { + companion object { /** * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked @@ -55,6 +58,7 @@ internal class RmiManagerGlobal(logger: KLogger, connection: Connection, serialization: Serialization, responseManager: RmiResponseManager, + kryoId: Int, rmiId: Int, interfaceClass: Class<*>): RemoteObject { @@ -62,10 +66,9 @@ internal class RmiManagerGlobal(logger: KLogger, // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. - val kryoClassId = serialization.getKryoIdForRmi(interfaceClass) - val cachedMethods = serialization.getMethods(kryoClassId) + val cachedMethods = serialization.getMethods(kryoId) - val name = "<${connection.endPoint().type.simpleName}-proxy #$rmiId>" + val name = "<${connection.endPoint.type.simpleName}-proxy #$rmiId>" // the ACTUAL proxy is created in the connection impl. Our proxy handler MUST BE suspending because of: // 1) how we send data on the wire @@ -77,59 +80,6 @@ internal class RmiManagerGlobal(logger: KLogger, return Proxy.newProxyInstance(RmiManagerGlobal::class.java.classLoader, interfaces, proxyObject) as RemoteObject } - - /** - * Scans a class (+hierarchy) for @Rmi annotation and executes the 'registerAction' with it - */ - internal fun scanImplForRmiFields(logger: KLogger, implObject: Any, registerAction: (fieldObject: Any) -> Unit) { - val implementationClass = implObject::class.java - - // the @Rmi annotation allows an RMI object to have fields with objects that are ALSO RMI objects - val classesToCheck = LinkedList, Any?>>() - classesToCheck.add(AbstractMap.SimpleEntry(implementationClass, implObject)) - - - var remoteClassObject: Map.Entry, Any?> - while (!classesToCheck.isEmpty()) { - remoteClassObject = classesToCheck.removeFirst() - - // we have to check the IMPLEMENTATION for any additional fields that will have proxy information. - // we use getDeclaredFields() + walking the object hierarchy, so we get ALL the fields possible (public + private). - - for (field in remoteClassObject.key.declaredFields) { - if (field.getAnnotation(Rmi::class.java) != null) { - val type = field.type - if (!type.isInterface) { - // the type must be an interface, otherwise RMI cannot create a proxy object - logger.error("Error checking RMI fields for: {}.{} -- It is not an interface!", - remoteClassObject.key, - field.name) - continue - } -//TODO FIX THIS. MAYBE USE KOTLIN TO DO THIS? - val prev = field.isAccessible - field.isAccessible = true - - val o: Any - try { - o = field[remoteClassObject.value] - registerAction(o) - classesToCheck.add(AbstractMap.SimpleEntry(type, o)) - } catch (e: IllegalAccessException) { - logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.key, field.name, e) - } finally { - field.isAccessible = prev - } - } - } - - // have to check the object hierarchy as well - val superclass = remoteClassObject.key.superclass - if (superclass != null && superclass != Any::class.java) { - classesToCheck.add(AbstractMap.SimpleEntry(superclass, remoteClassObject.value)) - } - } - } } val rmiResponseManager = RmiResponseManager(logger, actionDispatch) @@ -154,53 +104,15 @@ internal class RmiManagerGlobal(logger: KLogger, return if (isGlobal) getImplObject(rmiId) else connection.rmiConnectionSupport.getImplObject(rmiId) } - /** - * @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted) - */ - fun saveImplObject(logger: KLogger, `object`: Any): Int { - val rmiId = saveImplObject(`object`) - - if (rmiId != RemoteObjectStorage.INVALID_RMI) { - // this means we could register this object. - - // next, scan this object to see if there are any RMI fields - scanImplForRmiFields(logger, `object`) { - saveImplObject(it) - } - } else { - logger.error("Trying to create an RMI object with the INVALID_RMI id!!") - } - - return rmiId - } - - /** - * @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted) - */ - fun saveImplObject(logger: KLogger, `object`: Any, objectId: Int): Boolean { - val rmiSuccess = saveImplObject(`object`, objectId) - - if (rmiSuccess) { - // this means we could register this object. - - // next, scan this object to see if there are any RMI fields - scanImplForRmiFields(logger, `object`) { - saveImplObject(it) - } - } else { - logger.error("Trying to save an RMI object ${`object`.javaClass} with invalid id $objectId") - } - - return rmiSuccess - } - /** * @return the removed object. If null, an error log will be emitted */ - fun removeImplObject(logger: KLogger, objectId: Int): T? { + suspend fun removeImplObject(endPoint: EndPoint, objectId: Int): T? { val success = removeImplObject(objectId) if (success == null) { - logger.error("Error trying to remove an RMI impl object id $objectId.") + val exception = Exception("Error trying to remove RMI impl object id $objectId.") + ListenerManager.cleanStackTrace(exception) + endPoint.listenerManager.notifyError(exception) } @Suppress("UNCHECKED_CAST") @@ -234,7 +146,8 @@ internal class RmiManagerGlobal(logger: KLogger, // create the client-side proxy object, if possible. This MUST be an object that is saved for the connection var proxyObject = connection.rmiConnectionSupport.getProxyObject(rmiId) if (proxyObject == null) { - proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, rmiId, interfaceClass) + val kryoId = endPoint.serialization.getKryoIdForRmiClient(interfaceClass) + proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, kryoId, rmiId, interfaceClass) connection.rmiConnectionSupport.saveProxyObject(rmiId, proxyObject) } @@ -255,10 +168,12 @@ internal class RmiManagerGlobal(logger: KLogger, fun getGlobalRemoteObject(connection: Connection, objectId: Int, interfaceClass: Class): Iface { // this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)! + val kryoId = serialization.getKryoIdForRmiClient(interfaceClass) + // so we can just instantly create the proxy object (or get the cached one). This MUST be an object that is saved for the connection var proxyObject = connection.rmiConnectionSupport.getProxyObject(objectId) if (proxyObject == null) { - proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, objectId, interfaceClass) + proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, kryoId, objectId, interfaceClass) connection.rmiConnectionSupport.saveProxyObject(objectId, proxyObject) } @@ -453,7 +368,7 @@ internal class RmiManagerGlobal(logger: KLogger, /** * called on "server" */ - private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) { + private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint, connection: CONNECTION, message: GlobalObjectCreateRequest, logger: KLogger) { val interfaceClassId = RmiUtils.unpackLeft(message.packedIds) val callbackId = RmiUtils.unpackRight(message.packedIds) val objectParameters = message.objectParameters @@ -464,12 +379,12 @@ internal class RmiManagerGlobal(logger: KLogger, val response = if (implObject is Exception) { // whoops! - logger.error("Unable to create remote object!", implObject) + endPoint.listenerManager.notifyError(connection, implObject) // we send the message ANYWAYS, because the client needs to know it did NOT succeed! GlobalObjectCreateResponse(RmiUtils.packShorts(callbackId, RemoteObjectStorage.INVALID_RMI)) } else { - val rmiId = saveImplObject(logger, implObject) + val rmiId = saveImplObject(implObject) // we send the message ANYWAYS, because the client needs to know it did NOT succeed! GlobalObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId)) diff --git a/src/dorkbox/network/rmi/RmiObjectCache.kt b/src/dorkbox/network/rmi/RmiObjectCache.kt index 737fc099..cfebba61 100644 --- a/src/dorkbox/network/rmi/RmiObjectCache.kt +++ b/src/dorkbox/network/rmi/RmiObjectCache.kt @@ -15,6 +15,8 @@ */ package dorkbox.network.rmi +import dorkbox.network.connection.Connection +import dorkbox.network.connection.ListenerManager import mu.KLogger /** @@ -23,16 +25,39 @@ import mu.KLogger * The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there * would be conflicts in the data structure) */ -internal open class RmiObjectCache(logger: KLogger) { +internal open class RmiObjectCache(private val listenerManager: ListenerManager, logger: KLogger) { private val implObjects = RemoteObjectStorage(logger) - fun saveImplObject(rmiObject: Any): Int { - return implObjects.register(rmiObject) + + /** + * @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted) + */ + suspend fun saveImplObject(rmiObject: Any): Int { + val rmiId = implObjects.register(rmiObject) + + if (rmiId == RemoteObjectStorage.INVALID_RMI) { + val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + } + + return rmiId } - fun saveImplObject(rmiObject: Any, objectId: Int): Boolean { - return implObjects.register(rmiObject, objectId) + /** + * @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted) + */ + suspend fun saveImplObject(rmiObject: Any, objectId: Int): Boolean { + val success = implObjects.register(rmiObject, objectId) + + if (!success) { + val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated") + ListenerManager.cleanStackTrace(exception) + listenerManager.notifyError(exception) + } + + return success } fun getImplObject(rmiId: Int): T? {