From 12a929352663f1abf2dcb128d8d11180cd7d0b8a Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 25 Jan 2019 16:41:13 +0100 Subject: [PATCH] ConnectionRmiSupport refactor. Moved internal methods to Connection_ --- src/dorkbox/network/Client.java | 7 - .../network/connection/Connection.java | 6 - .../network/connection/ConnectionImpl.java | 15 +- .../network/connection/Connection_.java | 12 +- src/dorkbox/network/connection/KryoExtra.java | 4 +- .../ConnectionRegistrationImpl.java | 4 +- ...upport.java => ConnectionNoOpSupport.java} | 5 +- .../network/rmi/ConnectionRmiImplSupport.java | 396 ++++++++++++++++++ .../network/rmi/ConnectionRmiSupport.java | 375 +---------------- .../network/rmi/InvokeMethodSerializer.java | 2 +- src/dorkbox/network/rmi/RmiNopConnection.java | 2 +- src/dorkbox/network/rmi/RmiObjectHandler.java | 4 +- .../network/rmi/RmiObjectLocalHandler.java | 4 +- .../network/rmi/RmiObjectNetworkHandler.java | 4 +- src/dorkbox/network/rmi/RmiProxyHandler.java | 4 +- 15 files changed, 438 insertions(+), 406 deletions(-) rename src/dorkbox/network/rmi/{ConnectionSupport.java => ConnectionNoOpSupport.java} (96%) create mode 100644 src/dorkbox/network/rmi/ConnectionRmiImplSupport.java diff --git a/src/dorkbox/network/Client.java b/src/dorkbox/network/Client.java index 488084be..889d494f 100644 --- a/src/dorkbox/network/Client.java +++ b/src/dorkbox/network/Client.java @@ -361,13 +361,6 @@ class Client extends EndPointClient implements Connection return connection.isLoopback(); } - @SuppressWarnings("rawtypes") - @Override - public - EndPoint getEndPoint() { - return this; - } - /** * @return the connection (TCP or LOCAL) id of this connection. */ diff --git a/src/dorkbox/network/connection/Connection.java b/src/dorkbox/network/connection/Connection.java index 938d6be3..7b9f4fc9 100644 --- a/src/dorkbox/network/connection/Connection.java +++ b/src/dorkbox/network/connection/Connection.java @@ -41,12 +41,6 @@ interface Connection { */ boolean isLoopback(); - /** - * @return the endpoint associated with this connection - */ - @SuppressWarnings("rawtypes") - EndPoint getEndPoint(); - /** * @return the connection (TCP or LOCAL) id of this connection. */ diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index a6c6dbbf..88cc26da 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -34,8 +34,9 @@ import dorkbox.network.connection.ping.PingTuple; import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; import dorkbox.network.connection.wrapper.ChannelNull; import dorkbox.network.connection.wrapper.ChannelWrapper; +import dorkbox.network.rmi.ConnectionNoOpSupport; +import dorkbox.network.rmi.ConnectionRmiImplSupport; import dorkbox.network.rmi.ConnectionRmiSupport; -import dorkbox.network.rmi.ConnectionSupport; import dorkbox.network.rmi.RemoteObjectCallback; import dorkbox.network.rmi.RmiObjectHandler; import io.netty.bootstrap.DatagramSessionChannel; @@ -128,7 +129,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ // RMI support for this connection - final ConnectionSupport rmiSupport; + final ConnectionRmiSupport rmiSupport; /** * All of the parameters can be null, when metaChannel wants to get the base class type @@ -145,7 +146,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ boolean isNetworkChannel = this.channelWrapper instanceof ChannelNetworkWrapper; if (endPoint.rmiEnabled) { - RmiObjectHandler handler; if (isNetworkChannel) { handler = endPoint.rmiNetworkHandler; @@ -156,13 +156,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent access, but // there WILL be issues with thread visibility because a different worker thread can be called for different connections - this.rmiSupport = new ConnectionRmiSupport(this, endPoint.rmiGlobalBridge, handler); + this.rmiSupport = new ConnectionRmiImplSupport(this, endPoint.rmiGlobalBridge, handler); } else { - this.rmiSupport = new ConnectionSupport(); + this.rmiSupport = new ConnectionNoOpSupport(); } - if (isNetworkChannel) { this.remoteKeyChanged = ((ChannelNetworkWrapper) channelWrapper).remoteKeyChanged(); @@ -189,7 +188,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ this.logger = null; this.sessionManager = null; this.channelWrapper = null; - this.rmiSupport = new ConnectionSupport(); + this.rmiSupport = new ConnectionNoOpSupport(); } } @@ -1012,7 +1011,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ @Override public - ConnectionSupport rmiSupport() { + ConnectionRmiSupport rmiSupport() { return rmiSupport; } diff --git a/src/dorkbox/network/connection/Connection_.java b/src/dorkbox/network/connection/Connection_.java index 9e2155af..d643475e 100644 --- a/src/dorkbox/network/connection/Connection_.java +++ b/src/dorkbox/network/connection/Connection_.java @@ -17,10 +17,10 @@ package dorkbox.network.connection; import org.bouncycastle.crypto.params.ParametersWithIV; -import dorkbox.network.rmi.ConnectionSupport; +import dorkbox.network.rmi.ConnectionRmiSupport; /** - * Supporting methods for encrypting data to a remote endpoint and RMI + * Supporting methods that are internal to the network stack */ public interface Connection_ extends Connection { @@ -28,7 +28,7 @@ interface Connection_ extends Connection { /** * @return the RMI support for this connection */ - ConnectionSupport rmiSupport(); + ConnectionRmiSupport rmiSupport(); /** * This is the per-message sequence number. @@ -46,4 +46,10 @@ interface Connection_ extends Connection { * clobber each other */ ParametersWithIV getCryptoParameters(); + + /** + * @return the endpoint associated with this connection + */ + @SuppressWarnings("rawtypes") + EndPoint getEndPoint(); } diff --git a/src/dorkbox/network/connection/KryoExtra.java b/src/dorkbox/network/connection/KryoExtra.java index 6990ef1f..f7b7427d 100644 --- a/src/dorkbox/network/connection/KryoExtra.java +++ b/src/dorkbox/network/connection/KryoExtra.java @@ -28,7 +28,7 @@ import com.esotericsoftware.kryo.util.MapReferenceResolver; import dorkbox.network.pipeline.ByteBufInput; import dorkbox.network.pipeline.ByteBufOutput; -import dorkbox.network.rmi.ConnectionSupport; +import dorkbox.network.rmi.ConnectionRmiSupport; import dorkbox.network.rmi.RmiNopConnection; import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.bytes.BigEndian; @@ -59,7 +59,7 @@ class KryoExtra extends Kryo { private final ByteBufOutput writer = new ByteBufOutput(); // volatile to provide object visibility for entire class. This is unique per connection - public volatile ConnectionSupport rmiSupport; + public volatile ConnectionRmiSupport rmiSupport; private final GCMBlockCipher aesEngine = new GCMBlockCipher(new AESFastEngine()); diff --git a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java b/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java index a66df550..d3d5ee83 100644 --- a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java +++ b/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java @@ -25,7 +25,7 @@ import dorkbox.network.connection.Listeners; import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleSender; -import dorkbox.network.rmi.ConnectionSupport; +import dorkbox.network.rmi.ConnectionNoOpSupport; import dorkbox.network.rmi.RemoteObjectCallback; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -169,7 +169,7 @@ class ConnectionRegistrationImpl implements Connection_, ChannelHandler { @Override public - ConnectionSupport rmiSupport() { + ConnectionNoOpSupport rmiSupport() { return null; } } diff --git a/src/dorkbox/network/rmi/ConnectionSupport.java b/src/dorkbox/network/rmi/ConnectionNoOpSupport.java similarity index 96% rename from src/dorkbox/network/rmi/ConnectionSupport.java rename to src/dorkbox/network/rmi/ConnectionNoOpSupport.java index 9b5a0d2a..475d38d6 100644 --- a/src/dorkbox/network/rmi/ConnectionSupport.java +++ b/src/dorkbox/network/rmi/ConnectionNoOpSupport.java @@ -18,11 +18,8 @@ import org.slf4j.Logger; import dorkbox.network.connection.ConnectionImpl; -/** - * - */ public -class ConnectionSupport { +class ConnectionNoOpSupport implements ConnectionRmiSupport { public void close() { } diff --git a/src/dorkbox/network/rmi/ConnectionRmiImplSupport.java b/src/dorkbox/network/rmi/ConnectionRmiImplSupport.java new file mode 100644 index 00000000..96f8e876 --- /dev/null +++ b/src/dorkbox/network/rmi/ConnectionRmiImplSupport.java @@ -0,0 +1,396 @@ +/* + * Copyright 2019 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.rmi; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Proxy; +import java.util.AbstractMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.slf4j.Logger; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.KryoExtra; +import dorkbox.network.connection.Listener; +import dorkbox.network.connection.Listener.OnMessageReceived; +import dorkbox.network.serialization.NetworkSerializationManager; +import dorkbox.util.collections.LockFreeHashMap; +import dorkbox.util.collections.LockFreeIntMap; +import dorkbox.util.generics.ClassHelper; + +public +class ConnectionRmiImplSupport implements ConnectionRmiSupport { + private final RmiBridge rmiGlobalBridge; + private final RmiBridge rmiLocalBridge; + private final RmiObjectHandler rmiHandler; + + private final Map proxyIdCache; + private final List> proxyListeners; + + final ConnectionImpl connection; + + private final LockFreeIntMap rmiRegistrationCallbacks; + private final Logger logger; + private volatile int rmiCallbackId = 0; + + + public + ConnectionRmiImplSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge, final RmiObjectHandler rmiHandler) { + this.connection = connection; + + if (rmiGlobalBridge == null || rmiHandler == null) { + throw new NullPointerException("RMI cannot be null if using RMI support!"); + } + + this.rmiGlobalBridge = rmiGlobalBridge; + this.rmiHandler = rmiHandler; + + logger = rmiGlobalBridge.logger; + + // * @param executor + // * Sets the executor used to invoke methods when an invocation is received from a remote endpoint. By default, no + // * executor is set and invocations occur on the network thread, which should not be blocked for long, May be null. + + + rmiLocalBridge = new RmiBridge(logger, false); + + + proxyIdCache = new LockFreeHashMap(); + proxyListeners = new CopyOnWriteArrayList>(); + rmiRegistrationCallbacks = new LockFreeIntMap(); + } + + public + void close() { + // proxy listeners are cleared in the removeAll() call (which happens BEFORE close) + proxyIdCache.clear(); + + rmiRegistrationCallbacks.clear(); + } + + /** + * This will remove the invoke and invoke response listeners for this remote object + */ + public + void removeAllListeners() { + proxyListeners.clear(); + } + + public + void createRemoteObject(final ConnectionImpl connection, final Class interfaceClass, final RemoteObjectCallback callback) { + if (!interfaceClass.isInterface()) { + throw new IllegalArgumentException("Cannot create a proxy for RMI access. It must be an interface."); + } + + // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent + // access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections + //noinspection NonAtomicOperationOnVolatileField + int nextRmiCallbackId = rmiCallbackId++; + rmiRegistrationCallbacks.put(nextRmiCallbackId, callback); + RmiRegistration message = new RmiRegistration(interfaceClass, RmiBridge.INVALID_RMI, nextRmiCallbackId); + + // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we + // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. + + // this means we are creating a NEW object on the server, bound access to only this connection + connection.send(message).flush(); + } + + public + void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback callback) { + if (objectId < 0) { + throw new IllegalStateException("Object ID cannot be < 0"); + } + if (objectId >= RmiBridge.INVALID_RMI) { + throw new IllegalStateException("Object ID cannot be >= " + RmiBridge.INVALID_RMI); + } + + Class iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback.class, callback.getClass(), 0); + + // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent + // access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections + //noinspection NonAtomicOperationOnVolatileField + int nextRmiCallbackId = rmiCallbackId++; + rmiRegistrationCallbacks.put(nextRmiCallbackId, callback); + RmiRegistration message = new RmiRegistration(iFaceClass, objectId, nextRmiCallbackId); + + // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we + // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. + + // this means we are getting an EXISTING object on the server, bound access to only this connection + connection.send(message).flush(); + } + + /** + * Manages the RMI stuff for a connection. + */ + public + boolean manage(final ConnectionImpl connection, final Object message) { + if (message instanceof InvokeMethod) { + NetworkSerializationManager serialization = connection.getEndPoint().getSerialization(); + + InvokeMethod invokeMethod = rmiHandler.getInvokeMethod(serialization, connection, (InvokeMethod) message); + + int objectID = invokeMethod.objectID; + + // have to make sure to get the correct object (global vs local) + // This is what is overridden when registering interfaces/classes for RMI. + // objectID is the interface ID, and this returns the implementation ID. + final Object target = getImplementationObject(objectID); + + if (target == null) { + logger.warn("Ignoring remote invocation request for unknown object ID: {}", objectID); + + return true; // maybe false? + } + + try { + InvokeMethodResult result = RmiBridge.invoke(connection, target, invokeMethod, logger); + if (result != null) { + // System.err.println("Sending: " + invokeMethod.responseID); + connection.send(result).flush(); + } + + } catch (IOException e) { + logger.error("Unable to invoke method.", e); + } + + return true; + } + else if (message instanceof InvokeMethodResult) { + for (Listener.OnMessageReceived proxyListener : proxyListeners) { + proxyListener.received(connection, (InvokeMethodResult) message); + } + return true; + } + else if (message instanceof RmiRegistration) { + rmiHandler.registration(this, connection, (RmiRegistration) message); + return true; + } + + // not the correct type + return false; + } + + /** + * For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically. + * For local connections, we have to switch it appropriately in the LocalRmiProxy + */ + public + RmiRegistration createNewRmiObject(final NetworkSerializationManager serialization, final Class interfaceClass, final Class implementationClass, final int callbackId, final Logger logger) { + KryoExtra kryo = null; + Object object = null; + int rmiId = 0; + + try { + kryo = serialization.takeKryo(); + + // because the INTERFACE is what is registered with kryo (not the impl) we have to temporarily permit unregistered classes (which have an ID of -1) + // so we can cache the instantiator for this class. + boolean registrationRequired = kryo.isRegistrationRequired(); + + kryo.setRegistrationRequired(false); + + // this is what creates a new instance of the impl class, and stores it as an ID. + object = kryo.newInstance(implementationClass); + + if (registrationRequired) { + // only if it's different should we call this again. + kryo.setRegistrationRequired(true); + } + + + rmiId = rmiLocalBridge.register(object); + + if (rmiId == RmiBridge.INVALID_RMI) { + // this means that there are too many RMI ids (either global or connection specific!) + object = null; + } + else { + // if we are invalid, skip going over fields that might also be RMI objects, BECAUSE our object will be NULL! + + // the @Rmi annotation allows an RMI object to have fields with objects that are ALSO RMI objects + LinkedList, Object>> classesToCheck = new LinkedList, Object>>(); + classesToCheck.add(new AbstractMap.SimpleEntry, Object>(implementationClass, object)); + + + Map.Entry, Object> remoteClassObject; + while (!classesToCheck.isEmpty()) { + remoteClassObject = classesToCheck.removeFirst(); + + // we have to check the IMPLEMENTATION for any additional fields that will have proxy information. + // we use getDeclaredFields() + walking the object hierarchy, so we get ALL the fields possible (public + private). + for (Field field : remoteClassObject.getKey() + .getDeclaredFields()) { + if (field.getAnnotation(Rmi.class) != null) { + final Class type = field.getType(); + + if (!type.isInterface()) { + // the type must be an interface, otherwise RMI cannot create a proxy object + logger.error("Error checking RMI fields for: {}.{} -- It is not an interface!", + remoteClassObject.getKey(), + field.getName()); + continue; + } + + + boolean prev = field.isAccessible(); + field.setAccessible(true); + final Object o; + try { + o = field.get(remoteClassObject.getValue()); + + rmiLocalBridge.register(o); + classesToCheck.add(new AbstractMap.SimpleEntry, Object>(type, o)); + } catch (IllegalAccessException e) { + logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e); + } finally { + field.setAccessible(prev); + } + } + } + + + // have to check the object hierarchy as well + Class superclass = remoteClassObject.getKey() + .getSuperclass(); + if (superclass != null && superclass != Object.class) { + classesToCheck.add(new AbstractMap.SimpleEntry, Object>(superclass, remoteClassObject.getValue())); + } + } + } + } catch (Exception e) { + logger.error("Error registering RMI class " + implementationClass, e); + } finally { + if (kryo != null) { + // we use kryo to create a new instance - so only return it on error or when it's done creating a new instance + serialization.returnKryo(kryo); + } + } + + return new RmiRegistration(interfaceClass, rmiId, callbackId, object); + } + + public + void runCallback(final Class interfaceClass, final int callbackId, final Object remoteObject, final Logger logger) { + RemoteObjectCallback callback = rmiRegistrationCallbacks.remove(callbackId); + + try { + //noinspection unchecked + callback.created(remoteObject); + } catch (Exception e) { + logger.error("Error getting or creating the remote object " + interfaceClass, e); + } + } + + /** + * Used by RMI by the LOCAL side when setting up the to fetch an object for the REMOTE side + * + * @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID. + */ + public + int getRegisteredId(final T object) { + // always check global before checking local, because less contention on the synchronization + int objectId = rmiGlobalBridge.getRegisteredId(object); + if (objectId != RmiBridge.INVALID_RMI) { + return objectId; + } + else { + // might return RmiBridge.INVALID_RMI; + return rmiLocalBridge.getRegisteredId(object); + } + } + + /** + * This is used by RMI for the REMOTE side, to get the implementation + * + * @param objectId this is the RMI object ID + */ + public + Object getImplementationObject(final int objectId) { + if (RmiBridge.isGlobal(objectId)) { + return rmiGlobalBridge.getRegisteredObject(objectId); + } else { + return rmiLocalBridge.getRegisteredObject(objectId); + } + } + + /** + * Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)} + *

+ *

+ * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked + * remotely. + *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link + * RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be + * called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be + * called on the update thread. + *

+ * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will + * have the proxy object replaced with the registered object. + * + * @see RemoteObject + * @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID + * @param iFace this is the RMI interface + */ + public + RemoteObject getProxyObject(final int rmiId, final Class iFace) { + if (iFace == null) { + throw new IllegalArgumentException("iface cannot be null."); + } + if (!iFace.isInterface()) { + throw new IllegalArgumentException("iface must be an interface."); + } + + // we want to have a connection specific cache of IDs + // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent + // access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections + RemoteObject remoteObject = proxyIdCache.get(rmiId); + + if (remoteObject == null) { + // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. + + // the ACTUAL proxy is created in the connection impl. + RmiProxyHandler proxyObject = new RmiProxyHandler(this.connection, this, rmiId, iFace); + proxyListeners.add(proxyObject.getListener()); + + // This is the interface inheritance by the proxy object + Class[] temp = new Class[2]; + temp[0] = RemoteObject.class; + temp[1] = iFace; + + remoteObject = (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, proxyObject); + + proxyIdCache.put(rmiId, remoteObject); + } + + return remoteObject; + } + + public + Object fixupRmi(final ConnectionImpl connection, final Object message) { + // "local RMI" objects have to be modified, this part does that + return rmiHandler.normalMessages(this, message); + } +} diff --git a/src/dorkbox/network/rmi/ConnectionRmiSupport.java b/src/dorkbox/network/rmi/ConnectionRmiSupport.java index a5e088bc..903e116a 100644 --- a/src/dorkbox/network/rmi/ConnectionRmiSupport.java +++ b/src/dorkbox/network/rmi/ConnectionRmiSupport.java @@ -14,382 +14,29 @@ */ package dorkbox.network.rmi; -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Proxy; -import java.util.AbstractMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CopyOnWriteArrayList; - import org.slf4j.Logger; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.KryoExtra; -import dorkbox.network.connection.Listener; -import dorkbox.network.connection.Listener.OnMessageReceived; -import dorkbox.network.serialization.NetworkSerializationManager; -import dorkbox.util.collections.LockFreeHashMap; -import dorkbox.util.collections.LockFreeIntMap; -import dorkbox.util.generics.ClassHelper; public -class ConnectionRmiSupport extends ConnectionSupport { - private final RmiBridge rmiGlobalBridge; - private final RmiBridge rmiLocalBridge; - private final RmiObjectHandler rmiHandler; +interface ConnectionRmiSupport { + void close(); - private final Map proxyIdCache; - private final List> proxyListeners; + void removeAllListeners(); - final ConnectionImpl connection; + void createRemoteObject(final ConnectionImpl connection, final Class interfaceClass, final RemoteObjectCallback callback); - private final LockFreeIntMap rmiRegistrationCallbacks; - private final Logger logger; - private volatile int rmiCallbackId = 0; + void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback callback); + boolean manage(final ConnectionImpl connection, final Object message); - public - ConnectionRmiSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge, final RmiObjectHandler rmiHandler) { - this.connection = connection; + Object fixupRmi(final ConnectionImpl connection, final Object message); - if (rmiGlobalBridge == null || rmiHandler == null) { - throw new NullPointerException("RMI cannot be null if using RMI support!"); - } + int getRegisteredId(final T object); - this.rmiGlobalBridge = rmiGlobalBridge; - this.rmiHandler = rmiHandler; + void runCallback(final Class interfaceClass, final int callbackId, final Object remoteObject, final Logger logger); - logger = rmiGlobalBridge.logger; + RemoteObject getProxyObject(final int rmiId, final Class iFace); - // * @param executor - // * Sets the executor used to invoke methods when an invocation is received from a remote endpoint. By default, no - // * executor is set and invocations occur on the network thread, which should not be blocked for long, May be null. - - rmiLocalBridge = new RmiBridge(logger, false); - - - proxyIdCache = new LockFreeHashMap(); - proxyListeners = new CopyOnWriteArrayList>(); - rmiRegistrationCallbacks = new LockFreeIntMap(); - } - - public - void close() { - // proxy listeners are cleared in the removeAll() call (which happens BEFORE close) - proxyIdCache.clear(); - - rmiRegistrationCallbacks.clear(); - } - - /** - * This will remove the invoke and invoke response listeners for this remote object - */ - public - void removeAllListeners() { - proxyListeners.clear(); - } - - public - void createRemoteObject(final ConnectionImpl connection, final Class interfaceClass, final RemoteObjectCallback callback) { - if (!interfaceClass.isInterface()) { - throw new IllegalArgumentException("Cannot create a proxy for RMI access. It must be an interface."); - } - - // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent - // access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections - //noinspection NonAtomicOperationOnVolatileField - int nextRmiCallbackId = rmiCallbackId++; - rmiRegistrationCallbacks.put(nextRmiCallbackId, callback); - RmiRegistration message = new RmiRegistration(interfaceClass, RmiBridge.INVALID_RMI, nextRmiCallbackId); - - // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we - // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. - - // this means we are creating a NEW object on the server, bound access to only this connection - connection.send(message).flush(); - } - - public - void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback callback) { - if (objectId < 0) { - throw new IllegalStateException("Object ID cannot be < 0"); - } - if (objectId >= RmiBridge.INVALID_RMI) { - throw new IllegalStateException("Object ID cannot be >= " + RmiBridge.INVALID_RMI); - } - - Class iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(RemoteObjectCallback.class, callback.getClass(), 0); - - // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent - // access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections - //noinspection NonAtomicOperationOnVolatileField - int nextRmiCallbackId = rmiCallbackId++; - rmiRegistrationCallbacks.put(nextRmiCallbackId, callback); - RmiRegistration message = new RmiRegistration(iFaceClass, objectId, nextRmiCallbackId); - - // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we - // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. - - // this means we are getting an EXISTING object on the server, bound access to only this connection - connection.send(message).flush(); - } - - /** - * Manages the RMI stuff for a connection. - */ - public - boolean manage(final ConnectionImpl connection, final Object message) { - if (message instanceof InvokeMethod) { - NetworkSerializationManager serialization = connection.getEndPoint().getSerialization(); - - InvokeMethod invokeMethod = rmiHandler.getInvokeMethod(serialization, connection, (InvokeMethod) message); - - int objectID = invokeMethod.objectID; - - // have to make sure to get the correct object (global vs local) - // This is what is overridden when registering interfaces/classes for RMI. - // objectID is the interface ID, and this returns the implementation ID. - final Object target = getImplementationObject(objectID); - - if (target == null) { - logger.warn("Ignoring remote invocation request for unknown object ID: {}", objectID); - - return true; // maybe false? - } - - try { - InvokeMethodResult result = RmiBridge.invoke(connection, target, invokeMethod, logger); - if (result != null) { - // System.err.println("Sending: " + invokeMethod.responseID); - connection.send(result).flush(); - } - - } catch (IOException e) { - logger.error("Unable to invoke method.", e); - } - - return true; - } - else if (message instanceof InvokeMethodResult) { - for (Listener.OnMessageReceived proxyListener : proxyListeners) { - proxyListener.received(connection, (InvokeMethodResult) message); - } - return true; - } - else if (message instanceof RmiRegistration) { - rmiHandler.registration(this, connection, (RmiRegistration) message); - return true; - } - - // not the correct type - return false; - } - - /** - * For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically. - * For local connections, we have to switch it appropriately in the LocalRmiProxy - */ - public - RmiRegistration createNewRmiObject(final NetworkSerializationManager serialization, final Class interfaceClass, final Class implementationClass, final int callbackId, final Logger logger) { - KryoExtra kryo = null; - Object object = null; - int rmiId = 0; - - try { - kryo = serialization.takeKryo(); - - // because the INTERFACE is what is registered with kryo (not the impl) we have to temporarily permit unregistered classes (which have an ID of -1) - // so we can cache the instantiator for this class. - boolean registrationRequired = kryo.isRegistrationRequired(); - - kryo.setRegistrationRequired(false); - - // this is what creates a new instance of the impl class, and stores it as an ID. - object = kryo.newInstance(implementationClass); - - if (registrationRequired) { - // only if it's different should we call this again. - kryo.setRegistrationRequired(true); - } - - - rmiId = rmiLocalBridge.register(object); - - if (rmiId == RmiBridge.INVALID_RMI) { - // this means that there are too many RMI ids (either global or connection specific!) - object = null; - } - else { - // if we are invalid, skip going over fields that might also be RMI objects, BECAUSE our object will be NULL! - - // the @Rmi annotation allows an RMI object to have fields with objects that are ALSO RMI objects - LinkedList, Object>> classesToCheck = new LinkedList, Object>>(); - classesToCheck.add(new AbstractMap.SimpleEntry, Object>(implementationClass, object)); - - - Map.Entry, Object> remoteClassObject; - while (!classesToCheck.isEmpty()) { - remoteClassObject = classesToCheck.removeFirst(); - - // we have to check the IMPLEMENTATION for any additional fields that will have proxy information. - // we use getDeclaredFields() + walking the object hierarchy, so we get ALL the fields possible (public + private). - for (Field field : remoteClassObject.getKey() - .getDeclaredFields()) { - if (field.getAnnotation(Rmi.class) != null) { - final Class type = field.getType(); - - if (!type.isInterface()) { - // the type must be an interface, otherwise RMI cannot create a proxy object - logger.error("Error checking RMI fields for: {}.{} -- It is not an interface!", - remoteClassObject.getKey(), - field.getName()); - continue; - } - - - boolean prev = field.isAccessible(); - field.setAccessible(true); - final Object o; - try { - o = field.get(remoteClassObject.getValue()); - - rmiLocalBridge.register(o); - classesToCheck.add(new AbstractMap.SimpleEntry, Object>(type, o)); - } catch (IllegalAccessException e) { - logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e); - } finally { - field.setAccessible(prev); - } - } - } - - - // have to check the object hierarchy as well - Class superclass = remoteClassObject.getKey() - .getSuperclass(); - if (superclass != null && superclass != Object.class) { - classesToCheck.add(new AbstractMap.SimpleEntry, Object>(superclass, remoteClassObject.getValue())); - } - } - } - } catch (Exception e) { - logger.error("Error registering RMI class " + implementationClass, e); - } finally { - if (kryo != null) { - // we use kryo to create a new instance - so only return it on error or when it's done creating a new instance - serialization.returnKryo(kryo); - } - } - - return new RmiRegistration(interfaceClass, rmiId, callbackId, object); - } - - public - void runCallback(final Class interfaceClass, final int callbackId, final Object remoteObject, final Logger logger) { - RemoteObjectCallback callback = rmiRegistrationCallbacks.remove(callbackId); - - try { - //noinspection unchecked - callback.created(remoteObject); - } catch (Exception e) { - logger.error("Error getting or creating the remote object " + interfaceClass, e); - } - } - - /** - * Used by RMI by the LOCAL side when setting up the to fetch an object for the REMOTE side - * - * @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID. - */ - public - int getRegisteredId(final T object) { - // always check global before checking local, because less contention on the synchronization - int objectId = rmiGlobalBridge.getRegisteredId(object); - if (objectId != RmiBridge.INVALID_RMI) { - return objectId; - } - else { - // might return RmiBridge.INVALID_RMI; - return rmiLocalBridge.getRegisteredId(object); - } - } - - /** - * This is used by RMI for the REMOTE side, to get the implementation - * - * @param objectId this is the RMI object ID - */ - public - Object getImplementationObject(final int objectId) { - if (RmiBridge.isGlobal(objectId)) { - return rmiGlobalBridge.getRegisteredObject(objectId); - } else { - return rmiLocalBridge.getRegisteredObject(objectId); - } - } - - /** - * Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)} - *

- *

- * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked - * remotely. - *

- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link - * RemoteObject#setResponseTimeout(int) response timeout}. - *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be - * called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be - * called on the update thread. - *

- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will - * have the proxy object replaced with the registered object. - * - * @see RemoteObject - * @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID - * @param iFace this is the RMI interface - */ - public - RemoteObject getProxyObject(final int rmiId, final Class iFace) { - if (iFace == null) { - throw new IllegalArgumentException("iface cannot be null."); - } - if (!iFace.isInterface()) { - throw new IllegalArgumentException("iface must be an interface."); - } - - // we want to have a connection specific cache of IDs - // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent - // access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections - RemoteObject remoteObject = proxyIdCache.get(rmiId); - - if (remoteObject == null) { - // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. - - // the ACTUAL proxy is created in the connection impl. - RmiProxyHandler proxyObject = new RmiProxyHandler(this.connection, this, rmiId, iFace); - proxyListeners.add(proxyObject.getListener()); - - // This is the interface inheritance by the proxy object - Class[] temp = new Class[2]; - temp[0] = RemoteObject.class; - temp[1] = iFace; - - remoteObject = (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, proxyObject); - - proxyIdCache.put(rmiId, remoteObject); - } - - return remoteObject; - } - - public - Object fixupRmi(final ConnectionImpl connection, final Object message) { - // "local RMI" objects have to be modified, this part does that - return rmiHandler.normalMessages(this, message); - } + Object getImplementationObject(final int objectId); } diff --git a/src/dorkbox/network/rmi/InvokeMethodSerializer.java b/src/dorkbox/network/rmi/InvokeMethodSerializer.java index f72b8b16..4ee44b2a 100644 --- a/src/dorkbox/network/rmi/InvokeMethodSerializer.java +++ b/src/dorkbox/network/rmi/InvokeMethodSerializer.java @@ -127,7 +127,7 @@ class InvokeMethodSerializer extends Serializer { argStartIndex = 1; args = new Object[serializers.length + 1]; - args[0] = ((ConnectionRmiSupport) ((KryoExtra) kryo).rmiSupport).connection; + args[0] = ((ConnectionRmiImplSupport) ((KryoExtra) kryo).rmiSupport).connection; } else { method = cachedMethod.method; diff --git a/src/dorkbox/network/rmi/RmiNopConnection.java b/src/dorkbox/network/rmi/RmiNopConnection.java index ffff71aa..e7c01ad9 100644 --- a/src/dorkbox/network/rmi/RmiNopConnection.java +++ b/src/dorkbox/network/rmi/RmiNopConnection.java @@ -124,7 +124,7 @@ class RmiNopConnection implements Connection_ { @Override public - ConnectionSupport rmiSupport() { + ConnectionNoOpSupport rmiSupport() { return null; } diff --git a/src/dorkbox/network/rmi/RmiObjectHandler.java b/src/dorkbox/network/rmi/RmiObjectHandler.java index ab818094..a5a36420 100644 --- a/src/dorkbox/network/rmi/RmiObjectHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectHandler.java @@ -23,7 +23,7 @@ interface RmiObjectHandler { InvokeMethod getInvokeMethod(final NetworkSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod); - void registration(final ConnectionRmiSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration message); + void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration message); - Object normalMessages(final ConnectionRmiSupport connection, final Object message); + Object normalMessages(final ConnectionRmiImplSupport connection, final Object message); } diff --git a/src/dorkbox/network/rmi/RmiObjectLocalHandler.java b/src/dorkbox/network/rmi/RmiObjectLocalHandler.java index a63785dd..9ea3c7fa 100644 --- a/src/dorkbox/network/rmi/RmiObjectLocalHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectLocalHandler.java @@ -121,7 +121,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler { @Override public - void registration(final ConnectionRmiSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) { + void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) { // manage creating/getting/notifying this RMI object // these fields are ALWAYS present! @@ -198,7 +198,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler { @SuppressWarnings("unchecked") @Override public - Object normalMessages(final ConnectionRmiSupport rmiSupport, final Object message) { + Object normalMessages(final ConnectionRmiImplSupport rmiSupport, final Object message) { // else, this was "just a local message" // because we NORMALLY pass around just the object (there is no serialization going on...) we have to explicitly check to see diff --git a/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java b/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java index 43ca9abb..8fb887ed 100644 --- a/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java @@ -38,7 +38,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler { @Override public - void registration(final ConnectionRmiSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) { + void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) { // manage creating/getting/notifying this RMI object // these fields are ALWAYS present! @@ -88,7 +88,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler { @Override public - Object normalMessages(final ConnectionRmiSupport connection, final Object message) { + Object normalMessages(final ConnectionRmiImplSupport connection, final Object message) { return message; } } diff --git a/src/dorkbox/network/rmi/RmiProxyHandler.java b/src/dorkbox/network/rmi/RmiProxyHandler.java index 1542b986..fcd577b8 100644 --- a/src/dorkbox/network/rmi/RmiProxyHandler.java +++ b/src/dorkbox/network/rmi/RmiProxyHandler.java @@ -72,7 +72,7 @@ class RmiProxyHandler implements InvocationHandler { private final boolean[] pendingResponses = new boolean[64]; private final ConnectionImpl connection; - private final ConnectionRmiSupport rmiSupport; + private final ConnectionRmiImplSupport rmiSupport; public final int rmiObjectId; // this is the RMI id public final int classId; // this is the KRYO class id @@ -104,7 +104,7 @@ class RmiProxyHandler implements InvocationHandler { * @param iFace this is the RMI interface */ public - RmiProxyHandler(final ConnectionImpl connection, final ConnectionRmiSupport rmiSupport, final int rmiId, final Class iFace) { + RmiProxyHandler(final ConnectionImpl connection, final ConnectionRmiImplSupport rmiSupport, final int rmiId, final Class iFace) { super(); this.connection = connection;