RMI cache now emits errors using the notifyError callback

This commit is contained in:
nathan 2020-08-28 10:20:15 +02:00
parent ffa286c913
commit fe5aa1d8a6
3 changed files with 59 additions and 123 deletions

View File

@ -24,9 +24,11 @@ import dorkbox.network.serialization.Serialization
import dorkbox.util.collections.LockFreeIntMap
import mu.KLogger
internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
val rmiGlobalSupport: RmiManagerGlobal<CONNECTION>,
private val serialization: Serialization) : RmiObjectCache(logger) {
internal class RmiManagerConnections<CONNECTION: Connection>(
logger: KLogger,
listenerManager: ListenerManager<CONNECTION>,
val rmiGlobalSupport: RmiManagerGlobal<CONNECTION>,
private val serialization: Serialization) : RmiObjectCache<CONNECTION>(listenerManager, logger) {
// It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections!
private val proxyObjects = LockFreeIntMap<RemoteObject>()
@ -49,7 +51,7 @@ internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
/**
* on the connection+client to get a connection-specific remote object (that exists on the server/client)
*/
fun <Iface> getRemoteObject(connection: Connection, objectId: Int, interfaceClass: Class<Iface>): Iface {
fun <Iface> getRemoteObject(connection: Connection, kryoId: Int, objectId: Int, interfaceClass: Class<Iface>): Iface {
// so we can just instantly create the proxy object (or get the cached one)
var proxyObject = getProxyObject(objectId)
if (proxyObject == null) {
@ -58,6 +60,7 @@ internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
serialization,
rmiGlobalSupport.rmiResponseManager,
objectId,
kryoId,
interfaceClass)
saveProxyObject(objectId, proxyObject)
}
@ -106,14 +109,7 @@ internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
} else {
val rmiId = saveImplObject(implObject)
if (rmiId != RemoteObjectStorage.INVALID_RMI) {
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
RmiManagerGlobal.scanImplForRmiFields(endPoint.logger, implObject) {
saveImplObject(it)
}
} else {
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = NullPointerException("Trying to create an RMI object with the INVALID_RMI id!!")
ListenerManager.cleanStackTrace(exception)
endPoint.listenerManager.notifyError(connection, exception)

View File

@ -32,9 +32,12 @@ import mu.KLogger
import java.lang.reflect.Proxy
import java.util.*
internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: Serialization) : RmiObjectCache(logger) {
internal class RmiManagerGlobal<CONNECTION : Connection>(
logger: KLogger,
listenerManager: ListenerManager<CONNECTION>,
actionDispatch: CoroutineScope,
internal val serialization: Serialization) : RmiObjectCache<CONNECTION>(listenerManager, logger) {
companion object {
/**
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
@ -55,6 +58,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
connection: Connection,
serialization: Serialization,
responseManager: RmiResponseManager,
kryoId: Int,
rmiId: Int,
interfaceClass: Class<*>): RemoteObject {
@ -62,10 +66,9 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
// duplicates are fine, as they represent the same object (as specified by the ID) on the remote side.
val kryoClassId = serialization.getKryoIdForRmi(interfaceClass)
val cachedMethods = serialization.getMethods(kryoClassId)
val cachedMethods = serialization.getMethods(kryoId)
val name = "<${connection.endPoint().type.simpleName}-proxy #$rmiId>"
val name = "<${connection.endPoint.type.simpleName}-proxy #$rmiId>"
// the ACTUAL proxy is created in the connection impl. Our proxy handler MUST BE suspending because of:
// 1) how we send data on the wire
@ -77,59 +80,6 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
return Proxy.newProxyInstance(RmiManagerGlobal::class.java.classLoader, interfaces, proxyObject) as RemoteObject
}
/**
* Scans a class (+hierarchy) for @Rmi annotation and executes the 'registerAction' with it
*/
internal fun scanImplForRmiFields(logger: KLogger, implObject: Any, registerAction: (fieldObject: Any) -> Unit) {
val implementationClass = implObject::class.java
// the @Rmi annotation allows an RMI object to have fields with objects that are ALSO RMI objects
val classesToCheck = LinkedList<Map.Entry<Class<*>, Any?>>()
classesToCheck.add(AbstractMap.SimpleEntry(implementationClass, implObject))
var remoteClassObject: Map.Entry<Class<*>, Any?>
while (!classesToCheck.isEmpty()) {
remoteClassObject = classesToCheck.removeFirst()
// we have to check the IMPLEMENTATION for any additional fields that will have proxy information.
// we use getDeclaredFields() + walking the object hierarchy, so we get ALL the fields possible (public + private).
for (field in remoteClassObject.key.declaredFields) {
if (field.getAnnotation(Rmi::class.java) != null) {
val type = field.type
if (!type.isInterface) {
// the type must be an interface, otherwise RMI cannot create a proxy object
logger.error("Error checking RMI fields for: {}.{} -- It is not an interface!",
remoteClassObject.key,
field.name)
continue
}
//TODO FIX THIS. MAYBE USE KOTLIN TO DO THIS?
val prev = field.isAccessible
field.isAccessible = true
val o: Any
try {
o = field[remoteClassObject.value]
registerAction(o)
classesToCheck.add(AbstractMap.SimpleEntry(type, o))
} catch (e: IllegalAccessException) {
logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.key, field.name, e)
} finally {
field.isAccessible = prev
}
}
}
// have to check the object hierarchy as well
val superclass = remoteClassObject.key.superclass
if (superclass != null && superclass != Any::class.java) {
classesToCheck.add(AbstractMap.SimpleEntry(superclass, remoteClassObject.value))
}
}
}
}
val rmiResponseManager = RmiResponseManager(logger, actionDispatch)
@ -154,53 +104,15 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
return if (isGlobal) getImplObject(rmiId) else connection.rmiConnectionSupport.getImplObject(rmiId)
}
/**
* @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted)
*/
fun saveImplObject(logger: KLogger, `object`: Any): Int {
val rmiId = saveImplObject(`object`)
if (rmiId != RemoteObjectStorage.INVALID_RMI) {
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
scanImplForRmiFields(logger, `object`) {
saveImplObject(it)
}
} else {
logger.error("Trying to create an RMI object with the INVALID_RMI id!!")
}
return rmiId
}
/**
* @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted)
*/
fun saveImplObject(logger: KLogger, `object`: Any, objectId: Int): Boolean {
val rmiSuccess = saveImplObject(`object`, objectId)
if (rmiSuccess) {
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
scanImplForRmiFields(logger, `object`) {
saveImplObject(it)
}
} else {
logger.error("Trying to save an RMI object ${`object`.javaClass} with invalid id $objectId")
}
return rmiSuccess
}
/**
* @return the removed object. If null, an error log will be emitted
*/
fun <T> removeImplObject(logger: KLogger, objectId: Int): T? {
suspend fun <T> removeImplObject(endPoint: EndPoint<CONNECTION>, objectId: Int): T? {
val success = removeImplObject<Any>(objectId)
if (success == null) {
logger.error("Error trying to remove an RMI impl object id $objectId.")
val exception = Exception("Error trying to remove RMI impl object id $objectId.")
ListenerManager.cleanStackTrace(exception)
endPoint.listenerManager.notifyError(exception)
}
@Suppress("UNCHECKED_CAST")
@ -234,7 +146,8 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
// create the client-side proxy object, if possible. This MUST be an object that is saved for the connection
var proxyObject = connection.rmiConnectionSupport.getProxyObject(rmiId)
if (proxyObject == null) {
proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, rmiId, interfaceClass)
val kryoId = endPoint.serialization.getKryoIdForRmiClient(interfaceClass)
proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, kryoId, rmiId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(rmiId, proxyObject)
}
@ -255,10 +168,12 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
fun <Iface> getGlobalRemoteObject(connection: Connection, objectId: Int, interfaceClass: Class<Iface>): Iface {
// this immediately returns BECAUSE the object must have already been created on the server (this is why we specify the rmiId)!
val kryoId = serialization.getKryoIdForRmiClient(interfaceClass)
// so we can just instantly create the proxy object (or get the cached one). This MUST be an object that is saved for the connection
var proxyObject = connection.rmiConnectionSupport.getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, objectId, interfaceClass)
proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, kryoId, objectId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(objectId, proxyObject)
}
@ -453,7 +368,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
/**
* called on "server"
*/
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<CONNECTION>, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) {
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<CONNECTION>, connection: CONNECTION, message: GlobalObjectCreateRequest, logger: KLogger) {
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
val objectParameters = message.objectParameters
@ -464,12 +379,12 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
val response = if (implObject is Exception) {
// whoops!
logger.error("Unable to create remote object!", implObject)
endPoint.listenerManager.notifyError(connection, implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
GlobalObjectCreateResponse(RmiUtils.packShorts(callbackId, RemoteObjectStorage.INVALID_RMI))
} else {
val rmiId = saveImplObject(logger, implObject)
val rmiId = saveImplObject(implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
GlobalObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId))

View File

@ -15,6 +15,8 @@
*/
package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ListenerManager
import mu.KLogger
/**
@ -23,16 +25,39 @@ import mu.KLogger
* The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there
* would be conflicts in the data structure)
*/
internal open class RmiObjectCache(logger: KLogger) {
internal open class RmiObjectCache<CONNECTION: Connection>(private val listenerManager: ListenerManager<CONNECTION>, logger: KLogger) {
private val implObjects = RemoteObjectStorage(logger)
fun saveImplObject(rmiObject: Any): Int {
return implObjects.register(rmiObject)
/**
* @return the newly registered RMI ID for this object. [RemoteObjectStorage.INVALID_RMI] means it was invalid (an error log will be emitted)
*/
suspend fun saveImplObject(rmiObject: Any): Int {
val rmiId = implObjects.register(rmiObject)
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return rmiId
}
fun saveImplObject(rmiObject: Any, objectId: Int): Boolean {
return implObjects.register(rmiObject, objectId)
/**
* @return the true if it was a success saving this object. False means it was invalid (an error log will be emitted)
*/
suspend fun saveImplObject(rmiObject: Any, objectId: Int): Boolean {
val success = implObjects.register(rmiObject, objectId)
if (!success) {
val exception = Exception("RMI implementation '${rmiObject::class.java}' could not be saved! No more RMI id's could be generated")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
return success
}
fun <T> getImplObject(rmiId: Int): T? {