From db850f89bdb1975efeef92a32f0f30d515832731 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 24 Sep 2017 23:30:17 +0200 Subject: [PATCH] Fixed RMI across multiple clients/servers. Code cleanup --- src/dorkbox/network/Client.java | 57 ++- src/dorkbox/network/Configuration.java | 9 - .../network/connection/Connection.java | 75 ++- .../network/connection/ConnectionImpl.java | 327 +++++++------ ...r.java => CryptoSerializationManager.java} | 244 +++++++--- src/dorkbox/network/connection/EndPoint.java | 41 +- .../network/connection/EndPointClient.java | 45 +- .../connection/ObjectRegistrationLatch.java | 24 - .../connection/RegisterRmiSystemListener.java | 2 +- .../connection/RegistrationWrapper.java | 2 +- .../connection/registration/Registration.java | 7 - .../RegistrationRemoteHandlerServerUDP.java | 24 +- .../network/pipeline/udp/KryoDecoderUdp.java | 18 +- src/dorkbox/network/rmi/CachedMethod.java | 34 +- .../rmi/InvocationHandlerSerializer.java | 10 +- .../network/rmi/OverriddenMethods.java | 18 +- .../network/rmi/RemoteObjectCallback.java | 12 + .../network/rmi/RemoteObjectSerializer.java | 3 +- .../{RmiBridge.java => RmiImplHandler.java} | 61 +-- ...ationHandler.java => RmiProxyHandler.java} | 73 +-- src/dorkbox/network/rmi/RmiRegistration.java | 46 +- src/dorkbox/network/rmi/TimeoutException.java | 2 +- .../util/CryptoSerializationManager.java | 10 +- .../network/util/RMISerializationManager.java | 49 -- .../network/util/RmiSerializationManager.java | 126 +++++ test/dorkbox/network/ChunkedDataIdleTest.java | 12 +- test/dorkbox/network/ClientSendTest.java | 8 +- test/dorkbox/network/ConnectionTest.java | 16 +- test/dorkbox/network/IdleTest.java | 18 +- .../network/LargeResizeBufferTest.java | 8 +- test/dorkbox/network/ListenerTest.java | 39 +- test/dorkbox/network/MultipleServerTest.java | 6 +- test/dorkbox/network/MultipleThreadTest.java | 4 +- test/dorkbox/network/PingPongLocalTest.java | 8 +- test/dorkbox/network/PingPongTest.java | 8 +- .../network/UnregisteredClassTest.java | 4 +- .../network/rmi/MessageWithTestObject.java | 11 + test/dorkbox/network/rmi/RmiGlobalTest.java | 258 +++++----- .../rmi/RmiSendObjectOverrideMethodTest.java | 51 +- .../network/rmi/RmiSendObjectTest.java | 46 +- test/dorkbox/network/rmi/RmiTest.java | 441 ++++++++---------- test/dorkbox/network/rmi/TestObject.java | 19 + test/dorkbox/network/rmi/TestObjectImpl.java | 98 ++++ .../network/rmi/multiJVM/TestClient.java | 142 ++++++ .../network/rmi/multiJVM/TestServer.java | 81 ++++ 45 files changed, 1604 insertions(+), 993 deletions(-) rename src/dorkbox/network/connection/{KryoCryptoSerializationManager.java => CryptoSerializationManager.java} (66%) delete mode 100644 src/dorkbox/network/connection/ObjectRegistrationLatch.java create mode 100644 src/dorkbox/network/rmi/RemoteObjectCallback.java rename src/dorkbox/network/rmi/{RmiBridge.java => RmiImplHandler.java} (90%) rename src/dorkbox/network/rmi/{RemoteObjectInvocationHandler.java => RmiProxyHandler.java} (89%) delete mode 100644 src/dorkbox/network/util/RMISerializationManager.java create mode 100644 src/dorkbox/network/util/RmiSerializationManager.java create mode 100644 test/dorkbox/network/rmi/MessageWithTestObject.java create mode 100644 test/dorkbox/network/rmi/TestObject.java create mode 100644 test/dorkbox/network/rmi/TestObjectImpl.java create mode 100644 test/dorkbox/network/rmi/multiJVM/TestClient.java create mode 100644 test/dorkbox/network/rmi/multiJVM/TestServer.java diff --git a/src/dorkbox/network/Client.java b/src/dorkbox/network/Client.java index 95d82519..3cbb04e5 100644 --- a/src/dorkbox/network/Client.java +++ b/src/dorkbox/network/Client.java @@ -31,6 +31,7 @@ import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerC import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientUDP; import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientUDT; import dorkbox.network.rmi.RemoteObject; +import dorkbox.network.rmi.RemoteObjectCallback; import dorkbox.network.rmi.TimeoutException; import dorkbox.network.util.udt.UdtEndpointProxy; import dorkbox.util.NamedThreadFactory; @@ -329,10 +330,10 @@ class Client extends EndPointClient implements Connecti } } - connection = this.connectionManager.getConnection0(); + // RMI methods are usually created during the connection phase. We should wait until they are finished + waitForRmi(connectionTimeout); } - @Override public boolean hasRemoteKeyChanged() { @@ -423,11 +424,12 @@ class Client extends EndPointClient implements Connecti } /** - * Returns a new proxy object that implements the specified interface. Methods invoked on the proxy object will be - * invoked remotely on the object with the specified ID in the ObjectSpace for the current connection. - *

- * This will request a registration ID from the remote endpoint, and will block until the object has been returned. - *

+ * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" + * to an object that is created remotely. + *

+ * The callback will be notified when the remote object has been created. + *

+ *

