ConnectionRmiSupport refactor. Moved internal methods to Connection_

This commit is contained in:
nathan 2019-01-25 16:41:13 +01:00
parent db69e2f9c8
commit 12a9293526
15 changed files with 438 additions and 406 deletions

View File

@ -361,13 +361,6 @@ class Client<C extends Connection> extends EndPointClient implements Connection
return connection.isLoopback();
}
@SuppressWarnings("rawtypes")
@Override
public
EndPoint getEndPoint() {
return this;
}
/**
* @return the connection (TCP or LOCAL) id of this connection.
*/

View File

@ -41,12 +41,6 @@ interface Connection {
*/
boolean isLoopback();
/**
* @return the endpoint associated with this connection
*/
@SuppressWarnings("rawtypes")
EndPoint getEndPoint();
/**
* @return the connection (TCP or LOCAL) id of this connection.
*/

View File

@ -34,8 +34,9 @@ import dorkbox.network.connection.ping.PingTuple;
import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelNull;
import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.rmi.ConnectionNoOpSupport;
import dorkbox.network.rmi.ConnectionRmiImplSupport;
import dorkbox.network.rmi.ConnectionRmiSupport;
import dorkbox.network.rmi.ConnectionSupport;
import dorkbox.network.rmi.RemoteObjectCallback;
import dorkbox.network.rmi.RmiObjectHandler;
import io.netty.bootstrap.DatagramSessionChannel;
@ -128,7 +129,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
// RMI support for this connection
final ConnectionSupport rmiSupport;
final ConnectionRmiSupport rmiSupport;
/**
* All of the parameters can be null, when metaChannel wants to get the base class type
@ -145,7 +146,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
boolean isNetworkChannel = this.channelWrapper instanceof ChannelNetworkWrapper;
if (endPoint.rmiEnabled) {
RmiObjectHandler handler;
if (isNetworkChannel) {
handler = endPoint.rmiNetworkHandler;
@ -156,13 +156,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent access, but
// there WILL be issues with thread visibility because a different worker thread can be called for different connections
this.rmiSupport = new ConnectionRmiSupport(this, endPoint.rmiGlobalBridge, handler);
this.rmiSupport = new ConnectionRmiImplSupport(this, endPoint.rmiGlobalBridge, handler);
} else {
this.rmiSupport = new ConnectionSupport();
this.rmiSupport = new ConnectionNoOpSupport();
}
if (isNetworkChannel) {
this.remoteKeyChanged = ((ChannelNetworkWrapper) channelWrapper).remoteKeyChanged();
@ -189,7 +188,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
this.logger = null;
this.sessionManager = null;
this.channelWrapper = null;
this.rmiSupport = new ConnectionSupport();
this.rmiSupport = new ConnectionNoOpSupport();
}
}
@ -1012,7 +1011,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
@Override
public
ConnectionSupport rmiSupport() {
ConnectionRmiSupport rmiSupport() {
return rmiSupport;
}

View File

@ -17,10 +17,10 @@ package dorkbox.network.connection;
import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.rmi.ConnectionSupport;
import dorkbox.network.rmi.ConnectionRmiSupport;
/**
* Supporting methods for encrypting data to a remote endpoint and RMI
* Supporting methods that are internal to the network stack
*/
public
interface Connection_ extends Connection {
@ -28,7 +28,7 @@ interface Connection_ extends Connection {
/**
* @return the RMI support for this connection
*/
ConnectionSupport rmiSupport();
ConnectionRmiSupport rmiSupport();
/**
* This is the per-message sequence number.
@ -46,4 +46,10 @@ interface Connection_ extends Connection {
* clobber each other
*/
ParametersWithIV getCryptoParameters();
/**
* @return the endpoint associated with this connection
*/
@SuppressWarnings("rawtypes")
EndPoint getEndPoint();
}

View File

@ -28,7 +28,7 @@ import com.esotericsoftware.kryo.util.MapReferenceResolver;
import dorkbox.network.pipeline.ByteBufInput;
import dorkbox.network.pipeline.ByteBufOutput;
import dorkbox.network.rmi.ConnectionSupport;
import dorkbox.network.rmi.ConnectionRmiSupport;
import dorkbox.network.rmi.RmiNopConnection;
import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.util.bytes.BigEndian;
@ -59,7 +59,7 @@ class KryoExtra extends Kryo {
private final ByteBufOutput writer = new ByteBufOutput();
// volatile to provide object visibility for entire class. This is unique per connection
public volatile ConnectionSupport rmiSupport;
public volatile ConnectionRmiSupport rmiSupport;
private final GCMBlockCipher aesEngine = new GCMBlockCipher(new AESFastEngine());

View File

@ -25,7 +25,7 @@ import dorkbox.network.connection.Listeners;
import dorkbox.network.connection.bridge.ConnectionBridge;
import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.connection.idle.IdleSender;
import dorkbox.network.rmi.ConnectionSupport;
import dorkbox.network.rmi.ConnectionNoOpSupport;
import dorkbox.network.rmi.RemoteObjectCallback;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -169,7 +169,7 @@ class ConnectionRegistrationImpl implements Connection_, ChannelHandler {
@Override
public
ConnectionSupport rmiSupport() {
ConnectionNoOpSupport rmiSupport() {
return null;
}
}

View File

@ -18,11 +18,8 @@ import org.slf4j.Logger;
import dorkbox.network.connection.ConnectionImpl;
/**
*
*/
public
class ConnectionSupport {
class ConnectionNoOpSupport implements ConnectionRmiSupport {
public
void close() {
}

View File

@ -0,0 +1,396 @@
/*
* Copyright 2019 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.util.AbstractMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.KryoExtra;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listener.OnMessageReceived;
import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.util.collections.LockFreeHashMap;
import dorkbox.util.collections.LockFreeIntMap;
import dorkbox.util.generics.ClassHelper;
public
class ConnectionRmiImplSupport implements ConnectionRmiSupport {
private final RmiBridge rmiGlobalBridge;
private final RmiBridge rmiLocalBridge;
private final RmiObjectHandler rmiHandler;
private final Map<Integer, RemoteObject> proxyIdCache;
private final List<OnMessageReceived<Connection, InvokeMethodResult>> proxyListeners;
final ConnectionImpl connection;
private final LockFreeIntMap<RemoteObjectCallback> rmiRegistrationCallbacks;
private final Logger logger;
private volatile int rmiCallbackId = 0;
public
ConnectionRmiImplSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge, final RmiObjectHandler rmiHandler) {
this.connection = connection;
if (rmiGlobalBridge == null || rmiHandler == null) {
throw new NullPointerException("RMI cannot be null if using RMI support!");
}
this.rmiGlobalBridge = rmiGlobalBridge;
this.rmiHandler = rmiHandler;
logger = rmiGlobalBridge.logger;
// * @param executor
// * Sets the executor used to invoke methods when an invocation is received from a remote endpoint. By default, no
// * executor is set and invocations occur on the network thread, which should not be blocked for long, May be null.
rmiLocalBridge = new RmiBridge(logger, false);
proxyIdCache = new LockFreeHashMap<Integer, RemoteObject>();
proxyListeners = new CopyOnWriteArrayList<OnMessageReceived<Connection, InvokeMethodResult>>();
rmiRegistrationCallbacks = new LockFreeIntMap<RemoteObjectCallback>();
}
public
void close() {
// proxy listeners are cleared in the removeAll() call (which happens BEFORE close)
proxyIdCache.clear();
rmiRegistrationCallbacks.clear();
}
/**
* This will remove the invoke and invoke response listeners for this remote object
*/
public
void removeAllListeners() {
proxyListeners.clear();
}
public
<Iface> void createRemoteObject(final ConnectionImpl connection, final Class<Iface> interfaceClass, final RemoteObjectCallback<Iface> callback) {
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException("Cannot create a proxy for RMI access. It must be an interface.");
}
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
//noinspection NonAtomicOperationOnVolatileField
int nextRmiCallbackId = rmiCallbackId++;
rmiRegistrationCallbacks.put(nextRmiCallbackId, callback);
RmiRegistration message = new RmiRegistration(interfaceClass, RmiBridge.INVALID_RMI, nextRmiCallbackId);
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
// this means we are creating a NEW object on the server, bound access to only this connection
connection.send(message).flush();
}
public
<Iface> void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback<Iface> callback) {
if (objectId < 0) {
throw new IllegalStateException("Object ID cannot be < 0");
}
if (objectId >= RmiBridge.INVALID_RMI) {
throw new IllegalStateException("Object ID cannot be >= " + RmiBridge.INVALID_RMI);
}
Class<?> iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback.class, callback.getClass(), 0);
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
//noinspection NonAtomicOperationOnVolatileField
int nextRmiCallbackId = rmiCallbackId++;
rmiRegistrationCallbacks.put(nextRmiCallbackId, callback);
RmiRegistration message = new RmiRegistration(iFaceClass, objectId, nextRmiCallbackId);
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
// this means we are getting an EXISTING object on the server, bound access to only this connection
connection.send(message).flush();
}
/**
* Manages the RMI stuff for a connection.
*/
public
boolean manage(final ConnectionImpl connection, final Object message) {
if (message instanceof InvokeMethod) {
NetworkSerializationManager serialization = connection.getEndPoint().getSerialization();
InvokeMethod invokeMethod = rmiHandler.getInvokeMethod(serialization, connection, (InvokeMethod) message);
int objectID = invokeMethod.objectID;
// have to make sure to get the correct object (global vs local)
// This is what is overridden when registering interfaces/classes for RMI.
// objectID is the interface ID, and this returns the implementation ID.
final Object target = getImplementationObject(objectID);
if (target == null) {
logger.warn("Ignoring remote invocation request for unknown object ID: {}", objectID);
return true; // maybe false?
}
try {
InvokeMethodResult result = RmiBridge.invoke(connection, target, invokeMethod, logger);
if (result != null) {
// System.err.println("Sending: " + invokeMethod.responseID);
connection.send(result).flush();
}
} catch (IOException e) {
logger.error("Unable to invoke method.", e);
}
return true;
}
else if (message instanceof InvokeMethodResult) {
for (Listener.OnMessageReceived<Connection, InvokeMethodResult> proxyListener : proxyListeners) {
proxyListener.received(connection, (InvokeMethodResult) message);
}
return true;
}
else if (message instanceof RmiRegistration) {
rmiHandler.registration(this, connection, (RmiRegistration) message);
return true;
}
// not the correct type
return false;
}
/**
* For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically.
* For local connections, we have to switch it appropriately in the LocalRmiProxy
*/
public
RmiRegistration createNewRmiObject(final NetworkSerializationManager serialization, final Class<?> interfaceClass, final Class<?> implementationClass, final int callbackId, final Logger logger) {
KryoExtra kryo = null;
Object object = null;
int rmiId = 0;
try {
kryo = serialization.takeKryo();
// because the INTERFACE is what is registered with kryo (not the impl) we have to temporarily permit unregistered classes (which have an ID of -1)
// so we can cache the instantiator for this class.
boolean registrationRequired = kryo.isRegistrationRequired();
kryo.setRegistrationRequired(false);
// this is what creates a new instance of the impl class, and stores it as an ID.
object = kryo.newInstance(implementationClass);
if (registrationRequired) {
// only if it's different should we call this again.
kryo.setRegistrationRequired(true);
}
rmiId = rmiLocalBridge.register(object);
if (rmiId == RmiBridge.INVALID_RMI) {
// this means that there are too many RMI ids (either global or connection specific!)
object = null;
}
else {
// if we are invalid, skip going over fields that might also be RMI objects, BECAUSE our object will be NULL!
// the @Rmi annotation allows an RMI object to have fields with objects that are ALSO RMI objects
LinkedList<Entry<Class<?>, Object>> classesToCheck = new LinkedList<Map.Entry<Class<?>, Object>>();
classesToCheck.add(new AbstractMap.SimpleEntry<Class<?>, Object>(implementationClass, object));
Map.Entry<Class<?>, Object> remoteClassObject;
while (!classesToCheck.isEmpty()) {
remoteClassObject = classesToCheck.removeFirst();
// we have to check the IMPLEMENTATION for any additional fields that will have proxy information.
// we use getDeclaredFields() + walking the object hierarchy, so we get ALL the fields possible (public + private).
for (Field field : remoteClassObject.getKey()
.getDeclaredFields()) {
if (field.getAnnotation(Rmi.class) != null) {
final Class<?> type = field.getType();
if (!type.isInterface()) {
// the type must be an interface, otherwise RMI cannot create a proxy object
logger.error("Error checking RMI fields for: {}.{} -- It is not an interface!",
remoteClassObject.getKey(),
field.getName());
continue;
}
boolean prev = field.isAccessible();
field.setAccessible(true);
final Object o;
try {
o = field.get(remoteClassObject.getValue());
rmiLocalBridge.register(o);
classesToCheck.add(new AbstractMap.SimpleEntry<Class<?>, Object>(type, o));
} catch (IllegalAccessException e) {
logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e);
} finally {
field.setAccessible(prev);
}
}
}
// have to check the object hierarchy as well
Class<?> superclass = remoteClassObject.getKey()
.getSuperclass();
if (superclass != null && superclass != Object.class) {
classesToCheck.add(new AbstractMap.SimpleEntry<Class<?>, Object>(superclass, remoteClassObject.getValue()));
}
}
}
} catch (Exception e) {
logger.error("Error registering RMI class " + implementationClass, e);
} finally {
if (kryo != null) {
// we use kryo to create a new instance - so only return it on error or when it's done creating a new instance
serialization.returnKryo(kryo);
}
}
return new RmiRegistration(interfaceClass, rmiId, callbackId, object);
}
public
void runCallback(final Class<?> interfaceClass, final int callbackId, final Object remoteObject, final Logger logger) {
RemoteObjectCallback callback = rmiRegistrationCallbacks.remove(callbackId);
try {
//noinspection unchecked
callback.created(remoteObject);
} catch (Exception e) {
logger.error("Error getting or creating the remote object " + interfaceClass, e);
}
}
/**
* Used by RMI by the LOCAL side when setting up the to fetch an object for the REMOTE side
*
* @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID.
*/
public
<T> int getRegisteredId(final T object) {
// always check global before checking local, because less contention on the synchronization
int objectId = rmiGlobalBridge.getRegisteredId(object);
if (objectId != RmiBridge.INVALID_RMI) {
return objectId;
}
else {
// might return RmiBridge.INVALID_RMI;
return rmiLocalBridge.getRegisteredId(object);
}
}
/**
* This is used by RMI for the REMOTE side, to get the implementation
*
* @param objectId this is the RMI object ID
*/
public
Object getImplementationObject(final int objectId) {
if (RmiBridge.isGlobal(objectId)) {
return rmiGlobalBridge.getRegisteredObject(objectId);
} else {
return rmiLocalBridge.getRegisteredObject(objectId);
}
}
/**
* Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)}
* <p>
* <p>
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
* remotely.
* <p>
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will
* have the proxy object replaced with the registered object.
*
* @see RemoteObject
* @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param iFace this is the RMI interface
*/
public
RemoteObject getProxyObject(final int rmiId, final Class<?> iFace) {
if (iFace == null) {
throw new IllegalArgumentException("iface cannot be null.");
}
if (!iFace.isInterface()) {
throw new IllegalArgumentException("iface must be an interface.");
}
// we want to have a connection specific cache of IDs
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
RemoteObject remoteObject = proxyIdCache.get(rmiId);
if (remoteObject == null) {
// duplicates are fine, as they represent the same object (as specified by the ID) on the remote side.
// the ACTUAL proxy is created in the connection impl.
RmiProxyHandler proxyObject = new RmiProxyHandler(this.connection, this, rmiId, iFace);
proxyListeners.add(proxyObject.getListener());
// This is the interface inheritance by the proxy object
Class<?>[] temp = new Class<?>[2];
temp[0] = RemoteObject.class;
temp[1] = iFace;
remoteObject = (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, proxyObject);
proxyIdCache.put(rmiId, remoteObject);
}
return remoteObject;
}
public
Object fixupRmi(final ConnectionImpl connection, final Object message) {
// "local RMI" objects have to be modified, this part does that
return rmiHandler.normalMessages(this, message);
}
}

View File

@ -14,382 +14,29 @@
*/
package dorkbox.network.rmi;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.util.AbstractMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.KryoExtra;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listener.OnMessageReceived;
import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.util.collections.LockFreeHashMap;
import dorkbox.util.collections.LockFreeIntMap;
import dorkbox.util.generics.ClassHelper;
public
class ConnectionRmiSupport extends ConnectionSupport {
private final RmiBridge rmiGlobalBridge;
private final RmiBridge rmiLocalBridge;
private final RmiObjectHandler rmiHandler;
interface ConnectionRmiSupport {
void close();
private final Map<Integer, RemoteObject> proxyIdCache;
private final List<OnMessageReceived<Connection, InvokeMethodResult>> proxyListeners;
void removeAllListeners();
final ConnectionImpl connection;
<Iface> void createRemoteObject(final ConnectionImpl connection, final Class<Iface> interfaceClass, final RemoteObjectCallback<Iface> callback);
private final LockFreeIntMap<RemoteObjectCallback> rmiRegistrationCallbacks;
private final Logger logger;
private volatile int rmiCallbackId = 0;
<Iface> void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback<Iface> callback);
boolean manage(final ConnectionImpl connection, final Object message);
public
ConnectionRmiSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge, final RmiObjectHandler rmiHandler) {
this.connection = connection;
Object fixupRmi(final ConnectionImpl connection, final Object message);
if (rmiGlobalBridge == null || rmiHandler == null) {
throw new NullPointerException("RMI cannot be null if using RMI support!");
}
<T> int getRegisteredId(final T object);
this.rmiGlobalBridge = rmiGlobalBridge;
this.rmiHandler = rmiHandler;
void runCallback(final Class<?> interfaceClass, final int callbackId, final Object remoteObject, final Logger logger);
logger = rmiGlobalBridge.logger;
RemoteObject getProxyObject(final int rmiId, final Class<?> iFace);
// * @param executor
// * Sets the executor used to invoke methods when an invocation is received from a remote endpoint. By default, no
// * executor is set and invocations occur on the network thread, which should not be blocked for long, May be null.
rmiLocalBridge = new RmiBridge(logger, false);
proxyIdCache = new LockFreeHashMap<Integer, RemoteObject>();
proxyListeners = new CopyOnWriteArrayList<OnMessageReceived<Connection, InvokeMethodResult>>();
rmiRegistrationCallbacks = new LockFreeIntMap<RemoteObjectCallback>();
}
public
void close() {
// proxy listeners are cleared in the removeAll() call (which happens BEFORE close)
proxyIdCache.clear();
rmiRegistrationCallbacks.clear();
}
/**
* This will remove the invoke and invoke response listeners for this remote object
*/
public
void removeAllListeners() {
proxyListeners.clear();
}
public
<Iface> void createRemoteObject(final ConnectionImpl connection, final Class<Iface> interfaceClass, final RemoteObjectCallback<Iface> callback) {
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException("Cannot create a proxy for RMI access. It must be an interface.");
}
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
//noinspection NonAtomicOperationOnVolatileField
int nextRmiCallbackId = rmiCallbackId++;
rmiRegistrationCallbacks.put(nextRmiCallbackId, callback);
RmiRegistration message = new RmiRegistration(interfaceClass, RmiBridge.INVALID_RMI, nextRmiCallbackId);
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
// this means we are creating a NEW object on the server, bound access to only this connection
connection.send(message).flush();
}
public
<Iface> void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback<Iface> callback) {
if (objectId < 0) {
throw new IllegalStateException("Object ID cannot be < 0");
}
if (objectId >= RmiBridge.INVALID_RMI) {
throw new IllegalStateException("Object ID cannot be >= " + RmiBridge.INVALID_RMI);
}
Class<?> iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback.class, callback.getClass(), 0);
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
//noinspection NonAtomicOperationOnVolatileField
int nextRmiCallbackId = rmiCallbackId++;
rmiRegistrationCallbacks.put(nextRmiCallbackId, callback);
RmiRegistration message = new RmiRegistration(iFaceClass, objectId, nextRmiCallbackId);
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
// this means we are getting an EXISTING object on the server, bound access to only this connection
connection.send(message).flush();
}
/**
* Manages the RMI stuff for a connection.
*/
public
boolean manage(final ConnectionImpl connection, final Object message) {
if (message instanceof InvokeMethod) {
NetworkSerializationManager serialization = connection.getEndPoint().getSerialization();
InvokeMethod invokeMethod = rmiHandler.getInvokeMethod(serialization, connection, (InvokeMethod) message);
int objectID = invokeMethod.objectID;
// have to make sure to get the correct object (global vs local)
// This is what is overridden when registering interfaces/classes for RMI.
// objectID is the interface ID, and this returns the implementation ID.
final Object target = getImplementationObject(objectID);
if (target == null) {
logger.warn("Ignoring remote invocation request for unknown object ID: {}", objectID);
return true; // maybe false?
}
try {
InvokeMethodResult result = RmiBridge.invoke(connection, target, invokeMethod, logger);
if (result != null) {
// System.err.println("Sending: " + invokeMethod.responseID);
connection.send(result).flush();
}
} catch (IOException e) {
logger.error("Unable to invoke method.", e);
}
return true;
}
else if (message instanceof InvokeMethodResult) {
for (Listener.OnMessageReceived<Connection, InvokeMethodResult> proxyListener : proxyListeners) {
proxyListener.received(connection, (InvokeMethodResult) message);
}
return true;
}
else if (message instanceof RmiRegistration) {
rmiHandler.registration(this, connection, (RmiRegistration) message);
return true;
}
// not the correct type
return false;
}
/**
* For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically.
* For local connections, we have to switch it appropriately in the LocalRmiProxy
*/
public
RmiRegistration createNewRmiObject(final NetworkSerializationManager serialization, final Class<?> interfaceClass, final Class<?> implementationClass, final int callbackId, final Logger logger) {
KryoExtra kryo = null;
Object object = null;
int rmiId = 0;
try {
kryo = serialization.takeKryo();
// because the INTERFACE is what is registered with kryo (not the impl) we have to temporarily permit unregistered classes (which have an ID of -1)
// so we can cache the instantiator for this class.
boolean registrationRequired = kryo.isRegistrationRequired();
kryo.setRegistrationRequired(false);
// this is what creates a new instance of the impl class, and stores it as an ID.
object = kryo.newInstance(implementationClass);
if (registrationRequired) {
// only if it's different should we call this again.
kryo.setRegistrationRequired(true);
}
rmiId = rmiLocalBridge.register(object);
if (rmiId == RmiBridge.INVALID_RMI) {
// this means that there are too many RMI ids (either global or connection specific!)
object = null;
}
else {
// if we are invalid, skip going over fields that might also be RMI objects, BECAUSE our object will be NULL!
// the @Rmi annotation allows an RMI object to have fields with objects that are ALSO RMI objects
LinkedList<Entry<Class<?>, Object>> classesToCheck = new LinkedList<Map.Entry<Class<?>, Object>>();
classesToCheck.add(new AbstractMap.SimpleEntry<Class<?>, Object>(implementationClass, object));
Map.Entry<Class<?>, Object> remoteClassObject;
while (!classesToCheck.isEmpty()) {
remoteClassObject = classesToCheck.removeFirst();
// we have to check the IMPLEMENTATION for any additional fields that will have proxy information.
// we use getDeclaredFields() + walking the object hierarchy, so we get ALL the fields possible (public + private).
for (Field field : remoteClassObject.getKey()
.getDeclaredFields()) {
if (field.getAnnotation(Rmi.class) != null) {
final Class<?> type = field.getType();
if (!type.isInterface()) {
// the type must be an interface, otherwise RMI cannot create a proxy object
logger.error("Error checking RMI fields for: {}.{} -- It is not an interface!",
remoteClassObject.getKey(),
field.getName());
continue;
}
boolean prev = field.isAccessible();
field.setAccessible(true);
final Object o;
try {
o = field.get(remoteClassObject.getValue());
rmiLocalBridge.register(o);
classesToCheck.add(new AbstractMap.SimpleEntry<Class<?>, Object>(type, o));
} catch (IllegalAccessException e) {
logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e);
} finally {
field.setAccessible(prev);
}
}
}
// have to check the object hierarchy as well
Class<?> superclass = remoteClassObject.getKey()
.getSuperclass();
if (superclass != null && superclass != Object.class) {
classesToCheck.add(new AbstractMap.SimpleEntry<Class<?>, Object>(superclass, remoteClassObject.getValue()));
}
}
}
} catch (Exception e) {
logger.error("Error registering RMI class " + implementationClass, e);
} finally {
if (kryo != null) {
// we use kryo to create a new instance - so only return it on error or when it's done creating a new instance
serialization.returnKryo(kryo);
}
}
return new RmiRegistration(interfaceClass, rmiId, callbackId, object);
}
public
void runCallback(final Class<?> interfaceClass, final int callbackId, final Object remoteObject, final Logger logger) {
RemoteObjectCallback callback = rmiRegistrationCallbacks.remove(callbackId);
try {
//noinspection unchecked
callback.created(remoteObject);
} catch (Exception e) {
logger.error("Error getting or creating the remote object " + interfaceClass, e);
}
}
/**
* Used by RMI by the LOCAL side when setting up the to fetch an object for the REMOTE side
*
* @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID.
*/
public
<T> int getRegisteredId(final T object) {
// always check global before checking local, because less contention on the synchronization
int objectId = rmiGlobalBridge.getRegisteredId(object);
if (objectId != RmiBridge.INVALID_RMI) {
return objectId;
}
else {
// might return RmiBridge.INVALID_RMI;
return rmiLocalBridge.getRegisteredId(object);
}
}
/**
* This is used by RMI for the REMOTE side, to get the implementation
*
* @param objectId this is the RMI object ID
*/
public
Object getImplementationObject(final int objectId) {
if (RmiBridge.isGlobal(objectId)) {
return rmiGlobalBridge.getRegisteredObject(objectId);
} else {
return rmiLocalBridge.getRegisteredObject(objectId);
}
}
/**
* Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)}
* <p>
* <p>
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
* remotely.
* <p>
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will
* have the proxy object replaced with the registered object.
*
* @see RemoteObject
* @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param iFace this is the RMI interface
*/
public
RemoteObject getProxyObject(final int rmiId, final Class<?> iFace) {
if (iFace == null) {
throw new IllegalArgumentException("iface cannot be null.");
}
if (!iFace.isInterface()) {
throw new IllegalArgumentException("iface must be an interface.");
}
// we want to have a connection specific cache of IDs
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent
// access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections
RemoteObject remoteObject = proxyIdCache.get(rmiId);
if (remoteObject == null) {
// duplicates are fine, as they represent the same object (as specified by the ID) on the remote side.
// the ACTUAL proxy is created in the connection impl.
RmiProxyHandler proxyObject = new RmiProxyHandler(this.connection, this, rmiId, iFace);
proxyListeners.add(proxyObject.getListener());
// This is the interface inheritance by the proxy object
Class<?>[] temp = new Class<?>[2];
temp[0] = RemoteObject.class;
temp[1] = iFace;
remoteObject = (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, proxyObject);
proxyIdCache.put(rmiId, remoteObject);
}
return remoteObject;
}
public
Object fixupRmi(final ConnectionImpl connection, final Object message) {
// "local RMI" objects have to be modified, this part does that
return rmiHandler.normalMessages(this, message);
}
Object getImplementationObject(final int objectId);
}

