Fixed issues with UDP loopback. Reworked RMI. Added UDP/UDT options for rmi return values. Added using ASM to RMI.

This commit is contained in:
nathan 2014-11-24 00:52:58 +01:00
parent e4805cd900
commit d956badcd0
32 changed files with 633 additions and 489 deletions

View File

@ -98,9 +98,6 @@ public class Broadcast {
List<InetAddress> servers = new ArrayList<InetAddress>(); List<InetAddress> servers = new ArrayList<InetAddress>();
Logger logger2 = logger; Logger logger2 = logger;
if (logger2.isInfoEnabled()) {
logger2.info("Searching for host on port: {}", udpPort);
}
Enumeration<NetworkInterface> networkInterfaces; Enumeration<NetworkInterface> networkInterfaces;
try { try {
@ -127,6 +124,10 @@ public class Broadcast {
try { try {
if (logger2.isInfoEnabled()) {
logger2.info("Searching for host on {} : {}", address, udpPort);
}
NioEventLoopGroup group = new NioEventLoopGroup(); NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap udpBootstrap = new Bootstrap() Bootstrap udpBootstrap = new Bootstrap()
.group(group) .group(group)

View File

@ -2,7 +2,6 @@ package dorkbox.network;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -19,11 +18,10 @@ import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import dorkbox.network.connection.BootstrapWrapper;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPointClient; import dorkbox.network.connection.EndPointClient;
import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleBridge;
@ -40,15 +38,9 @@ import dorkbox.util.NamedThreadFactory;
import dorkbox.util.OS; import dorkbox.util.OS;
/** /**
* The client is both SYNC and ASYNC, meaning that once the client is connected to the server, you can access it however you want. * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's ASYNC.
* <p>
* Another way to put this: The client (like the server) can respond to EVENTS (ie, listeners), but you can also use it DIRECTLY, for
* example, send data to the server on keyboard input. This is because the client will BLOCK the calling thread until it's ready.
*/ */
public class Client extends EndPointClient { public class Client extends EndPointClient {
private List<BootstrapWrapper> bootstraps = new LinkedList<BootstrapWrapper>();
private volatile int connectionTimeout = 5000; // default
/** /**
* Starts a LOCAL <b>only</b> client, with the default local channel name and serialization scheme * Starts a LOCAL <b>only</b> client, with the default local channel name and serialization scheme
@ -90,7 +82,6 @@ public class Client extends EndPointClient {
options.udtPort = -1; options.udtPort = -1;
} }
// tcpBootstrap.setOption(SO_SNDBUF, 1048576); // tcpBootstrap.setOption(SO_SNDBUF, 1048576);
// tcpBootstrap.setOption(SO_RCVBUF, 1048576); // tcpBootstrap.setOption(SO_RCVBUF, 1048576);
@ -130,11 +121,9 @@ public class Client extends EndPointClient {
if (OS.isLinux()) { if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux) // JNI network stack is MUCH faster (but only on linux)
boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-TCP", nettyGroup)); boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-TCP", nettyGroup));
tcpBootstrap.channel(EpollSocketChannel.class); tcpBootstrap.channel(EpollSocketChannel.class);
} else { } else {
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-TCP", nettyGroup)); boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-TCP", nettyGroup));
tcpBootstrap.channel(NioSocketChannel.class); tcpBootstrap.channel(NioSocketChannel.class);
} }
} }
@ -168,7 +157,6 @@ public class Client extends EndPointClient {
} else { } else {
// CANNOT USE EpollDatagramChannel on the client! // CANNOT USE EpollDatagramChannel on the client!
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-UDP", nettyGroup)); boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-UDP", nettyGroup));
udpBootstrap.channel(NioDatagramChannel.class); udpBootstrap.channel(NioDatagramChannel.class);
} }
@ -258,6 +246,8 @@ public class Client extends EndPointClient {
/** /**
* will attempt to connect to the server, and will the specified timeout. * will attempt to connect to the server, and will the specified timeout.
* <p>
* will BLOCK until completed
* *
* @param connectionTimeout wait for x milliseconds. 0 will wait indefinitely * @param connectionTimeout wait for x milliseconds. 0 will wait indefinitely
*/ */
@ -269,58 +259,18 @@ public class Client extends EndPointClient {
synchronized (this.shutdownInProgress) { synchronized (this.shutdownInProgress) {
} }
// have to BLOCK here, because we don't want sendTCP() called before registration is complete // have to start the registration process
this.connectingBootstrap.set(0);
registerNextProtocol();
// have to BLOCK
// don't want the client to run before registration is complete
synchronized (this.registrationLock) { synchronized (this.registrationLock) {
this.registrationInProgress = true; try {
this.registrationLock.wait(connectionTimeout);
// we will only do a local channel when NOT doing TCP/UDP channels. This is EXCLUSIVE. (XOR) } catch (InterruptedException e) {
int size = this.bootstraps.size(); this.logger.error("Registration thread interrupted!");
for (int i=0;i<size;i++) {
if (!this.registrationInProgress) {
break;
}
this.registrationComplete = i == size-1;
BootstrapWrapper bootstrapWrapper = this.bootstraps.get(i);
ChannelFuture future;
if (connectionTimeout != 0) {
// must be before connect
bootstrapWrapper.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout);
}
Logger logger2 = this.logger;
try {
// UDP : When this is CONNECT on a udp socket will ONLY accept UDP traffic from the remote address (ip/port combo).
// If the reply isn't from the correct port, then the other end will receive a "Port Unreachable" exception.
future = bootstrapWrapper.bootstrap.connect();
future.await();
} catch (Exception e) {
String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, e);
this.registrationInProgress = false;
throw new IllegalArgumentException(errorMessage);
}
if (!future.isSuccess()) {
String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, future.cause());
this.registrationInProgress = false;
throw new IllegalArgumentException(errorMessage);
}
if (logger2.isTraceEnabled()) {
logger2.trace("Waiting for registration from server.");
}
manageForShutdown(future);
// WAIT for the next one to complete.
try {
this.registrationLock.wait(connectionTimeout);
} catch (InterruptedException e) {
}
} }
this.registrationInProgress = false;
} }
} }
@ -356,18 +306,8 @@ public class Client extends EndPointClient {
*/ */
@Override @Override
public void close() { public void close() {
// in case a different thread is blocked waiting for registration.
synchronized (this.registrationLock) { synchronized (this.registrationLock) {
if (this.registrationInProgress) { this.registrationLock.notify();
try {
this.registrationLock.wait();
} catch (InterruptedException e) {
}
}
// inside the sync block, because we DON'T want to allow a connect WHILE close is happening! Since connect is also
// in the sync bloc, we prevent it from happening.
super.close();
} }
} }
} }

View File

@ -1,5 +1,6 @@
package dorkbox.network; package dorkbox.network;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.util.SerializationManager; import dorkbox.network.util.SerializationManager;
import dorkbox.network.util.store.SettingsStore; import dorkbox.network.util.store.SettingsStore;
@ -18,6 +19,17 @@ public class ConnectionOptions {
public SerializationManager serializationManager = null; public SerializationManager serializationManager = null;
public SettingsStore settingsStore = null; public SettingsStore settingsStore = null;
/**
* Enable remote method invocation (RMI) for this connection. This is additional overhead to using RMI.
* <p>
* Specifically, It costs at least 2 bytes more to use remote method invocation than just
* sending the parameters. If the method has a return value which is not
* {@link RemoteObject#setNonBlocking(boolean) ignored}, an extra byte is
* written. If the type of a parameter is not final (note primitives are final)
* then an extra byte is written for that parameter.
*/
public boolean enableRmi = false;
public ConnectionOptions() { public ConnectionOptions() {
} }

View File

@ -7,8 +7,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalAddress;
@ -217,26 +215,26 @@ public class Server extends EndPointServer {
worker = new OioEventLoopGroup(0, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup)); worker = new OioEventLoopGroup(0, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup));
this.udpBootstrap.channel(OioDatagramChannel.class); this.udpBootstrap.channel(OioDatagramChannel.class);
} else { } else {
if (OS.isLinux()) { // if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux) // // JNI network stack is MUCH faster (but only on linux)
worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup)); // worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup));
//
this.udpBootstrap.channel(EpollDatagramChannel.class) // this.udpBootstrap.channel(EpollDatagramChannel.class)
.option(EpollChannelOption.SO_REUSEPORT, true); // .option(EpollChannelOption.SO_REUSEPORT, true);
} else { // } else {
worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup)); worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup));
this.udpBootstrap.channel(NioDatagramChannel.class); this.udpBootstrap.channel(NioDatagramChannel.class);
} // }
} }
manageForShutdown(worker); manageForShutdown(worker);
this.udpBootstrap.group(worker) this.udpBootstrap.group(worker)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// not binding to specific address, since it's driven by TCP, and that can be bound to a specific address // not binding to specific address, since it's driven by TCP, and that can be bound to a specific address
.localAddress(this.udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! .localAddress(this.udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets!
.handler(new RegistrationRemoteHandlerServerUDP(this.name, this.registrationWrapper, this.serializationManager)); .handler(new RegistrationRemoteHandlerServerUDP(this.name, this.registrationWrapper, this.serializationManager));
// Enable to READ from MULTICAST data (ie, 192.168.1.0) // Enable to READ from MULTICAST data (ie, 192.168.1.0)

View File

@ -1,13 +1,13 @@
package dorkbox.network; package dorkbox.network.connection;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
class BootstrapWrapper { public class BootstrapWrapper {
final String type; public final String type;
final Bootstrap bootstrap; public final Bootstrap bootstrap;
final int port; public final int port;
BootstrapWrapper(String type, int port, Bootstrap bootstrap) { public BootstrapWrapper(String type, int port, Bootstrap bootstrap) {
this.type = type; this.type = type;
this.port = port; this.port = port;
this.bootstrap = bootstrap; this.bootstrap = bootstrap;

View File

@ -6,9 +6,10 @@ import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.bridge.ConnectionBridge;
import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.connection.idle.IdleSender; import dorkbox.network.connection.idle.IdleSender;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.rmi.TimeoutException;
public interface Connection { public interface Connection {
public static final String connection = "connection"; public static final String connection = "connection";
/** /**
@ -97,4 +98,37 @@ public interface Connection {
* Closes the connection * Closes the connection
*/ */
public void close(); public void close();
/**
* Identical to {@link #getRemoteObject(C, int, Class...)} except returns
* the object cast to the specified interface type. The returned object
* still implements {@link RemoteObject}.
*/
public <T> T getRemoteObject(int objectID, Class<T> iface);
/**
* Returns a proxy object that implements the specified interfaces. Methods
* invoked on the proxy object will be invoked remotely on the object with
* the specified ID in the ObjectSpace for the specified connection. If the
* remote end of the connection has not {@link #addConnection(Connection)
* added} the connection to the ObjectSpace, the remote method invocations
* will be ignored.
* <p>
* Methods that return a value will throw {@link TimeoutException} if the
* response is not received with the
* {@link RemoteObject#setResponseTimeout(int) response timeout}.
* <p>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
* (the default), then methods that return a value must not be called from
* the update thread for the connection. An exception will be thrown if this
* occurs. Methods with a void return value can be called on the update
* thread.
* <p>
* If a proxy returned from this method is part of an object graph sent over
* the network, the object graph on the receiving side will have the proxy
* object replaced with the registered object.
*
* @see RemoteObject
*/
public RemoteObject getRemoteObject(int objectID, Class<?>... ifaces);
} }

View File

@ -28,6 +28,8 @@ import dorkbox.network.connection.idle.IdleSender;
import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelNull; import dorkbox.network.connection.wrapper.ChannelNull;
import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.rmi.TimeoutException;
/** /**
@ -343,6 +345,47 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
return sender; return sender;
} }
/**
* Identical to {@link #getRemoteObject(C, int, Class...)} except returns
* the object cast to the specified interface type. The returned object
* still implements {@link RemoteObject}.
*/
@Override
public <T> T getRemoteObject(int objectID, Class<T> iface) {
@SuppressWarnings({"unchecked"})
T remoteObject = (T) this.endPoint.getRemoteObject(this, objectID, new Class<?>[] {iface});
return remoteObject;
}
/**
* Returns a proxy object that implements the specified interfaces. Methods
* invoked on the proxy object will be invoked remotely on the object with
* the specified ID in the ObjectSpace for the specified connection. If the
* remote end of the connection has not {@link #addConnection(Connection)
* added} the connection to the ObjectSpace, the remote method invocations
* will be ignored.
* <p>
* Methods that return a value will throw {@link TimeoutException} if the
* response is not received with the
* {@link RemoteObject#setResponseTimeout(int) response timeout}.
* <p>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
* (the default), then methods that return a value must not be called from
* the update thread for the connection. An exception will be thrown if this
* occurs. Methods with a void return value can be called on the update
* thread.
* <p>
* If a proxy returned from this method is part of an object graph sent over
* the network, the object graph on the receiving side will have the proxy
* object replaced with the registered object.
*
* @see RemoteObject
*/
@Override
public RemoteObject getRemoteObject(int objectID, Class<?>... ifaces) {
return this.endPoint.getRemoteObject(this, objectID, ifaces);
}
/** /**
* Invoked when a {@link Channel} has been idle for a while. * Invoked when a {@link Channel} has been idle for a while.
*/ */

View File

@ -7,6 +7,8 @@ import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.AccessControlException; import java.security.AccessControlException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
@ -37,7 +39,10 @@ import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.pipeline.KryoEncoder; import dorkbox.network.pipeline.KryoEncoder;
import dorkbox.network.pipeline.KryoEncoderCrypto; import dorkbox.network.pipeline.KryoEncoderCrypto;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.rmi.Rmi;
import dorkbox.network.rmi.RmiBridge; import dorkbox.network.rmi.RmiBridge;
import dorkbox.network.rmi.TimeoutException;
import dorkbox.network.util.EndpointTool; import dorkbox.network.util.EndpointTool;
import dorkbox.network.util.KryoSerializationManager; import dorkbox.network.util.KryoSerializationManager;
import dorkbox.network.util.SerializationManager; import dorkbox.network.util.SerializationManager;
@ -51,6 +56,7 @@ import dorkbox.network.util.serializers.IgnoreSerialization;
import dorkbox.network.util.store.NullSettingsStore; import dorkbox.network.util.store.NullSettingsStore;
import dorkbox.network.util.store.SettingsStore; import dorkbox.network.util.store.SettingsStore;
import dorkbox.network.util.udt.UdtEndpointProxy; import dorkbox.network.util.udt.UdtEndpointProxy;
import dorkbox.util.Sys;
import dorkbox.util.collections.IntMap; import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.Crypto; import dorkbox.util.crypto.Crypto;
@ -139,7 +145,7 @@ public abstract class EndPoint {
protected final RegistrationWrapper registrationWrapper; protected final RegistrationWrapper registrationWrapper;
// The remote object space is used by RMI. // The remote object space is used by RMI.
protected RmiBridge remoteObjectSpace = null; private final RmiBridge rmiBridge;
// the eventLoop groups are used to track and manage the event loops for startup/shutdown // the eventLoop groups are used to track and manage the event loops for startup/shutdown
private List<EventLoopGroup> eventLoopGroups = new ArrayList<EventLoopGroup>(8); private List<EventLoopGroup> eventLoopGroups = new ArrayList<EventLoopGroup>(8);
@ -172,6 +178,17 @@ public abstract class EndPoint {
this.registrationWrapper = new RegistrationWrapper(this, this.logger); this.registrationWrapper = new RegistrationWrapper(this, this.logger);
// make sure that 'localhost' is REALLY our specific IP address
if (options.host != null && (options.host.equals("localhost") || options.host.startsWith("127."))) {
try {
InetAddress localHostLanAddress = Sys.getLocalHostLanAddress();
options.host = localHostLanAddress.getHostAddress();
this.logger.info("Network localhost request, using real IP instead: {}", options.host);
} catch (UnknownHostException e) {
this.logger.error("Unable to get the actual 'localhost' IP address", e);
}
}
// we have to be able to specify WHAT property store we want to use, since it can change! // we have to be able to specify WHAT property store we want to use, since it can change!
if (options.settingsStore == null) { if (options.settingsStore == null) {
this.propertyStore = new PropertyStore(name); this.propertyStore = new PropertyStore(name);
@ -278,7 +295,18 @@ public abstract class EndPoint {
// add the ping listener (internal use only!) // add the ping listener (internal use only!)
this.connectionManager.add(new PingSystemListener(name)); this.connectionManager.add(new PingSystemListener());
/*
* Creates the remote method invocation (RMI) bridge for this endpoint.
* <p>
* there is some housekeeping that is necessary BEFORE a connection is actually connected..
*/
if (options.enableRmi) {
this.rmiBridge = new RmiBridge(this.logger, this.serializationManager);
} else {
this.rmiBridge = null;
}
} }
public void disableRemoteKeyValidation() { public void disableRemoteKeyValidation() {
@ -323,7 +351,7 @@ public abstract class EndPoint {
* Internal call by the pipeline to notify the client to continue registering the different session protocols. * Internal call by the pipeline to notify the client to continue registering the different session protocols.
* The server does not use this. * The server does not use this.
*/ */
protected boolean continueRegistration0() { protected boolean registerNextProtocol0() {
return true; return true;
} }
@ -385,26 +413,16 @@ public abstract class EndPoint {
} }
/** /**
* Creates the remote (RMI) object space for this endpoint. * Gets the remote method invocation (RMI) bridge for this endpoint.
* <p>
* This method is safe, and is recommended. Make sure to call it BEFORE a connection is established, as
* there is some housekeeping that is necessary BEFORE a connection is actually connected..
*/ */
public RmiBridge getRmiBridge() { public Rmi rmi() {
synchronized (this) { if (this.rmiBridge == null) {
if (this.remoteObjectSpace == null) { throw new NetException("Cannot use a remote object space that has NOT been created first! Configure the ConnectionOptions!");
if (isConnected()) {
throw new NetException("Cannot create a remote object space after the remote endpoint has already connected!");
}
this.remoteObjectSpace = new RmiBridge(this.logger);
}
} }
return this.remoteObjectSpace; return this.rmiBridge;
} }
/** /**
* This method allows the connections used by the client/server to be subclassed (custom implementations). * This method allows the connections used by the client/server to be subclassed (custom implementations).
* <p> * <p>
@ -417,7 +435,6 @@ public abstract class EndPoint {
return new ConnectionImpl(name); return new ConnectionImpl(name);
} }
/** /**
* Internal call by the pipeline when: * Internal call by the pipeline when:
* - creating a new network connection * - creating a new network connection
@ -452,10 +469,8 @@ public abstract class EndPoint {
metaChannel.connection = connection; metaChannel.connection = connection;
// notify our remote object space that it is able to receive method calls. // notify our remote object space that it is able to receive method calls.
synchronized (this) { if (this.rmiBridge != null) {
if (this.remoteObjectSpace != null) { connection.listeners().add(this.rmiBridge.getListener());
this.remoteObjectSpace.addConnection(connection);
}
} }
} else { } else {
// getting the baseClass // getting the baseClass
@ -509,6 +524,34 @@ public abstract class EndPoint {
*/ */
public abstract ConnectionBridgeBase send(); public abstract ConnectionBridgeBase send();
/**
* Returns a proxy object that implements the specified interfaces. Methods
* invoked on the proxy object will be invoked remotely on the object with
* the specified ID in the ObjectSpace for the specified connection. If the
* remote end of the connection has not {@link #addConnection(Connection)
* added} the connection to the ObjectSpace, the remote method invocations
* will be ignored.
* <p>
* Methods that return a value will throw {@link TimeoutException} if the
* response is not received with the
* {@link RemoteObject#setResponseTimeout(int) response timeout}.
* <p>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
* (the default), then methods that return a value must not be called from
* the update thread for the connection. An exception will be thrown if this
* occurs. Methods with a void return value can be called on the update
* thread.
* <p>
* If a proxy returned from this method is part of an object graph sent over
* the network, the object graph on the receiving side will have the proxy
* object replaced with the registered object.
*
* @see RemoteObject
*/
public RemoteObject getRemoteObject(Connection connection, int objectID, Class<?>[] ifaces) {
return this.rmiBridge.getRemoteObject(connection, objectID, ifaces);
}
/** /**
* Registers a tool with the server, to be used by other services. * Registers a tool with the server, to be used by other services.
*/ */

View File

@ -1,5 +1,12 @@
package dorkbox.network.connection; package dorkbox.network.connection;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger; import org.slf4j.Logger;
import dorkbox.network.ConnectionOptions; import dorkbox.network.ConnectionOptions;
@ -11,11 +18,13 @@ import dorkbox.network.util.exceptions.SecurityException;
/** /**
* This serves the purpose of making sure that specific methods are not available to the end user. * This serves the purpose of making sure that specific methods are not available to the end user.
*/ */
public class EndPointClient extends EndPoint { public class EndPointClient extends EndPoint implements Runnable {
protected List<BootstrapWrapper> bootstraps = new LinkedList<BootstrapWrapper>();
protected AtomicInteger connectingBootstrap = new AtomicInteger(0);
protected final Object registrationLock = new Object(); protected final Object registrationLock = new Object();
protected volatile boolean registrationInProgress = false;
protected volatile int connectionTimeout = 5000; // default
protected volatile boolean registrationComplete = false; protected volatile boolean registrationComplete = false;
private volatile ConnectionBridgeFlushAlways connectionBridgeFlushAlways; private volatile ConnectionBridgeFlushAlways connectionBridgeFlushAlways;
@ -25,19 +34,58 @@ public class EndPointClient extends EndPoint {
super(name, options); super(name, options);
} }
protected void registerNextProtocol() {
new Thread(this, "Bootstrap registration").start();
}
@Override
public void run() {
synchronized(this.connectingBootstrap) {
int bootstrapToRegister = this.connectingBootstrap.getAndIncrement();
BootstrapWrapper bootstrapWrapper = this.bootstraps.get(bootstrapToRegister);
ChannelFuture future;
if (this.connectionTimeout != 0) {
// must be before connect
bootstrapWrapper.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout);
}
Logger logger2 = this.logger;
try {
// UDP : When this is CONNECT, a udp socket will ONLY accept UDP traffic from the remote address (ip/port combo).
// If the reply isn't from the correct port, then the other end will receive a "Port Unreachable" exception.
future = bootstrapWrapper.bootstrap.connect();
future.await();
} catch (Exception e) {
String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, e);
throw new IllegalArgumentException(errorMessage);
}
if (!future.isSuccess()) {
String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, future.cause());
throw new IllegalArgumentException(errorMessage);
}
if (logger2.isTraceEnabled()) {
logger2.trace("Waiting for registration from server.");
}
manageForShutdown(future);
}
}
/** /**
* Internal call by the pipeline to notify the client to continue registering the different session protocols. * Internal call by the pipeline to notify the client to continue registering the different session protocols.
* @return true if we are done registering bootstraps * @return true if we are done registering bootstraps
*/ */
@Override @Override
protected boolean continueRegistration0() { protected boolean registerNextProtocol0() {
// we need to cache the value, since it can change in a different thread before we have the chance to return the value. synchronized(this.connectingBootstrap) {
boolean complete = this.registrationComplete; this.registrationComplete = this.connectingBootstrap.get() == this.bootstraps.size();
if (!this.registrationComplete) {
// notify the block, but only if we are not ready. registerNextProtocol();
if (!complete) {
synchronized (this.registrationLock) {
this.registrationLock.notifyAll();
} }
} }
@ -48,11 +96,11 @@ public class EndPointClient extends EndPoint {
// only let us continue with connections (this starts up the client/server implementations) once ALL of the // only let us continue with connections (this starts up the client/server implementations) once ALL of the
// bootstraps have connected // bootstraps have connected
return complete; return this.registrationComplete;
} }
/** /**
* Internal (to the networking stack) to notify the client that registration has completed. This is necessary because the client * Internal (to the networking stack) to notify the client that registration has COMPLETED. This is necessary because the client
* will BLOCK until it has successfully registered it's connections. * will BLOCK until it has successfully registered it's connections.
*/ */
@Override @Override
@ -60,9 +108,9 @@ public class EndPointClient extends EndPoint {
// invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need. // invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need.
super.connectionConnected0(connection); super.connectionConnected0(connection);
// notify the block // notify the registration we are done!
synchronized (this.registrationLock) { synchronized (this.registrationLock) {
this.registrationLock.notifyAll(); this.registrationLock.notify();
} }
} }
@ -70,7 +118,9 @@ public class EndPointClient extends EndPoint {
* Internal call to abort registration if the shutdown command is issued during channel registration. * Internal call to abort registration if the shutdown command is issued during channel registration.
*/ */
void abortRegistration() { void abortRegistration() {
this.registrationInProgress = false; synchronized (this.registrationLock) {
this.registrationLock.notify();
}
stop(); stop();
} }

View File

@ -53,6 +53,7 @@ public abstract class ListenerRaw<C extends Connection, M extends Object> {
* This method should not block for long periods as other network activity will not be processed * This method should not block for long periods as other network activity will not be processed
* until it returns. * until it returns.
*/ */
@SuppressWarnings("unused")
public void connected(C connection) { public void connected(C connection) {
} }
@ -61,6 +62,7 @@ public abstract class ListenerRaw<C extends Connection, M extends Object> {
* <p> * <p>
* Do not write data in this method! The channel can be closed, resulting in an error if you attempt to do so. * Do not write data in this method! The channel can be closed, resulting in an error if you attempt to do so.
*/ */
@SuppressWarnings("unused")
public void disconnected(C connection) { public void disconnected(C connection) {
} }
@ -68,18 +70,21 @@ public abstract class ListenerRaw<C extends Connection, M extends Object> {
* Called when an object has been received from the remote end of the connection. * Called when an object has been received from the remote end of the connection.
* This method should not block for long periods as other network activity will not be processed until it returns. * This method should not block for long periods as other network activity will not be processed until it returns.
*/ */
@SuppressWarnings("unused")
public void received(C connection, M message) { public void received(C connection, M message) {
} }
/** /**
* Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(idle) idle threshold}. * Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(idle) idle threshold}.
*/ */
@SuppressWarnings("unused")
public void idle(C connection) { public void idle(C connection) {
} }
/** /**
* Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise) * Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise)
*/ */
@SuppressWarnings("unused")
public void error(C connection, Throwable throwable) { public void error(C connection, Throwable throwable) {
throwable.printStackTrace(); throwable.printStackTrace();
} }

View File

@ -3,7 +3,7 @@ package dorkbox.network.connection;
class PingSystemListener extends ListenerRaw<ConnectionImpl, PingMessage> { class PingSystemListener extends ListenerRaw<ConnectionImpl, PingMessage> {
PingSystemListener(String name) { PingSystemListener() {
} }
@Override @Override

View File

@ -100,8 +100,8 @@ public class RegistrationWrapper implements UdpServer {
* The server does not use this. * The server does not use this.
* @return true if we are done registering bootstraps * @return true if we are done registering bootstraps
*/ */
public boolean continueRegistration0() { public boolean registerNextProtocol0() {
return this.endPoint.continueRegistration0(); return this.endPoint.registerNextProtocol0();
} }
/** /**

View File

@ -23,6 +23,7 @@ public abstract class RegistrationHandler extends ChannelInboundHandlerAdapter {
this.registrationWrapper = registrationWrapper; this.registrationWrapper = registrationWrapper;
} }
@SuppressWarnings("unused")
protected void initChannel(Channel channel) { protected void initChannel(Channel channel) {
} }

View File

@ -63,7 +63,7 @@ public class RegistrationLocalHandlerClient extends RegistrationLocalHandler {
channel.pipeline().remove(this); channel.pipeline().remove(this);
// Event though a local channel is XOR with everything else, we still have to make the client clean up it's state. // Event though a local channel is XOR with everything else, we still have to make the client clean up it's state.
registrationWrapper.continueRegistration0(); registrationWrapper.registerNextProtocol0();
Connection connection = metaChannel.connection; Connection connection = metaChannel.connection;
registrationWrapper.connectionConnected0(connection); registrationWrapper.connectionConnected0(connection);

View File

@ -316,6 +316,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
* The server will override this. * The server will override this.
* Only called if we have a UDP channel when we finalize the setup of the TCP connection * Only called if we have a UDP channel when we finalize the setup of the TCP connection
*/ */
@SuppressWarnings("unused")
protected void setupServerUdpConnection(MetaChannel metaChannel) { protected void setupServerUdpConnection(MetaChannel metaChannel) {
} }

View File

@ -296,7 +296,7 @@ public class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandle
metaChannel.ecdhKey = null; metaChannel.ecdhKey = null;
// notify the client that we are ready to continue registering other session protocols (bootstraps) // notify the client that we are ready to continue registering other session protocols (bootstraps)
boolean isDoneWithRegistration = registrationWrapper2.continueRegistration0(); boolean isDoneWithRegistration = registrationWrapper2.registerNextProtocol0();
// tell the server we are done, and to setup crypto on it's side // tell the server we are done, and to setup crypto on it's side
if (isDoneWithRegistration) { if (isDoneWithRegistration) {

View File

@ -152,7 +152,7 @@ public class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandle
// hooray! we are successful // hooray! we are successful
// notify the client that we are ready to continue registering other session protocols (bootstraps) // notify the client that we are ready to continue registering other session protocols (bootstraps)
boolean isDoneWithRegistration = registrationWrapper2.continueRegistration0(); boolean isDoneWithRegistration = registrationWrapper2.registerNextProtocol0();
// tell the server we are done, and to setup crypto on it's side // tell the server we are done, and to setup crypto on it's side
if (isDoneWithRegistration) { if (isDoneWithRegistration) {

View File

@ -145,7 +145,7 @@ public class RegistrationRemoteHandlerClientUDT extends RegistrationRemoteHandle
// hooray! we are successful // hooray! we are successful
// notify the client that we are ready to continue registering other session protocols (bootstraps) // notify the client that we are ready to continue registering other session protocols (bootstraps)
boolean isDoneWithRegistration = registrationWrapper2.continueRegistration0(); boolean isDoneWithRegistration = registrationWrapper2.registerNextProtocol0();
// tell the server we are done, and to setup crypto on it's side // tell the server we are done, and to setup crypto on it's side
if (isDoneWithRegistration) { if (isDoneWithRegistration) {

View File

@ -17,7 +17,6 @@ import org.slf4j.Logger;
import dorkbox.network.Broadcast; import dorkbox.network.Broadcast;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration; import dorkbox.network.connection.registration.Registration;
@ -76,8 +75,9 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
// this is the response from a discoverHost query // this is the response from a discoverHost query
out.add(new DatagramPacket((ByteBuf) object, remoteAddress)); out.add(new DatagramPacket((ByteBuf) object, remoteAddress));
} else { } else {
ByteBuf buffer = Unpooled.buffer(EndPoint.udpMaxSize); // this is regular registration stuff
ByteBuf buffer = context.alloc().buffer();
// writes data into buffer
sendUDP(context, object, buffer, remoteAddress); sendUDP(context, object, buffer, remoteAddress);
if (buffer != null) { if (buffer != null) {
@ -121,6 +121,7 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
} }
@SuppressWarnings("unused")
public final void sendUDP(ChannelHandlerContext context, Object object, ByteBuf buffer, InetSocketAddress udpRemoteAddress) { public final void sendUDP(ChannelHandlerContext context, Object object, ByteBuf buffer, InetSocketAddress udpRemoteAddress) {
Connection networkConnection = this.registrationWrapper.getServerUDP(udpRemoteAddress); Connection networkConnection = this.registrationWrapper.getServerUDP(udpRemoteAddress);
@ -135,7 +136,8 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
// this will be invoked by the UdpRegistrationHandlerServer. Remember, TCP will be established first. // this will be invoked by the UdpRegistrationHandlerServer. Remember, TCP will be established first.
public final void receivedUDP(ChannelHandlerContext context, Channel channel, ByteBuf data, InetSocketAddress udpRemoteAddress) throws Exception { @SuppressWarnings("unused")
private final void receivedUDP(ChannelHandlerContext context, Channel channel, ByteBuf data, InetSocketAddress udpRemoteAddress) throws Exception {
// registration is the ONLY thing NOT encrypted // registration is the ONLY thing NOT encrypted
Logger logger2 = this.logger; Logger logger2 = this.logger;
RegistrationWrapper registrationWrapper2 = this.registrationWrapper; RegistrationWrapper registrationWrapper2 = this.registrationWrapper;
@ -171,7 +173,7 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
try { try {
object = serializationManager2.read(data, data.writerIndex()); object = serializationManager2.read(data, data.writerIndex());
} catch (NetException e) { } catch (Exception e) {
logger2.error("UDP unable to deserialize buffer", e); logger2.error("UDP unable to deserialize buffer", e);
shutdown(registrationWrapper2, channel); shutdown(registrationWrapper2, channel);
return; return;
@ -227,7 +229,6 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
register.payload = Crypto.AES.encrypt(RegistrationRemoteHandler.getAesEngine(), metaChannel.aesKey, metaChannel.aesIV, idAsBytes); register.payload = Crypto.AES.encrypt(RegistrationRemoteHandler.getAesEngine(), metaChannel.aesKey, metaChannel.aesIV, idAsBytes);
channel.writeAndFlush(new UdpWrapper(register, udpRemoteAddress)); channel.writeAndFlush(new UdpWrapper(register, udpRemoteAddress));
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
logger2.trace("Register UDP connection from {}", udpRemoteAddress); logger2.trace("Register UDP connection from {}", udpRemoteAddress);
} }

View File

@ -16,10 +16,11 @@ public class KryoDecoder extends ByteToMessageDecoder {
public KryoDecoder(SerializationManager kryoWrapper) { public KryoDecoder(SerializationManager kryoWrapper) {
super(); super();
this.kryoWrapper = kryoWrapper; this.kryoWrapper = kryoWrapper;
optimize = OptimizeUtilsByteBuf.get(); this.optimize = OptimizeUtilsByteBuf.get();
} }
protected Object readObject(SerializationManager kryoWrapper, ChannelHandlerContext ctx, ByteBuf in, int length) { @SuppressWarnings("unused")
protected Object readObject(SerializationManager kryoWrapper, ChannelHandlerContext context, ByteBuf in, int length) {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one. // no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
return kryoWrapper.read(in, length); return kryoWrapper.read(in, length);
} }
@ -133,12 +134,12 @@ public class KryoDecoder extends ByteToMessageDecoder {
length = optimize.readInt(in, true); // object LENGTH length = optimize.readInt(in, true); // object LENGTH
// however many we need to // however many we need to
out.add(readObject(kryoWrapper, ctx, in, length)); out.add(readObject(this.kryoWrapper, ctx, in, length));
} }
// the buffer reader index will be at the correct location, since the read object method advances it. // the buffer reader index will be at the correct location, since the read object method advances it.
} else { } else {
// exactly one! // exactly one!
out.add(readObject(kryoWrapper, ctx, in, length)); out.add(readObject(this.kryoWrapper, ctx, in, length));
} }
} }
} }

View File

@ -21,11 +21,12 @@ public class KryoEncoder extends MessageToByteEncoder<Object> {
public KryoEncoder(SerializationManager kryoWrapper) { public KryoEncoder(SerializationManager kryoWrapper) {
super(); super();
this.kryoWrapper = kryoWrapper; this.kryoWrapper = kryoWrapper;
optimize = OptimizeUtilsByteBuf.get(); this.optimize = OptimizeUtilsByteBuf.get();
} }
// the crypto writer will override this // the crypto writer will override this
protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) { @SuppressWarnings("unused")
protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext context, Object msg, ByteBuf buffer) {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one. // no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
kryoWrapper.write(buffer, msg); kryoWrapper.write(buffer, msg);
} }
@ -42,7 +43,7 @@ public class KryoEncoder extends MessageToByteEncoder<Object> {
out.writeInt(0); // put an int in, which is the same size as reservedLengthIndex out.writeInt(0); // put an int in, which is the same size as reservedLengthIndex
try { try {
writeObject(kryoWrapper, ctx, msg, out); writeObject(this.kryoWrapper, ctx, msg, out);
// now set the frame (if it's TCP)! // now set the frame (if it's TCP)!
int length = out.readableBytes() - startIndex - reservedLengthIndex; // (reservedLengthLength) 4 is the reserved space for the integer. int length = out.readableBytes() - startIndex - reservedLengthIndex; // (reservedLengthLength) 4 is the reserved space for the integer.

View File

@ -31,7 +31,8 @@ public class KryoEncoderUdp extends MessageToMessageEncoder<Object> {
} }
// the crypto writer will override this // the crypto writer will override this
protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) { @SuppressWarnings("unused")
protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext context, Object msg, ByteBuf buffer) {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one. // no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
kryoWrapper.write(buffer, msg); kryoWrapper.write(buffer, msg);
} }
@ -43,7 +44,7 @@ public class KryoEncoderUdp extends MessageToMessageEncoder<Object> {
ByteBuf outBuffer = Unpooled.buffer(maxSize); ByteBuf outBuffer = Unpooled.buffer(maxSize);
// no size info, since this is UDP, it is not segmented // no size info, since this is UDP, it is not segmented
writeObject(kryoWrapper, ctx, msg, outBuffer); writeObject(this.kryoWrapper, ctx, msg, outBuffer);
// have to check to see if we are too big for UDP! // have to check to see if we are too big for UDP!

View File

@ -10,6 +10,10 @@ class AsmCachedMethod extends CachedMethod {
@Override @Override
public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
return this.methodAccess.invoke(target, this.methodAccessIndex, args); try {
return this.methodAccess.invoke(target, this.methodAccessIndex, args);
} catch (Exception ex) {
throw new InvocationTargetException(ex);
}
} }
} }

View File

@ -1,14 +1,8 @@
package dorkbox.network.rmi; package dorkbox.network.rmi;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/** Internal message to invoke methods remotely. */ /** Internal message to invoke methods remotely. */
class InvokeMethod implements KryoSerializable, RmiMessages { class InvokeMethod implements RmiMessages {
public int objectID; public int objectID;
public CachedMethod cachedMethod; public CachedMethod cachedMethod;
public Object[] args; public Object[] args;
@ -18,54 +12,7 @@ class InvokeMethod implements KryoSerializable, RmiMessages {
// possible duplicate IDs. A response data of 0 means to not respond. // possible duplicate IDs. A response data of 0 means to not respond.
public byte responseData; public byte responseData;
@Override
@SuppressWarnings("rawtypes")
public void write(Kryo kryo, Output output) {
output.writeInt(this.objectID, true);
output.writeInt(this.cachedMethod.methodClassID, true);
output.writeByte(this.cachedMethod.methodIndex);
Serializer[] serializers = this.cachedMethod.serializers; public InvokeMethod() {
Object[] args = this.args;
for (int i = 0, n = serializers.length; i < n; i++) {
Serializer serializer = serializers[i];
if (serializer != null) {
kryo.writeObjectOrNull(output, args[i], serializer);
} else {
kryo.writeClassAndObject(output, args[i]);
}
}
output.writeByte(this.responseData);
}
@Override
public void read(Kryo kryo, Input input) {
this.objectID = input.readInt(true);
int methodClassID = input.readInt(true);
Class<?> methodClass = kryo.getRegistration(methodClassID).getType();
byte methodIndex = input.readByte();
try {
this.cachedMethod = RmiBridge.getMethods(kryo, methodClass)[methodIndex];
} catch (IndexOutOfBoundsException ex) {
throw new KryoException("Invalid method index " + methodIndex + " for class: " + methodClass.getName());
}
Serializer<?>[] serializers = this.cachedMethod.serializers;
Class<?>[] parameterTypes = this.cachedMethod.method.getParameterTypes();
Object[] args = new Object[serializers.length];
this.args = args;
for (int i = 0, n = args.length; i < n; i++) {
Serializer<?> serializer = serializers[i];
if (serializer != null) {
args[i] = kryo.readObjectOrNull(input, parameterTypes[i], serializer);
} else {
args[i] = kryo.readClassAndObject(input);
}
}
this.responseData = input.readByte();
} }
} }

View File

@ -0,0 +1,10 @@
package dorkbox.network.rmi;
import dorkbox.util.objectPool.PoolableObject;
public class InvokeMethodPoolable implements PoolableObject<InvokeMethod> {
@Override
public InvokeMethod create() {
return new InvokeMethod();
}
}

View File

@ -0,0 +1,71 @@
package dorkbox.network.rmi;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/** Internal message to invoke methods remotely. */
class InvokeMethodSerializer extends Serializer<InvokeMethod> {
private RmiBridge rmi;
public InvokeMethodSerializer(RmiBridge rmi) {
this.rmi = rmi;
}
@Override
@SuppressWarnings("rawtypes")
public void write(Kryo kryo, Output output, InvokeMethod object) {
output.writeInt(object.objectID, true);
output.writeInt(object.cachedMethod.methodClassID, true);
output.writeByte(object.cachedMethod.methodIndex);
Serializer[] serializers = object.cachedMethod.serializers;
Object[] args = object.args;
for (int i = 0, n = serializers.length; i < n; i++) {
Serializer serializer = serializers[i];
if (serializer != null) {
kryo.writeObjectOrNull(output, args[i], serializer);
} else {
kryo.writeClassAndObject(output, args[i]);
}
}
output.writeByte(object.responseData);
}
@Override
public InvokeMethod read(Kryo kryo, Input input, Class<InvokeMethod> type) {
InvokeMethod invokeMethod = new InvokeMethod();
invokeMethod.objectID = input.readInt(true);
int methodClassID = input.readInt(true);
Class<?> methodClass = kryo.getRegistration(methodClassID).getType();
byte methodIndex = input.readByte();
try {
invokeMethod.cachedMethod = this.rmi.getMethods(kryo, methodClass)[methodIndex];
} catch (IndexOutOfBoundsException ex) {
throw new KryoException("Invalid method index " + methodIndex + " for class: " + methodClass.getName());
}
Serializer<?>[] serializers = invokeMethod.cachedMethod.serializers;
Class<?>[] parameterTypes = invokeMethod.cachedMethod.method.getParameterTypes();
Object[] args = new Object[serializers.length];
invokeMethod.args = args;
for (int i = 0, n = args.length; i < n; i++) {
Serializer<?> serializer = serializers[i];
if (serializer != null) {
args[i] = kryo.readObjectOrNull(input, parameterTypes[i], serializer);
} else {
args[i] = kryo.readClassAndObject(input);
}
}
invokeMethod.responseData = input.readByte();
return invokeMethod;
}
}

View File

@ -12,6 +12,8 @@ import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ListenerRaw; import dorkbox.network.connection.ListenerRaw;
import dorkbox.network.util.exceptions.NetException; import dorkbox.network.util.exceptions.NetException;
import dorkbox.util.objectPool.ObjectPool;
import dorkbox.util.objectPool.ObjectPoolHolder;
/** Handles network communication when methods are invoked on a proxy. */ /** Handles network communication when methods are invoked on a proxy. */
class RemoteInvocationHandler implements InvocationHandler { class RemoteInvocationHandler implements InvocationHandler {
@ -26,6 +28,9 @@ class RemoteInvocationHandler implements InvocationHandler {
private boolean transmitExceptions = true; private boolean transmitExceptions = true;
private boolean remoteToString; private boolean remoteToString;
private boolean udp;
private boolean udt;
private Byte lastResponseID; private Byte lastResponseID;
private byte nextResponseId = 1; private byte nextResponseId = 1;
@ -37,8 +42,11 @@ class RemoteInvocationHandler implements InvocationHandler {
final InvokeMethodResult[] responseTable = new InvokeMethodResult[64]; final InvokeMethodResult[] responseTable = new InvokeMethodResult[64];
final boolean[] pendingResponses = new boolean[64]; final boolean[] pendingResponses = new boolean[64];
public RemoteInvocationHandler(Connection connection, final int objectID) { private final ObjectPool<InvokeMethod> invokeMethodPool;
public RemoteInvocationHandler(ObjectPool<InvokeMethod> invokeMethodPool, Connection connection, final int objectID) {
super(); super();
this.invokeMethodPool = invokeMethodPool;
this.connection = connection; this.connection = connection;
this.objectID = objectID; this.objectID = objectID;
@ -97,6 +105,12 @@ class RemoteInvocationHandler implements InvocationHandler {
} else if (name.equals("setTransmitExceptions")) { } else if (name.equals("setTransmitExceptions")) {
this.transmitExceptions = (Boolean) args[0]; this.transmitExceptions = (Boolean) args[0];
return null; return null;
} else if (name.equals("setUDP")) {
this.udp = (Boolean)args[0];
return null;
} else if (name.equals("setUDT")) {
this.udt = (Boolean)args[0];
return null;
} else if (name.equals("setRemoteToString")) { } else if (name.equals("setRemoteToString")) {
this.remoteToString = (Boolean) args[0]; this.remoteToString = (Boolean) args[0];
return null; return null;
@ -124,12 +138,15 @@ class RemoteInvocationHandler implements InvocationHandler {
return "<proxy>"; return "<proxy>";
} }
InvokeMethod invokeMethod = new InvokeMethod(); EndPoint endPoint = this.connection.getEndPoint();
RmiBridge rmi = (RmiBridge) endPoint.rmi();
ObjectPoolHolder<InvokeMethod> invokeMethodHolder = this.invokeMethodPool.take();
InvokeMethod invokeMethod = invokeMethodHolder.getValue();
invokeMethod.objectID = this.objectID; invokeMethod.objectID = this.objectID;
invokeMethod.args = args; invokeMethod.args = args;
EndPoint endPoint = this.connection.getEndPoint(); CachedMethod[] cachedMethods = rmi.getMethods(endPoint.getSerialization().getSingleInstanceUnsafe(), method.getDeclaringClass());
CachedMethod[] cachedMethods = RmiBridge.getMethods(endPoint.getSerialization().getSingleInstanceUnsafe(), method.getDeclaringClass());
for (int i = 0, n = cachedMethods.length; i < n; i++) { for (int i = 0, n = cachedMethods.length; i < n; i++) {
CachedMethod cachedMethod = cachedMethods[i]; CachedMethod cachedMethod = cachedMethods[i];
if (cachedMethod.method.equals(method)) { if (cachedMethod.method.equals(method)) {
@ -145,7 +162,7 @@ class RemoteInvocationHandler implements InvocationHandler {
// An invocation doesn't need a response is if it's async and no return values or exceptions are wanted back. // An invocation doesn't need a response is if it's async and no return values or exceptions are wanted back.
boolean needsResponse = this.transmitReturnValue || this.transmitExceptions || !this.nonBlocking; boolean needsResponse = !this.udp && (this.transmitReturnValue || this.transmitExceptions || !this.nonBlocking);
byte responseID = 0; byte responseID = 0;
if (needsResponse) { if (needsResponse) {
synchronized (this) { synchronized (this) {
@ -159,17 +176,23 @@ class RemoteInvocationHandler implements InvocationHandler {
// Pack other data into the high bits. // Pack other data into the high bits.
byte responseData = responseID; byte responseData = responseID;
if (this.transmitReturnValue) { if (this.transmitReturnValue) {
responseData |= RmiBridge.returnValMask; responseData |= RmiBridge.returnValueMask;
} }
if (this.transmitExceptions) { if (this.transmitExceptions) {
responseData |= RmiBridge.returnExMask; responseData |= RmiBridge.returnExceptionMask;
} }
invokeMethod.responseData = responseData; invokeMethod.responseData = responseData;
} else { } else {
invokeMethod.responseData = 0; // A response data of 0 means to not respond. invokeMethod.responseData = 0; // A response data of 0 means to not respond.
} }
this.connection.send().TCP(invokeMethod).flush(); if (this.udp) {
this.connection.send().UDP(invokeMethod).flush();
} else if (this.udt) {
this.connection.send().UDT(invokeMethod).flush();
} else {
this.connection.send().TCP(invokeMethod).flush();
}
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
String argString = ""; String argString = "";
@ -182,8 +205,9 @@ class RemoteInvocationHandler implements InvocationHandler {
} }
this.lastResponseID = (byte)(invokeMethod.responseData & RmiBridge.responseIdMask); this.lastResponseID = (byte)(invokeMethod.responseData & RmiBridge.responseIdMask);
this.invokeMethodPool.release(invokeMethodHolder);
if (this.nonBlocking) { if (this.nonBlocking || this.udp) {
Class<?> returnType = method.getReturnType(); Class<?> returnType = method.getReturnType();
if (returnType.isPrimitive()) { if (returnType.isPrimitive()) {
if (returnType == int.class) { if (returnType == int.class) {
@ -232,7 +256,6 @@ class RemoteInvocationHandler implements InvocationHandler {
} }
private Object waitForResponse(byte responseID) { private Object waitForResponse(byte responseID) {
long endTime = System.currentTimeMillis() + this.timeoutMillis; long endTime = System.currentTimeMillis() + this.timeoutMillis;
long remaining = this.timeoutMillis; long remaining = this.timeoutMillis;
@ -311,4 +334,4 @@ class RemoteInvocationHandler implements InvocationHandler {
} }
return true; return true;
} }
} }

View File

@ -9,74 +9,84 @@ import dorkbox.network.connection.Connection;
* @author Nathan Sweet <misc@n4te.com> * @author Nathan Sweet <misc@n4te.com>
*/ */
public interface RemoteObject { public interface RemoteObject {
/** Sets the milliseconds to wait for a method to return value. Default is 3000. */ /** Sets the milliseconds to wait for a method to return value. Default is 3000. */
public void setResponseTimeout(int timeoutMillis); public void setResponseTimeout(int timeoutMillis);
/** /**
* Sets the blocking behavior when invoking a remote method. Default is false. * Sets the blocking behavior when invoking a remote method. Default is false.
* *
* @param nonBlocking If false, the invoking thread will wait for the remote method to return or timeout (default). If * @param nonBlocking If false, the invoking thread will wait for the remote method to return or timeout (default). If true, the
* true, the invoking thread will not wait for a response. The method will return immediately and the return * invoking thread will not wait for a response. The method will return immediately and the return value should be ignored. If
* value should be ignored. If they are being transmitted, the return value or any thrown exception can later * they are being transmitted, the return value or any thrown exception can later be retrieved with
* be retrieved with {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. The responses will be * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. The responses will be stored until retrieved, so each method
* stored until retrieved, so each method call should have a matching retrieve. * call should have a matching retrieve.
*/ */
public void setNonBlocking(boolean nonBlocking); public void setNonBlocking(boolean nonBlocking);
/** /**
* Sets whether return values are sent back when invoking a remote method. Default is true. * Sets whether return values are sent back when invoking a remote method. Default is true.
* *
* @param transmit If true, then the return value for non-blocking method invocations can be retrieved with * @param transmit If true, then the return value for non-blocking method invocations can be retrieved with
* {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. If false, then non-primitive return values * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. If false, then non-primitive return values for remote method
* for remote method invocations are not sent by the remote side of the connection and the response can never * invocations are not sent by the remote side of the connection and the response can never be retrieved. This can also be used
* be retrieved. This can also be used to save bandwidth if you will not check the return value of a blocking * to save bandwidth if you will not check the return value of a blocking remote invocation. Note that an exception could still
* remote invocation. Note that an exception could still be returned by {@link #waitForLastResponse()} or * be returned by {@link #waitForLastResponse()} or {@link #waitForResponse(byte)} if {@link #setTransmitExceptions(boolean)} is
* {@link #waitForResponse(byte)} if {@link #setTransmitExceptions(boolean)} is true. * true.
*/ */
public void setTransmitReturnValue(boolean transmit); public void setTransmitReturnValue(boolean transmit);
/** /**
* Sets whether exceptions are sent back when invoking a remote method. Default is true. * Sets whether exceptions are sent back when invoking a remote method. Default is true.
* *
* @param transmit If false, exceptions will be unhandled and rethrown as RuntimeExceptions inside the invoking * @param transmit If false, exceptions will be unhandled and rethrown as RuntimeExceptions inside the invoking thread. This is the
* thread. This is the legacy behavior. If true, behavior is dependent on whether * legacy behavior. If true, behavior is dependent on whether {@link #setNonBlocking(boolean)}. If non-blocking is true, the
* {@link #setNonBlocking(boolean)}. If non-blocking is true, the exception will be serialized and sent back to * exception will be serialized and sent back to the call site of the remotely invoked method, where it will be re-thrown. If
* the call site of the remotely invoked method, where it will be re-thrown. If non-blocking is false, an * non-blocking is false, an exception will not be thrown in the calling thread but instead can be retrieved with
* exception will not be thrown in the calling thread but instead can be retrieved with * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}, similar to a return value.
* {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}, similar to a return value. */
*/ public void setTransmitExceptions(boolean transmit);
public void setTransmitExceptions(boolean transmit);
/** /**
* If false, calls to {@link Object#toString()} will return "<proxy>" instead of being invoking the remote method. * If true, UDP will be used to send the remote method invocation. UDP remote method invocations will never return a response and the
* Default is false. * invoking thread will not wait for a response.
*/ */
public void setRemoteToString(boolean remoteToString); public void setUDP(boolean udp);
/** /**
* Waits for the response to the last method invocation to be received or the response timeout to be reached. Must not * If true, UDT will be used to send the remote method invocation. UDT remote method invocations <b>will</b> return a response and the
* be called from the connection's update thread. * invoking thread <b>will</b> wait for a response.
* */
* @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...) public void setUDT(boolean udt);
*/
public Object waitForLastResponse();
/** Gets the ID of response for the last method invocation. */ /**
public byte getLastResponseID(); * If false, calls to {@link Object#toString()} will return "<proxy>" instead of being invoking the remote method. Default is false.
*/
public void setRemoteToString(boolean remoteToString);
/** /**
* Waits for the specified method invocation response to be received or the response timeout to be reached. Must not * Waits for the response to the last method invocation to be received or the response timeout to be reached. Must not be called from
* be called from the connection's update thread. Response IDs use a six bit identifier, with one identifier reserved * the connection's update thread.
* for "no response". This means that this method should be called to get the result for a non-blocking call before an *
* additional 63 non-blocking calls are made, or risk undefined behavior due to identical IDs. * @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...)
* */
* @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...) public Object waitForLastResponse();
*/
public Object waitForResponse(byte responseID);
/** Causes this RemoteObject to stop listening to the connection for method invocation response messages. */ /** Gets the ID of response for the last method invocation. */
public void close(); public byte getLastResponseID();
/** Returns the local connection for this remote object. */ /**
public Connection getConnection(); * Waits for the specified method invocation response to be received or the response timeout to be reached. Must not be called from the
* connection's update thread. Response IDs use a six bit identifier, with one identifier reserved for "no response". This means that
* this method should be called to get the result for a non-blocking call before an additional 63 non-blocking calls are made, or risk
* undefined behavior due to identical IDs.
*
* @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...)
*/
public Object waitForResponse(byte responseID);
/** Causes this RemoteObject to stop listening to the connection for method invocation response messages. */
public void close();
/** Returns the local connection for this remote object. */
public Connection getConnection();
} }

View File

@ -7,6 +7,7 @@ import com.esotericsoftware.kryo.io.Output;
import com.sun.xml.internal.ws.encoding.soap.SerializationException; import com.sun.xml.internal.ws.encoding.soap.SerializationException;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
/** /**
* Serializes an object registered with the RmiBridge so the receiving side * Serializes an object registered with the RmiBridge so the receiving side
@ -16,11 +17,16 @@ import dorkbox.network.connection.Connection;
* @author Nathan Sweet <misc@n4te.com> * @author Nathan Sweet <misc@n4te.com>
*/ */
public class RemoteObjectSerializer<T> extends Serializer<T> { public class RemoteObjectSerializer<T> extends Serializer<T> {
private final RmiBridge rmi;
public RemoteObjectSerializer(EndPoint endpoint) {
this.rmi = (RmiBridge) endpoint.rmi();
}
@Override @Override
public void write(Kryo kryo, Output output, T object) { public void write(Kryo kryo, Output output, T object) {
@SuppressWarnings("unchecked") int id = this.rmi.getRegisteredId(object);
Connection connection = (Connection) kryo.getContext().get(Connection.connection);
int id = RmiBridge.getRegisteredId(connection, object);
if (id == Integer.MAX_VALUE) { if (id == Integer.MAX_VALUE) {
throw new SerializationException("Object not found in an ObjectSpace: " + object); throw new SerializationException("Object not found in an ObjectSpace: " + object);
} }
@ -33,6 +39,6 @@ public class RemoteObjectSerializer<T> extends Serializer<T> {
public T read(Kryo kryo, Input input, Class type) { public T read(Kryo kryo, Input input, Class type) {
int objectID = input.readInt(true); int objectID = input.readInt(true);
Connection connection = (Connection) kryo.getContext().get(Connection.connection); Connection connection = (Connection) kryo.getContext().get(Connection.connection);
return (T) RmiBridge.getRemoteObject(connection, objectID, type); return (T) this.rmi.getRemoteObject(connection, objectID, type);
} }
} }

View File

@ -0,0 +1,22 @@
package dorkbox.network.rmi;
public interface Rmi {
/**
* Registers an object to allow the remote end of the RmiBridge connections to access it using the specified ID.
*
* @param objectID Must not be Integer.MAX_VALUE.
*/
public void register(int objectID, Object object);
/**
* Removes an object. The remote end of the RmiBridge connection will no longer be able to access it.
*/
public void remove(int objectID);
/**
* Removes an object. The remote end of the RmiBridge connection will no longer be able to access it.
*/
public void remove(Object object);
}

View File

@ -9,9 +9,10 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -30,9 +31,11 @@ import dorkbox.network.connection.ListenerRaw;
import dorkbox.network.util.SerializationManager; import dorkbox.network.util.SerializationManager;
import dorkbox.network.util.exceptions.NetException; import dorkbox.network.util.exceptions.NetException;
import dorkbox.util.collections.ObjectIntMap; import dorkbox.util.collections.ObjectIntMap;
import dorkbox.util.objectPool.ObjectPool;
import dorkbox.util.objectPool.ObjectPoolFactory;
/** /**
* Allows methods on objects to be invoked remotely over TCP. Objects are * Allows methods on objects to be invoked remotely over TCP, UDP, or UDT. Objects are
* {@link #register(int, Object) registered} with an ID. The remote end of * {@link #register(int, Object) registered} with an ID. The remote end of
* connections that have been {@link #addConnection(Connection) added} are * connections that have been {@link #addConnection(Connection) added} are
* allowed to {@link #getRemoteObject(Connection, int, Class) access} registered * allowed to {@link #getRemoteObject(Connection, int, Class) access} registered
@ -46,89 +49,79 @@ import dorkbox.util.collections.ObjectIntMap;
* *
* @author Nathan Sweet <misc@n4te.com>, Nathan Robinson * @author Nathan Sweet <misc@n4te.com>, Nathan Robinson
*/ */
public class RmiBridge { public class RmiBridge implements Rmi {
private static final String OBJECT_ID = "objectID"; private static final String OBJECT_ID = "objectID";
static CopyOnWriteArrayList<RmiBridge> instances = new CopyOnWriteArrayList<RmiBridge>(); static final int returnValueMask = 1 << 7;
static final int returnExceptionMask = 1 << 6;
private static final HashMap<Class<?>, CachedMethod[]> methodCache = new HashMap<Class<?>, CachedMethod[]>(); static final int responseIdMask = 0xFF & ~returnValueMask & ~returnExceptionMask;
static final int returnValMask = 1 << 7; // the name of who created this RmiBridge
static final int returnExMask = 1 << 6; private final org.slf4j.Logger logger;
static final int responseIdMask = 0xff & ~returnValMask & ~returnExMask;
private static boolean asm = true; private HashMap<Class<?>, CachedMethod[]> methodCache = new HashMap<Class<?>, CachedMethod[]>();
// can be access by DIFFERENT threads. private final boolean asm;
volatile IntMap<Object> idToObject = new IntMap<Object>();
volatile ObjectIntMap<Object> objectToID = new ObjectIntMap<Object>();
private CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>(); // can be accessed by DIFFERENT threads.
private ReentrantReadWriteLock objectLock = new ReentrantReadWriteLock();
private IntMap<Object> idToObject = new IntMap<Object>();
private ObjectIntMap<Object> objectToID = new ObjectIntMap<Object>();
private Executor executor; private Executor executor;
// the name of who created this object space. // 4096 concurrent method invocations max
private final org.slf4j.Logger logger; private final ObjectPool<InvokeMethod> invokeMethodPool = ObjectPoolFactory.create(new InvokeMethodPoolable(), 4096);
private final ListenerRaw<Connection, InvokeMethod> invokeListener = new ListenerRaw<Connection, InvokeMethod>() { private final ListenerRaw<Connection, InvokeMethod> invokeListener = new ListenerRaw<Connection, InvokeMethod>() {
@Override @Override
public void received(final Connection connection, final InvokeMethod invokeMethod) { public void received(final Connection connection, final InvokeMethod invokeMethod) {
boolean found = false; ReadLock readLock = RmiBridge.this.objectLock.readLock();
readLock.lock();
Iterator<Connection> iterator = RmiBridge.this.connections.iterator(); final Object target = RmiBridge.this.idToObject.get(invokeMethod.objectID);
while (iterator.hasNext()) {
Connection c = iterator.next(); readLock.unlock();
if (c == connection) {
found = true;
break; if (target == null) {
Logger logger2 = RmiBridge.this.logger;
if (logger2.isWarnEnabled()) {
logger2.warn("Ignoring remote invocation request for unknown object ID: {}", invokeMethod.objectID);
}
return;
}
Executor executor2 = RmiBridge.this.executor;
if (executor2 == null) {
invoke(connection,
target,
invokeMethod);
} else {
executor2.execute(new Runnable() {
@Override
public void run() {
invoke(connection,
target,
invokeMethod);
} }
} });
// The InvokeMethod message is not for a connection in this ObjectSpace.
if (!found) {
return;
}
final Object target = RmiBridge.this.idToObject.get(invokeMethod.objectID);
if (target == null) {
RmiBridge.this.logger.warn("Ignoring remote invocation request for unknown object ID: {}", invokeMethod.objectID);
return;
}
Executor executor2 = RmiBridge.this.executor;
if (executor2 == null) {
invoke(connection,
target,
invokeMethod);
} else {
executor2.execute(new Runnable() {
@Override
public void run() {
invoke(connection,
target,
invokeMethod);
}
});
}
} }
}
@Override };
public void disconnected(Connection connection) {
removeConnection(connection);
}
};
/** /**
* Creates an ObjectSpace with no connections. Connections must be * Creates an RmiBridge with no connections. Connections must be
* {@link #connectionConnected(Connection) added} to allow the remote end of * {@link #connectionConnected(Connection) added} to allow the remote end of
* the connections to access objects in this ObjectSpace. * the connections to access objects in this ObjectSpace.
* <p>
* For safety, this should ONLY be called by {@link EndPoint#getRmiBridge() }
*/ */
public RmiBridge(Logger logger) { public RmiBridge(Logger logger, SerializationManager serializationManager) {
this.logger = logger; this.logger = logger;
instances.addIfAbsent(this); this.asm = serializationManager.getSingleInstanceUnsafe().getAsmEnabled();
registerClasses(serializationManager);
} }
/** /**
@ -143,22 +136,12 @@ public class RmiBridge {
this.executor = executor; this.executor = executor;
} }
/** If true, an attempt will be made to use ReflectASM for invoking methods. Default is true. */
static public void setAsm(boolean asm) {
RmiBridge.asm = asm;
}
/** /**
* Registers an object to allow the remote end of the ObjectSpace's * Registers an object to allow the remote end of the RmiBridge connections to access it using the specified ID.
* connections to access it using the specified ID.
* <p>
* If a connection is added to multiple ObjectSpaces, the same object ID
* should not be registered in more than one of those ObjectSpaces.
* *
* @param objectID * @param objectID Must not be Integer.MAX_VALUE.
* Must not be Integer.MAX_VALUE.
* @see #getRemoteObject(Connection, int, Class...)
*/ */
@Override
public void register(int objectID, Object object) { public void register(int objectID, Object object) {
if (objectID == Integer.MAX_VALUE) { if (objectID == Integer.MAX_VALUE) {
throw new IllegalArgumentException("objectID cannot be Integer.MAX_VALUE."); throw new IllegalArgumentException("objectID cannot be Integer.MAX_VALUE.");
@ -166,9 +149,15 @@ public class RmiBridge {
if (object == null) { if (object == null) {
throw new IllegalArgumentException("object cannot be null."); throw new IllegalArgumentException("object cannot be null.");
} }
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
writeLock.lock();
this.idToObject.put(objectID, object); this.idToObject.put(objectID, object);
this.objectToID.put(object, objectID); this.objectToID.put(object, objectID);
writeLock.unlock();
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
logger2.trace("Object registered with ObjectSpace as {}:{}", objectID, object); logger2.trace("Object registered with ObjectSpace as {}:{}", objectID, object);
@ -176,33 +165,20 @@ public class RmiBridge {
} }
/** /**
* Causes this ObjectSpace to stop listening to the connections for method * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it.
* invocation messages.
*/
public void close() {
Iterator<Connection> iterator = this.connections.iterator();
while (iterator.hasNext()) {
Connection connection = iterator.next();
connection.listeners().remove(this.invokeListener);
}
instances.remove(this);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Closed ObjectSpace.");
}
}
/**
* Removes an object. The remote end of the ObjectSpace's connections will
* no longer be able to access it.
*/ */
@Override
public void remove(int objectID) { public void remove(int objectID) {
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
writeLock.lock();
Object object = this.idToObject.remove(objectID); Object object = this.idToObject.remove(objectID);
if (object != null) { if (object != null) {
this.objectToID.remove(object, 0); this.objectToID.remove(object, 0);
} }
writeLock.unlock();
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object); logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object);
@ -210,11 +186,15 @@ public class RmiBridge {
} }
/** /**
* Removes an object. The remote end of the ObjectSpace's connections will * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it.
* no longer be able to access it.
*/ */
@Override
public void remove(Object object) { public void remove(Object object) {
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
writeLock.lock();
if (!this.idToObject.containsValue(object, true)) { if (!this.idToObject.containsValue(object, true)) {
writeLock.unlock();
return; return;
} }
@ -222,52 +202,27 @@ public class RmiBridge {
this.idToObject.remove(objectID); this.idToObject.remove(objectID);
this.objectToID.remove(object, 0); this.objectToID.remove(object, 0);
writeLock.unlock();
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object); logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object);
} }
} }
/**
* Allows the remote end of the specified connection to access objects
* registered in this ObjectSpace.
*/
public void addConnection(Connection connection) {
if (connection == null) {
throw new IllegalArgumentException("connection cannot be null.");
}
this.connections.addIfAbsent(connection);
connection.listeners().add(this.invokeListener);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Added connection to ObjectSpace: {}", connection);
}
}
/** /**
* Removes the specified connection, it will no longer be able to access * @return the invocation listener
* objects registered in this ObjectSpace.
*/ */
public void removeConnection(Connection connection) { @SuppressWarnings("rawtypes")
if (connection == null) { public ListenerRaw getListener() {
throw new IllegalArgumentException("connection cannot be null."); return this.invokeListener;
}
connection.listeners().remove(this.invokeListener);
this.connections.remove(connection);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Removed connection from ObjectSpace: {}", connection);
}
} }
/** /**
* Invokes the method on the object and, if necessary, sends the result back * Invokes the method on the object and, if necessary, sends the result back
* to the connection that made the invocation request. This method is * to the connection that made the invocation request. This method is
* invoked on the update thread of the {@link EndPoint} for this ObjectSpace * invoked on the update thread of the {@link EndPoint} for this RmiBridge
* and unless an {@link #setExecutor(Executor) executor} has been set. * and unless an {@link #setExecutor(Executor) executor} has been set.
* *
* @param connection * @param connection
@ -292,8 +247,8 @@ public class RmiBridge {
byte responseData = invokeMethod.responseData; byte responseData = invokeMethod.responseData;
boolean transmitReturnVal = (responseData & returnValMask) == returnValMask; boolean transmitReturnVal = (responseData & returnValueMask) == returnValueMask;
boolean transmitExceptions = (responseData & returnExMask) == returnExMask; boolean transmitExceptions = (responseData & returnExceptionMask) == returnExceptionMask;
int responseID = responseData & responseIdMask; int responseID = responseData & responseIdMask;
Object result = null; Object result = null;
@ -342,17 +297,6 @@ public class RmiBridge {
// logger.error("{} sent data: {} with id ({})", connection, result, invokeMethod.responseID); // logger.error("{} sent data: {} with id ({})", connection, result, invokeMethod.responseID);
} }
/**
* Identical to {@link #getRemoteObject(C, int, Class...)} except returns
* the object cast to the specified interface type. The returned object
* still implements {@link RemoteObject}.
*/
static public <T, C extends Connection> T getRemoteObject(final C connection, int objectID, Class<T> iface) {
@SuppressWarnings({"unchecked"})
T remoteObject = (T) getRemoteObject(connection, objectID, new Class<?>[] {iface});
return remoteObject;
}
/** /**
* Returns a proxy object that implements the specified interfaces. Methods * Returns a proxy object that implements the specified interfaces. Methods
* invoked on the proxy object will be invoked remotely on the object with * invoked on the proxy object will be invoked remotely on the object with
@ -377,7 +321,7 @@ public class RmiBridge {
* *
* @see RemoteObject * @see RemoteObject
*/ */
public static RemoteObject getRemoteObject(Connection connection, int objectID, Class<?>... ifaces) { public RemoteObject getRemoteObject(Connection connection, int objectID, Class<?>... ifaces) {
if (connection == null) { if (connection == null) {
throw new IllegalArgumentException("connection cannot be null."); throw new IllegalArgumentException("connection cannot be null.");
} }
@ -391,11 +335,11 @@ public class RmiBridge {
return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(),
temp, temp,
new RemoteInvocationHandler(connection, objectID)); new RemoteInvocationHandler(this.invokeMethodPool, connection, objectID));
} }
static CachedMethod[] getMethods(Kryo kryo, Class<?> type) { public CachedMethod[] getMethods(Kryo kryo, Class<?> type) {
CachedMethod[] cachedMethods = methodCache.get(type); // Maybe should cache per Kryo instance? CachedMethod[] cachedMethods = this.methodCache.get(type); // Maybe should cache per Kryo instance?
if (cachedMethods != null) { if (cachedMethods != null) {
return cachedMethods; return cachedMethods;
} }
@ -453,7 +397,7 @@ public class RmiBridge {
}); });
Object methodAccess = null; Object methodAccess = null;
if (asm && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) { if (this.asm && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) {
methodAccess = MethodAccess.get(type); methodAccess = MethodAccess.get(type);
} }
@ -493,73 +437,47 @@ public class RmiBridge {
cachedMethods[i] = cachedMethod; cachedMethods[i] = cachedMethod;
} }
methodCache.put(type, cachedMethods); this.methodCache.put(type, cachedMethods);
return cachedMethods; return cachedMethods;
} }
/** /**
* Returns the first object registered with the specified ID in any of the * Returns the object registered with the specified ID.
* ObjectSpaces the specified connection belongs to.
*/ */
static Object getRegisteredObject(Connection connection, int objectID) { Object getRegisteredObject(int objectID) {
CopyOnWriteArrayList<RmiBridge> instances = RmiBridge.instances; ReadLock readLock = this.objectLock.readLock();
for (RmiBridge objectSpace : instances) { readLock.lock();
// Check if the connection is in this ObjectSpace.
Iterator<Connection> iterator = objectSpace.connections.iterator();
while (iterator.hasNext()) {
Connection c = iterator.next();
if (c != connection) {
continue;
}
// Find an object with the objectID. // Find an object with the objectID.
Object object = objectSpace.idToObject.get(objectID); Object object = this.idToObject.get(objectID);
if (object != null) { readLock.unlock();
return object;
}
}
}
return null; return object;
} }
/** /**
* Returns the first ID registered for the specified object with any of the * Returns the ID registered for the specified object, or Integer.MAX_VALUE if not found.
* ObjectSpaces the specified connection belongs to, or Integer.MAX_VALUE
* if not found.
*/ */
public static int getRegisteredId(Connection connection, Object object) { public int getRegisteredId(Object object) {
CopyOnWriteArrayList<RmiBridge> instances = RmiBridge.instances; // Find an ID with the object.
for (RmiBridge objectSpace : instances) { ReadLock readLock = this.objectLock.readLock();
// Check if the connection is in this ObjectSpace.
Iterator<Connection> iterator = objectSpace.connections.iterator();
while (iterator.hasNext()) {
Connection c = iterator.next();
if (c != connection) {
continue;
}
// Find an ID with the object. readLock.lock();
int id = objectSpace.objectToID.get(object, Integer.MAX_VALUE); int id = this.objectToID.get(object, Integer.MAX_VALUE);
if (id != Integer.MAX_VALUE) { readLock.unlock();
return id;
}
}
}
return Integer.MAX_VALUE; return id;
} }
/** /**
* Registers the classes needed to use ObjectSpaces. This should be called * Registers the classes needed to use RMI. This should be called before any connections are opened.
* before any connections are opened.
*/ */
public static void registerClasses(final SerializationManager smanager) { private void registerClasses(final SerializationManager manager) {
smanager.registerForRmiClasses(new RmiRegisterClassesCallback() { manager.registerForRmiClasses(new RmiRegisterClassesCallback() {
@Override @Override
public void registerForClasses(Kryo kryo) { public void registerForClasses(Kryo kryo) {
kryo.register(Object[].class); kryo.register(Object[].class);
kryo.register(InvokeMethod.class); kryo.register(InvokeMethod.class, new InvokeMethodSerializer(RmiBridge.this));
FieldSerializer<InvokeMethodResult> resultSerializer = new FieldSerializer<InvokeMethodResult>(kryo, InvokeMethodResult.class) { FieldSerializer<InvokeMethodResult> resultSerializer = new FieldSerializer<InvokeMethodResult>(kryo, InvokeMethodResult.class) {
@Override @Override
@ -590,7 +508,7 @@ public class RmiBridge {
public Object read(Kryo kryo, Input input, Class<Object> type) { public Object read(Kryo kryo, Input input, Class<Object> type) {
int objectID = input.readInt(true); int objectID = input.readInt(true);
Connection connection = (Connection) kryo.getContext().get(Connection.connection); Connection connection = (Connection) kryo.getContext().get(Connection.connection);
Object object = getRegisteredObject(connection, objectID); Object object = getRegisteredObject(objectID);
if (object == null) { if (object == null) {
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RmiBridge.class); final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RmiBridge.class);
logger.warn("Unknown object ID {} for connection: {}", objectID, connection); logger.warn("Unknown object ID {} for connection: {}", objectID, connection);

View File

@ -659,6 +659,7 @@ public class KryoSerializationManager implements SerializationManager {
} }
@SuppressWarnings("unused")
private static void compress(ByteBuf inputBuffer, ByteBuf outputBuffer, int length, Deflater compress) { private static void compress(ByteBuf inputBuffer, ByteBuf outputBuffer, int length, Deflater compress) {
byte[] in = new byte[inputBuffer.readableBytes()]; byte[] in = new byte[inputBuffer.readableBytes()];