diff --git a/Dorkbox-Network/src/dorkbox/network/Broadcast.java b/Dorkbox-Network/src/dorkbox/network/Broadcast.java index 67e6bd91..4182f369 100644 --- a/Dorkbox-Network/src/dorkbox/network/Broadcast.java +++ b/Dorkbox-Network/src/dorkbox/network/Broadcast.java @@ -98,9 +98,6 @@ public class Broadcast { List servers = new ArrayList(); Logger logger2 = logger; - if (logger2.isInfoEnabled()) { - logger2.info("Searching for host on port: {}", udpPort); - } Enumeration networkInterfaces; try { @@ -127,6 +124,10 @@ public class Broadcast { try { + if (logger2.isInfoEnabled()) { + logger2.info("Searching for host on {} : {}", address, udpPort); + } + NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap udpBootstrap = new Bootstrap() .group(group) diff --git a/Dorkbox-Network/src/dorkbox/network/Client.java b/Dorkbox-Network/src/dorkbox/network/Client.java index 8b747fd9..66b898fb 100644 --- a/Dorkbox-Network/src/dorkbox/network/Client.java +++ b/Dorkbox-Network/src/dorkbox/network/Client.java @@ -2,7 +2,6 @@ package dorkbox.network; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; @@ -19,11 +18,10 @@ import io.netty.channel.socket.oio.OioSocketChannel; import io.netty.util.internal.PlatformDependent; import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.List; import org.slf4j.Logger; +import dorkbox.network.connection.BootstrapWrapper; import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPointClient; import dorkbox.network.connection.idle.IdleBridge; @@ -40,15 +38,9 @@ import dorkbox.util.NamedThreadFactory; import dorkbox.util.OS; /** - * The client is both SYNC and ASYNC, meaning that once the client is connected to the server, you can access it however you want. - *

- * Another way to put this: The client (like the server) can respond to EVENTS (ie, listeners), but you can also use it DIRECTLY, for - * example, send data to the server on keyboard input. This is because the client will BLOCK the calling thread until it's ready. + * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's ASYNC. */ public class Client extends EndPointClient { - private List bootstraps = new LinkedList(); - - private volatile int connectionTimeout = 5000; // default /** * Starts a LOCAL only client, with the default local channel name and serialization scheme @@ -90,7 +82,6 @@ public class Client extends EndPointClient { options.udtPort = -1; } - // tcpBootstrap.setOption(SO_SNDBUF, 1048576); // tcpBootstrap.setOption(SO_RCVBUF, 1048576); @@ -130,11 +121,9 @@ public class Client extends EndPointClient { if (OS.isLinux()) { // JNI network stack is MUCH faster (but only on linux) boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-TCP", nettyGroup)); - tcpBootstrap.channel(EpollSocketChannel.class); } else { boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-TCP", nettyGroup)); - tcpBootstrap.channel(NioSocketChannel.class); } } @@ -168,7 +157,6 @@ public class Client extends EndPointClient { } else { // CANNOT USE EpollDatagramChannel on the client! boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-UDP", nettyGroup)); - udpBootstrap.channel(NioDatagramChannel.class); } @@ -258,6 +246,8 @@ public class Client extends EndPointClient { /** * will attempt to connect to the server, and will the specified timeout. + *

+ * will BLOCK until completed * * @param connectionTimeout wait for x milliseconds. 0 will wait indefinitely */ @@ -269,58 +259,18 @@ public class Client extends EndPointClient { synchronized (this.shutdownInProgress) { } - // have to BLOCK here, because we don't want sendTCP() called before registration is complete + // have to start the registration process + this.connectingBootstrap.set(0); + registerNextProtocol(); + + // have to BLOCK + // don't want the client to run before registration is complete synchronized (this.registrationLock) { - this.registrationInProgress = true; - - // we will only do a local channel when NOT doing TCP/UDP channels. This is EXCLUSIVE. (XOR) - int size = this.bootstraps.size(); - for (int i=0;i + * Specifically, It costs at least 2 bytes more to use remote method invocation than just + * sending the parameters. If the method has a return value which is not + * {@link RemoteObject#setNonBlocking(boolean) ignored}, an extra byte is + * written. If the type of a parameter is not final (note primitives are final) + * then an extra byte is written for that parameter. + */ + public boolean enableRmi = false; + public ConnectionOptions() { } diff --git a/Dorkbox-Network/src/dorkbox/network/Server.java b/Dorkbox-Network/src/dorkbox/network/Server.java index 86f40ab0..776d2c7c 100644 --- a/Dorkbox-Network/src/dorkbox/network/Server.java +++ b/Dorkbox-Network/src/dorkbox/network/Server.java @@ -7,8 +7,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollChannelOption; -import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.local.LocalAddress; @@ -217,26 +215,26 @@ public class Server extends EndPointServer { worker = new OioEventLoopGroup(0, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup)); this.udpBootstrap.channel(OioDatagramChannel.class); } else { - if (OS.isLinux()) { - // JNI network stack is MUCH faster (but only on linux) - worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup)); - - this.udpBootstrap.channel(EpollDatagramChannel.class) - .option(EpollChannelOption.SO_REUSEPORT, true); - } else { +// if (OS.isLinux()) { +// // JNI network stack is MUCH faster (but only on linux) +// worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup)); +// +// this.udpBootstrap.channel(EpollDatagramChannel.class) +// .option(EpollChannelOption.SO_REUSEPORT, true); +// } else { worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(this.name + "-worker-UDP", nettyGroup)); this.udpBootstrap.channel(NioDatagramChannel.class); - } +// } } manageForShutdown(worker); this.udpBootstrap.group(worker) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - // not binding to specific address, since it's driven by TCP, and that can be bound to a specific address - .localAddress(this.udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! - .handler(new RegistrationRemoteHandlerServerUDP(this.name, this.registrationWrapper, this.serializationManager)); + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + // not binding to specific address, since it's driven by TCP, and that can be bound to a specific address + .localAddress(this.udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! + .handler(new RegistrationRemoteHandlerServerUDP(this.name, this.registrationWrapper, this.serializationManager)); // Enable to READ from MULTICAST data (ie, 192.168.1.0) diff --git a/Dorkbox-Network/src/dorkbox/network/BootstrapWrapper.java b/Dorkbox-Network/src/dorkbox/network/connection/BootstrapWrapper.java similarity index 53% rename from Dorkbox-Network/src/dorkbox/network/BootstrapWrapper.java rename to Dorkbox-Network/src/dorkbox/network/connection/BootstrapWrapper.java index ea80f462..bed8fe1d 100644 --- a/Dorkbox-Network/src/dorkbox/network/BootstrapWrapper.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/BootstrapWrapper.java @@ -1,13 +1,13 @@ -package dorkbox.network; +package dorkbox.network.connection; import io.netty.bootstrap.Bootstrap; -class BootstrapWrapper { - final String type; - final Bootstrap bootstrap; - final int port; +public class BootstrapWrapper { + public final String type; + public final Bootstrap bootstrap; + public final int port; - BootstrapWrapper(String type, int port, Bootstrap bootstrap) { + public BootstrapWrapper(String type, int port, Bootstrap bootstrap) { this.type = type; this.port = port; this.bootstrap = bootstrap; diff --git a/Dorkbox-Network/src/dorkbox/network/connection/Connection.java b/Dorkbox-Network/src/dorkbox/network/connection/Connection.java index 4aee9364..a549154c 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/Connection.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/Connection.java @@ -6,9 +6,10 @@ import org.bouncycastle.crypto.params.ParametersWithIV; import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleSender; +import dorkbox.network.rmi.RemoteObject; +import dorkbox.network.rmi.TimeoutException; public interface Connection { - public static final String connection = "connection"; /** @@ -97,4 +98,37 @@ public interface Connection { * Closes the connection */ public void close(); + + /** + * Identical to {@link #getRemoteObject(C, int, Class...)} except returns + * the object cast to the specified interface type. The returned object + * still implements {@link RemoteObject}. + */ + public T getRemoteObject(int objectID, Class iface); + + /** + * Returns a proxy object that implements the specified interfaces. Methods + * invoked on the proxy object will be invoked remotely on the object with + * the specified ID in the ObjectSpace for the specified connection. If the + * remote end of the connection has not {@link #addConnection(Connection) + * added} the connection to the ObjectSpace, the remote method invocations + * will be ignored. + *

+ * Methods that return a value will throw {@link TimeoutException} if the + * response is not received with the + * {@link RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false + * (the default), then methods that return a value must not be called from + * the update thread for the connection. An exception will be thrown if this + * occurs. Methods with a void return value can be called on the update + * thread. + *

+ * If a proxy returned from this method is part of an object graph sent over + * the network, the object graph on the receiving side will have the proxy + * object replaced with the registered object. + * + * @see RemoteObject + */ + public RemoteObject getRemoteObject(int objectID, Class... ifaces); } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java index bd044fe6..b89616f8 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java @@ -28,6 +28,8 @@ import dorkbox.network.connection.idle.IdleSender; import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; import dorkbox.network.connection.wrapper.ChannelNull; import dorkbox.network.connection.wrapper.ChannelWrapper; +import dorkbox.network.rmi.RemoteObject; +import dorkbox.network.rmi.TimeoutException; /** @@ -343,6 +345,47 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter return sender; } + /** + * Identical to {@link #getRemoteObject(C, int, Class...)} except returns + * the object cast to the specified interface type. The returned object + * still implements {@link RemoteObject}. + */ + @Override + public T getRemoteObject(int objectID, Class iface) { + @SuppressWarnings({"unchecked"}) + T remoteObject = (T) this.endPoint.getRemoteObject(this, objectID, new Class[] {iface}); + return remoteObject; + } + + /** + * Returns a proxy object that implements the specified interfaces. Methods + * invoked on the proxy object will be invoked remotely on the object with + * the specified ID in the ObjectSpace for the specified connection. If the + * remote end of the connection has not {@link #addConnection(Connection) + * added} the connection to the ObjectSpace, the remote method invocations + * will be ignored. + *

+ * Methods that return a value will throw {@link TimeoutException} if the + * response is not received with the + * {@link RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false + * (the default), then methods that return a value must not be called from + * the update thread for the connection. An exception will be thrown if this + * occurs. Methods with a void return value can be called on the update + * thread. + *

+ * If a proxy returned from this method is part of an object graph sent over + * the network, the object graph on the receiving side will have the proxy + * object replaced with the registered object. + * + * @see RemoteObject + */ + @Override + public RemoteObject getRemoteObject(int objectID, Class... ifaces) { + return this.endPoint.getRemoteObject(this, objectID, ifaces); + } + /** * Invoked when a {@link Channel} has been idle for a while. */ diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java index 6f6c5996..7d84a7d6 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java @@ -7,6 +7,8 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import java.lang.annotation.Annotation; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.security.AccessControlException; import java.security.SecureRandom; import java.util.ArrayList; @@ -37,7 +39,10 @@ import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.pipeline.KryoEncoder; import dorkbox.network.pipeline.KryoEncoderCrypto; +import dorkbox.network.rmi.RemoteObject; +import dorkbox.network.rmi.Rmi; import dorkbox.network.rmi.RmiBridge; +import dorkbox.network.rmi.TimeoutException; import dorkbox.network.util.EndpointTool; import dorkbox.network.util.KryoSerializationManager; import dorkbox.network.util.SerializationManager; @@ -51,6 +56,7 @@ import dorkbox.network.util.serializers.IgnoreSerialization; import dorkbox.network.util.store.NullSettingsStore; import dorkbox.network.util.store.SettingsStore; import dorkbox.network.util.udt.UdtEndpointProxy; +import dorkbox.util.Sys; import dorkbox.util.collections.IntMap; import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.Crypto; @@ -139,7 +145,7 @@ public abstract class EndPoint { protected final RegistrationWrapper registrationWrapper; // The remote object space is used by RMI. - protected RmiBridge remoteObjectSpace = null; + private final RmiBridge rmiBridge; // the eventLoop groups are used to track and manage the event loops for startup/shutdown private List eventLoopGroups = new ArrayList(8); @@ -172,6 +178,17 @@ public abstract class EndPoint { this.registrationWrapper = new RegistrationWrapper(this, this.logger); + // make sure that 'localhost' is REALLY our specific IP address + if (options.host != null && (options.host.equals("localhost") || options.host.startsWith("127."))) { + try { + InetAddress localHostLanAddress = Sys.getLocalHostLanAddress(); + options.host = localHostLanAddress.getHostAddress(); + this.logger.info("Network localhost request, using real IP instead: {}", options.host); + } catch (UnknownHostException e) { + this.logger.error("Unable to get the actual 'localhost' IP address", e); + } + } + // we have to be able to specify WHAT property store we want to use, since it can change! if (options.settingsStore == null) { this.propertyStore = new PropertyStore(name); @@ -278,7 +295,18 @@ public abstract class EndPoint { // add the ping listener (internal use only!) - this.connectionManager.add(new PingSystemListener(name)); + this.connectionManager.add(new PingSystemListener()); + + /* + * Creates the remote method invocation (RMI) bridge for this endpoint. + *

+ * there is some housekeeping that is necessary BEFORE a connection is actually connected.. + */ + if (options.enableRmi) { + this.rmiBridge = new RmiBridge(this.logger, this.serializationManager); + } else { + this.rmiBridge = null; + } } public void disableRemoteKeyValidation() { @@ -323,7 +351,7 @@ public abstract class EndPoint { * Internal call by the pipeline to notify the client to continue registering the different session protocols. * The server does not use this. */ - protected boolean continueRegistration0() { + protected boolean registerNextProtocol0() { return true; } @@ -385,26 +413,16 @@ public abstract class EndPoint { } /** - * Creates the remote (RMI) object space for this endpoint. - *

- * This method is safe, and is recommended. Make sure to call it BEFORE a connection is established, as - * there is some housekeeping that is necessary BEFORE a connection is actually connected.. + * Gets the remote method invocation (RMI) bridge for this endpoint. */ - public RmiBridge getRmiBridge() { - synchronized (this) { - if (this.remoteObjectSpace == null) { - if (isConnected()) { - throw new NetException("Cannot create a remote object space after the remote endpoint has already connected!"); - } - - this.remoteObjectSpace = new RmiBridge(this.logger); - } + public Rmi rmi() { + if (this.rmiBridge == null) { + throw new NetException("Cannot use a remote object space that has NOT been created first! Configure the ConnectionOptions!"); } - return this.remoteObjectSpace; + return this.rmiBridge; } - /** * This method allows the connections used by the client/server to be subclassed (custom implementations). *

@@ -417,7 +435,6 @@ public abstract class EndPoint { return new ConnectionImpl(name); } - /** * Internal call by the pipeline when: * - creating a new network connection @@ -452,10 +469,8 @@ public abstract class EndPoint { metaChannel.connection = connection; // notify our remote object space that it is able to receive method calls. - synchronized (this) { - if (this.remoteObjectSpace != null) { - this.remoteObjectSpace.addConnection(connection); - } + if (this.rmiBridge != null) { + connection.listeners().add(this.rmiBridge.getListener()); } } else { // getting the baseClass @@ -509,6 +524,34 @@ public abstract class EndPoint { */ public abstract ConnectionBridgeBase send(); + /** + * Returns a proxy object that implements the specified interfaces. Methods + * invoked on the proxy object will be invoked remotely on the object with + * the specified ID in the ObjectSpace for the specified connection. If the + * remote end of the connection has not {@link #addConnection(Connection) + * added} the connection to the ObjectSpace, the remote method invocations + * will be ignored. + *

+ * Methods that return a value will throw {@link TimeoutException} if the + * response is not received with the + * {@link RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false + * (the default), then methods that return a value must not be called from + * the update thread for the connection. An exception will be thrown if this + * occurs. Methods with a void return value can be called on the update + * thread. + *

+ * If a proxy returned from this method is part of an object graph sent over + * the network, the object graph on the receiving side will have the proxy + * object replaced with the registered object. + * + * @see RemoteObject + */ + public RemoteObject getRemoteObject(Connection connection, int objectID, Class[] ifaces) { + return this.rmiBridge.getRemoteObject(connection, objectID, ifaces); + } + /** * Registers a tool with the server, to be used by other services. */ diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java index c2db1702..2ee83f4a 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java @@ -1,5 +1,12 @@ package dorkbox.network.connection; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + import org.slf4j.Logger; import dorkbox.network.ConnectionOptions; @@ -11,11 +18,13 @@ import dorkbox.network.util.exceptions.SecurityException; /** * This serves the purpose of making sure that specific methods are not available to the end user. */ -public class EndPointClient extends EndPoint { +public class EndPointClient extends EndPoint implements Runnable { + protected List bootstraps = new LinkedList(); + protected AtomicInteger connectingBootstrap = new AtomicInteger(0); protected final Object registrationLock = new Object(); - protected volatile boolean registrationInProgress = false; + protected volatile int connectionTimeout = 5000; // default protected volatile boolean registrationComplete = false; private volatile ConnectionBridgeFlushAlways connectionBridgeFlushAlways; @@ -25,19 +34,58 @@ public class EndPointClient extends EndPoint { super(name, options); } + protected void registerNextProtocol() { + new Thread(this, "Bootstrap registration").start(); + } + + @Override + public void run() { + synchronized(this.connectingBootstrap) { + int bootstrapToRegister = this.connectingBootstrap.getAndIncrement(); + + BootstrapWrapper bootstrapWrapper = this.bootstraps.get(bootstrapToRegister); + + ChannelFuture future; + + if (this.connectionTimeout != 0) { + // must be before connect + bootstrapWrapper.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout); + } + + Logger logger2 = this.logger; + try { + // UDP : When this is CONNECT, a udp socket will ONLY accept UDP traffic from the remote address (ip/port combo). + // If the reply isn't from the correct port, then the other end will receive a "Port Unreachable" exception. + + future = bootstrapWrapper.bootstrap.connect(); + future.await(); + } catch (Exception e) { + String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, e); + throw new IllegalArgumentException(errorMessage); + } + + if (!future.isSuccess()) { + String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, future.cause()); + throw new IllegalArgumentException(errorMessage); + } + + if (logger2.isTraceEnabled()) { + logger2.trace("Waiting for registration from server."); + } + manageForShutdown(future); + } + } + /** * Internal call by the pipeline to notify the client to continue registering the different session protocols. * @return true if we are done registering bootstraps */ @Override - protected boolean continueRegistration0() { - // we need to cache the value, since it can change in a different thread before we have the chance to return the value. - boolean complete = this.registrationComplete; - - // notify the block, but only if we are not ready. - if (!complete) { - synchronized (this.registrationLock) { - this.registrationLock.notifyAll(); + protected boolean registerNextProtocol0() { + synchronized(this.connectingBootstrap) { + this.registrationComplete = this.connectingBootstrap.get() == this.bootstraps.size(); + if (!this.registrationComplete) { + registerNextProtocol(); } } @@ -48,11 +96,11 @@ public class EndPointClient extends EndPoint { // only let us continue with connections (this starts up the client/server implementations) once ALL of the // bootstraps have connected - return complete; + return this.registrationComplete; } /** - * Internal (to the networking stack) to notify the client that registration has completed. This is necessary because the client + * Internal (to the networking stack) to notify the client that registration has COMPLETED. This is necessary because the client * will BLOCK until it has successfully registered it's connections. */ @Override @@ -60,9 +108,9 @@ public class EndPointClient extends EndPoint { // invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need. super.connectionConnected0(connection); - // notify the block + // notify the registration we are done! synchronized (this.registrationLock) { - this.registrationLock.notifyAll(); + this.registrationLock.notify(); } } @@ -70,7 +118,9 @@ public class EndPointClient extends EndPoint { * Internal call to abort registration if the shutdown command is issued during channel registration. */ void abortRegistration() { - this.registrationInProgress = false; + synchronized (this.registrationLock) { + this.registrationLock.notify(); + } stop(); } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java b/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java index 264055d5..090563ae 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java @@ -53,6 +53,7 @@ public abstract class ListenerRaw { * This method should not block for long periods as other network activity will not be processed * until it returns. */ + @SuppressWarnings("unused") public void connected(C connection) { } @@ -61,6 +62,7 @@ public abstract class ListenerRaw { *

* Do not write data in this method! The channel can be closed, resulting in an error if you attempt to do so. */ + @SuppressWarnings("unused") public void disconnected(C connection) { } @@ -68,18 +70,21 @@ public abstract class ListenerRaw { * Called when an object has been received from the remote end of the connection. * This method should not block for long periods as other network activity will not be processed until it returns. */ + @SuppressWarnings("unused") public void received(C connection, M message) { } /** * Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(idle) idle threshold}. */ + @SuppressWarnings("unused") public void idle(C connection) { } /** * Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise) */ + @SuppressWarnings("unused") public void error(C connection, Throwable throwable) { throwable.printStackTrace(); } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/PingSystemListener.java b/Dorkbox-Network/src/dorkbox/network/connection/PingSystemListener.java index f892a8d1..93683f2a 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/PingSystemListener.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/PingSystemListener.java @@ -3,7 +3,7 @@ package dorkbox.network.connection; class PingSystemListener extends ListenerRaw { - PingSystemListener(String name) { + PingSystemListener() { } @Override diff --git a/Dorkbox-Network/src/dorkbox/network/connection/RegistrationWrapper.java b/Dorkbox-Network/src/dorkbox/network/connection/RegistrationWrapper.java index a430fb79..699ebfa4 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/RegistrationWrapper.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/RegistrationWrapper.java @@ -100,8 +100,8 @@ public class RegistrationWrapper implements UdpServer { * The server does not use this. * @return true if we are done registering bootstraps */ - public boolean continueRegistration0() { - return this.endPoint.continueRegistration0(); + public boolean registerNextProtocol0() { + return this.endPoint.registerNextProtocol0(); } /** diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/RegistrationHandler.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/RegistrationHandler.java index aa6a720c..23606758 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/RegistrationHandler.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/RegistrationHandler.java @@ -23,6 +23,7 @@ public abstract class RegistrationHandler extends ChannelInboundHandlerAdapter { this.registrationWrapper = registrationWrapper; } + @SuppressWarnings("unused") protected void initChannel(Channel channel) { } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java index b15b4d57..04011c1d 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java @@ -63,7 +63,7 @@ public class RegistrationLocalHandlerClient extends RegistrationLocalHandler { channel.pipeline().remove(this); // Event though a local channel is XOR with everything else, we still have to make the client clean up it's state. - registrationWrapper.continueRegistration0(); + registrationWrapper.registerNextProtocol0(); Connection connection = metaChannel.connection; registrationWrapper.connectionConnected0(connection); diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index 64798086..9053318c 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -316,6 +316,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler { * The server will override this. * Only called if we have a UDP channel when we finalize the setup of the TCP connection */ + @SuppressWarnings("unused") protected void setupServerUdpConnection(MetaChannel metaChannel) { } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java index 49fb7afc..3a8617c8 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java @@ -296,7 +296,7 @@ public class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandle metaChannel.ecdhKey = null; // notify the client that we are ready to continue registering other session protocols (bootstraps) - boolean isDoneWithRegistration = registrationWrapper2.continueRegistration0(); + boolean isDoneWithRegistration = registrationWrapper2.registerNextProtocol0(); // tell the server we are done, and to setup crypto on it's side if (isDoneWithRegistration) { diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java index 0f32846d..33f09d0f 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java @@ -152,7 +152,7 @@ public class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandle // hooray! we are successful // notify the client that we are ready to continue registering other session protocols (bootstraps) - boolean isDoneWithRegistration = registrationWrapper2.continueRegistration0(); + boolean isDoneWithRegistration = registrationWrapper2.registerNextProtocol0(); // tell the server we are done, and to setup crypto on it's side if (isDoneWithRegistration) { diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java index 2de277e1..e6322f2c 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java @@ -145,7 +145,7 @@ public class RegistrationRemoteHandlerClientUDT extends RegistrationRemoteHandle // hooray! we are successful // notify the client that we are ready to continue registering other session protocols (bootstraps) - boolean isDoneWithRegistration = registrationWrapper2.continueRegistration0(); + boolean isDoneWithRegistration = registrationWrapper2.registerNextProtocol0(); // tell the server we are done, and to setup crypto on it's side if (isDoneWithRegistration) { diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java index 2c5b8dde..dba52e8c 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java @@ -17,7 +17,6 @@ import org.slf4j.Logger; import dorkbox.network.Broadcast; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -76,8 +75,9 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec { public KryoEncoder(SerializationManager kryoWrapper) { super(); this.kryoWrapper = kryoWrapper; - optimize = OptimizeUtilsByteBuf.get(); + this.optimize = OptimizeUtilsByteBuf.get(); } // the crypto writer will override this - protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) { + @SuppressWarnings("unused") + protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext context, Object msg, ByteBuf buffer) { // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. kryoWrapper.write(buffer, msg); } @@ -42,7 +43,7 @@ public class KryoEncoder extends MessageToByteEncoder { out.writeInt(0); // put an int in, which is the same size as reservedLengthIndex try { - writeObject(kryoWrapper, ctx, msg, out); + writeObject(this.kryoWrapper, ctx, msg, out); // now set the frame (if it's TCP)! int length = out.readableBytes() - startIndex - reservedLengthIndex; // (reservedLengthLength) 4 is the reserved space for the integer. diff --git a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java index 1bc09006..bc739734 100644 --- a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java +++ b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java @@ -31,7 +31,8 @@ public class KryoEncoderUdp extends MessageToMessageEncoder { } // the crypto writer will override this - protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) { + @SuppressWarnings("unused") + protected void writeObject(SerializationManager kryoWrapper, ChannelHandlerContext context, Object msg, ByteBuf buffer) { // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. kryoWrapper.write(buffer, msg); } @@ -43,7 +44,7 @@ public class KryoEncoderUdp extends MessageToMessageEncoder { ByteBuf outBuffer = Unpooled.buffer(maxSize); // no size info, since this is UDP, it is not segmented - writeObject(kryoWrapper, ctx, msg, outBuffer); + writeObject(this.kryoWrapper, ctx, msg, outBuffer); // have to check to see if we are too big for UDP! diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java b/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java index 25e03f37..5a5ada0d 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java @@ -10,6 +10,10 @@ class AsmCachedMethod extends CachedMethod { @Override public Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { - return this.methodAccess.invoke(target, this.methodAccessIndex, args); + try { + return this.methodAccess.invoke(target, this.methodAccessIndex, args); + } catch (Exception ex) { + throw new InvocationTargetException(ex); + } } } diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java index aba3c9e7..06e9d05e 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java @@ -1,14 +1,8 @@ package dorkbox.network.rmi; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.KryoSerializable; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; /** Internal message to invoke methods remotely. */ -class InvokeMethod implements KryoSerializable, RmiMessages { +class InvokeMethod implements RmiMessages { public int objectID; public CachedMethod cachedMethod; public Object[] args; @@ -18,54 +12,7 @@ class InvokeMethod implements KryoSerializable, RmiMessages { // possible duplicate IDs. A response data of 0 means to not respond. public byte responseData; - @Override - @SuppressWarnings("rawtypes") - public void write(Kryo kryo, Output output) { - output.writeInt(this.objectID, true); - output.writeInt(this.cachedMethod.methodClassID, true); - output.writeByte(this.cachedMethod.methodIndex); - Serializer[] serializers = this.cachedMethod.serializers; - Object[] args = this.args; - for (int i = 0, n = serializers.length; i < n; i++) { - Serializer serializer = serializers[i]; - if (serializer != null) { - kryo.writeObjectOrNull(output, args[i], serializer); - } else { - kryo.writeClassAndObject(output, args[i]); - } - } - - output.writeByte(this.responseData); - } - - @Override - public void read(Kryo kryo, Input input) { - this.objectID = input.readInt(true); - - int methodClassID = input.readInt(true); - Class methodClass = kryo.getRegistration(methodClassID).getType(); - - byte methodIndex = input.readByte(); - try { - this.cachedMethod = RmiBridge.getMethods(kryo, methodClass)[methodIndex]; - } catch (IndexOutOfBoundsException ex) { - throw new KryoException("Invalid method index " + methodIndex + " for class: " + methodClass.getName()); - } - - Serializer[] serializers = this.cachedMethod.serializers; - Class[] parameterTypes = this.cachedMethod.method.getParameterTypes(); - Object[] args = new Object[serializers.length]; - this.args = args; - for (int i = 0, n = args.length; i < n; i++) { - Serializer serializer = serializers[i]; - if (serializer != null) { - args[i] = kryo.readObjectOrNull(input, parameterTypes[i], serializer); - } else { - args[i] = kryo.readClassAndObject(input); - } - } - - this.responseData = input.readByte(); + public InvokeMethod() { } } diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodPoolable.java b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodPoolable.java new file mode 100644 index 00000000..d83bcb1a --- /dev/null +++ b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodPoolable.java @@ -0,0 +1,10 @@ +package dorkbox.network.rmi; + +import dorkbox.util.objectPool.PoolableObject; + +public class InvokeMethodPoolable implements PoolableObject { + @Override + public InvokeMethod create() { + return new InvokeMethod(); + } +} \ No newline at end of file diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodSerializer.java b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodSerializer.java new file mode 100644 index 00000000..39fd6c9c --- /dev/null +++ b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodSerializer.java @@ -0,0 +1,71 @@ +package dorkbox.network.rmi; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** Internal message to invoke methods remotely. */ +class InvokeMethodSerializer extends Serializer { + private RmiBridge rmi; + + public InvokeMethodSerializer(RmiBridge rmi) { + this.rmi = rmi; + } + + @Override + @SuppressWarnings("rawtypes") + public void write(Kryo kryo, Output output, InvokeMethod object) { + output.writeInt(object.objectID, true); + output.writeInt(object.cachedMethod.methodClassID, true); + output.writeByte(object.cachedMethod.methodIndex); + + Serializer[] serializers = object.cachedMethod.serializers; + Object[] args = object.args; + for (int i = 0, n = serializers.length; i < n; i++) { + Serializer serializer = serializers[i]; + if (serializer != null) { + kryo.writeObjectOrNull(output, args[i], serializer); + } else { + kryo.writeClassAndObject(output, args[i]); + } + } + + output.writeByte(object.responseData); + } + + @Override + public InvokeMethod read(Kryo kryo, Input input, Class type) { + InvokeMethod invokeMethod = new InvokeMethod(); + + invokeMethod.objectID = input.readInt(true); + + int methodClassID = input.readInt(true); + Class methodClass = kryo.getRegistration(methodClassID).getType(); + + byte methodIndex = input.readByte(); + try { + invokeMethod.cachedMethod = this.rmi.getMethods(kryo, methodClass)[methodIndex]; + } catch (IndexOutOfBoundsException ex) { + throw new KryoException("Invalid method index " + methodIndex + " for class: " + methodClass.getName()); + } + + Serializer[] serializers = invokeMethod.cachedMethod.serializers; + Class[] parameterTypes = invokeMethod.cachedMethod.method.getParameterTypes(); + Object[] args = new Object[serializers.length]; + invokeMethod.args = args; + for (int i = 0, n = args.length; i < n; i++) { + Serializer serializer = serializers[i]; + if (serializer != null) { + args[i] = kryo.readObjectOrNull(input, parameterTypes[i], serializer); + } else { + args[i] = kryo.readClassAndObject(input); + } + } + + invokeMethod.responseData = input.readByte(); + + return invokeMethod; + } +} diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java index 010daef1..7d943c97 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java @@ -12,6 +12,8 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.ListenerRaw; import dorkbox.network.util.exceptions.NetException; +import dorkbox.util.objectPool.ObjectPool; +import dorkbox.util.objectPool.ObjectPoolHolder; /** Handles network communication when methods are invoked on a proxy. */ class RemoteInvocationHandler implements InvocationHandler { @@ -26,6 +28,9 @@ class RemoteInvocationHandler implements InvocationHandler { private boolean transmitExceptions = true; private boolean remoteToString; + private boolean udp; + private boolean udt; + private Byte lastResponseID; private byte nextResponseId = 1; @@ -37,8 +42,11 @@ class RemoteInvocationHandler implements InvocationHandler { final InvokeMethodResult[] responseTable = new InvokeMethodResult[64]; final boolean[] pendingResponses = new boolean[64]; - public RemoteInvocationHandler(Connection connection, final int objectID) { + private final ObjectPool invokeMethodPool; + + public RemoteInvocationHandler(ObjectPool invokeMethodPool, Connection connection, final int objectID) { super(); + this.invokeMethodPool = invokeMethodPool; this.connection = connection; this.objectID = objectID; @@ -97,6 +105,12 @@ class RemoteInvocationHandler implements InvocationHandler { } else if (name.equals("setTransmitExceptions")) { this.transmitExceptions = (Boolean) args[0]; return null; + } else if (name.equals("setUDP")) { + this.udp = (Boolean)args[0]; + return null; + } else if (name.equals("setUDT")) { + this.udt = (Boolean)args[0]; + return null; } else if (name.equals("setRemoteToString")) { this.remoteToString = (Boolean) args[0]; return null; @@ -124,12 +138,15 @@ class RemoteInvocationHandler implements InvocationHandler { return ""; } - InvokeMethod invokeMethod = new InvokeMethod(); + EndPoint endPoint = this.connection.getEndPoint(); + RmiBridge rmi = (RmiBridge) endPoint.rmi(); + + ObjectPoolHolder invokeMethodHolder = this.invokeMethodPool.take(); + InvokeMethod invokeMethod = invokeMethodHolder.getValue(); invokeMethod.objectID = this.objectID; invokeMethod.args = args; - EndPoint endPoint = this.connection.getEndPoint(); - CachedMethod[] cachedMethods = RmiBridge.getMethods(endPoint.getSerialization().getSingleInstanceUnsafe(), method.getDeclaringClass()); + CachedMethod[] cachedMethods = rmi.getMethods(endPoint.getSerialization().getSingleInstanceUnsafe(), method.getDeclaringClass()); for (int i = 0, n = cachedMethods.length; i < n; i++) { CachedMethod cachedMethod = cachedMethods[i]; if (cachedMethod.method.equals(method)) { @@ -145,7 +162,7 @@ class RemoteInvocationHandler implements InvocationHandler { // An invocation doesn't need a response is if it's async and no return values or exceptions are wanted back. - boolean needsResponse = this.transmitReturnValue || this.transmitExceptions || !this.nonBlocking; + boolean needsResponse = !this.udp && (this.transmitReturnValue || this.transmitExceptions || !this.nonBlocking); byte responseID = 0; if (needsResponse) { synchronized (this) { @@ -159,17 +176,23 @@ class RemoteInvocationHandler implements InvocationHandler { // Pack other data into the high bits. byte responseData = responseID; if (this.transmitReturnValue) { - responseData |= RmiBridge.returnValMask; + responseData |= RmiBridge.returnValueMask; } if (this.transmitExceptions) { - responseData |= RmiBridge.returnExMask; + responseData |= RmiBridge.returnExceptionMask; } invokeMethod.responseData = responseData; } else { invokeMethod.responseData = 0; // A response data of 0 means to not respond. } - this.connection.send().TCP(invokeMethod).flush(); + if (this.udp) { + this.connection.send().UDP(invokeMethod).flush(); + } else if (this.udt) { + this.connection.send().UDT(invokeMethod).flush(); + } else { + this.connection.send().TCP(invokeMethod).flush(); + } if (logger.isDebugEnabled()) { String argString = ""; @@ -182,8 +205,9 @@ class RemoteInvocationHandler implements InvocationHandler { } this.lastResponseID = (byte)(invokeMethod.responseData & RmiBridge.responseIdMask); + this.invokeMethodPool.release(invokeMethodHolder); - if (this.nonBlocking) { + if (this.nonBlocking || this.udp) { Class returnType = method.getReturnType(); if (returnType.isPrimitive()) { if (returnType == int.class) { @@ -232,7 +256,6 @@ class RemoteInvocationHandler implements InvocationHandler { } private Object waitForResponse(byte responseID) { - long endTime = System.currentTimeMillis() + this.timeoutMillis; long remaining = this.timeoutMillis; @@ -311,4 +334,4 @@ class RemoteInvocationHandler implements InvocationHandler { } return true; } -} \ No newline at end of file +} diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java index deb15714..5f8c1a57 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java @@ -9,74 +9,84 @@ import dorkbox.network.connection.Connection; * @author Nathan Sweet */ public interface RemoteObject { - /** Sets the milliseconds to wait for a method to return value. Default is 3000. */ - public void setResponseTimeout(int timeoutMillis); + /** Sets the milliseconds to wait for a method to return value. Default is 3000. */ + public void setResponseTimeout(int timeoutMillis); - /** - * Sets the blocking behavior when invoking a remote method. Default is false. - * - * @param nonBlocking If false, the invoking thread will wait for the remote method to return or timeout (default). If - * true, the invoking thread will not wait for a response. The method will return immediately and the return - * value should be ignored. If they are being transmitted, the return value or any thrown exception can later - * be retrieved with {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. The responses will be - * stored until retrieved, so each method call should have a matching retrieve. - */ - public void setNonBlocking(boolean nonBlocking); + /** + * Sets the blocking behavior when invoking a remote method. Default is false. + * + * @param nonBlocking If false, the invoking thread will wait for the remote method to return or timeout (default). If true, the + * invoking thread will not wait for a response. The method will return immediately and the return value should be ignored. If + * they are being transmitted, the return value or any thrown exception can later be retrieved with + * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. The responses will be stored until retrieved, so each method + * call should have a matching retrieve. + */ + public void setNonBlocking(boolean nonBlocking); - /** - * Sets whether return values are sent back when invoking a remote method. Default is true. - * - * @param transmit If true, then the return value for non-blocking method invocations can be retrieved with - * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. If false, then non-primitive return values - * for remote method invocations are not sent by the remote side of the connection and the response can never - * be retrieved. This can also be used to save bandwidth if you will not check the return value of a blocking - * remote invocation. Note that an exception could still be returned by {@link #waitForLastResponse()} or - * {@link #waitForResponse(byte)} if {@link #setTransmitExceptions(boolean)} is true. - */ - public void setTransmitReturnValue(boolean transmit); + /** + * Sets whether return values are sent back when invoking a remote method. Default is true. + * + * @param transmit If true, then the return value for non-blocking method invocations can be retrieved with + * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. If false, then non-primitive return values for remote method + * invocations are not sent by the remote side of the connection and the response can never be retrieved. This can also be used + * to save bandwidth if you will not check the return value of a blocking remote invocation. Note that an exception could still + * be returned by {@link #waitForLastResponse()} or {@link #waitForResponse(byte)} if {@link #setTransmitExceptions(boolean)} is + * true. + */ + public void setTransmitReturnValue(boolean transmit); - /** - * Sets whether exceptions are sent back when invoking a remote method. Default is true. - * - * @param transmit If false, exceptions will be unhandled and rethrown as RuntimeExceptions inside the invoking - * thread. This is the legacy behavior. If true, behavior is dependent on whether - * {@link #setNonBlocking(boolean)}. If non-blocking is true, the exception will be serialized and sent back to - * the call site of the remotely invoked method, where it will be re-thrown. If non-blocking is false, an - * exception will not be thrown in the calling thread but instead can be retrieved with - * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}, similar to a return value. - */ - public void setTransmitExceptions(boolean transmit); + /** + * Sets whether exceptions are sent back when invoking a remote method. Default is true. + * + * @param transmit If false, exceptions will be unhandled and rethrown as RuntimeExceptions inside the invoking thread. This is the + * legacy behavior. If true, behavior is dependent on whether {@link #setNonBlocking(boolean)}. If non-blocking is true, the + * exception will be serialized and sent back to the call site of the remotely invoked method, where it will be re-thrown. If + * non-blocking is false, an exception will not be thrown in the calling thread but instead can be retrieved with + * {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}, similar to a return value. + */ + public void setTransmitExceptions(boolean transmit); - /** - * If false, calls to {@link Object#toString()} will return "" instead of being invoking the remote method. - * Default is false. - */ - public void setRemoteToString(boolean remoteToString); + /** + * If true, UDP will be used to send the remote method invocation. UDP remote method invocations will never return a response and the + * invoking thread will not wait for a response. + */ + public void setUDP(boolean udp); - /** - * Waits for the response to the last method invocation to be received or the response timeout to be reached. Must not - * be called from the connection's update thread. - * - * @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...) - */ - public Object waitForLastResponse(); + /** + * If true, UDT will be used to send the remote method invocation. UDT remote method invocations will return a response and the + * invoking thread will wait for a response. + */ + public void setUDT(boolean udt); - /** Gets the ID of response for the last method invocation. */ - public byte getLastResponseID(); + /** + * If false, calls to {@link Object#toString()} will return "" instead of being invoking the remote method. Default is false. + */ + public void setRemoteToString(boolean remoteToString); - /** - * Waits for the specified method invocation response to be received or the response timeout to be reached. Must not - * be called from the connection's update thread. Response IDs use a six bit identifier, with one identifier reserved - * for "no response". This means that this method should be called to get the result for a non-blocking call before an - * additional 63 non-blocking calls are made, or risk undefined behavior due to identical IDs. - * - * @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...) - */ - public Object waitForResponse(byte responseID); + /** + * Waits for the response to the last method invocation to be received or the response timeout to be reached. Must not be called from + * the connection's update thread. + * + * @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...) + */ + public Object waitForLastResponse(); - /** Causes this RemoteObject to stop listening to the connection for method invocation response messages. */ - public void close(); + /** Gets the ID of response for the last method invocation. */ + public byte getLastResponseID(); - /** Returns the local connection for this remote object. */ - public Connection getConnection(); + /** + * Waits for the specified method invocation response to be received or the response timeout to be reached. Must not be called from the + * connection's update thread. Response IDs use a six bit identifier, with one identifier reserved for "no response". This means that + * this method should be called to get the result for a non-blocking call before an additional 63 non-blocking calls are made, or risk + * undefined behavior due to identical IDs. + * + * @see RmiBridge#getRemoteObject(dorkbox.Connection.connection.interfaces.IConnection.Connection, int, Class...) + */ + public Object waitForResponse(byte responseID); + + /** Causes this RemoteObject to stop listening to the connection for method invocation response messages. */ + public void close(); + + /** Returns the local connection for this remote object. */ + public Connection getConnection(); } diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java index 9c2b37ca..1950264b 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java @@ -7,6 +7,7 @@ import com.esotericsoftware.kryo.io.Output; import com.sun.xml.internal.ws.encoding.soap.SerializationException; import dorkbox.network.connection.Connection; +import dorkbox.network.connection.EndPoint; /** * Serializes an object registered with the RmiBridge so the receiving side @@ -16,11 +17,16 @@ import dorkbox.network.connection.Connection; * @author Nathan Sweet */ public class RemoteObjectSerializer extends Serializer { + + private final RmiBridge rmi; + + public RemoteObjectSerializer(EndPoint endpoint) { + this.rmi = (RmiBridge) endpoint.rmi(); + } + @Override public void write(Kryo kryo, Output output, T object) { - @SuppressWarnings("unchecked") - Connection connection = (Connection) kryo.getContext().get(Connection.connection); - int id = RmiBridge.getRegisteredId(connection, object); + int id = this.rmi.getRegisteredId(object); if (id == Integer.MAX_VALUE) { throw new SerializationException("Object not found in an ObjectSpace: " + object); } @@ -33,6 +39,6 @@ public class RemoteObjectSerializer extends Serializer { public T read(Kryo kryo, Input input, Class type) { int objectID = input.readInt(true); Connection connection = (Connection) kryo.getContext().get(Connection.connection); - return (T) RmiBridge.getRemoteObject(connection, objectID, type); + return (T) this.rmi.getRemoteObject(connection, objectID, type); } } \ No newline at end of file diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/Rmi.java b/Dorkbox-Network/src/dorkbox/network/rmi/Rmi.java new file mode 100644 index 00000000..05f59069 --- /dev/null +++ b/Dorkbox-Network/src/dorkbox/network/rmi/Rmi.java @@ -0,0 +1,22 @@ +package dorkbox.network.rmi; + + + +public interface Rmi { + /** + * Registers an object to allow the remote end of the RmiBridge connections to access it using the specified ID. + * + * @param objectID Must not be Integer.MAX_VALUE. + */ + public void register(int objectID, Object object); + + /** + * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. + */ + public void remove(int objectID); + + /** + * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. + */ + public void remove(Object object); +} diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java b/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java index 507eb45a..511d2363 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java @@ -9,9 +9,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.slf4j.Logger; @@ -30,9 +31,11 @@ import dorkbox.network.connection.ListenerRaw; import dorkbox.network.util.SerializationManager; import dorkbox.network.util.exceptions.NetException; import dorkbox.util.collections.ObjectIntMap; +import dorkbox.util.objectPool.ObjectPool; +import dorkbox.util.objectPool.ObjectPoolFactory; /** - * Allows methods on objects to be invoked remotely over TCP. Objects are + * Allows methods on objects to be invoked remotely over TCP, UDP, or UDT. Objects are * {@link #register(int, Object) registered} with an ID. The remote end of * connections that have been {@link #addConnection(Connection) added} are * allowed to {@link #getRemoteObject(Connection, int, Class) access} registered @@ -46,89 +49,79 @@ import dorkbox.util.collections.ObjectIntMap; * * @author Nathan Sweet , Nathan Robinson */ -public class RmiBridge { +public class RmiBridge implements Rmi { private static final String OBJECT_ID = "objectID"; - static CopyOnWriteArrayList instances = new CopyOnWriteArrayList(); - - private static final HashMap, CachedMethod[]> methodCache = new HashMap, CachedMethod[]>(); + static final int returnValueMask = 1 << 7; + static final int returnExceptionMask = 1 << 6; + static final int responseIdMask = 0xFF & ~returnValueMask & ~returnExceptionMask; - static final int returnValMask = 1 << 7; - static final int returnExMask = 1 << 6; - static final int responseIdMask = 0xff & ~returnValMask & ~returnExMask; + // the name of who created this RmiBridge + private final org.slf4j.Logger logger; - private static boolean asm = true; + private HashMap, CachedMethod[]> methodCache = new HashMap, CachedMethod[]>(); - // can be access by DIFFERENT threads. - volatile IntMap idToObject = new IntMap(); - volatile ObjectIntMap objectToID = new ObjectIntMap(); + private final boolean asm; - private CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); + // can be accessed by DIFFERENT threads. + private ReentrantReadWriteLock objectLock = new ReentrantReadWriteLock(); + private IntMap idToObject = new IntMap(); + private ObjectIntMap objectToID = new ObjectIntMap(); private Executor executor; - // the name of who created this object space. - private final org.slf4j.Logger logger; + // 4096 concurrent method invocations max + private final ObjectPool invokeMethodPool = ObjectPoolFactory.create(new InvokeMethodPoolable(), 4096); private final ListenerRaw invokeListener = new ListenerRaw() { - @Override - public void received(final Connection connection, final InvokeMethod invokeMethod) { - boolean found = false; + @Override + public void received(final Connection connection, final InvokeMethod invokeMethod) { + ReadLock readLock = RmiBridge.this.objectLock.readLock(); + readLock.lock(); - Iterator iterator = RmiBridge.this.connections.iterator(); - while (iterator.hasNext()) { - Connection c = iterator.next(); - if (c == connection) { - found = true; - break; + final Object target = RmiBridge.this.idToObject.get(invokeMethod.objectID); + + readLock.unlock(); + + + if (target == null) { + Logger logger2 = RmiBridge.this.logger; + if (logger2.isWarnEnabled()) { + logger2.warn("Ignoring remote invocation request for unknown object ID: {}", invokeMethod.objectID); + } + + return; + } + + Executor executor2 = RmiBridge.this.executor; + if (executor2 == null) { + invoke(connection, + target, + invokeMethod); + } else { + executor2.execute(new Runnable() { + @Override + public void run() { + invoke(connection, + target, + invokeMethod); } - } - - // The InvokeMethod message is not for a connection in this ObjectSpace. - if (!found) { - return; - } - - final Object target = RmiBridge.this.idToObject.get(invokeMethod.objectID); - if (target == null) { - RmiBridge.this.logger.warn("Ignoring remote invocation request for unknown object ID: {}", invokeMethod.objectID); - return; - } - - Executor executor2 = RmiBridge.this.executor; - if (executor2 == null) { - invoke(connection, - target, - invokeMethod); - } else { - executor2.execute(new Runnable() { - @Override - public void run() { - invoke(connection, - target, - invokeMethod); - } - }); - } + }); } - - @Override - public void disconnected(Connection connection) { - removeConnection(connection); - } - }; + } + }; /** - * Creates an ObjectSpace with no connections. Connections must be + * Creates an RmiBridge with no connections. Connections must be * {@link #connectionConnected(Connection) added} to allow the remote end of * the connections to access objects in this ObjectSpace. - *

- * For safety, this should ONLY be called by {@link EndPoint#getRmiBridge() } */ - public RmiBridge(Logger logger) { + public RmiBridge(Logger logger, SerializationManager serializationManager) { this.logger = logger; - instances.addIfAbsent(this); + this.asm = serializationManager.getSingleInstanceUnsafe().getAsmEnabled(); + + registerClasses(serializationManager); } /** @@ -143,22 +136,12 @@ public class RmiBridge { this.executor = executor; } - /** If true, an attempt will be made to use ReflectASM for invoking methods. Default is true. */ - static public void setAsm(boolean asm) { - RmiBridge.asm = asm; - } - /** - * Registers an object to allow the remote end of the ObjectSpace's - * connections to access it using the specified ID. - *

- * If a connection is added to multiple ObjectSpaces, the same object ID - * should not be registered in more than one of those ObjectSpaces. + * Registers an object to allow the remote end of the RmiBridge connections to access it using the specified ID. * - * @param objectID - * Must not be Integer.MAX_VALUE. - * @see #getRemoteObject(Connection, int, Class...) + * @param objectID Must not be Integer.MAX_VALUE. */ + @Override public void register(int objectID, Object object) { if (objectID == Integer.MAX_VALUE) { throw new IllegalArgumentException("objectID cannot be Integer.MAX_VALUE."); @@ -166,9 +149,15 @@ public class RmiBridge { if (object == null) { throw new IllegalArgumentException("object cannot be null."); } + + WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); + writeLock.lock(); + this.idToObject.put(objectID, object); this.objectToID.put(object, objectID); + writeLock.unlock(); + Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { logger2.trace("Object registered with ObjectSpace as {}:{}", objectID, object); @@ -176,33 +165,20 @@ public class RmiBridge { } /** - * Causes this ObjectSpace to stop listening to the connections for method - * invocation messages. - */ - public void close() { - Iterator iterator = this.connections.iterator(); - while (iterator.hasNext()) { - Connection connection = iterator.next(); - connection.listeners().remove(this.invokeListener); - } - - instances.remove(this); - Logger logger2 = this.logger; - if (logger2.isTraceEnabled()) { - logger2.trace("Closed ObjectSpace."); - } - } - - /** - * Removes an object. The remote end of the ObjectSpace's connections will - * no longer be able to access it. + * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. */ + @Override public void remove(int objectID) { + WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); + writeLock.lock(); + Object object = this.idToObject.remove(objectID); if (object != null) { this.objectToID.remove(object, 0); } + writeLock.unlock(); + Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object); @@ -210,11 +186,15 @@ public class RmiBridge { } /** - * Removes an object. The remote end of the ObjectSpace's connections will - * no longer be able to access it. + * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. */ + @Override public void remove(Object object) { + WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); + writeLock.lock(); + if (!this.idToObject.containsValue(object, true)) { + writeLock.unlock(); return; } @@ -222,52 +202,27 @@ public class RmiBridge { this.idToObject.remove(objectID); this.objectToID.remove(object, 0); + writeLock.unlock(); + Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object); } } - /** - * Allows the remote end of the specified connection to access objects - * registered in this ObjectSpace. - */ - public void addConnection(Connection connection) { - if (connection == null) { - throw new IllegalArgumentException("connection cannot be null."); - } - - this.connections.addIfAbsent(connection); - connection.listeners().add(this.invokeListener); - - Logger logger2 = this.logger; - if (logger2.isTraceEnabled()) { - logger2.trace("Added connection to ObjectSpace: {}", connection); - } - } /** - * Removes the specified connection, it will no longer be able to access - * objects registered in this ObjectSpace. + * @return the invocation listener */ - public void removeConnection(Connection connection) { - if (connection == null) { - throw new IllegalArgumentException("connection cannot be null."); - } - - connection.listeners().remove(this.invokeListener); - this.connections.remove(connection); - - Logger logger2 = this.logger; - if (logger2.isTraceEnabled()) { - logger2.trace("Removed connection from ObjectSpace: {}", connection); - } + @SuppressWarnings("rawtypes") + public ListenerRaw getListener() { + return this.invokeListener; } /** * Invokes the method on the object and, if necessary, sends the result back * to the connection that made the invocation request. This method is - * invoked on the update thread of the {@link EndPoint} for this ObjectSpace + * invoked on the update thread of the {@link EndPoint} for this RmiBridge * and unless an {@link #setExecutor(Executor) executor} has been set. * * @param connection @@ -292,8 +247,8 @@ public class RmiBridge { byte responseData = invokeMethod.responseData; - boolean transmitReturnVal = (responseData & returnValMask) == returnValMask; - boolean transmitExceptions = (responseData & returnExMask) == returnExMask; + boolean transmitReturnVal = (responseData & returnValueMask) == returnValueMask; + boolean transmitExceptions = (responseData & returnExceptionMask) == returnExceptionMask; int responseID = responseData & responseIdMask; Object result = null; @@ -342,17 +297,6 @@ public class RmiBridge { // logger.error("{} sent data: {} with id ({})", connection, result, invokeMethod.responseID); } - /** - * Identical to {@link #getRemoteObject(C, int, Class...)} except returns - * the object cast to the specified interface type. The returned object - * still implements {@link RemoteObject}. - */ - static public T getRemoteObject(final C connection, int objectID, Class iface) { - @SuppressWarnings({"unchecked"}) - T remoteObject = (T) getRemoteObject(connection, objectID, new Class[] {iface}); - return remoteObject; - } - /** * Returns a proxy object that implements the specified interfaces. Methods * invoked on the proxy object will be invoked remotely on the object with @@ -377,7 +321,7 @@ public class RmiBridge { * * @see RemoteObject */ - public static RemoteObject getRemoteObject(Connection connection, int objectID, Class... ifaces) { + public RemoteObject getRemoteObject(Connection connection, int objectID, Class... ifaces) { if (connection == null) { throw new IllegalArgumentException("connection cannot be null."); } @@ -391,11 +335,11 @@ public class RmiBridge { return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, - new RemoteInvocationHandler(connection, objectID)); + new RemoteInvocationHandler(this.invokeMethodPool, connection, objectID)); } - static CachedMethod[] getMethods(Kryo kryo, Class type) { - CachedMethod[] cachedMethods = methodCache.get(type); // Maybe should cache per Kryo instance? + public CachedMethod[] getMethods(Kryo kryo, Class type) { + CachedMethod[] cachedMethods = this.methodCache.get(type); // Maybe should cache per Kryo instance? if (cachedMethods != null) { return cachedMethods; } @@ -453,7 +397,7 @@ public class RmiBridge { }); Object methodAccess = null; - if (asm && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) { + if (this.asm && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) { methodAccess = MethodAccess.get(type); } @@ -493,73 +437,47 @@ public class RmiBridge { cachedMethods[i] = cachedMethod; } - methodCache.put(type, cachedMethods); + this.methodCache.put(type, cachedMethods); return cachedMethods; } /** - * Returns the first object registered with the specified ID in any of the - * ObjectSpaces the specified connection belongs to. + * Returns the object registered with the specified ID. */ - static Object getRegisteredObject(Connection connection, int objectID) { - CopyOnWriteArrayList instances = RmiBridge.instances; - for (RmiBridge objectSpace : instances) { - // Check if the connection is in this ObjectSpace. - Iterator iterator = objectSpace.connections.iterator(); - while (iterator.hasNext()) { - Connection c = iterator.next(); - if (c != connection) { - continue; - } + Object getRegisteredObject(int objectID) { + ReadLock readLock = this.objectLock.readLock(); + readLock.lock(); - // Find an object with the objectID. - Object object = objectSpace.idToObject.get(objectID); - if (object != null) { - return object; - } - } - } + // Find an object with the objectID. + Object object = this.idToObject.get(objectID); + readLock.unlock(); - return null; + return object; } /** - * Returns the first ID registered for the specified object with any of the - * ObjectSpaces the specified connection belongs to, or Integer.MAX_VALUE - * if not found. + * Returns the ID registered for the specified object, or Integer.MAX_VALUE if not found. */ - public static int getRegisteredId(Connection connection, Object object) { - CopyOnWriteArrayList instances = RmiBridge.instances; - for (RmiBridge objectSpace : instances) { - // Check if the connection is in this ObjectSpace. - Iterator iterator = objectSpace.connections.iterator(); - while (iterator.hasNext()) { - Connection c = iterator.next(); - if (c != connection) { - continue; - } + public int getRegisteredId(Object object) { + // Find an ID with the object. + ReadLock readLock = this.objectLock.readLock(); - // Find an ID with the object. - int id = objectSpace.objectToID.get(object, Integer.MAX_VALUE); - if (id != Integer.MAX_VALUE) { - return id; - } - } - } + readLock.lock(); + int id = this.objectToID.get(object, Integer.MAX_VALUE); + readLock.unlock(); - return Integer.MAX_VALUE; + return id; } /** - * Registers the classes needed to use ObjectSpaces. This should be called - * before any connections are opened. + * Registers the classes needed to use RMI. This should be called before any connections are opened. */ - public static void registerClasses(final SerializationManager smanager) { - smanager.registerForRmiClasses(new RmiRegisterClassesCallback() { + private void registerClasses(final SerializationManager manager) { + manager.registerForRmiClasses(new RmiRegisterClassesCallback() { @Override public void registerForClasses(Kryo kryo) { kryo.register(Object[].class); - kryo.register(InvokeMethod.class); + kryo.register(InvokeMethod.class, new InvokeMethodSerializer(RmiBridge.this)); FieldSerializer resultSerializer = new FieldSerializer(kryo, InvokeMethodResult.class) { @Override @@ -590,7 +508,7 @@ public class RmiBridge { public Object read(Kryo kryo, Input input, Class type) { int objectID = input.readInt(true); Connection connection = (Connection) kryo.getContext().get(Connection.connection); - Object object = getRegisteredObject(connection, objectID); + Object object = getRegisteredObject(objectID); if (object == null) { final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RmiBridge.class); logger.warn("Unknown object ID {} for connection: {}", objectID, connection); diff --git a/Dorkbox-Network/src/dorkbox/network/util/KryoSerializationManager.java b/Dorkbox-Network/src/dorkbox/network/util/KryoSerializationManager.java index e44e9b36..6b52f5e0 100644 --- a/Dorkbox-Network/src/dorkbox/network/util/KryoSerializationManager.java +++ b/Dorkbox-Network/src/dorkbox/network/util/KryoSerializationManager.java @@ -659,6 +659,7 @@ public class KryoSerializationManager implements SerializationManager { } + @SuppressWarnings("unused") private static void compress(ByteBuf inputBuffer, ByteBuf outputBuffer, int length, Deflater compress) { byte[] in = new byte[inputBuffer.readableBytes()];