View File

@ -127,7 +127,7 @@ class InvokeMethodSerializer extends Serializer<InvokeMethod> {
argStartIndex = 1;
args = new Object[serializers.length + 1];
args[0] = ((ConnectionRmiSupport) ((KryoExtra) kryo).rmiSupport).connection;
args[0] = ((ConnectionRmiImplSupport) ((KryoExtra) kryo).rmiSupport).connection;
}
else {
method = cachedMethod.method;

View File

@ -124,7 +124,7 @@ class RmiNopConnection implements Connection_ {
@Override
public
ConnectionSupport rmiSupport() {
ConnectionNoOpSupport rmiSupport() {
return null;
}

View File

@ -23,7 +23,7 @@ interface RmiObjectHandler {
InvokeMethod getInvokeMethod(final NetworkSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod);
void registration(final ConnectionRmiSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration message);
void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration message);
Object normalMessages(final ConnectionRmiSupport connection, final Object message);
Object normalMessages(final ConnectionRmiImplSupport connection, final Object message);
}

View File

@ -121,7 +121,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
@Override
public
void registration(final ConnectionRmiSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) {
void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) {
// manage creating/getting/notifying this RMI object
// these fields are ALWAYS present!
@ -198,7 +198,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
@SuppressWarnings("unchecked")
@Override
public
Object normalMessages(final ConnectionRmiSupport rmiSupport, final Object message) {
Object normalMessages(final ConnectionRmiImplSupport rmiSupport, final Object message) {
// else, this was "just a local message"
// because we NORMALLY pass around just the object (there is no serialization going on...) we have to explicitly check to see

View File

@ -38,7 +38,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler {
@Override
public
void registration(final ConnectionRmiSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) {
void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) {
// manage creating/getting/notifying this RMI object
// these fields are ALWAYS present!
@ -88,7 +88,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler {
@Override
public
Object normalMessages(final ConnectionRmiSupport connection, final Object message) {
Object normalMessages(final ConnectionRmiImplSupport connection, final Object message) {
return message;
}
}

View File

@ -72,7 +72,7 @@ class RmiProxyHandler implements InvocationHandler {
private final boolean[] pendingResponses = new boolean[64];
private final ConnectionImpl connection;
private final ConnectionRmiSupport rmiSupport;
private final ConnectionRmiImplSupport rmiSupport;
public final int rmiObjectId; // this is the RMI id
public final int classId; // this is the KRYO class id
@ -104,7 +104,7 @@ class RmiProxyHandler implements InvocationHandler {
* @param iFace this is the RMI interface
*/
public
RmiProxyHandler(final ConnectionImpl connection, final ConnectionRmiSupport rmiSupport, final int rmiId, final Class<?> iFace) {
RmiProxyHandler(final ConnectionImpl connection, final ConnectionRmiImplSupport rmiSupport, final int rmiId, final Class<?> iFace) {
super();
this.connection = connection;