Cleaned up RMI, made TCP/UDP/UDT not change behavior (only transport). Adjusted examples to reflect this. Tweaked API names

This commit is contained in:
nathan 2016-03-01 04:12:15 +01:00
parent e56e654ee8
commit 9aef066506
10 changed files with 147 additions and 126 deletions

View File

@ -406,7 +406,7 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false (the default), then methods that return a value must not be
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>
@ -431,7 +431,7 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false (the default), then methods that return a value must not be
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>

View File

@ -42,10 +42,10 @@ class Configuration {
public SettingsStore settingsStore = null;
/**
* Enable remote method invocation (RMI) for this connection. This is additional overhead to using RMI.
* Enable remote method invocation (RMI) for this connection. There is additional overhead to using RMI.
* <p/>
* Specifically, It costs at least 2 bytes more to use remote method invocation than just sending the parameters. If the method has a
* return value which is not {@link dorkbox.network.rmi.RemoteObject#setNonBlocking(boolean) ignored}, an extra byte is written. If the
* return value which is not {@link dorkbox.network.rmi.RemoteObject#setAsync(boolean) ignored}, an extra byte is written. If the
* type of a parameter is not final (primitives are final) then an extra byte is written for that parameter.
*/
public boolean rmiEnabled = false;

View File

@ -108,7 +108,7 @@ interface Connection {
* response is not received with the
* {@link RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false
* (the default), then methods that return a value must not be called from
* the update thread for the connection. An exception will be thrown if this
* occurs. Methods with a void return value can be called on the update
@ -134,7 +134,7 @@ interface Connection {
* response is not received with the
* {@link RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false
* (the default), then methods that return a value must not be called from
* the update thread for the connection. An exception will be thrown if this
* occurs. Methods with a void return value can be called on the update

View File

@ -1088,6 +1088,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
/**
* Used by the LOCAL side, to get the proxy object as an interface
*
* @param type must be the interface the proxy will bind to
*/
public
@ -1098,7 +1099,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
if (remoteObject == null) {
// duplicates are fine, as they represent the same object (as specified by the ID) on the remote side.
remoteObject = RmiBridge.createProxyObject(this, objectID, type);
remoteObject = rmiBridge.createProxyObject(this, objectID, type);
proxyIdCache.put(objectID, remoteObject);
}

View File

@ -21,7 +21,7 @@ class InvocationHandlerSerializer extends Serializer<Object> {
@Override
public
void write(Kryo kryo, Output output, Object object) {
RemoteInvocationHandler handler = (RemoteInvocationHandler) Proxy.getInvocationHandler(object);
RemoteObjectInvocationHandler handler = (RemoteObjectInvocationHandler) Proxy.getInvocationHandler(object);
output.writeInt(handler.objectID, true);
}

View File

@ -34,8 +34,6 @@
*/
package dorkbox.network.rmi;
import dorkbox.network.connection.Connection;
/**
* Provides access to various settings on a remote object.
*
@ -44,30 +42,32 @@ import dorkbox.network.connection.Connection;
public
interface RemoteObject {
/**
* Sets the milliseconds to wait for a method to return value. Default is 3000, 0 disables (ie: waits forever)
* Sets the milliseconds to wait for a method to return a value. Default is 3000, 0 disables (waits forever)
*
* @param timeoutMillis how long to wait for a method to return a value.
*/
void setResponseTimeout(int timeoutMillis);
/**
* Sets the blocking behavior when invoking a remote method. Default is false.
* Sets the blocking behavior when invoking a remote method. Default is false (blocking)
*
* @param nonBlocking
* @param enable
* If false, the invoking thread will wait for the remote method to return or timeout (default). If true, the invoking
* thread will not wait for a response. The method will return immediately and the return value should be ignored. If
* they are being transmitted, the return value or any thrown exception can later be retrieved with {@link
* return values are being transmitted, the return value or any thrown exception can later be retrieved with {@link
* #waitForLastResponse()} or {@link #waitForResponse(byte)}. The responses will be stored until retrieved, so each
* method call should have a matching retrieve.
*/
void setNonBlocking(boolean nonBlocking);
void setAsync(boolean enable);
/**
* Sets whether return values are sent back when invoking a remote method. Default is true.
*
* @param transmit
* If true, then the return value for non-blocking method invocations can be retrieved with {@link
* If true, then the return value for async method invocations can be retrieved with {@link
* #waitForLastResponse()} or {@link #waitForResponse(byte)}. If false, then non-primitive return values for remote
* method invocations are not sent by the remote side of the connection and the response can never be retrieved. This
* can also be used to save bandwidth if you will not check the return value of a blocking remote invocation. Note that
* can also be used to save bandwidth if you will not check the return value of a blocking remote invocations. Note that
* an exception could still be returned by {@link #waitForLastResponse()} or {@link #waitForResponse(byte)} if {@link
* #setTransmitExceptions(boolean)} is true.
*/
@ -78,7 +78,7 @@ interface RemoteObject {
*
* @param transmit
* If false, exceptions will be unhandled and rethrown as RuntimeExceptions inside the invoking thread. This is the
* legacy behavior. If true, behavior is dependent on whether {@link #setNonBlocking(boolean)}. If non-blocking is true,
* legacy behavior. If true, behavior is dependent on whether {@link #setAsync(boolean)}. If non-blocking is true,
* the exception will be serialized and sent back to the call site of the remotely invoked method, where it will be
* re-thrown. If non-blocking is false, an exception will not be thrown in the calling thread but instead can be
* retrieved with {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}, similar to a return value.
@ -86,32 +86,50 @@ interface RemoteObject {
void setTransmitExceptions(boolean transmit);
/**
* If true, UDP will be used to send the remote method invocation. UDP remote method invocations will never return a response and the
* invoking thread will not wait for a response.
* Specifies that remote method invocation will happen over TCP. This is the default.
* <p>
* TCP remote method invocations <b>will</b> return a response and the invoking thread <b>will</b> wait for a response. See {@link
* #setAsync(boolean)} if you do not want to wait for a response, which can be retrieved later with {@link #waitForLastResponse()} or
* {@link #waitForResponse(byte)}.
*/
void setUDP(boolean udp);
void setTCP();
/**
* If true, UDT will be used to send the remote method invocation. UDT remote method invocations <b>will</b> return a response and the
* invoking thread <b>will</b> wait for a response.
* Specifies that remote method invocation will happen over UDP. Default is {@link #setTCP()}
* <p>
* UDP remote method invocations <b>will</b> return a response and the invoking thread <b>will</b> wait for a response. See {@link
* #setAsync(boolean)} if you do not want to wait for a response, which can be retrieved later with {@link #waitForLastResponse()} or
* {@link #waitForResponse(byte)}.
*/
void setUDT(boolean udt);
void setUDP();
/**
* If false, calls to {@link Object#toString()} will return "<proxy #id>" (where `id` is the remote object ID) instead of invoking
* the remote `toString()` method on the object.
* Specifies that remote method invocation will happen over UDT. Default is {@link #setTCP()}
* <p>
* UDT remote method invocations <b>will</b> return a response and the invoking thread <b>will</b> wait for a response. See {@link
* #setAsync(boolean)} if you do not want to wait for a response, which can be retrieved later with {@link #waitForLastResponse()} or
* {@link #waitForResponse(byte)}.
*/
void setUDT();
/**
* Permits calls to {@link Object#toString()} to actually return the `toString()` method on the object.
*
* @param enableDetailedToString
* If false, calls to {@link Object#toString()} will return "<proxy #id>" (where `id` is the remote object ID) instead
* of invoking the remote `toString()` method on the object.
*/
void enableToString(boolean enableDetailedToString);
/**
* Waits for the response to the last method invocation to be received or the response timeout to be reached.
*
* @see RmiBridge#createProxyObject(Connection, int, Class)
* @return the response of the last method invocation
*/
Object waitForLastResponse();
/**
* Gets the ID of response for the last method invocation.
* @return the ID of response for the last method invocation.
*/
byte getLastResponseID();
@ -122,7 +140,9 @@ interface RemoteObject {
* this method should be called to get the result for a non-blocking call before an additional 63 non-blocking calls are made, or risk
* undefined behavior due to identical IDs.
*
* @see RmiBridge#createProxyObject(Connection, int, Class)
* @param responseID this is the response ID obtained via {@link #getLastResponseID()}
*
* @return the response of the last method invocation
*/
Object waitForResponse(byte responseID);
@ -130,9 +150,4 @@ interface RemoteObject {
* Causes this RemoteObject to stop listening to the connection for method invocation response messages.
*/
void close();
/**
* Returns the local connection for this remote object.
*/
Connection getConnection();
}

View File

@ -54,8 +54,8 @@ import java.util.concurrent.locks.ReentrantLock;
* Handles network communication when methods are invoked on a proxy.
*/
public
class RemoteInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteInvocationHandler.class);
class RemoteObjectInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteObjectInvocationHandler.class);
final ReentrantLock lock = new ReentrantLock();
final Condition responseCondition = this.lock.newCondition();
@ -69,7 +69,7 @@ class RemoteInvocationHandler implements InvocationHandler {
private final ListenerRaw<Connection, InvokeMethodResult> responseListener;
private int timeoutMillis = 3000;
private boolean nonBlocking = false;
private boolean isAsync = false;
private boolean transmitReturnValue = true;
private boolean transmitExceptions = true;
@ -83,7 +83,7 @@ class RemoteInvocationHandler implements InvocationHandler {
private byte nextResponseId = (byte) 1;
public
RemoteInvocationHandler(final Connection connection, final int objectID) {
RemoteObjectInvocationHandler(final Connection connection, final int objectID) {
super();
this.connection = connection;
this.objectID = objectID;
@ -109,16 +109,16 @@ class RemoteInvocationHandler implements InvocationHandler {
// logger.trace("{} received data: {} with id ({})", connection, invokeMethodResult.result, invokeMethodResult.responseID);
synchronized (this) {
if (RemoteInvocationHandler.this.pendingResponses[responseID]) {
RemoteInvocationHandler.this.responseTable[responseID] = invokeMethodResult;
if (RemoteObjectInvocationHandler.this.pendingResponses[responseID]) {
RemoteObjectInvocationHandler.this.responseTable[responseID] = invokeMethodResult;
}
}
RemoteInvocationHandler.this.lock.lock();
RemoteObjectInvocationHandler.this.lock.lock();
try {
RemoteInvocationHandler.this.responseCondition.signalAll();
RemoteObjectInvocationHandler.this.responseCondition.signalAll();
} finally {
RemoteInvocationHandler.this.lock.unlock();
RemoteObjectInvocationHandler.this.lock.unlock();
}
}
};
@ -127,12 +127,14 @@ class RemoteInvocationHandler implements InvocationHandler {
.add(this.responseListener);
}
@SuppressWarnings({"AutoUnboxing", "AutoBoxing", "NumericCastThatLosesPrecision"})
@SuppressWarnings({"AutoUnboxing", "AutoBoxing", "NumericCastThatLosesPrecision", "IfCanBeSwitch"})
@Override
public
Object invoke(final Object proxy, final Method method, final Object[] args) throws Exception {
final Class<?> declaringClass = method.getDeclaringClass();
if (declaringClass == RemoteObject.class) {
// manage all of the RemoteObject proxy methods
String name = method.getName();
if (name.equals("close")) {
close();
@ -142,8 +144,8 @@ class RemoteInvocationHandler implements InvocationHandler {
this.timeoutMillis = (Integer) args[0];
return null;
}
else if (name.equals("setNonBlocking")) {
this.nonBlocking = (Boolean) args[0];
else if (name.equals("setAsync")) {
this.isAsync = (Boolean) args[0];
return null;
}
else if (name.equals("setTransmitReturnValue")) {
@ -154,12 +156,19 @@ class RemoteInvocationHandler implements InvocationHandler {
this.transmitExceptions = (Boolean) args[0];
return null;
}
else if (name.equals("setTCP")) {
this.udp = false;
this.udt = false;
return null;
}
else if (name.equals("setUDP")) {
this.udp = (Boolean) args[0];
this.udp = true;
this.udt = false;
return null;
}
else if (name.equals("setUDT")) {
this.udt = (Boolean) args[0];
this.udp = false;
this.udt = true;
return null;
}
else if (name.equals("enableToString")) {
@ -179,23 +188,21 @@ class RemoteInvocationHandler implements InvocationHandler {
return this.lastResponseID;
}
else if (name.equals("waitForResponse")) {
if (!this.transmitReturnValue && !this.transmitExceptions && this.nonBlocking) {
if (!this.transmitReturnValue && !this.transmitExceptions && this.isAsync) {
throw new IllegalStateException("This RemoteObject is currently set to ignore all responses.");
}
return waitForResponse((Byte) args[0]);
}
else if (name.equals("getConnection")) {
return this.connection;
}
// Should never happen, for debugging purposes only!
throw new Exception("Invocation handler could not find RemoteObject method.");
throw new Exception("Invocation handler could not find RemoteObject method for " + name);
}
else if (!this.enableToString && declaringClass == Object.class && method.getName()
.equals("toString")) {
return proxyString;
}
final Logger logger1 = RemoteInvocationHandler.logger;
final Logger logger1 = RemoteObjectInvocationHandler.logger;
EndPoint endPoint = this.connection.getEndPoint();
final CryptoSerializationManager serializationManager = endPoint.getSerialization();
@ -242,10 +249,13 @@ class RemoteInvocationHandler implements InvocationHandler {
}
// An invocation doesn't need a response is if it's async and no return values or exceptions are wanted back.
boolean needsResponse = !this.udp && (this.transmitReturnValue || this.transmitExceptions || !this.nonBlocking);
byte responseID = (byte) 0;
if (needsResponse) {
// An invocation doesn't need a response is if it's async and no return values or exceptions are wanted back.
boolean ignoreResponse = this.isAsync && !(this.transmitReturnValue || this.transmitExceptions);
if (ignoreResponse) {
invokeMethod.responseData = (byte) 0; // 0 means do not respond.
}
else {
synchronized (this) {
// Increment the response counter and put it into the low bits of the responseID.
responseID = this.nextResponseId++;
@ -264,10 +274,8 @@ class RemoteInvocationHandler implements InvocationHandler {
}
invokeMethod.responseData = responseData;
}
else {
invokeMethod.responseData = (byte) 0; // A response data of 0 means to not respond.
}
// Sends our invokeMethod to the remote connection, which the RmiBridge listens for
if (this.udp) {
this.connection.send()
.UDP(invokeMethod)
@ -298,7 +306,7 @@ class RemoteInvocationHandler implements InvocationHandler {
this.lastResponseID = (byte) (invokeMethod.responseData & RmiBridge.responseIdMask);
if (this.nonBlocking || this.udp || this.udt) {
if (this.isAsync) {
Class<?> returnType = method.getReturnType();
if (returnType.isPrimitive()) {
if (returnType == int.class) {
@ -417,8 +425,6 @@ class RemoteInvocationHandler implements InvocationHandler {
}
}
// only get here if we timeout
throw new TimeoutException("Response timed out.");
}
@ -434,8 +440,6 @@ class RemoteInvocationHandler implements InvocationHandler {
int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (this.connection == null ? 0 : this.connection.hashCode());
result = prime * result + (this.lastResponseID == null ? 0 : this.lastResponseID.hashCode());
result = prime * result + this.objectID;
return result;
}
@ -452,23 +456,7 @@ class RemoteInvocationHandler implements InvocationHandler {
if (getClass() != obj.getClass()) {
return false;
}
RemoteInvocationHandler other = (RemoteInvocationHandler) obj;
if (this.connection == null) {
if (other.connection != null) {
return false;
}
}
else if (!this.connection.equals(other.connection)) {
return false;
}
if (this.lastResponseID == null) {
if (other.lastResponseID != null) {
return false;
}
}
else if (!this.lastResponseID.equals(other.lastResponseID)) {
return false;
}
RemoteObjectInvocationHandler other = (RemoteObjectInvocationHandler) obj;
return this.objectID == other.objectID;
}
}

View File

@ -60,7 +60,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
* Connection#createProxyObject(Class)} for the registered objects.
* <p/>
* It costs at least 2 bytes more to use remote method invocation than just sending the parameters. If the method has a return value which
* is not {@link RemoteObject#setNonBlocking(boolean) ignored}, an extra byte is written. If the type of a parameter is not final (note that
* is not {@link RemoteObject#setAsync(boolean) ignored}, an extra byte is written. If the type of a parameter is not final (note that
* primitives are final) then an extra byte is written for that parameter.
* <p/>
* <p/>
@ -83,7 +83,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
*
* @author Nathan Sweet <misc@n4te.com>, Nathan Robinson
*/
public
public final
class RmiBridge {
public static final int INVALID_RMI = 0;
static final int returnValueMask = 1 << 7;
@ -102,40 +102,6 @@ class RmiBridge {
return (objectId & 1) != 0;
}
/**
* Returns a proxy object that implements the specified interfaces. Methods invoked on the proxy object will be invoked remotely on the
* object with the specified ID in the ObjectSpace for the specified connection. If the remote end of the connection has not {@link
* RmiBridge#register(int, Object)} added the connection to the ObjectSpace, the remote method invocations will be ignored.
* <p/>
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will
* have the proxy object replaced with the registered object.
*
* @see RemoteObject
*/
public static
RemoteObject createProxyObject(Connection connection, int objectID, Class<?> iface) {
if (connection == null) {
throw new IllegalArgumentException("connection cannot be null.");
}
if (iface == null) {
throw new IllegalArgumentException("iface cannot be null.");
}
Class<?>[] temp = new Class<?>[2];
temp[0] = RemoteObject.class;
temp[1] = iface;
return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, new RemoteInvocationHandler(connection,
objectID));
}
// the name of who created this RmiBridge
private final org.slf4j.Logger logger;
@ -441,4 +407,42 @@ class RmiBridge {
return id;
}
/**
* Warning. This is an advanced method. You should probably be using {@link Connection#createProxyObject(Class)}.
* <p>
* <p>
* Returns a proxy object that implements the specified interfaces. Methods invoked on the proxy object will be invoked remotely on the
* object with the specified ID in the ObjectSpace for the specified connection. If the remote end of the connection has not {@link
* RmiBridge#register(int, Object)} added the connection to the ObjectSpace, the remote method invocations will be ignored.
* <p/>
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will
* have the proxy object replaced with the registered object.
*
* @see RemoteObject
*/
public
RemoteObject createProxyObject(Connection connection, int objectID, Class<?> iface) {
if (connection == null) {
throw new IllegalArgumentException("connection cannot be null.");
}
if (iface == null) {
throw new IllegalArgumentException("iface cannot be null.");
}
Class<?>[] temp = new Class<?>[2];
temp[0] = RemoteObject.class;
temp[1] = iface;
return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(),
temp,
new RemoteObjectInvocationHandler(connection, objectID));
}
}

View File

@ -53,10 +53,16 @@ class RmiGlobalTest extends BaseTest {
// UDP calls that ignore the return value
remoteObject.setUDP(true);
remoteObject.setUDP();
remoteObject.setAsync(true);
remoteObject.setTransmitReturnValue(false);
remoteObject.setTransmitExceptions(false);
test.moo("Meow");
assertEquals(0, test.id());
remoteObject.setUDP(false);
remoteObject.setAsync(false);
remoteObject.setTransmitReturnValue(true);
remoteObject.setTransmitExceptions(true);
remoteObject.setTCP();
// Test that RMI correctly waits for the remotely invoked method to exit
@ -89,7 +95,7 @@ class RmiGlobalTest extends BaseTest {
assertTrue(caught);
// Non-blocking call that ignores the return value
remoteObject.setNonBlocking(true);
remoteObject.setAsync(true);
remoteObject.setTransmitReturnValue(false);
test.moo("Meow");
assertEquals(0, test.id());
@ -120,7 +126,7 @@ class RmiGlobalTest extends BaseTest {
// should wait for a small time
remoteObject.setTransmitReturnValue(true);
remoteObject.setNonBlocking(false);
remoteObject.setAsync(false);
remoteObject.setResponseTimeout(6000);
System.out.println("You should see this 2 seconds before");
float slow = test.slow();

View File

@ -51,10 +51,16 @@ class RmiTest extends BaseTest {
// UDP calls that ignore the return value
remoteObject.setUDP(true);
remoteObject.setUDP();
remoteObject.setAsync(true);
remoteObject.setTransmitReturnValue(false);
remoteObject.setTransmitExceptions(false);
test.moo("Meow");
assertEquals(0, test.id());
remoteObject.setUDP(false);
remoteObject.setAsync(false);
remoteObject.setTransmitReturnValue(true);
remoteObject.setTransmitExceptions(true);
remoteObject.setTCP();
// Test that RMI correctly waits for the remotely invoked method to exit
@ -87,7 +93,7 @@ class RmiTest extends BaseTest {
assertTrue(caught);
// Non-blocking call that ignores the return value
remoteObject.setNonBlocking(true);
remoteObject.setAsync(true);
remoteObject.setTransmitReturnValue(false);
test.moo("Meow");
assertEquals(0, test.id());
@ -117,13 +123,14 @@ class RmiTest extends BaseTest {
// should wait for a small time
remoteObject.setTransmitReturnValue(true);
remoteObject.setNonBlocking(false);
remoteObject.setAsync(false);
remoteObject.setResponseTimeout(6000);
System.out.println("You should see this 2 seconds before");
float slow = test.slow();
System.out.println("...This");
assertEquals(slow, 123, .0001D);
// Test sending a reference to a remote object.
MessageWithTestObject m = new MessageWithTestObject();
m.number = 678;