* Methods that return a value will throw {@link TimeoutException} if the response is not received with the * {@link RemoteObject#setResponseTimeout(int) response timeout}. *

@@ -437,39 +439,44 @@ class Client extends EndPointClient implements Connecti *

* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side * will have the proxy object replaced with the registered (non-proxy) object. + *

+ * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `RemoteObject remoteObject = (RemoteObject) test;` * * @see RemoteObject */ @Override public - Iface getRemoteObject(final Class interfaceClass) throws IOException { - return this.connectionManager.getConnection0() - .getRemoteObject(interfaceClass); + void getRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) throws IOException { + this.connectionManager.getConnection0().getRemoteObject(interfaceClass, callback); } /** - * Returns a new proxy object implements the specified interface. Methods invoked on the proxy object will be invoked remotely on the - * object with the specified ID in the ObjectSpace for the current connection. + * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" + * to an object that is created remotely. + *

+ * The callback will be notified when the remote object has been created. + *

+ *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the + * {@link RemoteObject#setResponseTimeout(int) response timeout}. *

- * This will REUSE a registration ID from the remote endpoint, and will block until the object has been returned. + * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must + * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a + * void return value can be called on the update thread. *

- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link - * RemoteObject#setResponseTimeout(int) response timeout}. - *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be - * called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be - * called on the update thread. - *

- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will - * have the proxy object replaced with the registered (non-proxy) object. + * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side + * will have the proxy object replaced with the registered (non-proxy) object. + *

+ * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `RemoteObject remoteObject = (RemoteObject) test;` * * @see RemoteObject */ @Override public - Iface getRemoteObject(final int objectId) throws IOException { - return this.connectionManager.getConnection0() - .getRemoteObject(objectId); + void getRemoteObject(final int objectId, final RemoteObjectCallback callback) throws IOException { + this.connectionManager.getConnection0().getRemoteObject(objectId, callback); } /** diff --git a/src/dorkbox/network/Configuration.java b/src/dorkbox/network/Configuration.java index 3c70ce94..6174fbc3 100644 --- a/src/dorkbox/network/Configuration.java +++ b/src/dorkbox/network/Configuration.java @@ -64,15 +64,6 @@ class Configuration { */ public CryptoSerializationManager serialization = null; - /** - * Enable remote method invocation (RMI) for this connection. There is additional overhead to using RMI. - *

- * Specifically, It costs at least 2 bytes more to use remote method invocation than just sending the parameters. If the method has a - * return value which is not {@link dorkbox.network.rmi.RemoteObject#setAsync(boolean) ignored}, an extra byte is written. If the - * type of a parameter is not final (primitives are final) then an extra byte is written for that parameter. - */ - public boolean rmiEnabled = false; - /** * Sets the executor used to invoke methods when an invocation is received from a remote endpoint. By default, no executor is set and * invocations occur on the network thread, which should not be blocked for long. diff --git a/src/dorkbox/network/connection/Connection.java b/src/dorkbox/network/connection/Connection.java index 9c752d41..0e6efedf 100644 --- a/src/dorkbox/network/connection/Connection.java +++ b/src/dorkbox/network/connection/Connection.java @@ -15,14 +15,15 @@ */ package dorkbox.network.connection; +import java.io.IOException; + import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleSender; import dorkbox.network.rmi.RemoteObject; +import dorkbox.network.rmi.RemoteObjectCallback; import dorkbox.network.rmi.TimeoutException; -import java.io.IOException; - @SuppressWarnings("unused") public interface Connection { @@ -95,59 +96,55 @@ interface Connection { void close(); /** - * Marks the connection to be closed as soon as possible. This is evaluated when the current - * thread execution returns to the network stack. + * Marks the connection to be closed as soon as possible. This is evaluated when the current thread execution returns to the network stack. */ void closeAsap(); /** - * Returns a new proxy object implements the specified interface. Methods invoked on the proxy object will be - * invoked remotely on the object with the specified ID in the ObjectSpace for the current connection. - *

- * This will request a registration ID from the remote endpoint, and will block until the object - * has been returned. - *

- * Methods that return a value will throw {@link TimeoutException} if the - * response is not received with the + * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" + * to an object that is created remotely. + *

+ * The callback will be notified when the remote object has been created. + *

+ *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the * {@link RemoteObject#setResponseTimeout(int) response timeout}. *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false - * (the default), then methods that return a value must not be called from - * the update thread for the connection. An exception will be thrown if this - * occurs. Methods with a void return value can be called on the update - * thread. + * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must + * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a + * void return value can be called on the update thread. *

- * If a proxy returned from this method is part of an object graph sent over - * the network, the object graph on the receiving side will have the proxy - * object replaced with the registered (non-proxy) object. + * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side + * will have the proxy object replaced with the registered (non-proxy) object. + * + * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `RemoteObject remoteObject = (RemoteObject) test;` * * @see RemoteObject */ - Iface createProxyObject(final Class remoteImplementationClass) throws IOException; - + void getRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) throws IOException; /** - * Returns a new proxy object implements the specified interface. Methods invoked on the proxy object will be - * invoked remotely on the object with the specified ID in the ObjectSpace for the current connection. - *

- * This will REUSE a registration ID from the remote endpoint, and will block until the object - * has been returned. - *

- * Methods that return a value will throw {@link TimeoutException} if the - * response is not received with the + * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" + * to an object that is created remotely. + *

+ * The callback will be notified when the remote object has been created. + *

+ *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the * {@link RemoteObject#setResponseTimeout(int) response timeout}. *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false - * (the default), then methods that return a value must not be called from - * the update thread for the connection. An exception will be thrown if this - * occurs. Methods with a void return value can be called on the update - * thread. + * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must + * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a + * void return value can be called on the update thread. *

- * If a proxy returned from this method is part of an object graph sent over - * the network, the object graph on the receiving side will have the proxy - * object replaced with the registered (non-proxy) object. + * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side + * will have the proxy object replaced with the registered (non-proxy) object. + *

+ * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `RemoteObject remoteObject = (RemoteObject) test;` * * @see RemoteObject */ - Iface getProxyObject(final int objectId) throws IOException; + void getRemoteObject(final int objectId, final RemoteObjectCallback callback) throws IOException; } diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index cb9ddf24..37ce1296 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -15,6 +15,19 @@ */ package dorkbox.network.connection; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.LinkedList; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.bouncycastle.crypto.params.ParametersWithIV; +import org.slf4j.Logger; + +import com.esotericsoftware.kryo.Registration; + import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleSender; @@ -27,8 +40,11 @@ import dorkbox.network.connection.wrapper.ChannelNull; import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.rmi.RMI; import dorkbox.network.rmi.RemoteObject; -import dorkbox.network.rmi.RmiBridge; +import dorkbox.network.rmi.RemoteObjectCallback; +import dorkbox.network.rmi.RmiImplHandler; import dorkbox.network.rmi.RmiRegistration; +import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.collections.IntMap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -43,18 +59,6 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Promise; -import org.bouncycastle.crypto.params.ParametersWithIV; -import org.slf4j.Logger; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.LinkedList; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** @@ -79,7 +83,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn private final AtomicBoolean messageInProgress = new AtomicBoolean(false); private ISessionManager sessionManager; - private ChannelWrapper channelWrapper; + private ChannelWrapper channelWrapper; private boolean isLoopback; private volatile PingFuture pingFuture = null; @@ -96,26 +100,28 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn // back to the network stack private boolean closeAsap = false; - private volatile ObjectRegistrationLatch objectRegistrationLatch; - private final Object remoteObjectLock = new Object(); - private final RmiBridge rmiBridge; - - private final Map proxyIdCache = Collections.synchronizedMap(new WeakHashMap(8)); - // The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) // The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this // counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) private final AtomicLong aes_gcm_iv = new AtomicLong(0); + // + // RMI fields + // + private final RmiImplHandler rmiImplHandler; + private final Map proxyIdCache = new WeakHashMap(8); + private final IntMap rmiRegistrationCallbacks = new IntMap<>(); + private int rmiRegistrationID = 0; // protected by synchronized (rmiRegistrationCallbacks) + /** * All of the parameters can be null, when metaChannel wants to get the base class type */ - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes", "unchecked"}) public - ConnectionImpl(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { + ConnectionImpl(final Logger logger, final EndPoint endPoint, final RmiImplHandler rmiImplHandler) { this.logger = logger; this.endPoint = endPoint; - this.rmiBridge = rmiBridge; + this.rmiImplHandler = rmiImplHandler; } /** @@ -123,6 +129,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn *

* This happens BEFORE prep. */ + @SuppressWarnings("unchecked") void init(final ChannelWrapper channelWrapper, final ConnectionManager sessionManager) { this.sessionManager = sessionManager; this.channelWrapper = channelWrapper; @@ -155,6 +162,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn * because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't * clobber each other */ + @Override public final ParametersWithIV getCryptoParameters() { return this.channelWrapper.cryptoParameters(); @@ -167,6 +175,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) */ + @Override public final long getNextGcmSequence() { return aes_gcm_iv.getAndIncrement(); @@ -950,103 +959,145 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn // RMI methods // - - @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"}) - @Override - public final - Iface createProxyObject(final Class remoteImplementationClass) throws IOException { - // only one register can happen at a time - synchronized (remoteObjectLock) { - objectRegistrationLatch = new ObjectRegistrationLatch(); - - // since this synchronous, we want to wait for the response before we continue - // this means we are creating a NEW object on the server, bound access to only this connection - TCP(new RmiRegistration(remoteImplementationClass.getName())).flush(); - - //noinspection Duplicates + /** + * Internal call CLIENT ONLY. + *

+ * RMI methods are usually created during the connection phase. We should wait until they are finished + */ + void waitForRmi(final int connectionTimeout) { + synchronized (rmiRegistrationCallbacks) { try { - if (!objectRegistrationLatch.latch.await(2, TimeUnit.SECONDS)) { - final String errorMessage = "Timed out getting registration ID for: " + remoteImplementationClass; - logger.error(errorMessage); - throw new IOException(errorMessage); - } + rmiRegistrationCallbacks.wait(connectionTimeout); } catch (InterruptedException e) { - final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass; - logger.error(errorMessage, e); - throw new IOException(errorMessage, e); + logger.error("Interrupted waiting for RMI to finish.", e); } - - // local var to prevent double hit on volatile field - final ObjectRegistrationLatch latch = objectRegistrationLatch; - if (latch.hasError) { - final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass; - logger.error(errorMessage); - throw new IOException(errorMessage); - } - - return (Iface) latch.remoteObject; } } - @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"}) - @Override - public final - Iface getProxyObject(final int objectId) throws IOException { - // only one register can happen at a time - synchronized (remoteObjectLock) { - objectRegistrationLatch = new ObjectRegistrationLatch(); - - // since this synchronous, we want to wait for the response before we continue - // this means that we are ACCESSING a remote object on the server, the server checks GLOBAL, then LOCAL for this object - TCP(new RmiRegistration(objectId)).flush(); - - //noinspection Duplicates - try { - if (!objectRegistrationLatch.latch.await(2, TimeUnit.SECONDS)) { - final String errorMessage = "Timed out getting registration for ID: " + objectId; - logger.error(errorMessage); - throw new IOException(errorMessage); - } - } catch (InterruptedException e) { - final String errorMessage = "Error getting registration for ID: " + objectId; - logger.error(errorMessage, e); - throw new IOException(errorMessage, e); - } - - // local var to prevent double hit on volatile field - final ObjectRegistrationLatch latch = objectRegistrationLatch; - if (latch.hasError) { - final String errorMessage = "Error getting registration for ID: " + objectId; - logger.error(errorMessage); - throw new IOException(errorMessage); - } - - return (Iface) latch.remoteObject; + /** + * Internal call CLIENT ONLY. + *

+ * RMI methods are usually created during the connection phase. If there are none, we should unblock the waiting client.connect(). + */ + boolean rmiCallbacksIsEmpty() { + synchronized (rmiRegistrationCallbacks) { + return rmiRegistrationCallbacks.size == 0; } } + /** + * Internal call CLIENT ONLY. + *

+ * RMI methods are usually created during the connection phase. If there are none, we should unblock the waiting client.connect(). + */ + void rmiCallbacksNotify() { + synchronized (rmiRegistrationCallbacks) { + rmiRegistrationCallbacks.notify(); + } + } + + /** + * Internal call CLIENT ONLY. + *

+ * RMI methods are usually created during the connection phase. If there are none, we should unblock the waiting client.connect(). + */ + private + void rmiCallbacksNotifyIfEmpty() { + synchronized (rmiRegistrationCallbacks) { + if (rmiRegistrationCallbacks.size == 0) { + rmiRegistrationCallbacks.notify(); + } + } + } + + + @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked", "Duplicates"}) + @Override + public final + void getRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) throws IOException { + if (!interfaceClass.isInterface()) { + throw new IllegalArgumentException("Cannot create a proxy for RMI access. It must be an interface."); + } + + RmiRegistration message; + + synchronized (rmiRegistrationCallbacks) { + int nextRmiID = rmiRegistrationID++; + rmiRegistrationCallbacks.put(nextRmiID, callback); + message = new RmiRegistration(interfaceClass, nextRmiID); + } + + // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we + // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. + + // this means we are creating a NEW object on the server, bound access to only this connection + TCP(message).flush(); + } + + @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked", "Duplicates"}) + @Override + public final + void getRemoteObject(final int objectId, final RemoteObjectCallback callback) throws IOException { + RmiRegistration message; + + synchronized (rmiRegistrationCallbacks) { + int nextRmiID = rmiRegistrationID++; + rmiRegistrationCallbacks.put(nextRmiID, callback); + message = new RmiRegistration(objectId, nextRmiID); + } + + // We use a callback to notify us when the object is ready. We can't "create this on the fly" because we + // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. + + // this means we are creating a NEW object on the server, bound access to only this connection + TCP(message).flush(); + } + + final void registerInternal(final ConnectionImpl connection, final RmiRegistration remoteRegistration) { - final String implementationClassName = remoteRegistration.remoteImplementationClass; + final Class interfaceClass = remoteRegistration.interfaceClass; + final int rmiID = remoteRegistration.rmiID; - - if (implementationClassName != null) { + if (interfaceClass != null) { // THIS IS ON THE REMOTE CONNECTION (where the object will really exist) // // CREATE a new ID, and register the ID and new object (must create a new one) in the object maps - Class implementationClass; - try { - implementationClass = Class.forName(implementationClassName); - } catch (Exception e) { - logger.error("Error registering RMI class " + implementationClassName, e); - connection.TCP(new RmiRegistration()).flush(); + // have to find the implementation from the specified interface + CryptoSerializationManager manager = getEndPoint().serializationManager; + KryoExtra kryo = manager.takeKryo(); + Registration registration = kryo.getRegistration(interfaceClass); + + + if (registration == null) { + // we use kryo to create a new instance - so only return it on error or when it's done creating a new instance + manager.returnKryo(kryo); + + logger.error("Error getting RMI class interface for " + interfaceClass); + connection.TCP(new RmiRegistration(rmiID)).flush(); + return; + } + + + implementationClass = manager.getRmiImpl(registration.getId()); + if (implementationClass == null) { + // we use kryo to create a new instance - so only return it on error or when it's done creating a new instance + manager.returnKryo(kryo); + + logger.error("Error getting RMI class implementation for " + interfaceClass); + connection.TCP(new RmiRegistration(rmiID)).flush(); return; } try { - final Object remotePrimaryObject = implementationClass.newInstance(); - rmiBridge.register(rmiBridge.nextObjectId(), remotePrimaryObject); + // this is what creates a new instance of the impl class, and stores it as an ID. + final Object remotePrimaryObject = kryo.newInstance(implementationClass); + + // we use kryo to create a new instance - so only return it on error or when it's done creating a new instance + manager.returnKryo(kryo); + + rmiImplHandler.register(rmiImplHandler.nextObjectId(), remotePrimaryObject); LinkedList remoteClasses = new LinkedList(); remoteClasses.add(new ClassObject(implementationClass, remotePrimaryObject)); @@ -1063,43 +1114,46 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn final Class type = field.getType(); - rmiBridge.register(rmiBridge.nextObjectId(), o); + rmiImplHandler.register(rmiImplHandler.nextObjectId(), o); remoteClasses.offerLast(new ClassObject(type, o)); } } } - connection.TCP(new RmiRegistration(remotePrimaryObject)).flush(); + connection.TCP(new RmiRegistration(remotePrimaryObject, rmiID)).flush(); } catch (Exception e) { - logger.error("Error registering RMI class " + implementationClassName, e); - connection.TCP(new RmiRegistration()).flush(); + logger.error("Error registering RMI class " + implementationClass, e); + connection.TCP(new RmiRegistration(rmiID)).flush(); } } - else if (remoteRegistration.remoteObjectId > RmiBridge.INVALID_RMI) { + else if (remoteRegistration.remoteObjectId > RmiImplHandler.INVALID_RMI) { // THIS IS ON THE REMOTE CONNECTION (where the object will really exist) // // GET a LOCAL rmi object, if none get a specific, GLOBAL rmi object (objects that are not bound to a single connection). Object object = getImplementationObject(remoteRegistration.remoteObjectId); if (object != null) { - connection.TCP(new RmiRegistration(object)).flush(); + connection.TCP(new RmiRegistration(object, rmiID)).flush(); } else { - connection.TCP(new RmiRegistration()).flush(); + connection.TCP(new RmiRegistration(rmiID)).flush(); } } else { - // THIS IS ON THE LOCAL CONNECTION (that sent the 'create proxy object') SIDE + // THIS IS ON THE LOCAL CONNECTION SIDE, which is the side that called 'getRemoteObject()' - // the next two use a local var, so that there isn't a double hit for volatile access - final ObjectRegistrationLatch latch = this.objectRegistrationLatch; - latch.hasError = remoteRegistration.hasError; + // this will be null if there was an error + Object remoteObject = remoteRegistration.remoteObject; - if (!remoteRegistration.hasError) { - latch.remoteObject = remoteRegistration.remoteObject; + RemoteObjectCallback callback; + synchronized (rmiRegistrationCallbacks) { + callback = rmiRegistrationCallbacks.remove(remoteRegistration.rmiID); } - // notify the original register that it may continue. We access the volatile field directly, so that it's members are updated - objectRegistrationLatch.latch.countDown(); + //noinspection unchecked + callback.created(remoteObject); + + // tell the client that we are finished with all RMI callbacks + rmiCallbacksNotifyIfEmpty(); } } @@ -1109,18 +1163,19 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn * @return the registered ID for a specific object. This is used by the "local" side when setting up the to fetch an object for the * "remote" side for RMI */ + @Override public int getRegisteredId(final T object) { // always check local before checking global, because less contention on the synchronization - RmiBridge globalRmiBridge = endPoint.globalRmiBridge; + RmiImplHandler globalRmiImplHandler = endPoint.globalRmiImplHandler; - if (globalRmiBridge == null) { - throw new NullPointerException("Unable to call 'getRegisteredId' when the globalRmiBridge is null!"); + if (globalRmiImplHandler == null) { + throw new NullPointerException("Unable to call 'getRegisteredId' when the globalRmiImplHandler is null!"); } - int object1 = globalRmiBridge.getRegisteredId(object); + int object1 = globalRmiImplHandler.getRegisteredId(object); if (object1 == Integer.MAX_VALUE) { - return rmiBridge.getRegisteredId(object); + return rmiImplHandler.getRegisteredId(object); } else { return object1; } @@ -1131,36 +1186,40 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn * * @param type must be the interface the proxy will bind to */ + @Override public RemoteObject getProxyObject(final int objectID, final Class type) { - // we want to have a connection specific cache of IDs, using weak references. - // because this is PER CONNECTION, this is safe. - RemoteObject remoteObject = proxyIdCache.get(objectID); + synchronized (proxyIdCache) { + // we want to have a connection specific cache of IDs, using weak references. + // because this is PER CONNECTION, this is safe. + RemoteObject remoteObject = proxyIdCache.get(objectID); - if (remoteObject == null) { - // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. - remoteObject = rmiBridge.createProxyObject(this, objectID, type); - proxyIdCache.put(objectID, remoteObject); + if (remoteObject == null) { + // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. + remoteObject = rmiImplHandler.createProxyObject(this, objectID, type); + proxyIdCache.put(objectID, remoteObject); + } + + return remoteObject; } - - return remoteObject; } /** * This is used by RMI for the REMOTE side, to get the implementation */ + @Override public Object getImplementationObject(final int objectID) { - if (RmiBridge.isGlobal(objectID)) { - RmiBridge globalRmiBridge = endPoint.globalRmiBridge; + if (RmiImplHandler.isGlobal(objectID)) { + RmiImplHandler globalRmiImplHandler = endPoint.globalRmiImplHandler; - if (globalRmiBridge == null) { + if (globalRmiImplHandler == null) { throw new NullPointerException("Unable to call 'getRegisteredId' when the gloablRmiBridge is null!"); } - return globalRmiBridge.getRegisteredObject(objectID); + return globalRmiImplHandler.getRegisteredObject(objectID); } else { - return rmiBridge.getRegisteredObject(objectID); + return rmiImplHandler.getRegisteredObject(objectID); } } } diff --git a/src/dorkbox/network/connection/KryoCryptoSerializationManager.java b/src/dorkbox/network/connection/CryptoSerializationManager.java similarity index 66% rename from src/dorkbox/network/connection/KryoCryptoSerializationManager.java rename to src/dorkbox/network/connection/CryptoSerializationManager.java index ca938064..8d874450 100644 --- a/src/dorkbox/network/connection/KryoCryptoSerializationManager.java +++ b/src/dorkbox/network/connection/CryptoSerializationManager.java @@ -38,6 +38,7 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.util.IntMap; import com.esotericsoftware.kryo.util.MapReferenceResolver; import dorkbox.network.connection.ping.PingMessage; @@ -49,7 +50,7 @@ import dorkbox.network.rmi.InvokeMethodResult; import dorkbox.network.rmi.InvokeMethodSerializer; import dorkbox.network.rmi.RemoteObjectSerializer; import dorkbox.network.rmi.RmiRegistration; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.network.util.RmiSerializationManager; import dorkbox.objectPool.ObjectPool; import dorkbox.objectPool.PoolableObject; import dorkbox.util.Property; @@ -69,9 +70,9 @@ import io.netty.buffer.ByteBuf; */ @SuppressWarnings({"unused", "StaticNonFinalField"}) public -class KryoCryptoSerializationManager implements CryptoSerializationManager { +class CryptoSerializationManager implements dorkbox.network.util.CryptoSerializationManager, RmiSerializationManager { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KryoCryptoSerializationManager.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CryptoSerializationManager.class); /** * Specify if we want KRYO to use unsafe memory for serialization, or to use the ASM backend. Unsafe memory use is WAY faster, but is @@ -83,20 +84,20 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { private static final String OBJECT_ID = "objectID"; public static - KryoCryptoSerializationManager DEFAULT() { + CryptoSerializationManager DEFAULT() { return DEFAULT(true, true); } public static - KryoCryptoSerializationManager DEFAULT(final boolean references, final boolean registrationRequired) { + CryptoSerializationManager DEFAULT(final boolean references, final boolean registrationRequired) { // ignore fields that have the "@IgnoreSerialization" annotation. Collection> marks = new ArrayList>(); marks.add(IgnoreSerialization.class); SerializerFactory disregardingFactory = new FieldAnnotationAwareSerializer.Factory(marks, true); - final KryoCryptoSerializationManager serializationManager = new KryoCryptoSerializationManager(references, - registrationRequired, - disregardingFactory); + final CryptoSerializationManager serializationManager = new CryptoSerializationManager(references, + registrationRequired, + disregardingFactory); serializationManager.register(PingMessage.class); serializationManager.register(byte[].class); @@ -105,7 +106,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { serializationManager.register(IESWithCipherParameters.class, new IesWithCipherParametersSerializer()); serializationManager.register(ECPublicKeyParameters.class, new EccPublicKeySerializer()); serializationManager.register(ECPrivateKeyParameters.class, new EccPrivateKeySerializer()); - serializationManager.register(dorkbox.network.connection.registration.Registration.class); + serializationManager.register(dorkbox.network.connection.registration.Registration.class); // must use full package name! // necessary for the transport of exceptions. serializationManager.register(ArrayList.class, new CollectionSerializer()); @@ -143,12 +144,18 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { this.id = id; } } - private static class RemoteClass { - private final Class ifaceClass; - private final Class implClass; + private static class RemoteIfaceClass { + private final Class ifaceClass; - RemoteClass(final Class ifaceClass, final Class implClass) { + RemoteIfaceClass(final Class ifaceClass) { this.ifaceClass = ifaceClass; + } + } + + private static class RemoteImplClass { + private final Class implClass; + + RemoteImplClass(final Class implClass) { this.implClass = implClass; } } @@ -156,18 +163,18 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { private boolean initialized = false; private final ObjectPool kryoPool; - // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class - private final List> classesToRegister = new ArrayList>(); - private final List classSerializerToRegister = new ArrayList(); - private final List classSerializer2ToRegister = new ArrayList(); - private final List remoteClassToRegister = new ArrayList(); + // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) + // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. + // Object checking is performed during actual registration. + private final List classesToRegister = new ArrayList(); - private boolean shouldInitRMI = false; + private boolean usesRmi = false; private InvokeMethodSerializer methodSerializer = null; private Serializer invocationSerializer = null; private RemoteObjectSerializer remoteObjectSerializer; - + // used to track which interface -> implementation, for use by RMI + private final IntMap> rmiInterfaceToImpl = new IntMap>(); /** * By default, the serialization manager will compress+encrypt data to connections with remote IPs, and only compress on the loopback IP @@ -194,13 +201,12 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { *

*/ public - KryoCryptoSerializationManager(final boolean references, final boolean registrationRequired, final SerializerFactory factory) { - + CryptoSerializationManager(final boolean references, final boolean registrationRequired, final SerializerFactory factory) { kryoPool = ObjectPool.NonBlockingSoftReference(new PoolableObject() { @Override public KryoExtra create() { - synchronized (KryoCryptoSerializationManager.this) { + synchronized (CryptoSerializationManager.this) { KryoExtra kryo = new KryoExtra(); // we HAVE to pre-allocate the KRYOs @@ -211,44 +217,58 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { kryo.setReferences(references); - for (Class clazz : classesToRegister) { - kryo.register(clazz); - } - - for (ClassSerializer classSerializer : classSerializerToRegister) { - kryo.register(classSerializer.clazz, classSerializer.serializer); - } - - for (ClassSerializer2 classSerializer : classSerializer2ToRegister) { - kryo.register(classSerializer.clazz, classSerializer.serializer, classSerializer.id); - } - - if (shouldInitRMI) { + if (usesRmi) { kryo.register(Class.class); kryo.register(RmiRegistration.class); kryo.register(InvokeMethod.class, methodSerializer); kryo.register(Object[].class); - // This has to be PER KRYO, + // This has to be for each kryo instance! InvocationResultSerializer resultSerializer = new InvocationResultSerializer(kryo); resultSerializer.removeField(OBJECT_ID); kryo.register(InvokeMethodResult.class, resultSerializer); kryo.register(InvocationHandler.class, invocationSerializer); + } - for (RemoteClass remoteClass : remoteClassToRegister) { - kryo.register(remoteClass.implClass, remoteObjectSerializer); - // After all common registrations, register OtherObjectImpl only on the server using the remote object interface ID. - // This causes OtherObjectImpl to be serialized as OtherObject. - int otherObjectID = kryo.getRegistration(remoteClass.implClass) - .getId(); + // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. + for (Object clazz : classesToRegister) { + if (clazz instanceof Class) { + kryo.register((Class)clazz); + } + else if (clazz instanceof ClassSerializer) { + ClassSerializer classSerializer = (ClassSerializer) clazz; + kryo.register(classSerializer.clazz, classSerializer.serializer); + } + else if (clazz instanceof ClassSerializer2) { + ClassSerializer2 classSerializer = (ClassSerializer2) clazz; + kryo.register(classSerializer.clazz, classSerializer.serializer, classSerializer.id); + } + else if (clazz instanceof RemoteIfaceClass) { + // THIS IS DONE ON THE CLIENT + // "server" means the side of the connection that has the implementation details for the RMI object + // "client" means the side of the connection that accesses the "server" side object via a proxy object + // the client will NEVER send this object to the server. + // the server will ONLY send this object to the client + RemoteIfaceClass remoteIfaceClass = (RemoteIfaceClass) clazz; - // this overrides the 'otherObjectID' with the specified class/serializer, so that when we WRITE this ID, the impl is READ - kryo.register(remoteClass.ifaceClass, remoteObjectSerializer, otherObjectID); + // registers the interface, so that when it is read, it becomes a "magic" proxy object + kryo.register(remoteIfaceClass.ifaceClass, remoteObjectSerializer); + } + else if (clazz instanceof RemoteImplClass) { + // THIS IS DONE ON THE SERVER + // "server" means the side of the connection that has the implementation details for the RMI object + // "client" means the side of the connection that accesses the "server" side object via a proxy object + // the client will NEVER send this object to the server. + // the server will ONLY send this object to the client + RemoteImplClass remoteImplClass = (RemoteImplClass) clazz; - // we have to save the fact that we might have overridden methods - CachedMethod.registerOverridden(remoteClass.ifaceClass, remoteClass.implClass); + // registers the implementation, so that when it is written, it becomes a "magic" proxy object + int id = kryo.register(remoteImplClass.implClass, remoteObjectSerializer).getId(); + + // sets up the RMI, so when we receive the iface class from the client, we know what impl to use + rmiInterfaceToImpl.put(id, remoteImplClass.implClass); } } @@ -272,12 +292,17 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public synchronized - void register(Class clazz) { + RmiSerializationManager register(Class clazz) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class) call."); } + else if (clazz.isInterface()) { + throw new IllegalArgumentException("Cannot register an interface for serialization. It must be an implementation."); + } else { + classesToRegister.add(clazz); + } - classesToRegister.add(clazz); + return this; } /** @@ -289,13 +314,18 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public synchronized - void register(Class clazz, Serializer serializer) { + RmiSerializationManager register(Class clazz, Serializer serializer) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer) call."); - return; + } + else if (clazz.isInterface()) { + throw new IllegalArgumentException("Cannot register an interface for serialization. It must be an implementation."); + } + else { + classesToRegister.add(new ClassSerializer(clazz, serializer)); } - classSerializerToRegister.add(new ClassSerializer(clazz, serializer)); + return this; } /** @@ -310,50 +340,119 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public synchronized - void register(Class clazz, Serializer serializer, int id) { + RmiSerializationManager register(Class clazz, Serializer serializer, int id) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer, int) call."); - return; + } + else if (clazz.isInterface()) { + throw new IllegalArgumentException("Cannot register an interface for serialization. It must be an implementation."); + } + else { + classesToRegister.add(new ClassSerializer2(clazz, serializer, id)); } - classSerializer2ToRegister.add(new ClassSerializer2(clazz, serializer, id)); + return this; } /** - * Objects that are accessed over RMI, must be accessed via an interface. This method configures the serialization of an implementation - * to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation class is ALREADY registered, - * then it's registration will be overwritten by this one + * @return the previously registered class + */ + private + Class getLastAddedClass() { + // get the previously registered class + Object obj = classesToRegister.get(classesToRegister.size() - 1); + if (obj instanceof Class) { + return (Class) obj; + } + else if (obj instanceof ClassSerializer) { + ClassSerializer classSerializer = (ClassSerializer) obj; + return classSerializer.clazz; + } + else if (obj instanceof ClassSerializer2) { + ClassSerializer2 classSerializer = (ClassSerializer2) obj; + return classSerializer.clazz; + } + + return null; + } + + private static + boolean testInterface(Class from, Class iface) { + for (Class intface : from.getInterfaces()) { + if (iface.equals(intface) || testInterface(intface, iface)) { + return true; + } + } + + return false; + } + + @Override + public synchronized + RmiSerializationManager registerRmiInterface(Class ifaceClass) { + if (initialized) { + logger.warn("Serialization manager already initialized. Ignoring duplicate registerRemote(Class, Class) call."); + return this; + } + + else if (!ifaceClass.isInterface()) { + throw new IllegalArgumentException("Cannot register an implementation for RMI access. It must be an interface."); + } + + usesRmi = true; + classesToRegister.add(new RemoteIfaceClass(ifaceClass)); + return this; + } + + /** + * This method overrides the interface -> implementation. This is so incoming proxy objects will get auto-changed into their correct + * implementation type, so this side of the connection knows what to do with the proxy object. + *

+ * NOTE: You must have ALREADY registered the implementation class. This method just enables the proxy magic. * - * @param ifaceClass The interface used to access the remote object - * @param implClass The implementation class of the interface + * @throws IllegalArgumentException if the iface/impl have previously been overridden */ @Override public synchronized - void registerRemote(final Class ifaceClass, final Class implClass) { + RmiSerializationManager registerRmiImplementation(Class ifaceClass, Class implClass) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate registerRemote(Class, Class) call."); - return; + return this; } - remoteClassToRegister.add(new RemoteClass(ifaceClass, implClass)); + usesRmi = true; + + // THIS IS DONE ON THE SERVER ONLY + // we have to save the fact that we might have overridden methods. + // will throw IllegalArgumentException if the iface/impl have previously been overridden + CachedMethod.registerOverridden(ifaceClass, implClass); + + classesToRegister.add(new RemoteImplClass(implClass)); + return this; } /** * Necessary to register classes for RMI, only called once if/when the RMI bridge is created. + * + * @return true if there are classes that have been registered for RMI */ @Override public synchronized - void initRmiSerialization() { + boolean initRmiSerialization() { + if (!usesRmi) { + return false; + } + if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate initRmiSerialization() call."); - return; + return true; } methodSerializer = new InvokeMethodSerializer(); invocationSerializer = new InvocationHandlerSerializer(logger); remoteObjectSerializer = new RemoteObjectSerializer(); - shouldInitRMI = true; + return true; } /** @@ -363,6 +462,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { public synchronized void finishInit() { initialized = true; + // initialize the kryo pool with at least 1 kryo instance. This ALSO makes sure that all of our class registration is done correctly + kryoPool.put(takeKryo()); } @Override @@ -371,6 +472,17 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { return initialized; } + /** + * Gets the RMI implementation based on the specified ID (which is the ID for the registered interface) + * + * @param objectId ID of the registered interface, which will map to the corresponding implementation. + * @return the implementation for the interface, or null + */ + @Override + public Class getRmiImpl(int objectId) { + return rmiInterfaceToImpl.get(objectId); + } + /** * @return takes a kryo instance from the pool. */ diff --git a/src/dorkbox/network/connection/EndPoint.java b/src/dorkbox/network/connection/EndPoint.java index f899015e..58238635 100644 --- a/src/dorkbox/network/connection/EndPoint.java +++ b/src/dorkbox/network/connection/EndPoint.java @@ -42,12 +42,12 @@ import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.pipeline.KryoEncoder; import dorkbox.network.pipeline.KryoEncoderCrypto; -import dorkbox.network.rmi.RmiBridge; +import dorkbox.network.rmi.RmiImplHandler; import dorkbox.network.store.NullSettingsStore; import dorkbox.network.store.SettingsStore; -import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.OS; import dorkbox.util.Property; +import dorkbox.util.SerializationManager; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.entropy.Entropy; import dorkbox.util.exceptions.InitializationException; @@ -156,7 +156,7 @@ class EndPoint { protected final Class> type; protected final ConnectionManager connectionManager; - protected final CryptoSerializationManager serializationManager; + protected final dorkbox.network.util.CryptoSerializationManager serializationManager; protected final RegistrationWrapper registrationWrapper; protected final Object shutdownInProgress = new Object(); @@ -164,7 +164,7 @@ class EndPoint { final ECPublicKeyParameters publicKey; final SecureRandom secureRandom; - final RmiBridge globalRmiBridge; + final RmiImplHandler globalRmiImplHandler; private final CountDownLatch blockUntilDone = new CountDownLatch(1); @@ -221,14 +221,11 @@ class EndPoint { if (options.serialization != null) { this.serializationManager = options.serialization; } else { - this.serializationManager = KryoCryptoSerializationManager.DEFAULT(); + this.serializationManager = CryptoSerializationManager.DEFAULT(); } - rmiEnabled = options.rmiEnabled; - if (rmiEnabled) { - // setup our RMI serialization managers. Can only be called once - serializationManager.initRmiSerialization(); - } + // setup our RMI serialization managers. Can only be called once + rmiEnabled = serializationManager.initRmiSerialization(); rmiExecutor = options.rmiExecutor; @@ -325,10 +322,10 @@ class EndPoint { if (this.rmiEnabled) { // these register the listener for registering a class implementation for RMI (internal use only) this.connectionManager.add(new RegisterRmiSystemListener()); - this.globalRmiBridge = new RmiBridge(logger, options.rmiExecutor, true); + this.globalRmiImplHandler = new RmiImplHandler(logger, options.rmiExecutor, true); } else { - this.globalRmiBridge = null; + this.globalRmiImplHandler = null; } serializationManager.finishInit(); @@ -423,7 +420,7 @@ class EndPoint { * Returns the serialization wrapper if there is an object type that needs to be added outside of the basics. */ public - CryptoSerializationManager getSerialization() { + SerializationManager getSerialization() { return this.serializationManager; } @@ -437,8 +434,8 @@ class EndPoint { * @return a new network connection */ protected - ConnectionImpl newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { - return new ConnectionImpl(logger, endPoint, rmiBridge); + ConnectionImpl newConnection(final Logger logger, final EndPoint endPoint, final RmiImplHandler rmiImplHandler) { + return new ConnectionImpl(logger, endPoint, rmiImplHandler); } /** @@ -453,9 +450,9 @@ class EndPoint { Connection connection0(MetaChannel metaChannel) { ConnectionImpl connection; - RmiBridge rmiBridge = null; + RmiImplHandler rmiImplHandler = null; if (metaChannel != null && rmiEnabled) { - rmiBridge = new RmiBridge(logger, rmiExecutor, false); + rmiImplHandler = new RmiImplHandler(logger, rmiExecutor, false); } // setup the extras needed by the network connection. @@ -464,7 +461,7 @@ class EndPoint { if (metaChannel != null) { ChannelWrapper wrapper; - connection = newConnection(logger, this, rmiBridge); + connection = newConnection(logger, this, rmiImplHandler); metaChannel.connection = connection; if (metaChannel.localChannel != null) { @@ -482,10 +479,10 @@ class EndPoint { // now initialize the connection channels with whatever extra info they might need. connection.init(wrapper, (ConnectionManager) this.connectionManager); - if (rmiBridge != null) { + if (rmiImplHandler != null) { // notify our remote object space that it is able to receive method calls. connection.listeners() - .add(rmiBridge.getListener()); + .add(rmiImplHandler.getListener()); } } else { @@ -803,8 +800,8 @@ class EndPoint { */ public int createGlobalObject(final T globalObject) { - int globalObjectId = globalRmiBridge.nextObjectId(); - globalRmiBridge.register(globalObjectId, globalObject); + int globalObjectId = globalRmiImplHandler.nextObjectId(); + globalRmiImplHandler.register(globalObjectId, globalObject); return globalObjectId; } } diff --git a/src/dorkbox/network/connection/EndPointClient.java b/src/dorkbox/network/connection/EndPointClient.java index e0d7fe4d..835d6b0d 100644 --- a/src/dorkbox/network/connection/EndPointClient.java +++ b/src/dorkbox/network/connection/EndPointClient.java @@ -15,6 +15,13 @@ */ package dorkbox.network.connection; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; + import dorkbox.network.Client; import dorkbox.network.Configuration; import dorkbox.network.connection.bridge.ConnectionBridge; @@ -22,12 +29,6 @@ import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; /** * This serves the purpose of making sure that specific methods are not available to the end user. @@ -38,10 +39,12 @@ class EndPointClient extends EndPoint implements Runnab protected C connection; protected final Object registrationLock = new Object(); + protected final AtomicInteger connectingBootstrap = new AtomicInteger(0); protected List bootstraps = new LinkedList(); protected volatile int connectionTimeout = 5000; // default protected volatile boolean registrationComplete = false; + private volatile boolean rmiInitializationComplete = false; private volatile ConnectionBridge connectionBridgeFlushAlways; @@ -186,6 +189,11 @@ class EndPointClient extends EndPoint implements Runnab } }; + //noinspection unchecked + this.connection = (C) connection; + + // check if there were any RMI callbacks during the connect phase. + rmiInitializationComplete = connection.rmiCallbacksIsEmpty(); // notify the registration we are done! synchronized (this.registrationLock) { @@ -193,6 +201,21 @@ class EndPointClient extends EndPoint implements Runnab } } + /** + * Internal call. + *

+ * RMI methods are usually created during the connection phase. We should wait until they are finished, but ONLY if there is + * something we need to wait for. + * + * This is called AFTER registration is finished. + */ + protected + void waitForRmi(final int connectionTimeout) { + if (!rmiInitializationComplete && connection instanceof ConnectionImpl) { + ((ConnectionImpl) connection).waitForRmi(connectionTimeout); + } + } + /** * Expose methods to send objects to a destination. *

@@ -224,6 +247,11 @@ class EndPointClient extends EndPoint implements Runnab this.registrationLock.notify(); } registrationComplete = false; + + // Always unblock the waiting client.connect(). + if (connection instanceof ConnectionImpl) { + ((ConnectionImpl) connection).rmiCallbacksNotify(); + } } /** @@ -233,6 +261,11 @@ class EndPointClient extends EndPoint implements Runnab synchronized (this.registrationLock) { this.registrationLock.notify(); } + + // Always unblock the waiting client.connect(). + if (connection instanceof ConnectionImpl) { + ((ConnectionImpl) connection).rmiCallbacksNotify(); + } stop(); } } diff --git a/src/dorkbox/network/connection/ObjectRegistrationLatch.java b/src/dorkbox/network/connection/ObjectRegistrationLatch.java deleted file mode 100644 index 278b4ed4..00000000 --- a/src/dorkbox/network/connection/ObjectRegistrationLatch.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.connection; - -import java.util.concurrent.CountDownLatch; - -class ObjectRegistrationLatch { - final CountDownLatch latch = new CountDownLatch(1); - Object remoteObject; - boolean hasError = false; -} diff --git a/src/dorkbox/network/connection/RegisterRmiSystemListener.java b/src/dorkbox/network/connection/RegisterRmiSystemListener.java index a293d1a1..151bd75f 100644 --- a/src/dorkbox/network/connection/RegisterRmiSystemListener.java +++ b/src/dorkbox/network/connection/RegisterRmiSystemListener.java @@ -25,7 +25,7 @@ class RegisterRmiSystemListener implements Listener.OnMessageReceived implements UdpServer { */ public boolean rmiEnabled() { - return endPoint.globalRmiBridge != null; + return endPoint.globalRmiImplHandler != null; } public diff --git a/src/dorkbox/network/connection/registration/Registration.java b/src/dorkbox/network/connection/registration/Registration.java index 3f22c14e..5a04392e 100644 --- a/src/dorkbox/network/connection/registration/Registration.java +++ b/src/dorkbox/network/connection/registration/Registration.java @@ -23,13 +23,6 @@ import org.bouncycastle.crypto.params.IESParameters; */ public class Registration { - public static final byte notAdroid = (byte) 0; - public static final byte android = (byte) 1; - - - // signals which serialization is possible. If they match, then UNSAFE can be used (except android. it always must use ASM) - public byte connectionType; - public ECPublicKeyParameters publicKey; public IESParameters eccParameters; public byte[] aesKey; diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java index 49d5b147..d48d4071 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java @@ -15,16 +15,22 @@ */ package dorkbox.network.connection.registration.remote; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.List; + +import org.slf4j.Logger; + import dorkbox.network.Broadcast; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.connection.wrapper.UdpWrapper; -import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteArray; import dorkbox.util.crypto.CryptoAES; import io.netty.buffer.ByteBuf; @@ -34,12 +40,6 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageCodec; -import org.slf4j.Logger; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.List; @Sharable public @@ -50,13 +50,13 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo private final org.slf4j.Logger logger; private final ByteBuf discoverResponseBuffer; private final RegistrationWrapper registrationWrapper; - private final CryptoSerializationManager serializationManager; + private final dorkbox.network.util.CryptoSerializationManager serializationManager; public RegistrationRemoteHandlerServerUDP(final String name, final RegistrationWrapper registrationWrapper, - final CryptoSerializationManager serializationManager) { + final dorkbox.network.util.CryptoSerializationManager serializationManager) { final String name1 = name + " Registration-UDP-Server"; this.logger = org.slf4j.LoggerFactory.getLogger(name1); this.registrationWrapper = registrationWrapper; @@ -170,9 +170,9 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo // registration is the ONLY thing NOT encrypted Logger logger2 = this.logger; RegistrationWrapper registrationWrapper2 = this.registrationWrapper; - CryptoSerializationManager serializationManager2 = this.serializationManager; + dorkbox.network.util.CryptoSerializationManager serializationManager2 = this.serializationManager; - if (KryoCryptoSerializationManager.isEncrypted(message)) { + if (CryptoSerializationManager.isEncrypted(message)) { // we need to FORWARD this message "down the pipeline". ConnectionImpl connection = registrationWrapper2.getServerUDP(udpRemoteAddress); diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java index bf78ec11..ec91cb6c 100644 --- a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java @@ -15,26 +15,26 @@ */ package dorkbox.network.pipeline.udp; -import dorkbox.network.connection.KryoCryptoSerializationManager; -import dorkbox.network.util.CryptoSerializationManager; +import java.io.IOException; +import java.util.List; + +import org.slf4j.LoggerFactory; + +import dorkbox.network.connection.CryptoSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageDecoder; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; @Sharable public class KryoDecoderUdp extends MessageToMessageDecoder { - private final CryptoSerializationManager serializationManager; + private final dorkbox.network.util.CryptoSerializationManager serializationManager; public - KryoDecoderUdp(CryptoSerializationManager serializationManager) { + KryoDecoderUdp(dorkbox.network.util.CryptoSerializationManager serializationManager) { this.serializationManager = serializationManager; } @@ -48,7 +48,7 @@ class KryoDecoderUdp extends MessageToMessageDecoder { // there is a REMOTE possibility that UDP traffic BEAT the TCP registration traffic, which means that THIS packet // COULD be encrypted! - if (KryoCryptoSerializationManager.isEncrypted(data)) { + if (CryptoSerializationManager.isEncrypted(data)) { String message = "Encrypted UDP packet received before registration complete."; LoggerFactory.getLogger(this.getClass()).error(message); throw new IOException(message); diff --git a/src/dorkbox/network/rmi/CachedMethod.java b/src/dorkbox/network/rmi/CachedMethod.java index d3e9b886..d550d7fe 100644 --- a/src/dorkbox/network/rmi/CachedMethod.java +++ b/src/dorkbox/network/rmi/CachedMethod.java @@ -34,19 +34,6 @@ */ package dorkbox.network.rmi; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.util.IdentityMap; -import com.esotericsoftware.kryo.util.Util; -import com.esotericsoftware.reflectasm.MethodAccess; -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.KryoExtra; -import dorkbox.network.util.RMISerializationManager; -import dorkbox.util.ClassHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -56,6 +43,21 @@ import java.util.Comparator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.util.IdentityMap; +import com.esotericsoftware.kryo.util.Util; +import com.esotericsoftware.reflectasm.MethodAccess; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.KryoExtra; +import dorkbox.network.util.RmiSerializationManager; +import dorkbox.util.ClassHelper; + public class CachedMethod { private static final Logger logger = LoggerFactory.getLogger(CachedMethod.class); @@ -114,7 +116,7 @@ class CachedMethod { // type will be likely be the interface public static - CachedMethod[] getMethods(final RMISerializationManager serializationManager, final Class type) { + CachedMethod[] getMethods(final RmiSerializationManager serializationManager, final Class type) { CachedMethod[] cachedMethods = methodCache.get(type); if (cachedMethods != null) { return cachedMethods; @@ -315,6 +317,10 @@ class CachedMethod { /** * Called by the SerializationManager, so that RMI classes that are overridden for serialization purposes, can check to see if certain * methods need to be overridden. + *

+ * NOTE: It is CRITICAL that this is unique per JVM, otherwise unexpected things can happen. + * + * @throws IllegalArgumentException if the iface/impl have previously been overridden */ public static void registerOverridden(final Class ifaceClass, final Class implClass) { diff --git a/src/dorkbox/network/rmi/InvocationHandlerSerializer.java b/src/dorkbox/network/rmi/InvocationHandlerSerializer.java index 117fa6f6..3108381a 100644 --- a/src/dorkbox/network/rmi/InvocationHandlerSerializer.java +++ b/src/dorkbox/network/rmi/InvocationHandlerSerializer.java @@ -15,14 +15,16 @@ */ package dorkbox.network.rmi; +import java.lang.reflect.Proxy; + +import org.slf4j.Logger; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import dorkbox.network.connection.KryoExtra; -import org.slf4j.Logger; -import java.lang.reflect.Proxy; +import dorkbox.network.connection.KryoExtra; public class InvocationHandlerSerializer extends Serializer { @@ -36,7 +38,7 @@ class InvocationHandlerSerializer extends Serializer { @Override public void write(Kryo kryo, Output output, Object object) { - RemoteObjectInvocationHandler handler = (RemoteObjectInvocationHandler) Proxy.getInvocationHandler(object); + RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(object); output.writeInt(handler.objectID, true); } diff --git a/src/dorkbox/network/rmi/OverriddenMethods.java b/src/dorkbox/network/rmi/OverriddenMethods.java index d7d35bb6..aab55cca 100644 --- a/src/dorkbox/network/rmi/OverriddenMethods.java +++ b/src/dorkbox/network/rmi/OverriddenMethods.java @@ -15,10 +15,10 @@ */ package dorkbox.network.rmi; -import com.esotericsoftware.kryo.util.IdentityMap; - import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import com.esotericsoftware.kryo.util.IdentityMap; + /** * Uses the "single writer principle" for fast access */ @@ -67,10 +67,20 @@ class OverriddenMethods { return identityMap.get(type); } + /** + * @throws IllegalArgumentException if the iface/impl have previously been overridden + */ // synchronized to make sure only one writer at a time public synchronized void set(final Class ifaceClass, final Class implClass) { - this.overriddenMethods.put(ifaceClass, implClass); - this.overriddenReverseMethods.put(implClass, ifaceClass); + Class a = this.overriddenMethods.put(ifaceClass, implClass); + Class b = this.overriddenReverseMethods.put(implClass, ifaceClass); + + // this MUST BE UNIQUE per JVM, otherwise unexpected things can happen. + if (a != null || b != null) { + throw new IllegalArgumentException("Unable to override interface (" + ifaceClass + ") and implementation (" + implClass + ") " + + "because they have already been overridden by something else. It is critical that they are" + + " both unique per JVM"); + } } } diff --git a/src/dorkbox/network/rmi/RemoteObjectCallback.java b/src/dorkbox/network/rmi/RemoteObjectCallback.java new file mode 100644 index 00000000..65164cdb --- /dev/null +++ b/src/dorkbox/network/rmi/RemoteObjectCallback.java @@ -0,0 +1,12 @@ +package dorkbox.network.rmi; + +/** + * + */ +public +interface RemoteObjectCallback { + /** + * @param remoteObject the remote object (as a proxy object) or null if there was an error + */ + void created(Iface remoteObject); +} diff --git a/src/dorkbox/network/rmi/RemoteObjectSerializer.java b/src/dorkbox/network/rmi/RemoteObjectSerializer.java index 5f902302..14e118db 100644 --- a/src/dorkbox/network/rmi/RemoteObjectSerializer.java +++ b/src/dorkbox/network/rmi/RemoteObjectSerializer.java @@ -38,10 +38,11 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; + import dorkbox.network.connection.KryoExtra; /** - * Serializes an object registered with the RmiBridge so the receiving side gets a {@link RemoteObject} proxy rather than the bytes for the + * Serializes an object registered with the RmiImplHandler so the receiving side gets a {@link RemoteObject} proxy rather than the bytes for the * serialized object. * * @author Nathan Sweet diff --git a/src/dorkbox/network/rmi/RmiBridge.java b/src/dorkbox/network/rmi/RmiImplHandler.java similarity index 90% rename from src/dorkbox/network/rmi/RmiBridge.java rename to src/dorkbox/network/rmi/RmiImplHandler.java index 65a0980a..218a8514 100644 --- a/src/dorkbox/network/rmi/RmiBridge.java +++ b/src/dorkbox/network/rmi/RmiImplHandler.java @@ -34,14 +34,6 @@ */ package dorkbox.network.rmi; -import com.esotericsoftware.kryo.util.IntMap; -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.Listener; -import dorkbox.util.collections.ObjectIntMap; -import org.slf4j.Logger; - import java.io.IOException; import java.lang.reflect.Proxy; import java.util.Arrays; @@ -51,13 +43,24 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.slf4j.Logger; + +import com.esotericsoftware.kryo.util.IntMap; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.Listener; +import dorkbox.network.util.RmiSerializationManager; +import dorkbox.util.collections.ObjectIntMap; + /** * Allows methods on objects to be invoked remotely over TCP, UDP, UDT, or LOCAL. Local connections ignore TCP/UDP/UDT requests, and perform * object transformation (because there is no serialization occurring) using a series of weak hashmaps. *

*

- * Objects are {@link dorkbox.network.util.RMISerializationManager#registerRemote(Class, Class)}, and endpoint connections can then {@link - * Connection#createProxyObject(Class)} for the registered objects. + * Objects are {@link RmiSerializationManager#registerRmiInterface(Class)}, and endpoint connections can then {@link + * Connection#getRemoteObject(Class)} for the registered objects. *

* It costs at least 2 bytes more to use remote method invocation than just sending the parameters. If the method has a return value which * is not {@link RemoteObject#setAsync(boolean) ignored}, an extra byte is written. If the type of a parameter is not final (note that @@ -84,7 +87,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; * @author Nathan Sweet , Nathan Robinson */ public final -class RmiBridge { +class RmiImplHandler { public static final int INVALID_RMI = 0; static final int returnValueMask = 1 << 7; static final int returnExceptionMask = 1 << 6; @@ -102,7 +105,7 @@ class RmiBridge { return (objectId & 1) != 0; } - // the name of who created this RmiBridge + // the name of who created this RmiImplHandler private final org.slf4j.Logger logger; // we start at 1, because 0 (INVALID_RMI) means we access connection only objects @@ -127,7 +130,7 @@ class RmiBridge { final Object target = connection.getImplementationObject(objectID); if (target == null) { - Logger logger2 = RmiBridge.this.logger; + Logger logger2 = RmiImplHandler.this.logger; if (logger2.isWarnEnabled()) { logger2.warn("Ignoring remote invocation request for unknown object ID: {}", objectID); } @@ -135,7 +138,7 @@ class RmiBridge { return; } - Executor executor2 = RmiBridge.this.executor; + Executor executor2 = RmiImplHandler.this.executor; if (executor2 == null) { try { invoke(connection, target, invokeMethod); @@ -160,18 +163,18 @@ class RmiBridge { }; /** - * Creates an RmiBridge with no connections. Connections must be {@link RmiBridge#register(int, Object)} added to allow the remote end + * Creates an RmiImplHandler with no connections. Connections must be {@link RmiImplHandler#register(int, Object)} added to allow the remote end * of the connections to access objects in this ObjectSpace. * * @param executor * Sets the executor used to invoke methods when an invocation is received from a remote endpoint. By default, no * executor is set and invocations occur on the network thread, which should not be blocked for long, May be null. * @param isGlobal - * specify if this RmiBridge is a "global" bridge, meaning connections will prefer objects from this bridge instead of + * specify if this RmiImplHandler is a "global" bridge, meaning connections will prefer objects from this bridge instead of * the connection-local bridge. */ public - RmiBridge(final org.slf4j.Logger logger, final Executor executor, final boolean isGlobal) { + RmiImplHandler(final org.slf4j.Logger logger, final Executor executor, final boolean isGlobal) { this.logger = logger; this.executor = executor; @@ -194,7 +197,7 @@ class RmiBridge { /** * Invokes the method on the object and, if necessary, sends the result back to the connection that made the invocation request. This - * method is invoked on the update thread of the {@link EndPoint} for this RmiBridge and unless an executor has been set. + * method is invoked on the update thread of the {@link EndPoint} for this RmiImplHandler and unless an executor has been set. * * @param connection * The remote side of this connection requested the invocation. @@ -295,13 +298,13 @@ class RmiBridge { int value = rmiObjectIdCounter.getAndAdd(2); if (value > MAX_RMI_VALUE) { rmiObjectIdCounter.set(MAX_RMI_VALUE); // prevent wrapping by spammy callers - logger.error("RMI next value has exceeded maximum limits in RmiBridge!"); + logger.error("RMI next value has exceeded maximum limits in RmiImplHandler!"); } return value; } /** - * Registers an object to allow the remote end of the RmiBridge connections to access it using the specified ID. + * Registers an object to allow the remote end of the RmiImplHandler connections to access it using the specified ID. * * @param objectID * Must not be Integer.MAX_VALUE. @@ -316,7 +319,7 @@ class RmiBridge { throw new IllegalArgumentException("object cannot be null."); } - WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); + WriteLock writeLock = RmiImplHandler.this.objectLock.writeLock(); writeLock.lock(); this.idToObject.put(objectID, object); @@ -331,12 +334,12 @@ class RmiBridge { } /** - * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. + * Removes an object. The remote end of the RmiImplHandler connection will no longer be able to access it. */ @SuppressWarnings("AutoBoxing") public void remove(int objectID) { - WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); + WriteLock writeLock = RmiImplHandler.this.objectLock.writeLock(); writeLock.lock(); Object object = this.idToObject.remove(objectID); @@ -353,12 +356,12 @@ class RmiBridge { } /** - * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. + * Removes an object. The remote end of the RmiImplHandler connection will no longer be able to access it. */ @SuppressWarnings("AutoBoxing") public void remove(Object object) { - WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); + WriteLock writeLock = RmiImplHandler.this.objectLock.writeLock(); writeLock.lock(); if (!this.idToObject.containsValue(object, true)) { @@ -409,12 +412,12 @@ class RmiBridge { } /** - * Warning. This is an advanced method. You should probably be using {@link Connection#createProxyObject(Class)}. + * Warning. This is an advanced method. You should probably be using {@link Connection#getRemoteObject(Class, RemoteObjectCallback)} *

*

* Returns a proxy object that implements the specified interfaces. Methods invoked on the proxy object will be invoked remotely on the * object with the specified ID in the ObjectSpace for the specified connection. If the remote end of the connection has not {@link - * RmiBridge#register(int, Object)} added the connection to the ObjectSpace, the remote method invocations will be ignored. + * RmiImplHandler#register(int, Object)} added the connection to the ObjectSpace, the remote method invocations will be ignored. *

* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link * RemoteObject#setResponseTimeout(int) response timeout}. @@ -441,8 +444,8 @@ class RmiBridge { temp[0] = RemoteObject.class; temp[1] = iface; - return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), + return (RemoteObject) Proxy.newProxyInstance(RmiImplHandler.class.getClassLoader(), temp, - new RemoteObjectInvocationHandler(connection, objectID)); + new RmiProxyHandler(connection, objectID)); } } diff --git a/src/dorkbox/network/rmi/RemoteObjectInvocationHandler.java b/src/dorkbox/network/rmi/RmiProxyHandler.java similarity index 89% rename from src/dorkbox/network/rmi/RemoteObjectInvocationHandler.java rename to src/dorkbox/network/rmi/RmiProxyHandler.java index 2f341190..8b6a42e4 100644 --- a/src/dorkbox/network/rmi/RemoteObjectInvocationHandler.java +++ b/src/dorkbox/network/rmi/RmiProxyHandler.java @@ -35,12 +35,6 @@ package dorkbox.network.rmi; -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.util.RMISerializationManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -49,11 +43,18 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.util.RmiSerializationManager; + /** * Handles network communication when methods are invoked on a proxy. */ -class RemoteObjectInvocationHandler implements InvocationHandler { - private static final Logger logger = LoggerFactory.getLogger(RemoteObjectInvocationHandler.class); +class RmiProxyHandler implements InvocationHandler { + private final Logger logger; private final ReentrantLock lock = new ReentrantLock(); private final Condition responseCondition = this.lock.newCondition(); @@ -80,12 +81,16 @@ class RemoteObjectInvocationHandler implements InvocationHandler { private Byte lastResponseID; private byte nextResponseId = (byte) 1; - RemoteObjectInvocationHandler(final Connection connection, final int objectID) { + RmiProxyHandler(final Connection connection, final int objectID) { super(); + this.connection = connection; this.objectID = objectID; this.proxyString = ""; + logger = LoggerFactory.getLogger(connection.getEndPoint().getName() + ":" + this.getClass().getSimpleName()); + + this.responseListener = new RemoteInvocationResponse() { @Override public @@ -106,16 +111,16 @@ class RemoteObjectInvocationHandler implements InvocationHandler { // logger.trace("{} received data: {} with id ({})", connection, invokeMethodResult.result, invokeMethodResult.responseID); synchronized (this) { - if (RemoteObjectInvocationHandler.this.pendingResponses[responseID]) { - RemoteObjectInvocationHandler.this.responseTable[responseID] = invokeMethodResult; + if (RmiProxyHandler.this.pendingResponses[responseID]) { + RmiProxyHandler.this.responseTable[responseID] = invokeMethodResult; } } - RemoteObjectInvocationHandler.this.lock.lock(); + RmiProxyHandler.this.lock.lock(); try { - RemoteObjectInvocationHandler.this.responseCondition.signalAll(); + RmiProxyHandler.this.responseCondition.signalAll(); } finally { - RemoteObjectInvocationHandler.this.lock.unlock(); + RmiProxyHandler.this.lock.unlock(); } } }; @@ -201,10 +206,8 @@ class RemoteObjectInvocationHandler implements InvocationHandler { return proxyString; } - final Logger logger1 = RemoteObjectInvocationHandler.logger; - EndPoint endPoint = this.connection.getEndPoint(); - final RMISerializationManager serializationManager = endPoint.getSerialization(); + final RmiSerializationManager serializationManager = (RmiSerializationManager) endPoint.getSerialization(); InvokeMethod invokeMethod = new InvokeMethod(); invokeMethod.objectID = this.objectID; @@ -245,14 +248,17 @@ class RemoteObjectInvocationHandler implements InvocationHandler { if (invokeMethod.cachedMethod == null) { String msg = "Method not found: " + method; - logger1.error(msg); + logger.error(msg); return msg; } byte responseID = (byte) 0; - // An invocation doesn't need a response is if it's async and no return values or exceptions are wanted back. - boolean ignoreResponse = this.isAsync && !(this.transmitReturnValue || this.transmitExceptions); + // An invocation doesn't need a response is if it's + // VOID return type + // ASYNC and no return values or exceptions are wanted back + Class returnType = method.getReturnType(); + boolean ignoreResponse = returnType == void.class || this.isAsync && !(this.transmitReturnValue || this.transmitExceptions); if (ignoreResponse) { invokeMethod.responseData = (byte) 0; // 0 means do not respond. } @@ -260,7 +266,7 @@ class RemoteObjectInvocationHandler implements InvocationHandler { synchronized (this) { // Increment the response counter and put it into the low bits of the responseID. responseID = this.nextResponseId++; - if (this.nextResponseId > RmiBridge.responseIdMask) { + if (this.nextResponseId > RmiImplHandler.responseIdMask) { this.nextResponseId = (byte) 1; } this.pendingResponses[responseID] = true; @@ -268,15 +274,15 @@ class RemoteObjectInvocationHandler implements InvocationHandler { // Pack other data into the high bits. byte responseData = responseID; if (this.transmitReturnValue) { - responseData |= (byte) RmiBridge.returnValueMask; + responseData |= (byte) RmiImplHandler.returnValueMask; } if (this.transmitExceptions) { - responseData |= (byte) RmiBridge.returnExceptionMask; + responseData |= (byte) RmiImplHandler.returnExceptionMask; } invokeMethod.responseData = responseData; } - // Sends our invokeMethod to the remote connection, which the RmiBridge listens for + // Sends our invokeMethod to the remote connection, which the RmiImplHandler listens for if (this.udp) { this.connection.send() .UDP(invokeMethod) @@ -293,22 +299,23 @@ class RemoteObjectInvocationHandler implements InvocationHandler { .flush(); } - if (logger1.isTraceEnabled()) { + if (logger.isTraceEnabled()) { String argString = ""; if (args != null) { argString = Arrays.deepToString(args); argString = argString.substring(1, argString.length() - 1); } - logger1.trace(this.connection + " sent: " + method.getDeclaringClass() + logger.trace(this.connection + " sent: " + method.getDeclaringClass() .getSimpleName() + "#" + method.getName() + "(" + argString + ")"); } - this.lastResponseID = (byte) (invokeMethod.responseData & RmiBridge.responseIdMask); - + this.lastResponseID = (byte) (invokeMethod.responseData & RmiImplHandler.responseIdMask); + // 0 means respond immediately because it's + // VOID return type + // ASYNC and no return values or exceptions are wanted back if (this.isAsync) { - Class returnType = method.getReturnType(); if (returnType.isPrimitive()) { if (returnType == int.class) { return 0; @@ -334,9 +341,15 @@ class RemoteObjectInvocationHandler implements InvocationHandler { if (returnType == double.class) { return 0.0d; } + if (returnType == void.class) { + return 0.0d; + } } return null; } + else if (returnType == void.class) { + return null; + } try { Object result = waitForResponse(this.lastResponseID); @@ -457,7 +470,7 @@ class RemoteObjectInvocationHandler implements InvocationHandler { if (getClass() != obj.getClass()) { return false; } - RemoteObjectInvocationHandler other = (RemoteObjectInvocationHandler) obj; + RmiProxyHandler other = (RmiProxyHandler) obj; return this.objectID == other.objectID; } } diff --git a/src/dorkbox/network/rmi/RmiRegistration.java b/src/dorkbox/network/rmi/RmiRegistration.java index 8b0c7aec..31df50f3 100644 --- a/src/dorkbox/network/rmi/RmiRegistration.java +++ b/src/dorkbox/network/rmi/RmiRegistration.java @@ -21,32 +21,48 @@ package dorkbox.network.rmi; public class RmiRegistration { public Object remoteObject; - public String remoteImplementationClass; - public boolean hasError; + public Class interfaceClass; // this is used to get specific, GLOBAL rmi objects (objects that are not bound to a single connection) public int remoteObjectId; - public + public int rmiID; + + @SuppressWarnings("unused") + private RmiRegistration() { - hasError = true; + // for serialization } + // When requesting a new remote object to be created. + // SENT FROM "client" -> "server" public - RmiRegistration(final String remoteImplementationClass) { - this.remoteImplementationClass = remoteImplementationClass; - hasError = false; + RmiRegistration(final Class interfaceClass, final int rmiID) { + this.interfaceClass = interfaceClass; + this.rmiID = rmiID; } + // When requesting a new remote object to be created. + // SENT FROM "client" -> "server" public - RmiRegistration(final Object remoteObject) { - this.remoteObject = remoteObject; - hasError = false; - } - - public - RmiRegistration(final int remoteObjectId) { + RmiRegistration(final int remoteObjectId, final int rmiID) { this.remoteObjectId = remoteObjectId; - hasError = false; + this.rmiID = rmiID; + } + + + // When there was an error creating the remote object. + // SENT FROM "server" -> "client" + public + RmiRegistration(final int rmiID) { + this.rmiID = rmiID; + } + + // This is when we successfully created a new object + // SENT FROM "server" -> "client" + public + RmiRegistration(final Object remoteObject, final int rmiID) { + this.remoteObject = remoteObject; + this.rmiID = rmiID; } } diff --git a/src/dorkbox/network/rmi/TimeoutException.java b/src/dorkbox/network/rmi/TimeoutException.java index 883527f5..75e942cc 100644 --- a/src/dorkbox/network/rmi/TimeoutException.java +++ b/src/dorkbox/network/rmi/TimeoutException.java @@ -41,7 +41,7 @@ import java.io.IOException; * RemoteObject#setResponseTimeout(int) response timeout}. * * @author Nathan Sweet - * @see dorkbox.network.connection.Connection#createProxyObject(Class) + * @see dorkbox.network.connection.Connection#createRemoteObject(Class) */ public class TimeoutException extends IOException { diff --git a/src/dorkbox/network/util/CryptoSerializationManager.java b/src/dorkbox/network/util/CryptoSerializationManager.java index 69035b16..411352a2 100644 --- a/src/dorkbox/network/util/CryptoSerializationManager.java +++ b/src/dorkbox/network/util/CryptoSerializationManager.java @@ -15,18 +15,17 @@ */ package dorkbox.network.util; -import dorkbox.network.connection.ConnectionImpl; -import dorkbox.util.SerializationManager; -import io.netty.buffer.ByteBuf; - import java.io.IOException; +import dorkbox.network.connection.ConnectionImpl; +import io.netty.buffer.ByteBuf; + /** * Threads reading/writing, it messes up a single instance. it is possible to use a single kryo with the use of synchronize, however - that * defeats the point of multi-threaded */ public -interface CryptoSerializationManager extends SerializationManager, RMISerializationManager { +interface CryptoSerializationManager extends RmiSerializationManager { /** * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. @@ -46,5 +45,4 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat * should ALWAYS be the length of the expected object! */ Object readWithCrypto(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException; - } diff --git a/src/dorkbox/network/util/RMISerializationManager.java b/src/dorkbox/network/util/RMISerializationManager.java deleted file mode 100644 index 35edef0b..00000000 --- a/src/dorkbox/network/util/RMISerializationManager.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.util; - -import dorkbox.network.connection.KryoExtra; - -public -interface RMISerializationManager { - - /** - * Necessary to register classes for RMI, only called once when the RMI bridge is created. - */ - void initRmiSerialization(); - - /** - * @return takes a kryo instance from the pool. - */ - KryoExtra takeKryo(); - - /** - * Returns a kryo instance to the pool. - */ - void returnKryo(KryoExtra object); - - /** - * Objects that we want to use RMI with, must be accessed via an interface. This method configures the serialization of an - * implementation to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation class is - * ALREADY registered, then it's registration will be overwritten by this one - * - * @param ifaceClass - * The interface used to access the remote object - * @param implClass - * The implementation class of the interface - */ - void registerRemote(Class ifaceClass, Class implClass); -} diff --git a/src/dorkbox/network/util/RmiSerializationManager.java b/src/dorkbox/network/util/RmiSerializationManager.java new file mode 100644 index 00000000..c33adb4c --- /dev/null +++ b/src/dorkbox/network/util/RmiSerializationManager.java @@ -0,0 +1,126 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.util; + +import com.esotericsoftware.kryo.Serializer; + +import dorkbox.network.connection.KryoExtra; +import dorkbox.util.SerializationManager; + +public +interface RmiSerializationManager extends SerializationManager { + + /** + * Registers the class using the lowest, next available integer ID and the + * {@link Kryo#getDefaultSerializer(Class) default serializer}. If the class + * is already registered, the existing entry is updated with the new + * serializer. Registering a primitive also affects the corresponding + * primitive wrapper. + *

+ * Because the ID assigned is affected by the IDs registered before it, the + * order classes are registered is important when using this method. The + * order must be the same at deserialization as it was for serialization. + */ + @Override + RmiSerializationManager register(Class clazz); + + /** + * Registers the class using the lowest, next available integer ID and the + * specified serializer. If the class is already registered, the existing + * entry is updated with the new serializer. Registering a primitive also + * affects the corresponding primitive wrapper. + *

+ * Because the ID assigned is affected by the IDs registered before it, the + * order classes are registered is important when using this method. The + * order must be the same at deserialization as it was for serialization. + */ + @Override + RmiSerializationManager register(Class clazz, Serializer serializer); + + /** + * Registers the class using the specified ID and serializer. If the ID is + * already in use by the same type, the old entry is overwritten. If the ID + * is already in use by a different type, a {@link KryoException} is thrown. + * Registering a primitive also affects the corresponding primitive wrapper. + *

+ * IDs must be the same at deserialization as they were for serialization. + * + * @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs + * 0-8 are used by default for primitive types and String, but + * these IDs can be repurposed. + */ + @Override + RmiSerializationManager register(Class clazz, Serializer serializer, int id); + + + /** + * Necessary to register classes for RMI, only called once when the RMI bridge is created. + * @return true if there are classes that have been registered for RMI + */ + boolean initRmiSerialization(); + + /** + * @return takes a kryo instance from the pool. + */ + KryoExtra takeKryo(); + + /** + * Returns a kryo instance to the pool. + */ + void returnKryo(KryoExtra kryo); + + /** + * Gets the RMI implementation based on the specified ID (which is the ID for the registered interface) + * + * @param objectId ID of the registered interface, which will map to the corresponding implementation. + * + * @return the implementation for the interface, or null + */ + Class getRmiImpl(int objectId); + + /** + * Enable remote method invocation (RMI) for this connection. There is additional overhead to using RMI. + *

+ * Specifically, It costs at least 2 bytes more to use remote method invocation than just sending the parameters. If the method has a + * return value which is not {@link dorkbox.network.rmi.RemoteObject#setAsync(boolean) ignored}, an extra byte is written. If the + * type of a parameter is not final (primitives are final) then an extra byte is written for that parameter. + */ + RmiSerializationManager registerRmiInterface(Class ifaceClass); + + + + /** + * Objects that we want to use RMI with, must be accessed via an interface. This method configures the serialization of an + * implementation to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation class is + * ALREADY registered, then it's registration will be overwritten by this one + * + * @param ifaceClass The interface used to access the remote object + * @param implClass The implementation class of the interface + */ + RmiSerializationManager registerRmiImplementation(Class ifaceClass, Class implClass); + + /** + * This method overrides the interface -> implementation. This is so incoming proxy objects will get auto-changed into their correct + * implementation type, so this side of the connection knows what to do with the proxy object. + *

+ * NOTE: You must have ALREADY registered the implementation class. This method just enables the proxy magic. + * This is for the "server" side, where "server" means the connection side where the implementation is used. + * + * @param ifaceClass The interface used to access the remote object + * @param implClass The implementation class of the interface + */ + // RmiSerializationManager rmiImplementation(); +} diff --git a/test/dorkbox/network/ChunkedDataIdleTest.java b/test/dorkbox/network/ChunkedDataIdleTest.java index b6af61a5..a24e03d7 100644 --- a/test/dorkbox/network/ChunkedDataIdleTest.java +++ b/test/dorkbox/network/ChunkedDataIdleTest.java @@ -29,10 +29,10 @@ import org.junit.Test; import dorkbox.network.PingPongTest.TYPE; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.connection.idle.IdleBridge; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -56,7 +56,7 @@ public class ChunkedDataIdleTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); sendObject(mainData, configuration, ConnectionType.TCP); @@ -67,7 +67,7 @@ public class ChunkedDataIdleTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); sendObject(mainData, configuration, ConnectionType.UDP); @@ -78,7 +78,7 @@ public class ChunkedDataIdleTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udtPort = udtPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); sendObject(mainData, configuration, ConnectionType.UDT); @@ -164,7 +164,7 @@ public class ChunkedDataIdleTest extends BaseTest { data.Booleans = new Boolean[] {true, false}; } - private void register (CryptoSerializationManager manager) { + private void register (SerializationManager manager) { manager.register(int[].class); manager.register(short[].class); manager.register(float[].class); diff --git a/test/dorkbox/network/ClientSendTest.java b/test/dorkbox/network/ClientSendTest.java index dbbc949d..95b4673f 100644 --- a/test/dorkbox/network/ClientSendTest.java +++ b/test/dorkbox/network/ClientSendTest.java @@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -44,7 +44,7 @@ class ClientSendTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); Server server = new Server(configuration); @@ -87,7 +87,7 @@ class ClientSendTest extends BaseTest { } private static - void register(CryptoSerializationManager manager) { + void register(SerializationManager manager) { manager.register(AMessage.class); } diff --git a/test/dorkbox/network/ConnectionTest.java b/test/dorkbox/network/ConnectionTest.java index 6d55ce8e..4d77800f 100644 --- a/test/dorkbox/network/ConnectionTest.java +++ b/test/dorkbox/network/ConnectionTest.java @@ -26,10 +26,10 @@ import java.util.TimerTask; import org.junit.Test; import dorkbox.network.connection.Connection; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -43,7 +43,7 @@ class ConnectionTest extends BaseTest { Configuration configuration = new Configuration(); configuration.localChannelName = EndPoint.LOCAL_CHANNEL; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); startServer(configuration); @@ -59,7 +59,7 @@ class ConnectionTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); startServer(configuration); @@ -78,7 +78,7 @@ class ConnectionTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); startServer(configuration); @@ -97,7 +97,7 @@ class ConnectionTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.udtPort = udtPort; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); startServer(configuration); @@ -117,7 +117,7 @@ class ConnectionTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; configuration.udtPort = udtPort; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); startServer(configuration); @@ -194,7 +194,7 @@ class ConnectionTest extends BaseTest { } private - void register(CryptoSerializationManager manager) { + void register(SerializationManager manager) { manager.register(BMessage.class); } diff --git a/test/dorkbox/network/IdleTest.java b/test/dorkbox/network/IdleTest.java index 8b38014d..ce00029d 100644 --- a/test/dorkbox/network/IdleTest.java +++ b/test/dorkbox/network/IdleTest.java @@ -30,7 +30,7 @@ import org.junit.Test; import dorkbox.network.PingPongTest.TYPE; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleListener; @@ -38,7 +38,7 @@ import dorkbox.network.connection.idle.IdleListenerTCP; import dorkbox.network.connection.idle.IdleListenerUDP; import dorkbox.network.connection.idle.IdleListenerUDT; import dorkbox.network.connection.idle.InputStreamSender; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -63,7 +63,7 @@ class IdleTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(false, false); + configuration.serialization = CryptoSerializationManager.DEFAULT(false, false); streamSpecificType(largeDataSize, configuration, ConnectionType.TCP); @@ -73,7 +73,7 @@ class IdleTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(false, false); + configuration.serialization = CryptoSerializationManager.DEFAULT(false, false); streamSpecificType(largeDataSize, configuration, ConnectionType.UDP); @@ -83,7 +83,7 @@ class IdleTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udtPort = udtPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(false, false); + configuration.serialization = CryptoSerializationManager.DEFAULT(false, false); streamSpecificType(largeDataSize, configuration, ConnectionType.UDT); } @@ -102,7 +102,7 @@ class IdleTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); sendObject(mainData, configuration, ConnectionType.TCP); @@ -113,7 +113,7 @@ class IdleTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); sendObject(mainData, configuration, ConnectionType.TCP); @@ -124,7 +124,7 @@ class IdleTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udtPort = udtPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); sendObject(mainData, configuration, ConnectionType.TCP); @@ -316,7 +316,7 @@ class IdleTest extends BaseTest { } private static - void register(CryptoSerializationManager manager) { + void register(SerializationManager manager) { manager.register(int[].class); manager.register(short[].class); manager.register(float[].class); diff --git a/test/dorkbox/network/LargeResizeBufferTest.java b/test/dorkbox/network/LargeResizeBufferTest.java index f0db4b31..ae15f2c7 100644 --- a/test/dorkbox/network/LargeResizeBufferTest.java +++ b/test/dorkbox/network/LargeResizeBufferTest.java @@ -28,9 +28,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -51,7 +51,7 @@ class LargeResizeBufferTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); Server server = new Server(configuration); @@ -141,7 +141,7 @@ class LargeResizeBufferTest extends BaseTest { } private - void register(CryptoSerializationManager manager) { + void register(SerializationManager manager) { manager.register(byte[].class); manager.register(LargeMessage.class); } diff --git a/test/dorkbox/network/ListenerTest.java b/test/dorkbox/network/ListenerTest.java index a400ca71..893b7dc6 100644 --- a/test/dorkbox/network/ListenerTest.java +++ b/test/dorkbox/network/ListenerTest.java @@ -19,25 +19,26 @@ */ package dorkbox.network; -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.Listener; -import dorkbox.network.connection.ListenerBridge; -import dorkbox.network.rmi.RmiBridge; -import dorkbox.util.exceptions.InitializationException; -import dorkbox.util.exceptions.SecurityException; -import org.junit.Test; -import org.slf4j.Logger; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.junit.Test; +import org.slf4j.Logger; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; +import dorkbox.network.rmi.RmiImplHandler; +import dorkbox.util.exceptions.InitializationException; +import dorkbox.util.exceptions.SecurityException; public class ListenerTest extends BaseTest { @@ -63,8 +64,8 @@ class ListenerTest extends BaseTest { // quick and dirty test to also test connection sub-classing class TestConnectionA extends ConnectionImpl { public - TestConnectionA(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { - super(logger, endPoint, rmiBridge); + TestConnectionA(final Logger logger, final EndPoint endPoint, final RmiImplHandler rmiImplHandler) { + super(logger, endPoint, rmiImplHandler); } public @@ -76,8 +77,8 @@ class ListenerTest extends BaseTest { class TestConnectionB extends TestConnectionA { public - TestConnectionB(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { - super(logger, endPoint, rmiBridge); + TestConnectionB(final Logger logger, final EndPoint endPoint, final RmiImplHandler rmiImplHandler) { + super(logger, endPoint, rmiImplHandler); } @Override @@ -108,7 +109,7 @@ class ListenerTest extends BaseTest { Server server = new Server(configuration) { @Override public - TestConnectionA newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { + TestConnectionA newConnection(final Logger logger, final EndPoint endPoint, final RmiImplHandler rmiBridge) { return new TestConnectionA(logger, endPoint, rmiBridge); } }; diff --git a/test/dorkbox/network/MultipleServerTest.java b/test/dorkbox/network/MultipleServerTest.java index 63fb0cbc..8f602f21 100644 --- a/test/dorkbox/network/MultipleServerTest.java +++ b/test/dorkbox/network/MultipleServerTest.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -43,7 +43,7 @@ class MultipleServerTest extends BaseTest { configuration1.tcpPort = tcpPort; configuration1.udpPort = udpPort; configuration1.localChannelName = "chan1"; - configuration1.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration1.serialization = CryptoSerializationManager.DEFAULT(); configuration1.serialization.register(String[].class); Server server1 = new Server(configuration1); @@ -68,7 +68,7 @@ class MultipleServerTest extends BaseTest { configuration2.tcpPort = tcpPort + 1; configuration2.udpPort = udpPort + 1; configuration2.localChannelName = "chan2"; - configuration2.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration2.serialization = CryptoSerializationManager.DEFAULT(); configuration2.serialization.register(String[].class); Server server2 = new Server(configuration2); diff --git a/test/dorkbox/network/MultipleThreadTest.java b/test/dorkbox/network/MultipleThreadTest.java index 70dc4c9e..ead4eeb2 100644 --- a/test/dorkbox/network/MultipleThreadTest.java +++ b/test/dorkbox/network/MultipleThreadTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.connection.ListenerBridge; import dorkbox.util.exceptions.InitializationException; @@ -61,7 +61,7 @@ class MultipleThreadTest extends BaseTest { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); configuration.serialization.register(String[].class); configuration.serialization.register(DataClass.class); diff --git a/test/dorkbox/network/PingPongLocalTest.java b/test/dorkbox/network/PingPongLocalTest.java index 7dc8d04b..c87fb6f4 100644 --- a/test/dorkbox/network/PingPongLocalTest.java +++ b/test/dorkbox/network/PingPongLocalTest.java @@ -28,10 +28,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.connection.ListenerBridge; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -48,7 +48,7 @@ class PingPongLocalTest extends BaseTest { populateData(dataLOCAL); Configuration configuration = Configuration.localOnly(); - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); @@ -169,7 +169,7 @@ class PingPongLocalTest extends BaseTest { data.Booleans = new Boolean[] {true,false}; } - private void register(CryptoSerializationManager manager) { + private void register(SerializationManager manager) { manager.register(int[].class); manager.register(short[].class); manager.register(float[].class); diff --git a/test/dorkbox/network/PingPongTest.java b/test/dorkbox/network/PingPongTest.java index b215cfd6..109268bb 100644 --- a/test/dorkbox/network/PingPongTest.java +++ b/test/dorkbox/network/PingPongTest.java @@ -29,11 +29,11 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import dorkbox.network.connection.Connection; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.connection.ListenerBridge; -import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -62,7 +62,7 @@ class PingPongTest extends BaseTest { configuration.udpPort = udpPort; configuration.udtPort = udtPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); @@ -290,7 +290,7 @@ class PingPongTest extends BaseTest { } private - void register(CryptoSerializationManager manager) { + void register(SerializationManager manager) { manager.register(int[].class); manager.register(short[].class); manager.register(float[].class); diff --git a/test/dorkbox/network/UnregisteredClassTest.java b/test/dorkbox/network/UnregisteredClassTest.java index 4030b27f..8bfc6ddb 100644 --- a/test/dorkbox/network/UnregisteredClassTest.java +++ b/test/dorkbox/network/UnregisteredClassTest.java @@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import dorkbox.network.connection.Connection; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.connection.ListenerBridge; import dorkbox.util.exceptions.InitializationException; @@ -54,7 +54,7 @@ class UnregisteredClassTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; configuration.host = host; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(false, false); + configuration.serialization = CryptoSerializationManager.DEFAULT(false, false); System.err.println("Running test " + this.tries + " times, please wait for it to finish."); diff --git a/test/dorkbox/network/rmi/MessageWithTestObject.java b/test/dorkbox/network/rmi/MessageWithTestObject.java new file mode 100644 index 00000000..f96a3743 --- /dev/null +++ b/test/dorkbox/network/rmi/MessageWithTestObject.java @@ -0,0 +1,11 @@ +package dorkbox.network.rmi; + +/** + * + */ +public +class MessageWithTestObject implements RmiMessages { + public int number; + public String text; + public TestObject testObject; +} diff --git a/test/dorkbox/network/rmi/RmiGlobalTest.java b/test/dorkbox/network/rmi/RmiGlobalTest.java index f3609487..749667a3 100644 --- a/test/dorkbox/network/rmi/RmiGlobalTest.java +++ b/test/dorkbox/network/rmi/RmiGlobalTest.java @@ -50,9 +50,8 @@ import dorkbox.network.Configuration; import dorkbox.network.Server; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; -import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -66,142 +65,127 @@ class RmiGlobalTest extends BaseTest { private final TestObject globalRemoteClientObject = new TestObjectImpl(); private static - void runTest(final Connection connection, final Object remoteObject, final int remoteObjectID) { - new Thread() { - @Override - public - void run() { - try { - TestObject test = connection.getProxyObject(remoteObjectID); + void runTest(final Connection connection, final TestObject rObject, final TestObject test, final int remoteObjectID) { + System.err.println("Starting test for: " + remoteObjectID); - System.err.println("Starting test for: " + remoteObjectID); + assertEquals(rObject.hashCode(), test.hashCode()); + RemoteObject remoteObject = (RemoteObject) test; + + // Default behavior. RMI is transparent, method calls behave like normal + // (return values and exceptions are returned, call is synchronous) + System.err.println("hashCode: " + test.hashCode()); + System.err.println("toString: " + test); + + // see what the "remote" toString() method is + final String s = remoteObject.toString(); + remoteObject.enableToString(true); + assertFalse(s.equals(remoteObject.toString())); - //TestObject test = connection.getRemoteObject(id, TestObject.class); - assertEquals(remoteObject.hashCode(), test.hashCode()); - RemoteObject remoteObject = (RemoteObject) test; - - // Default behavior. RMI is transparent, method calls behave like normal - // (return values and exceptions are returned, call is synchronous) - System.err.println("hashCode: " + test.hashCode()); - System.err.println("toString: " + test); - - // see what the "remote" toString() method is - final String s = remoteObject.toString(); - remoteObject.enableToString(true); - assertFalse(s.equals(remoteObject.toString())); + test.moo(); + test.moo("Cow"); + assertEquals(remoteObjectID, test.id()); - test.moo(); - test.moo("Cow"); - assertEquals(remoteObjectID, test.id()); + // UDP calls that ignore the return value + remoteObject.setUDP(); + remoteObject.setAsync(true); + remoteObject.setTransmitReturnValue(false); + remoteObject.setTransmitExceptions(false); + test.moo("Meow"); + assertEquals(0, test.id()); + remoteObject.setAsync(false); + remoteObject.setTransmitReturnValue(true); + remoteObject.setTransmitExceptions(true); + remoteObject.setTCP(); - // UDP calls that ignore the return value - remoteObject.setUDP(); - remoteObject.setAsync(true); - remoteObject.setTransmitReturnValue(false); - remoteObject.setTransmitExceptions(false); - test.moo("Meow"); - assertEquals(0, test.id()); - remoteObject.setAsync(false); - remoteObject.setTransmitReturnValue(true); - remoteObject.setTransmitExceptions(true); - remoteObject.setTCP(); + // Test that RMI correctly waits for the remotely invoked method to exit + remoteObject.setResponseTimeout(5000); + test.moo("You should see this two seconds before...", 2000); + System.out.println("...This"); + remoteObject.setResponseTimeout(3000); + + // Try exception handling + boolean caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + System.err.println("\tExpected."); + caught = true; + } + assertTrue(caught); - // Test that RMI correctly waits for the remotely invoked method to exit - remoteObject.setResponseTimeout(5000); - test.moo("You should see this two seconds before...", 2000); - System.out.println("...This"); - remoteObject.setResponseTimeout(3000); + // Return values are ignored, but exceptions are still dealt with properly + remoteObject.setTransmitReturnValue(false); + test.moo("Baa"); + test.id(); + caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + caught = true; + } + assertTrue(caught); - // Try exception handling - boolean caught = false; - try { - test.throwException(); - } catch (UnsupportedOperationException ex) { - System.err.println("\tExpected."); - caught = true; - } - assertTrue(caught); + // Non-blocking call that ignores the return value + remoteObject.setAsync(true); + remoteObject.setTransmitReturnValue(false); + test.moo("Meow"); + assertEquals(0, test.id()); + + // Non-blocking call that returns the return value + remoteObject.setTransmitReturnValue(true); + test.moo("Foo"); + + assertEquals(0, test.id()); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); + + assertEquals(0, test.id()); + byte responseID = remoteObject.getLastResponseID(); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); + + // Non-blocking call that errors out + remoteObject.setTransmitReturnValue(false); + test.throwException(); + assertEquals(remoteObject.waitForLastResponse() + .getClass(), UnsupportedOperationException.class); + + // Call will time out if non-blocking isn't working properly + remoteObject.setTransmitExceptions(false); + test.moo("Mooooooooo", 3000); - // Return values are ignored, but exceptions are still dealt with properly - remoteObject.setTransmitReturnValue(false); - test.moo("Baa"); - test.id(); - caught = false; - try { - test.throwException(); - } catch (UnsupportedOperationException ex) { - caught = true; - } - assertTrue(caught); - - // Non-blocking call that ignores the return value - remoteObject.setAsync(true); - remoteObject.setTransmitReturnValue(false); - test.moo("Meow"); - assertEquals(0, test.id()); - - // Non-blocking call that returns the return value - remoteObject.setTransmitReturnValue(true); - test.moo("Foo"); - - assertEquals(0, test.id()); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); - - assertEquals(0, test.id()); - byte responseID = remoteObject.getLastResponseID(); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); - - // Non-blocking call that errors out - remoteObject.setTransmitReturnValue(false); - test.throwException(); - assertEquals(remoteObject.waitForLastResponse() - .getClass(), UnsupportedOperationException.class); - - // Call will time out if non-blocking isn't working properly - remoteObject.setTransmitExceptions(false); - test.moo("Mooooooooo", 3000); + // should wait for a small time + remoteObject.setTransmitReturnValue(true); + remoteObject.setAsync(false); + remoteObject.setResponseTimeout(6000); + System.out.println("You should see this 2 seconds before"); + float slow = test.slow(); + System.out.println("...This"); + assertEquals(123.0F, slow, 0.0001D); - // should wait for a small time - remoteObject.setTransmitReturnValue(true); - remoteObject.setAsync(false); - remoteObject.setResponseTimeout(6000); - System.out.println("You should see this 2 seconds before"); - float slow = test.slow(); - System.out.println("...This"); - assertEquals(123.0F, slow, 0.0001D); - - - // Test sending a reference to a remote object. - MessageWithTestObject m = new MessageWithTestObject(); - m.number = 678; - m.text = "sometext"; - m.testObject = test; - connection.send() - .TCP(m) - .flush(); - } catch (IOException e) { - e.printStackTrace(); - fail(); - } - } - }.start(); + // Test sending a reference to a remote object. + MessageWithTestObject m = new MessageWithTestObject(); + m.number = 678; + m.text = "sometext"; + m.testObject = test; + connection.send() + .TCP(m) + .flush(); } public static - void register(CryptoSerializationManager manager) { + void register(dorkbox.network.util.CryptoSerializationManager manager) { manager.register(Object.class); // Needed for Object#toString, hashCode, etc. - manager.registerRemote(TestObject.class, TestObjectImpl.class); + // manager.rmi().register(TestObject.class).override(TestObject.class, TestObjectImpl.class); manager.register(MessageWithTestObject.class); manager.register(UnsupportedOperationException.class); @@ -215,8 +199,7 @@ class RmiGlobalTest extends BaseTest { configuration.udpPort = udpPort; configuration.host = host; - configuration.rmiEnabled = true; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); @@ -234,7 +217,18 @@ class RmiGlobalTest extends BaseTest { @Override public void connected(final Connection connection) { - RmiGlobalTest.runTest(connection, globalRemoteClientObject, CLIENT_GLOBAL_OBJECT_ID); + try { + connection.getRemoteObject(CLIENT_GLOBAL_OBJECT_ID, new RemoteObjectCallback() { + @Override + public + void created(final TestObject remoteObject) { + runTest(connection, globalRemoteClientObject, remoteObject, CLIENT_GLOBAL_OBJECT_ID); + } + }); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } } }); @@ -274,7 +268,18 @@ class RmiGlobalTest extends BaseTest { System.err.println("Server/Client Finished!"); // normally this is in the 'connected', but we do it here, so that it's more linear and easier to debug - runTest(connection, globalRemoteServerObject, SERVER_GLOBAL_OBJECT_ID); + try { + connection.getRemoteObject(SERVER_GLOBAL_OBJECT_ID, new RemoteObjectCallback() { + @Override + public + void created(final TestObject remoteObject) { + runTest(connection, globalRemoteServerObject, remoteObject, SERVER_GLOBAL_OBJECT_ID); + } + }); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } } }); @@ -282,7 +287,7 @@ class RmiGlobalTest extends BaseTest { waitForThreads(); } - public + private interface TestObject extends Serializable { void throwException(); @@ -297,8 +302,8 @@ class RmiGlobalTest extends BaseTest { float slow(); } - - public static class ConnectionAware { + private static + class ConnectionAware { private ConnectionImpl connection; @@ -313,7 +318,8 @@ class RmiGlobalTest extends BaseTest { } } - public static + + private static class TestObjectImpl extends ConnectionAware implements TestObject { public long value = System.currentTimeMillis(); public int moos; @@ -375,7 +381,7 @@ class RmiGlobalTest extends BaseTest { } - public static + private static class MessageWithTestObject implements RmiMessages { public int number; public String text; diff --git a/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java b/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java index 86399ec4..a9e77d14 100644 --- a/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java +++ b/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java @@ -28,7 +28,7 @@ import dorkbox.network.Client; import dorkbox.network.Configuration; import dorkbox.network.Server; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -71,11 +71,9 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.host = host; - configuration.rmiEnabled = true; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); - - configuration.serialization.registerRemote(TestObject.class, TestObjectImpl.class); - configuration.serialization.registerRemote(OtherObject.class, OtherObjectImpl.class); + configuration.serialization = CryptoSerializationManager.DEFAULT(); + // configuration.serialization.rmi().register(TestObjectImpl.class).override(TestObject.class, TestObjectImpl.class); + // configuration.serialization.rmi().register(OtherObject.class).override(OtherObject.class, OtherObjectImpl.class); @@ -114,24 +112,20 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { @Override public void connected(final Connection connection) { - new Thread(new Runnable() { - @Override - public - void run() { - - - TestObject test = null; - try { - test = connection.createProxyObject(TestObjectImpl.class); - - test.setOther(43.21f); + try { + // if this is called in the dispatch thread, it will block network comms while waiting for a response and it won't work... + connection.getRemoteObject(TestObjectImpl.class, new RemoteObjectCallback() { + @Override + public + void created(final TestObjectImpl remoteObject) { + remoteObject.setOther(43.21f); // Normal remote method call. - assertEquals(43.21f, test.other(), .0001f); + assertEquals(43.21f, remoteObject.other(), .0001f); // Make a remote method call that returns another remote proxy object. // the "test" object exists in the REMOTE side, as does the "OtherObject" that is created. // here we have a proxy to both of them. - OtherObject otherObject = test.getOtherObject(); + OtherObject otherObject = remoteObject.getOtherObject(); // Normal remote method call on the second object. otherObject.setValue(12.34f); @@ -145,12 +139,13 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { connection.send() .TCP(otherObject) .flush(); - } catch (IOException e) { - e.printStackTrace(); - fail(); } - } - }).start(); + }); + + } catch (IOException e) { + e.printStackTrace(); + fail(); + } } }); @@ -159,7 +154,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { waitForThreads(); } - public + private interface TestObject { void setOther(float aFloat); @@ -169,7 +164,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { } - public + private interface OtherObject { void setValue(float aFloat); float value(); @@ -179,7 +174,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { private static final AtomicInteger idCounter = new AtomicInteger(); - public static + private static class TestObjectImpl implements TestObject { @IgnoreSerialization private final int ID = idCounter.getAndIncrement(); @@ -230,7 +225,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { } - public static + private static class OtherObjectImpl implements OtherObject { @IgnoreSerialization private final int ID = idCounter.getAndIncrement(); diff --git a/test/dorkbox/network/rmi/RmiSendObjectTest.java b/test/dorkbox/network/rmi/RmiSendObjectTest.java index b95b4813..2ab3a05f 100644 --- a/test/dorkbox/network/rmi/RmiSendObjectTest.java +++ b/test/dorkbox/network/rmi/RmiSendObjectTest.java @@ -47,7 +47,7 @@ import dorkbox.network.Client; import dorkbox.network.Configuration; import dorkbox.network.Server; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -68,12 +68,11 @@ class RmiSendObjectTest extends BaseTest { configuration.tcpPort = tcpPort; configuration.host = host; - configuration.rmiEnabled = true; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); configuration.serialization.register(TestObject.class); - configuration.serialization.registerRemote(TestObject.class, TestObjectImpl.class); - configuration.serialization.registerRemote(OtherObject.class, OtherObjectImpl.class); + // configuration.serialization.rmi().register(TestObject.class).override(TestObject.class, TestObjectImpl.class); + // configuration.serialization.rmi().register(OtherObject.class).override(OtherObject.class, OtherObjectImpl.class); Server server = new Server(configuration); @@ -109,20 +108,17 @@ class RmiSendObjectTest extends BaseTest { @Override public void connected(final Connection connection) { - new Thread(new Runnable() { - @Override - public - void run() { - TestObject test = null; - try { - test = connection.createProxyObject(TestObjectImpl.class); - - test.setOther(43.21f); + try { + connection.getRemoteObject(TestObjectImpl.class, new RemoteObjectCallback() { + @Override + public + void created(final TestObjectImpl remoteObject) { + remoteObject.setOther(43.21f); // Normal remote method call. - assertEquals(43.21f, test.other(), 0.0001F); + assertEquals(43.21f, remoteObject.other(), 0.0001F); // Make a remote method call that returns another remote proxy object. - OtherObject otherObject = test.getOtherObject(); + OtherObject otherObject = remoteObject.getOtherObject(); // Normal remote method call on the second object. otherObject.setValue(12.34f); float value = otherObject.value(); @@ -133,12 +129,12 @@ class RmiSendObjectTest extends BaseTest { connection.send() .TCP(otherObject) .flush(); - } catch (IOException e) { - e.printStackTrace(); - fail(); } - } - }).start(); + }); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } } }); @@ -147,7 +143,7 @@ class RmiSendObjectTest extends BaseTest { waitForThreads(); } - public + private interface TestObject { void setOther(float aFloat); float other(); @@ -155,7 +151,7 @@ class RmiSendObjectTest extends BaseTest { } - public + private interface OtherObject { void setValue(float aFloat); float value(); @@ -165,7 +161,7 @@ class RmiSendObjectTest extends BaseTest { private static final AtomicInteger idCounter = new AtomicInteger(); - public static + private static class TestObjectImpl implements TestObject { @IgnoreSerialization private final int ID = idCounter.getAndIncrement(); @@ -201,7 +197,7 @@ class RmiSendObjectTest extends BaseTest { } - public static + private static class OtherObjectImpl implements OtherObject { @IgnoreSerialization private final int ID = idCounter.getAndIncrement(); diff --git a/test/dorkbox/network/rmi/RmiTest.java b/test/dorkbox/network/rmi/RmiTest.java index 9610629e..711f3c7a 100644 --- a/test/dorkbox/network/rmi/RmiTest.java +++ b/test/dorkbox/network/rmi/RmiTest.java @@ -40,171 +40,209 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; +import org.slf4j.LoggerFactory; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.encoder.PatternLayoutEncoder; +import ch.qos.logback.classic.joran.JoranConfigurator; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.ConsoleAppender; import dorkbox.network.BaseTest; import dorkbox.network.Client; import dorkbox.network.Configuration; import dorkbox.network.Server; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.KryoCryptoSerializationManager; +import dorkbox.network.connection.CryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.connection.ListenerBridge; -import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; +import io.netty.util.ResourceLeakDetector; public class RmiTest extends BaseTest { - private static - void runTest(final Connection connection, final int remoteObjectID) { - new Thread() { - @Override - public - void run() { - try { - TestObject test = connection.createProxyObject(TestObjectImpl.class); + public static + void runTests(final Connection connection, final TestObject test, final int remoteObjectID) { + RemoteObject remoteObject = (RemoteObject) test; - //TestObject test = connection.getRemoteObject(id, TestObject.class); - RemoteObject remoteObject = (RemoteObject) test; + // Default behavior. RMI is transparent, method calls behave like normal + // (return values and exceptions are returned, call is synchronous) + System.err.println("hashCode: " + test.hashCode()); + System.err.println("toString: " + test.toString()); - // Default behavior. RMI is transparent, method calls behave like normal - // (return values and exceptions are returned, call is synchronous) - System.err.println("hashCode: " + test.hashCode()); - System.err.println("toString: " + test); + // see what the "remote" toString() method is + final String s = remoteObject.toString(); + remoteObject.enableToString(true); + assertFalse(s.equals(remoteObject.toString())); - // see what the "remote" toString() method is - final String s = remoteObject.toString(); - remoteObject.enableToString(true); - assertFalse(s.equals(remoteObject.toString())); - - test.moo(); - test.moo("Cow"); - assertEquals(remoteObjectID, test.id()); + test.moo(); + test.moo("Cow"); + assertEquals(remoteObjectID, test.id()); - // UDP calls that ignore the return value - remoteObject.setUDP(); - remoteObject.setAsync(true); - remoteObject.setTransmitReturnValue(false); - remoteObject.setTransmitExceptions(false); - test.moo("Meow"); - assertEquals(0, test.id()); - remoteObject.setAsync(false); - remoteObject.setTransmitReturnValue(true); - remoteObject.setTransmitExceptions(true); - remoteObject.setTCP(); + // UDP calls that ignore the return value + remoteObject.setUDP(); + remoteObject.setAsync(true); + remoteObject.setTransmitReturnValue(false); + remoteObject.setTransmitExceptions(false); + test.moo("Meow"); + assertEquals(0, test.id()); + + remoteObject.setAsync(false); + remoteObject.setTransmitReturnValue(true); + remoteObject.setTransmitExceptions(true); + remoteObject.setTCP(); - // Test that RMI correctly waits for the remotely invoked method to exit - remoteObject.setResponseTimeout(5000); - test.moo("You should see this two seconds before...", 2000); - System.out.println("...This"); - remoteObject.setResponseTimeout(3000); + // Test that RMI correctly waits for the remotely invoked method to exit + remoteObject.setResponseTimeout(5000); + test.moo("You should see this two seconds before...", 2000); + System.out.println("...This"); + remoteObject.setResponseTimeout(3000); - // Try exception handling - boolean caught = false; - try { - test.throwException(); - } catch (UnsupportedOperationException ex) { - System.err.println("\tExpected."); - caught = true; - } - assertTrue(caught); + // Try exception handling + boolean caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + System.err.println("\tExpected."); + caught = true; + } + assertTrue(caught); - // Return values are ignored, but exceptions are still dealt with properly - remoteObject.setTransmitReturnValue(false); - test.moo("Baa"); - test.id(); - caught = false; - try { - test.throwException(); - } catch (UnsupportedOperationException ex) { - caught = true; - } - assertTrue(caught); + // Return values are ignored, but exceptions are still dealt with properly + remoteObject.setTransmitReturnValue(false); + test.moo("Baa"); + test.id(); + caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + caught = true; + } + assertTrue(caught); - // Non-blocking call that ignores the return value - remoteObject.setAsync(true); - remoteObject.setTransmitReturnValue(false); - test.moo("Meow"); - assertEquals(0, test.id()); + // Non-blocking call that ignores the return value + remoteObject.setAsync(true); + remoteObject.setTransmitReturnValue(false); + test.moo("Meow"); + assertEquals(0, test.id()); - // Non-blocking call that returns the return value - remoteObject.setTransmitReturnValue(true); - test.moo("Foo"); + // Non-blocking call that returns the return value + remoteObject.setTransmitReturnValue(true); + test.moo("Foo"); - assertEquals(0, test.id()); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); + assertEquals(0, test.id()); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); - assertEquals(0, test.id()); - byte responseID = remoteObject.getLastResponseID(); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); + assertEquals(0, test.id()); + byte responseID = remoteObject.getLastResponseID(); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); - // Non-blocking call that errors out - remoteObject.setTransmitReturnValue(false); - test.throwException(); - assertEquals(remoteObject.waitForLastResponse() - .getClass(), UnsupportedOperationException.class); + // Non-blocking call that errors out + remoteObject.setTransmitReturnValue(false); + test.throwException(); + assertEquals(remoteObject.waitForLastResponse() + .getClass(), UnsupportedOperationException.class); - // Call will time out if non-blocking isn't working properly - remoteObject.setTransmitExceptions(false); - test.moo("Mooooooooo", 3000); + // Call will time out if non-blocking isn't working properly + remoteObject.setTransmitExceptions(false); + test.moo("Mooooooooo", 3000); - // should wait for a small time - remoteObject.setTransmitReturnValue(true); - remoteObject.setAsync(false); - remoteObject.setResponseTimeout(6000); - System.out.println("You should see this 2 seconds before"); - float slow = test.slow(); - System.out.println("...This"); - assertEquals(slow, 123, 0.0001D); + // should wait for a small time + remoteObject.setTransmitReturnValue(true); + remoteObject.setAsync(false); + remoteObject.setResponseTimeout(6000); + System.out.println("You should see this 2 seconds before"); + float slow = test.slow(); + System.out.println("...This"); + assertEquals(slow, 123, 0.0001D); - // Test sending a reference to a remote object. - MessageWithTestObject m = new MessageWithTestObject(); - m.number = 678; - m.text = "sometext"; - m.testObject = test; - connection.send() - .TCP(m) - .flush(); + // Test sending a reference to a remote object. + MessageWithTestObject m = new MessageWithTestObject(); + m.number = 678; + m.text = "sometext"; + m.testObject = test; + connection.send() + .TCP(m) + .flush(); - } catch (IOException e) { - e.printStackTrace(); - fail(); - } - } - }.start(); } public static - void register(CryptoSerializationManager manager) { + void register(dorkbox.network.util.CryptoSerializationManager manager) { manager.register(Object.class); // Needed for Object#toString, hashCode, etc. - - manager.registerRemote(TestObject.class, TestObjectImpl.class); manager.register(MessageWithTestObject.class); - manager.register(UnsupportedOperationException.class); } @Test public void rmi() throws InitializationException, SecurityException, IOException, InterruptedException { + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + + // assume SLF4J is bound to logback in the current environment + Logger rootLogger = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); + LoggerContext context = rootLogger.getLoggerContext(); + + JoranConfigurator jc = new JoranConfigurator(); + jc.setContext(context); + context.reset(); // override default configuration + +// rootLogger.setLevel(Level.OFF); + + // rootLogger.setLevel(Level.DEBUG); + rootLogger.setLevel(Level.TRACE); +// rootLogger.setLevel(Level.ALL); + + + // we only want error messages + Logger nettyLogger = (Logger) LoggerFactory.getLogger("io.netty"); + nettyLogger.setLevel(Level.ERROR); + + // we only want error messages + Logger kryoLogger = (Logger) LoggerFactory.getLogger("com.esotericsoftware"); + // kryoLogger.setLevel(Level.TRACE); + kryoLogger.setLevel(Level.ERROR); + + // we only want error messages + Logger barchartLogger = (Logger) LoggerFactory.getLogger("com.barchart"); + barchartLogger.setLevel(Level.ERROR); + + PatternLayoutEncoder encoder = new PatternLayoutEncoder(); + encoder.setContext(context); + encoder.setPattern("%date{HH:mm:ss.SSS} %-5level [%logger{35}] %msg%n"); + encoder.start(); + + ConsoleAppender consoleAppender = new ch.qos.logback.core.ConsoleAppender(); + + consoleAppender.setContext(context); + consoleAppender.setEncoder(encoder); + consoleAppender.start(); + + rootLogger.addAppender(consoleAppender); + + + + + Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.udpPort = udpPort; configuration.host = host; - configuration.rmiEnabled = true; - configuration.serialization = KryoCryptoSerializationManager.DEFAULT(); + configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); + configuration.serialization.registerRmiImplementation(TestObject.class, TestObjectImpl.class); final Server server = new Server(configuration); @@ -214,16 +252,7 @@ class RmiTest extends BaseTest { server.bind(false); final ListenerBridge listeners = server.listeners(); - listeners.add(new Listener.OnConnected() { - @Override - public - void connected(final Connection connection) { - System.err.println("Starting test for: Server -> Client"); - RmiTest.runTest(connection, 1); - } - }); listeners.add(new Listener.OnMessageReceived() { - @Override public void received(Connection connection, MessageWithTestObject m) { @@ -234,17 +263,55 @@ class RmiTest extends BaseTest { stopEndPoints(2000); } - }); // ---- + configuration = new Configuration(); + configuration.tcpPort = tcpPort; + configuration.udpPort = udpPort; + configuration.host = host; + + configuration.serialization = CryptoSerializationManager.DEFAULT(); + register(configuration.serialization); + configuration.serialization.registerRmiInterface(TestObject.class); + final Client client = new Client(configuration); client.setIdleTimeout(0); addEndPoint(client); + + client.listeners().add(new Listener.OnConnected() { + @Override + public + void connected(final Connection connection) { + System.err.println("Starting test for: Client -> Server"); + + try { + // if this is called in the dispatch thread, it will block network comms while waiting for a response and it won't work... + connection.getRemoteObject(TestObject.class, new RemoteObjectCallback() { + @Override + public + void created(final TestObject remoteObject) { + new Thread() { + @Override + public + void run() { + // MUST run on a separate thread because remote object method invocations are blocking + runTests(connection, remoteObject, 1); + } + }.start(); + } + }); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + }); + client.listeners() .add(new Listener.OnMessageReceived() { @Override @@ -255,9 +322,17 @@ class RmiTest extends BaseTest { assertEquals(1, id); System.err.println("Server -> Client Finished!"); - System.err.println("Starting test for: Client -> Server"); - // normally this is in the 'connected', but we do it here, so that it's more linear and easier to debug - runTest(connection, 2); + // System.err.println("Starting test for: Client -> Server"); + // System.err.println("Starting test for: Server -> Client"); + // + // // normally this is in the 'connected', but we do it here, so that it's more linear and easier to debug + // try { + // // if this is called in the dispatch thread, it will block network comms while waiting for a response and it won't work... + // connection.getRemoteObject(TestObjectImpl.class, remoteObject->runTest(connection, remoteObject, 2)); + // } catch (IOException e) { + // e.printStackTrace(); + // fail(); + // } } }); @@ -265,120 +340,4 @@ class RmiTest extends BaseTest { waitForThreads(); } - - public - interface TestObject { - void throwException(); - - void moo(); - - void moo(String value); - - void moo(String value, long delay); - - int id(); - - float slow(); - } - - - public static - class TestObjectImpl implements TestObject { - // has to start at 1, because UDP/UDT method invocations ignore return values - static final AtomicInteger ID_COUNTER = new AtomicInteger(1); - - public long value = System.currentTimeMillis(); - public int moos; - private final int id = ID_COUNTER.getAndIncrement(); - - public - TestObjectImpl() { - } - - @Override - public - void throwException() { - throw new UnsupportedOperationException("Why would I do that?"); - } - - @Override - public - void moo() { - this.moos++; - System.out.println("Moo!"); - } - - @Override - public - void moo(String value) { - this.moos += 2; - System.out.println("Moo: " + value); - } - - @Override - public - void moo(String value, long delay) { - this.moos += 4; - System.out.println("Moo: " + value + " (" + delay + ")"); - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Override - public - int id() { - return id; - } - - @Override - public - float slow() { - System.out.println("Slowdown!!"); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return 123.0F; - } - - @Override - public - boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final TestObjectImpl that = (TestObjectImpl) o; - - return id == that.id; - - } - - @Override - public - int hashCode() { - return id; - } - - @Override - public - String toString() { - return "Tada! This is a remote object!"; - } - } - - - public static - class MessageWithTestObject implements RmiMessages { - public int number; - public String text; - public TestObject testObject; - } } diff --git a/test/dorkbox/network/rmi/TestObject.java b/test/dorkbox/network/rmi/TestObject.java new file mode 100644 index 00000000..47be3cab --- /dev/null +++ b/test/dorkbox/network/rmi/TestObject.java @@ -0,0 +1,19 @@ +package dorkbox.network.rmi; + +/** + * + */ +public +interface TestObject { + void throwException(); + + void moo(); + + void moo(String value); + + void moo(String value, long delay); + + int id(); + + float slow(); +} diff --git a/test/dorkbox/network/rmi/TestObjectImpl.java b/test/dorkbox/network/rmi/TestObjectImpl.java new file mode 100644 index 00000000..ff0878e3 --- /dev/null +++ b/test/dorkbox/network/rmi/TestObjectImpl.java @@ -0,0 +1,98 @@ +package dorkbox.network.rmi; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + */ +public +class TestObjectImpl implements TestObject { + // has to start at 1, because UDP/UDT method invocations ignore return values + static final AtomicInteger ID_COUNTER = new AtomicInteger(1); + + public long value = System.currentTimeMillis(); + public int moos; + private final int id = ID_COUNTER.getAndIncrement(); + + public + TestObjectImpl() { + } + + @Override + public + void throwException() { + throw new UnsupportedOperationException("Why would I do that?"); + } + + @Override + public + void moo() { + this.moos++; + System.out.println("Moo!"); + } + + @Override + public + void moo(String value) { + this.moos += 2; + System.out.println("Moo: " + value); + } + + @Override + public + void moo(String value, long delay) { + this.moos += 4; + System.out.println("Moo: " + value + " (" + delay + ")"); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public + int id() { + return id; + } + + @Override + public + float slow() { + System.out.println("Slowdown!!"); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return 123.0F; + } + + @Override + public + boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final TestObjectImpl that = (TestObjectImpl) o; + + return id == that.id; + + } + + @Override + public + int hashCode() { + return id; + } + + @Override + public + String toString() { + return "Tada! This is a remote object!"; + } +} diff --git a/test/dorkbox/network/rmi/multiJVM/TestClient.java b/test/dorkbox/network/rmi/multiJVM/TestClient.java new file mode 100644 index 00000000..4e97c3c2 --- /dev/null +++ b/test/dorkbox/network/rmi/multiJVM/TestClient.java @@ -0,0 +1,142 @@ +package dorkbox.network.rmi.multiJVM; + +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.encoder.PatternLayoutEncoder; +import ch.qos.logback.classic.joran.JoranConfigurator; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.ConsoleAppender; +import dorkbox.network.Client; +import dorkbox.network.Configuration; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.CryptoSerializationManager; +import dorkbox.network.rmi.RemoteObjectCallback; +import dorkbox.network.rmi.RmiTest; +import dorkbox.network.rmi.TestObject; +import io.netty.util.ResourceLeakDetector; + +/** + * + */ +public +class TestClient +{ + public static void setup() { + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + + // assume SLF4J is bound to logback in the current environment + Logger rootLogger = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); + LoggerContext context = rootLogger.getLoggerContext(); + + JoranConfigurator jc = new JoranConfigurator(); + jc.setContext(context); + context.reset(); // override default configuration + +// rootLogger.setLevel(Level.OFF); + + // rootLogger.setLevel(Level.DEBUG); + rootLogger.setLevel(Level.TRACE); +// rootLogger.setLevel(Level.ALL); + + + // we only want error messages + Logger nettyLogger = (Logger) LoggerFactory.getLogger("io.netty"); + nettyLogger.setLevel(Level.ERROR); + + // we only want error messages + Logger kryoLogger = (Logger) LoggerFactory.getLogger("com.esotericsoftware"); + kryoLogger.setLevel(Level.ERROR); + + // we only want error messages + Logger barchartLogger = (Logger) LoggerFactory.getLogger("com.barchart"); + barchartLogger.setLevel(Level.ERROR); + + PatternLayoutEncoder encoder = new PatternLayoutEncoder(); + encoder.setContext(context); + encoder.setPattern("%date{HH:mm:ss.SSS} %-5level [%logger{35}] %msg%n"); + encoder.start(); + + ConsoleAppender consoleAppender = new ch.qos.logback.core.ConsoleAppender(); + + consoleAppender.setContext(context); + consoleAppender.setEncoder(encoder); + consoleAppender.start(); + + rootLogger.addAppender(consoleAppender); + } + + + public static + void main(String[] args) { + setup(); + + Configuration configuration = new Configuration(); + configuration.tcpPort = 2000; + configuration.host = "localhost"; + + configuration.serialization = CryptoSerializationManager.DEFAULT(); + RmiTest.register(configuration.serialization); + configuration.serialization.registerRmiInterface(TestObject.class); + + + try { + Client client = new Client(configuration); + client.setIdleTimeout(0); + + client.listeners() + .add(new dorkbox.network.connection.Listener.OnConnected() { + @Override + public + void connected(final Connection connection) { + System.err.println("Starting test for: Client -> Server"); + + try { + // if this is called in the dispatch thread, it will block network comms while waiting for a response and it won't work... + connection.getRemoteObject(TestObject.class, new RemoteObjectCallback() { + @Override + public + void created(final TestObject remoteObject) { + new Thread() { + @Override + public + void run() { + // MUST run on a separate thread because remote object method invocations are blocking + RmiTest.runTests(connection, remoteObject, 1); + } + }.start(); + } + }); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + + // try { + // connection.getRemoteObject(TestObject.class, new RemoteObjectCallback() { + // @Override + // public + // void created(final TestObject remoteObject) { + // remoteObject.test(); + // } + // }); + // } catch (IOException e) { + // e.printStackTrace(); + // } + } + }); + + client.connect(0); + + Thread.sleep(999999999); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/test/dorkbox/network/rmi/multiJVM/TestServer.java b/test/dorkbox/network/rmi/multiJVM/TestServer.java new file mode 100644 index 00000000..4e56d383 --- /dev/null +++ b/test/dorkbox/network/rmi/multiJVM/TestServer.java @@ -0,0 +1,81 @@ +package dorkbox.network.rmi.multiJVM; + +import java.io.IOException; + +import dorkbox.network.Server; +import dorkbox.network.connection.CryptoSerializationManager; +import dorkbox.network.rmi.RmiTest; +import dorkbox.network.rmi.TestObject; +import dorkbox.network.rmi.TestObjectImpl; +import dorkbox.util.exceptions.InitializationException; +import dorkbox.util.exceptions.SecurityException; + +/** + * + */ +public +class TestServer +{ + public static + void main(String[] args) { + TestClient.setup(); + + dorkbox.network.Configuration configuration = new dorkbox.network.Configuration(); + configuration.tcpPort = 2000; + + configuration.serialization = CryptoSerializationManager.DEFAULT(); + RmiTest.register(configuration.serialization); + configuration.serialization.registerRmiImplementation(TestObject.class, TestObjectImpl.class); + + Server server = null; + try { + server = new Server(configuration); + } catch (InitializationException e) { + e.printStackTrace(); + } catch (SecurityException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + server.setIdleTimeout(0); + server.bind(true); + + + // configuration.host = "localhost"; + // configuration.serialization.register(TestObjImpl.class); + // + // Client client = null; + // try { + // client = new Client(configuration); + // } catch (InitializationException e) { + // e.printStackTrace(); + // } catch (SecurityException e) { + // e.printStackTrace(); + // } catch (IOException e) { + // e.printStackTrace(); + // } + // client.setIdleTimeout(0); + // + // client.listeners() + // .add(new dorkbox.network.connection.Listener.OnConnected() { + // @Override + // public + // void connected(final Connection connection) { + // System.err.println("CONNECTED!"); + // + // try { + // TestObject object = connection.createProxyObject(TestObject.class); + // object.test(); + // } catch (IOException e) { + // e.printStackTrace(); + // } + // } + // }); + // + // try { + // client.connect(5000); + // } catch (IOException e) { + // e.printStackTrace(); + // } + }}