Reworked RMI so it will not crash anything on the "remote" side. There
will be errors emitted, and the "local" side will receive a null object + invalid RMI ID.
This commit is contained in:
parent
10d8cc6061
commit
0404ff408d
|
@ -41,8 +41,8 @@ import dorkbox.network.connection.wrapper.ChannelNull;
|
|||
import dorkbox.network.connection.wrapper.ChannelWrapper;
|
||||
import dorkbox.network.rmi.*;
|
||||
import dorkbox.network.serialization.CryptoSerializationManager;
|
||||
import dorkbox.util.collections.IntMap;
|
||||
import dorkbox.util.collections.LockFreeHashMap;
|
||||
import dorkbox.util.collections.LockFreeIntMap;
|
||||
import dorkbox.util.generics.ClassHelper;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
|
@ -66,9 +66,6 @@ import io.netty.util.concurrent.Promise;
|
|||
@Sharable
|
||||
public
|
||||
class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConnection, Connection, Listeners, ConnectionBridge {
|
||||
// default false
|
||||
public static boolean ENABLE_PROXY_OBJECTS = true;
|
||||
|
||||
public static
|
||||
boolean isTcp(Class<? extends Channel> channelClass) {
|
||||
return channelClass == NioSocketChannel.class || channelClass == EpollSocketChannel.class;
|
||||
|
@ -128,8 +125,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
private final Map<Integer, RemoteObject> proxyIdCache;
|
||||
private final List<Listener.OnMessageReceived<Connection, InvokeMethodResult>> proxyListeners;
|
||||
|
||||
private final IntMap<RemoteObjectCallback> rmiRegistrationCallbacks;
|
||||
private int rmiCallbackId = 0; // protected by synchronized (rmiRegistrationCallbacks)
|
||||
private final LockFreeIntMap<RemoteObjectCallback> rmiRegistrationCallbacks;
|
||||
private volatile int rmiCallbackId = 0;
|
||||
|
||||
/**
|
||||
* All of the parameters can be null, when metaChannel wants to get the base class type
|
||||
|
@ -142,9 +139,11 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
|
||||
if (endPoint != null && endPoint.globalRmiBridge != null) {
|
||||
// rmi is enabled.
|
||||
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
|
||||
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
|
||||
proxyIdCache = new LockFreeHashMap<Integer, RemoteObject>();
|
||||
proxyListeners = new CopyOnWriteArrayList<Listener.OnMessageReceived<Connection, InvokeMethodResult>>();
|
||||
rmiRegistrationCallbacks = new IntMap<RemoteObjectCallback>();
|
||||
rmiRegistrationCallbacks = new LockFreeIntMap<RemoteObjectCallback>();
|
||||
} else {
|
||||
proxyIdCache = null;
|
||||
proxyListeners = null;
|
||||
|
@ -699,18 +698,14 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
|
||||
// proxy listeners are cleared in the removeAll() call
|
||||
if (proxyIdCache != null) {
|
||||
synchronized (proxyIdCache) {
|
||||
proxyIdCache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (rmiRegistrationCallbacks != null) {
|
||||
synchronized (rmiRegistrationCallbacks) {
|
||||
rmiRegistrationCallbacks.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
|
@ -971,13 +966,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
throw new IllegalArgumentException("Cannot create a proxy for RMI access. It must be an interface.");
|
||||
}
|
||||
|
||||
RmiRegistration message;
|
||||
|
||||
synchronized (rmiRegistrationCallbacks) {
|
||||
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
|
||||
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
|
||||
//noinspection NonAtomicOperationOnVolatileField
|
||||
int nextRmiCallbackId = rmiCallbackId++;
|
||||
rmiRegistrationCallbacks.put(nextRmiCallbackId, callback);
|
||||
message = new RmiRegistration(interfaceClass, RmiBridge.INVALID_RMI, nextRmiCallbackId);
|
||||
}
|
||||
RmiRegistration message = new RmiRegistration(interfaceClass, RmiBridge.INVALID_RMI, nextRmiCallbackId);
|
||||
|
||||
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
|
||||
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
|
||||
|
@ -989,15 +983,21 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
@Override
|
||||
public final
|
||||
<Iface> void getRemoteObject(final int objectId, final RemoteObjectCallback<Iface> callback) {
|
||||
RmiRegistration message;
|
||||
if (objectId < 0) {
|
||||
throw new IllegalStateException("Object ID cannot be < 0");
|
||||
}
|
||||
if (objectId >= RmiBridge.INVALID_RMI) {
|
||||
throw new IllegalStateException("Object ID cannot be >= " + RmiBridge.INVALID_RMI);
|
||||
}
|
||||
|
||||
Class<?> iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback.class, callback.getClass(), 0);
|
||||
|
||||
synchronized (rmiRegistrationCallbacks) {
|
||||
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
|
||||
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
|
||||
//noinspection NonAtomicOperationOnVolatileField
|
||||
int nextRmiCallbackId = rmiCallbackId++;
|
||||
rmiRegistrationCallbacks.put(nextRmiCallbackId, callback);
|
||||
message = new RmiRegistration(iFaceClass, objectId, nextRmiCallbackId);
|
||||
}
|
||||
RmiRegistration message = new RmiRegistration(iFaceClass, objectId, nextRmiCallbackId);
|
||||
|
||||
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
|
||||
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
|
||||
|
@ -1072,7 +1072,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
|
||||
rmiId = rmiBridge.register(object);
|
||||
|
||||
|
||||
if (rmiId == RmiBridge.INVALID_RMI) {
|
||||
// this means that there are too many RMI ids (either global or connection specific!)
|
||||
object = null;
|
||||
}
|
||||
else {
|
||||
// if we are invalid, skip going over fields that might also be RMI objects, BECAUSE our object will be NULL!
|
||||
|
||||
// the @Rmi annotation allows an RMI object to have fields with objects that are ALSO RMI objects
|
||||
LinkedList<Map.Entry<Class<?>, Object>> classesToCheck = new LinkedList<Map.Entry<Class<?>, Object>>();
|
||||
|
@ -1123,6 +1128,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
classesToCheck.add(new AbstractMap.SimpleEntry<Class<?>, Object>(superclass, remoteClassObject.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error registering RMI class " + implementationClass, e);
|
||||
} finally {
|
||||
|
@ -1144,10 +1150,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
|
||||
public final
|
||||
void runRmiCallback(final Class<?> interfaceClass, final int callbackId, final Object remoteObject) {
|
||||
RemoteObjectCallback callback;
|
||||
synchronized (rmiRegistrationCallbacks) {
|
||||
callback = rmiRegistrationCallbacks.remove(callbackId);
|
||||
}
|
||||
RemoteObjectCallback callback = rmiRegistrationCallbacks.remove(callbackId);
|
||||
|
||||
try {
|
||||
//noinspection unchecked
|
||||
|
@ -1173,11 +1176,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
}
|
||||
|
||||
int objectId = globalRmiBridge.getRegisteredId(object);
|
||||
if (objectId == RmiBridge.INVALID_RMI) {
|
||||
return rmiBridge.getRegisteredId(object);
|
||||
} else {
|
||||
if (objectId != RmiBridge.INVALID_RMI) {
|
||||
return objectId;
|
||||
}
|
||||
else {
|
||||
return rmiBridge.getRegisteredId(object);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1212,9 +1216,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
throw new IllegalArgumentException("iface must be an interface.");
|
||||
}
|
||||
|
||||
synchronized (proxyIdCache) {
|
||||
// we want to have a connection specific cache of IDs, using weak references.
|
||||
// because this is PER CONNECTION, this is safe.
|
||||
// we want to have a connection specific cache of IDs
|
||||
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
|
||||
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
|
||||
RemoteObject remoteObject = proxyIdCache.get(objectID);
|
||||
|
||||
if (remoteObject == null) {
|
||||
|
@ -1236,7 +1240,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
|
||||
return remoteObject;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used by RMI for the REMOTE side, to get the implementation
|
||||
|
@ -1247,9 +1250,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
|
|||
if (RmiBridge.isGlobal(objectID)) {
|
||||
RmiBridge globalRmiBridge = endPoint.globalRmiBridge;
|
||||
|
||||
if (globalRmiBridge == null) {
|
||||
throw new NullPointerException("Unable to call 'getRegisteredId' when the gloablRmiBridge is null!");
|
||||
}
|
||||
assert globalRmiBridge != null;
|
||||
|
||||
return globalRmiBridge.getRegisteredObject(objectID);
|
||||
} else {
|
||||
|
|
|
@ -221,8 +221,8 @@ class EndPoint extends Shutdownable {
|
|||
|
||||
if (rmiEnabled) {
|
||||
rmiHandler = null;
|
||||
localRmiHandler = new RmiObjectLocalHandler();
|
||||
networkRmiHandler = new RmiObjectNetworkHandler();
|
||||
localRmiHandler = new RmiObjectLocalHandler(logger);
|
||||
networkRmiHandler = new RmiObjectNetworkHandler(logger);
|
||||
globalRmiBridge = new RmiBridge(logger, config.rmiExecutor, true);
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -21,7 +21,7 @@ package dorkbox.network.rmi;
|
|||
public
|
||||
interface RemoteObjectCallback<Iface> {
|
||||
/**
|
||||
* @param remoteObject the remote object (as a proxy object) or null if there was an error
|
||||
* @param remoteObject the remote object (as a proxy object) or null if there was an error creating the RMI object
|
||||
*/
|
||||
void created(Iface remoteObject);
|
||||
}
|
||||
|
|
|
@ -59,10 +59,6 @@ class RemoteObjectSerializer<T> extends Serializer<T> {
|
|||
void write(Kryo kryo, Output output, T object) {
|
||||
KryoExtra kryoExtra = (KryoExtra) kryo;
|
||||
int id = kryoExtra.connection.getRegisteredId(object);
|
||||
if (id == RmiBridge.INVALID_RMI) {
|
||||
throw new IllegalStateException("Object not found in RMI objectSpace: " + object);
|
||||
}
|
||||
|
||||
output.writeInt(id, true);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,20 +38,16 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.esotericsoftware.kryo.util.IntMap;
|
||||
|
||||
import dorkbox.network.connection.Connection;
|
||||
import dorkbox.network.connection.ConnectionImpl;
|
||||
import dorkbox.network.connection.EndPoint;
|
||||
import dorkbox.network.connection.Listener;
|
||||
import dorkbox.network.serialization.RmiSerializationManager;
|
||||
import dorkbox.util.collections.ObjectIntMap;
|
||||
import dorkbox.util.Property;
|
||||
import dorkbox.util.collections.LockFreeIntBiMap;
|
||||
|
||||
/**
|
||||
* Allows methods on objects to be invoked remotely over TCP, UDP, or LOCAL. Local connections ignore TCP/UDP requests, and perform
|
||||
|
@ -84,34 +80,47 @@ import dorkbox.util.collections.ObjectIntMap;
|
|||
*/
|
||||
public final
|
||||
class RmiBridge {
|
||||
public static final int INVALID_RMI = 0;
|
||||
/**
|
||||
* Permits local (in-jvm) connections to bypass creating RMI proxy objects (which is CPU + Memory intensive) in favor of just using the
|
||||
* object directly. While doing so is considerably faster, it comes at the expense that RMI objects lose the {@link RemoteObject}
|
||||
* functions.
|
||||
* <p>
|
||||
* This will also break logic in a way that "network connection" RMI logic *CAN BE* incompatible.
|
||||
* <p>
|
||||
* Default is true
|
||||
*/
|
||||
@Property
|
||||
public static boolean ENABLE_PROXY_OBJECTS = true;
|
||||
|
||||
public static final int INVALID_RMI = Integer.MAX_VALUE;
|
||||
|
||||
static final int returnValueMask = 1 << 7;
|
||||
static final int returnExceptionMask = 1 << 6;
|
||||
static final int responseIdMask = 0xFF & ~returnValueMask & ~returnExceptionMask;
|
||||
|
||||
// global RMI objects -> ODD in range 1-16380 (max 2 bytes) throws error on outside of range
|
||||
// connection local RMI -> EVEN in range 1-16380 (max 2 bytes) throws error on outside of range
|
||||
private static final int MAX_RMI_VALUE = 16380;
|
||||
private static final int INVALID_MAP_ID = -1;
|
||||
|
||||
/**
|
||||
* @return true if the objectId is a "global" id (it's odd) otherwise, false (it's connection local)
|
||||
* @return true if the objectId is global for all connections (even). false if it's connection-only (odd)
|
||||
*/
|
||||
public static
|
||||
boolean isGlobal(final int objectId) {
|
||||
return (objectId & 1) != 0;
|
||||
// global RMI objects -> EVEN in range 0 - (MAX_VALUE-1)
|
||||
// connection local RMI -> ODD in range 1 - (MAX_VALUE-1)
|
||||
return (objectId & 1) == 0;
|
||||
}
|
||||
|
||||
// the name of who created this RmiBridge
|
||||
private final org.slf4j.Logger logger;
|
||||
|
||||
private final Executor executor;
|
||||
|
||||
|
||||
// we start at 1, because 0 (INVALID_RMI) means we access connection only objects
|
||||
private final AtomicInteger rmiObjectIdCounter;
|
||||
|
||||
// can be accessed by DIFFERENT threads.
|
||||
private final ReentrantReadWriteLock objectLock = new ReentrantReadWriteLock();
|
||||
private final IntMap<Object> idToObject = new IntMap<Object>();
|
||||
private final ObjectIntMap<Object> objectToID = new ObjectIntMap<Object>();
|
||||
private final Executor executor;
|
||||
// this is the ID -> Object RMI map. The RMI ID is used (not the kryo ID)
|
||||
private final LockFreeIntBiMap<Object> objectMap = new LockFreeIntBiMap<Object>(INVALID_MAP_ID);
|
||||
|
||||
private final Listener.OnMessageReceived<ConnectionImpl, InvokeMethod> invokeListener = new Listener.OnMessageReceived<ConnectionImpl, InvokeMethod>() {
|
||||
@SuppressWarnings("AutoBoxing")
|
||||
|
@ -175,10 +184,10 @@ class RmiBridge {
|
|||
this.executor = executor;
|
||||
|
||||
if (isGlobal) {
|
||||
rmiObjectIdCounter = new AtomicInteger(1);
|
||||
rmiObjectIdCounter = new AtomicInteger(0);
|
||||
}
|
||||
else {
|
||||
rmiObjectIdCounter = new AtomicInteger(2);
|
||||
rmiObjectIdCounter = new AtomicInteger(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,46 +300,65 @@ class RmiBridge {
|
|||
// logger.error("{} sent data: {} with id ({})", connection, result, invokeMethod.responseID);
|
||||
}
|
||||
|
||||
public
|
||||
private
|
||||
int nextObjectId() {
|
||||
// always increment by 2
|
||||
// global RMI objects -> ODD in range 1-16380 (max 2 bytes) throws error on outside of range
|
||||
// connection local RMI -> EVEN in range 1-16380 (max 2 bytes) throws error on outside of range
|
||||
int value = rmiObjectIdCounter.getAndAdd(2);
|
||||
if (value > MAX_RMI_VALUE) {
|
||||
rmiObjectIdCounter.set(MAX_RMI_VALUE); // prevent wrapping by spammy callers
|
||||
logger.error("RMI next value has exceeded maximum limits in RmiBridge!");
|
||||
if (value >= INVALID_RMI) {
|
||||
rmiObjectIdCounter.set(INVALID_RMI); // prevent wrapping by spammy callers
|
||||
logger.error("next RMI value '{}' has exceeded maximum limit '{}' in RmiBridge! Not creating RMI object.", value, INVALID_RMI);
|
||||
return INVALID_RMI;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Automatically registers an object with the next available ID to allow the remote end of the RmiBridge connections to access it using the returned ID.
|
||||
*
|
||||
* @return the RMI object ID, if the registration failed (null object or TOO MANY objects), it will be {@link RmiBridge#INVALID_RMI} (Integer.MAX_VALUE).
|
||||
*/
|
||||
public
|
||||
int register(Object object) {
|
||||
if (object == null) {
|
||||
return INVALID_RMI;
|
||||
}
|
||||
|
||||
// this will return INVALID_RMI if there are too many in the ObjectSpace
|
||||
int nextObjectId = nextObjectId();
|
||||
if (nextObjectId != INVALID_RMI) {
|
||||
// specifically avoid calling register(int, Object) method to skip non-necessary checks + exceptions
|
||||
objectMap.put(nextObjectId, object);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Object <proxy #{}> registered with ObjectSpace with .toString() = '{}'", nextObjectId, object);
|
||||
}
|
||||
}
|
||||
return nextObjectId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers an object to allow the remote end of the RmiBridge connections to access it using the specified ID.
|
||||
*
|
||||
* @param objectID
|
||||
* Must not be Integer.MAX_VALUE.
|
||||
* @param objectID Must not be <0 or {@link RmiBridge#INVALID_RMI} (Integer.MAX_VALUE).
|
||||
*/
|
||||
@SuppressWarnings("AutoBoxing")
|
||||
public
|
||||
void register(int objectID, Object object) {
|
||||
if (objectID == Integer.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("objectID cannot be Integer.MAX_VALUE.");
|
||||
if (objectID < 0) {
|
||||
throw new IllegalArgumentException("objectID cannot be " + INVALID_RMI);
|
||||
}
|
||||
if (objectID >= INVALID_RMI) {
|
||||
throw new IllegalArgumentException("objectID cannot be " + INVALID_RMI);
|
||||
}
|
||||
if (object == null) {
|
||||
throw new IllegalArgumentException("object cannot be null.");
|
||||
}
|
||||
|
||||
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
|
||||
writeLock.lock();
|
||||
objectMap.put(objectID, object);
|
||||
|
||||
this.idToObject.put(objectID, object);
|
||||
this.objectToID.put(object, objectID);
|
||||
|
||||
writeLock.unlock();
|
||||
|
||||
Logger logger2 = this.logger;
|
||||
if (logger2.isTraceEnabled()) {
|
||||
logger2.trace("Object <proxy #{}> registered with ObjectSpace with .toString() = '{}'", objectID, object);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Object <proxy #{}> registered with ObjectSpace with .toString() = '{}'", objectID, object);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -340,19 +368,10 @@ class RmiBridge {
|
|||
@SuppressWarnings("AutoBoxing")
|
||||
public
|
||||
void remove(int objectID) {
|
||||
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
|
||||
writeLock.lock();
|
||||
Object object = objectMap.remove(objectID);
|
||||
|
||||
Object object = this.idToObject.remove(objectID);
|
||||
if (object != null) {
|
||||
this.objectToID.remove(object, 0);
|
||||
}
|
||||
|
||||
writeLock.unlock();
|
||||
|
||||
Logger logger2 = this.logger;
|
||||
if (logger2.isTraceEnabled()) {
|
||||
logger2.trace("Object <proxy #{}> removed from ObjectSpace with .toString() = '{}'", objectID, object);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Object <proxy #{}> removed from ObjectSpace with .toString() = '{}'", objectID, object);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -362,23 +381,14 @@ class RmiBridge {
|
|||
@SuppressWarnings("AutoBoxing")
|
||||
public
|
||||
void remove(Object object) {
|
||||
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
|
||||
writeLock.lock();
|
||||
int objectID = objectMap.inverse()
|
||||
.remove(object);
|
||||
|
||||
if (!this.idToObject.containsValue(object, true)) {
|
||||
writeLock.unlock();
|
||||
return;
|
||||
if (objectID == INVALID_MAP_ID) {
|
||||
logger.error("Object {} could not be found in the ObjectSpace.", object);
|
||||
}
|
||||
|
||||
int objectID = this.idToObject.findKey(object, true, -1);
|
||||
this.idToObject.remove(objectID);
|
||||
this.objectToID.remove(object, 0);
|
||||
|
||||
writeLock.unlock();
|
||||
|
||||
Logger logger2 = this.logger;
|
||||
if (logger2.isTraceEnabled()) {
|
||||
logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object);
|
||||
else if (logger.isTraceEnabled()) {
|
||||
logger.trace("Object {} (ID: {}) removed from ObjectSpace.", object, objectID);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -387,28 +397,26 @@ class RmiBridge {
|
|||
*/
|
||||
public
|
||||
Object getRegisteredObject(final int objectID) {
|
||||
ReadLock readLock = this.objectLock.readLock();
|
||||
readLock.lock();
|
||||
if (objectID < 0 || objectID >= INVALID_RMI) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Find an object with the objectID.
|
||||
Object object = this.idToObject.get(objectID);
|
||||
readLock.unlock();
|
||||
|
||||
return object;
|
||||
return objectMap.get(objectID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the ID registered for the specified object, or Integer.MAX_VALUE if not found.
|
||||
* Returns the ID registered for the specified object, or INVALID_RMI if not found.
|
||||
*/
|
||||
public
|
||||
<T> int getRegisteredId(final T object) {
|
||||
// Find an ID with the object.
|
||||
ReadLock readLock = this.objectLock.readLock();
|
||||
int i = objectMap.inverse()
|
||||
.get(object);
|
||||
if (i == INVALID_MAP_ID) {
|
||||
return INVALID_RMI;
|
||||
}
|
||||
|
||||
readLock.lock();
|
||||
int id = this.objectToID.get(object, Integer.MAX_VALUE);
|
||||
readLock.unlock();
|
||||
|
||||
return id;
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ import java.lang.reflect.Proxy;
|
|||
import java.util.ArrayList;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.esotericsoftware.kryo.KryoException;
|
||||
import com.esotericsoftware.kryo.Serializer;
|
||||
import com.esotericsoftware.kryo.util.IdentityMap;
|
||||
|
@ -39,14 +41,9 @@ import dorkbox.network.serialization.CryptoSerializationManager;
|
|||
*/
|
||||
public
|
||||
class RmiObjectLocalHandler extends RmiObjectHandler {
|
||||
private static final boolean ENABLE_PROXY_OBJECTS = ConnectionImpl.ENABLE_PROXY_OBJECTS;
|
||||
private static final boolean ENABLE_PROXY_OBJECTS = RmiBridge.ENABLE_PROXY_OBJECTS;
|
||||
private static final Field[] NO_REMOTE_FIELDS = new Field[0];
|
||||
|
||||
// private static final AtomicReferenceFieldUpdater<RmiObjectLocalHandler, IdentityMap> rmiFieldsREF = AtomicReferenceFieldUpdater.newUpdater(
|
||||
// RmiObjectLocalHandler.class,
|
||||
// IdentityMap.class,
|
||||
// "fieldCache");
|
||||
|
||||
private static final AtomicReferenceFieldUpdater<RmiObjectLocalHandler, IdentityMap> implToProxyREF = AtomicReferenceFieldUpdater.newUpdater(
|
||||
RmiObjectLocalHandler.class,
|
||||
IdentityMap.class,
|
||||
|
@ -57,13 +54,14 @@ class RmiObjectLocalHandler extends RmiObjectHandler {
|
|||
IdentityMap.class,
|
||||
"objectHasRemoteObjects");
|
||||
|
||||
// private volatile IdentityMap<Class<?>, Field[]> fieldCache = new IdentityMap<Class<?>, Field[]>();
|
||||
private volatile IdentityMap<Object, Object> implToProxy = new IdentityMap<Object, Object>();
|
||||
private volatile IdentityMap<Object, Field[]> objectHasRemoteObjects = new IdentityMap<Object, Field[]>();
|
||||
private final Logger logger;
|
||||
|
||||
|
||||
public
|
||||
RmiObjectLocalHandler() {
|
||||
RmiObjectLocalHandler(final Logger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -176,18 +174,28 @@ class RmiObjectLocalHandler extends RmiObjectHandler {
|
|||
// if we are the response, we want to create a proxy object instead of just passing the ACTUAL object.
|
||||
// On the "network" (RemoteObjectSerializer.java) stack, this process is automatic -- and here we have to mimic this behavior.
|
||||
|
||||
// has to be 'registration.remoteObject' because we use it later on
|
||||
if (ENABLE_PROXY_OBJECTS && registration.remoteObject != null) {
|
||||
|
||||
// if PROXY objects are enabled, we replace the IMPLEMENTATION with a proxy object, so that the network logic == local logic.
|
||||
if (ENABLE_PROXY_OBJECTS) {
|
||||
RemoteObject proxyObject = null;
|
||||
|
||||
if (registration.rmiId == RmiBridge.INVALID_RMI) {
|
||||
logger.error("RMI ID '{}' is invalid. Unable to create RMI object.", registration.rmiId);
|
||||
}
|
||||
else {
|
||||
// override the implementation object with the proxy. This is required because RMI must be the same between "network" and "local"
|
||||
// connections -- even if this "slows down" the speed/performance of what "local" connections offer.
|
||||
RemoteObject proxyObject = connection.getProxyObject(registration.rmiId, interfaceClass);
|
||||
proxyObject = connection.getProxyObject(registration.rmiId, interfaceClass);
|
||||
|
||||
if (proxyObject != null) {
|
||||
// have to save A and B so we can correctly switch as necessary
|
||||
//noinspection SynchronizeOnNonFinalField
|
||||
synchronized (implToProxy) {
|
||||
// i know what I'm doing. This must be synchronized.
|
||||
implToProxy.put(registration.remoteObject, proxyObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
connection.runRmiCallback(interfaceClass, callbackId, proxyObject);
|
||||
}
|
||||
|
@ -197,6 +205,7 @@ class RmiObjectLocalHandler extends RmiObjectHandler {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public
|
||||
Object normalMessages(final ConnectionImpl connection, final Object message) {
|
||||
|
@ -208,7 +217,6 @@ class RmiObjectLocalHandler extends RmiObjectHandler {
|
|||
|
||||
// maybe this object is supposed to switch to a proxy object?? (note: we cannot send proxy objects over local/network connections)
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
IdentityMap<Object, Object> implToProxy = implToProxyREF.get(this);
|
||||
IdentityMap<Object, Field[]> objectHasRemoteObjects = remoteObjectREF.get(this);
|
||||
|
||||
|
@ -324,78 +332,4 @@ class RmiObjectLocalHandler extends RmiObjectHandler {
|
|||
|
||||
return message;
|
||||
}
|
||||
|
||||
|
||||
// private
|
||||
// LocalRmiClassEncoder replaceFieldObjects(final ConnectionImpl connection, final Object object, final Class<?> implClass) {
|
||||
// Field[] rmiFields = fieldCache.get(implClass);
|
||||
// int length = rmiFields.length;
|
||||
//
|
||||
// Object rmiObject = null;
|
||||
// int[] rmiFieldIds = new int[length];
|
||||
// for (int i = 0; i < length; i++) {
|
||||
// Field field = rmiFields[i];
|
||||
//
|
||||
// // if it's an RMI object we want to write out a proxy object in the field instead of the actual object
|
||||
// try {
|
||||
// rmiObject = field.get(object);
|
||||
// } catch (IllegalAccessException e) {
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
//
|
||||
// if (rmiObject == null) {
|
||||
// rmiFieldIds[i] = 0; // 0 means it was null
|
||||
// }
|
||||
//
|
||||
// final Map<Object, Integer> localWeakCache = objectThreadLocals.get();
|
||||
//
|
||||
// Integer id = localWeakCache.get(rmiObject);
|
||||
// if (id == null) {
|
||||
// // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side.
|
||||
// int registeredId = connection.getRegisteredId(rmiObject);
|
||||
// rmiFieldIds[i] = registeredId;
|
||||
// localWeakCache.put(rmiObject, registeredId);
|
||||
// }
|
||||
// else {
|
||||
// rmiFieldIds[i] = id;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// LocalRmiClassEncoder localRmiClassEncoder = new LocalRmiClassEncoder();
|
||||
// localRmiClassEncoder.rmiObject = object;
|
||||
// localRmiClassEncoder.rmiFieldIds = rmiFieldIds;
|
||||
//
|
||||
// return localRmiClassEncoder;
|
||||
// }
|
||||
|
||||
// Field[] getFields(final Class<?> clazz) {
|
||||
// // duplicates are OK, because they will contain the same information, so we DO NOT care about single writers
|
||||
//
|
||||
// //noinspection unchecked
|
||||
// final IdentityMap<Class<?>, Field[]> identityMap = rmiFieldsREF.get(this);
|
||||
//
|
||||
//
|
||||
// Field[] rmiFields = identityMap.get(clazz);
|
||||
// if (rmiFields != null) {
|
||||
// return rmiFields;
|
||||
// }
|
||||
//
|
||||
// final ArrayList<Field> fields = new ArrayList<Field>();
|
||||
//
|
||||
// for (Field field : clazz.getDeclaredFields()) {
|
||||
// if (field.getAnnotation(Rmi.class) != null) {
|
||||
// fields.add(field);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
//
|
||||
// rmiFields = new Field[fields.size()];
|
||||
// fields.toArray(rmiFields);
|
||||
//
|
||||
// // save in cache
|
||||
// fieldCache.put(clazz, rmiFields);
|
||||
// rmiFieldsREF.lazySet(this, fieldCache);
|
||||
//
|
||||
// return rmiFields;
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -15,14 +15,19 @@
|
|||
*/
|
||||
package dorkbox.network.rmi;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import dorkbox.network.connection.ConnectionImpl;
|
||||
import dorkbox.network.connection.Listener;
|
||||
|
||||
public
|
||||
class RmiObjectNetworkHandler extends RmiObjectHandler {
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
public
|
||||
RmiObjectNetworkHandler() {
|
||||
RmiObjectNetworkHandler(final Logger logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,6 +71,10 @@ class RmiObjectNetworkHandler extends RmiObjectHandler {
|
|||
}
|
||||
}
|
||||
else {
|
||||
if (registration.rmiId == RmiBridge.INVALID_RMI) {
|
||||
logger.error("RMI ID '{}' is invalid. Unable to create RMI object.", registration.rmiId);
|
||||
}
|
||||
|
||||
// this is the response.
|
||||
// THIS IS ON THE LOCAL CONNECTION SIDE, which is the side that called 'getRemoteObject()' This can be Server or Client.
|
||||
connection.runRmiCallback(interfaceClass, callbackId, registration.remoteObject);
|
||||
|
|
|
@ -361,7 +361,7 @@ class RmiGlobalTest extends BaseTest {
|
|||
class TestCowImpl extends ConnectionAware implements TestCow {
|
||||
public long value = System.currentTimeMillis();
|
||||
public int moos;
|
||||
private final int id = 1;
|
||||
private final int id = 0; // the RMI id should be == to this for each direction.
|
||||
|
||||
public
|
||||
TestCowImpl() {
|
||||
|
|
Loading…
Reference in New Issue
Block a user