diff --git a/Dorkbox-Network/src/dorkbox/network/Client.java b/Dorkbox-Network/src/dorkbox/network/Client.java index d6977de0..05b18448 100644 --- a/Dorkbox-Network/src/dorkbox/network/Client.java +++ b/Dorkbox-Network/src/dorkbox/network/Client.java @@ -31,7 +31,6 @@ import dorkbox.network.util.udt.UdtEndpointProxy; import dorkbox.util.NamedThreadFactory; import dorkbox.util.OS; import dorkbox.util.exceptions.InitializationException; -import dorkbox.util.exceptions.NetException; import dorkbox.util.exceptions.SecurityException; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; @@ -60,7 +59,7 @@ import java.net.InetSocketAddress; */ @SuppressWarnings("unused") public -class Client extends EndPointClient implements Connection { +class Client extends EndPointClient implements Connection { /** * Starts a LOCAL only client, with the default local channel name and serialization scheme @@ -134,6 +133,10 @@ class Client extends EndPointClient implements Connection { throw new IllegalArgumentException("You must define what host you want to connect to."); } + if (options.tcpPort < 0 && options.udpPort < 0 && options.udtPort < 0) { + throw new IllegalArgumentException("You must define what port you want to connect to."); + } + if (options.tcpPort > 0) { Bootstrap tcpBootstrap = new Bootstrap(); this.bootstraps.add(new BootstrapWrapper("TCP", options.tcpPort, tcpBootstrap)); @@ -251,17 +254,21 @@ class Client extends EndPointClient implements Connection { /** * Allows the client to reconnect to the last connected server + * + * @throws InterruptedException if the client is unable to reconnect in the previously requested connection-timeout */ public - void reconnect() { + void reconnect() throws IOException { reconnect(this.connectionTimeout); } /** * Allows the client to reconnect to the last connected server + * + * @throws InterruptedException if the client is unable to reconnect in the requested time */ public - void reconnect(int connectionTimeout) { + void reconnect(int connectionTimeout) throws IOException { // close out all old connections close(); @@ -271,9 +278,11 @@ class Client extends EndPointClient implements Connection { /** * will attempt to connect to the server, with a 30 second timeout. + * + * @throws IOException if the client is unable to connect in 30 seconds */ public - void connect() { + void connect() throws IOException { connect(30000); } @@ -283,9 +292,10 @@ class Client extends EndPointClient implements Connection { * will BLOCK until completed * * @param connectionTimeout wait for x milliseconds. 0 will wait indefinitely + * @throws IOException if the client is unable to connect in the requested time */ public - void connect(int connectionTimeout) { + void connect(int connectionTimeout) throws IOException { this.connectionTimeout = connectionTimeout; // make sure we are not trying to connect during a close or stop event. @@ -303,22 +313,24 @@ class Client extends EndPointClient implements Connection { try { this.registrationLock.wait(connectionTimeout); } catch (InterruptedException e) { - this.logger.error("Registration thread interrupted!"); + throw new IOException("Unable to complete registration within '" + connectionTimeout + "' milliseconds", e); } } + + connection = this.connectionManager.getConnection0(); } @Override public boolean hasRemoteKeyChanged() { - return this.connectionManager.getConnection0().hasRemoteKeyChanged(); + return this.connection.hasRemoteKeyChanged(); } @Override public String getRemoteHost() { - return this.connectionManager.getConnection0().getRemoteHost(); + return this.connection.getRemoteHost(); } @Override @@ -330,25 +342,25 @@ class Client extends EndPointClient implements Connection { @Override public int id() { - return this.connectionManager.getConnection0().id(); + return this.connection.id(); } @Override public String idAsHex() { - return this.connectionManager.getConnection0().idAsHex(); + return this.connection.idAsHex(); } @Override public boolean hasUDP() { - return this.connectionManager.getConnection0().hasUDP(); + return this.connection.hasUDP(); } @Override public boolean hasUDT() { - return this.connectionManager.getConnection0().hasUDT(); + return this.connection.hasUDT(); } /** @@ -357,8 +369,7 @@ class Client extends EndPointClient implements Connection { @Override public IdleBridge sendOnIdle(IdleSender sender) { - return this.connectionManager.getConnection0() - .sendOnIdle(sender); + return this.connection.sendOnIdle(sender); } /** @@ -367,33 +378,13 @@ class Client extends EndPointClient implements Connection { @Override public IdleBridge sendOnIdle(Object message) { - return this.connectionManager.getConnection0() - .sendOnIdle(message); + return this.connection.sendOnIdle(message); } - /** - * Fetches the connection used by the client. - *

- * Make sure that you only call this after the client connects! - *

- * This is preferred to {@link EndPoint#getConnections()} getConnections()}, as it properly does some error checking - */ - public - Connection getConnection() { - return this.connectionManager.getConnection0(); - } - - /** - * Closes all connections ONLY (keeps the server/client running). - *

- * This is used, for example, when reconnecting to a server. - */ @Override public - void close() { - synchronized (this.registrationLock) { - this.registrationLock.notify(); - } + void closeAsap() { +//TODO } /** @@ -421,11 +412,11 @@ class Client extends EndPointClient implements Connection { */ @Override public - Iface createRemoteObject(final Class remoteImplementationClass) throws NetException { - return this.connectionManager.getConnection0().createRemoteObject(remoteImplementationClass); + Iface createRemoteObject(final Class remoteImplementationClass) throws IOException { + return this.connectionManager.getConnection0() + .createRemoteObject(remoteImplementationClass); } - /** * Returns a new proxy object implements the specified interface. Methods invoked on the proxy object will be * invoked remotely on the object with the specified ID in the ObjectSpace for the current connection. @@ -451,7 +442,33 @@ class Client extends EndPointClient implements Connection { */ @Override public - Iface getRemoteObject(final int objectId) throws NetException { - return this.connectionManager.getConnection0().getRemoteObject(objectId); + Iface getRemoteObject(final int objectId) throws IOException { + return this.connectionManager.getConnection0() + .getRemoteObject(objectId); + } + + /** + * Fetches the connection used by the client. + *

+ * Make sure that you only call this after the client connects! + *

+ * This is preferred to {@link EndPoint#getConnections()} getConnections()}, as it properly does some error checking + */ + public + C getConnection() { + return this.connection; + } + + /** + * Closes all connections ONLY (keeps the server/client running). + *

+ * This is used, for example, when reconnecting to a server. + */ + @Override + public + void close() { + synchronized (this.registrationLock) { + this.registrationLock.notify(); + } } } diff --git a/Dorkbox-Network/src/dorkbox/network/Server.java b/Dorkbox-Network/src/dorkbox/network/Server.java index 0f5a8107..1a3bbebf 100644 --- a/Dorkbox-Network/src/dorkbox/network/Server.java +++ b/Dorkbox-Network/src/dorkbox/network/Server.java @@ -15,6 +15,7 @@ */ package dorkbox.network; +import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPointServer; import dorkbox.network.connection.registration.local.RegistrationLocalHandlerServer; import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerTCP; @@ -56,7 +57,7 @@ import java.io.IOException; * To put it bluntly, ONLY have the server do work inside of a listener! */ public -class Server extends EndPointServer { +class Server extends EndPointServer { /** * The maximum queue length for incoming connection indications (a request to connect). If a connection indication arrives when @@ -323,6 +324,7 @@ class Server extends EndPointServer { * you want to continue running code after this method invocation, bind should be called in a separate, * non-daemon thread - or with false as the parameter. */ + @SuppressWarnings("AutoBoxing") public void bind(boolean blockUntilTerminate) { // make sure we are not trying to connect during a close or stop event. diff --git a/Dorkbox-Network/src/dorkbox/network/connection/Bridge.java b/Dorkbox-Network/src/dorkbox/network/connection/Bridge.java index c106f34a..97b1d7de 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/Bridge.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/Bridge.java @@ -18,12 +18,12 @@ package dorkbox.network.connection; import dorkbox.network.connection.wrapper.ChannelWrapper; public -class Bridge { +class Bridge { final ChannelWrapper channelWrapper; - final ISessionManager sessionManager; + final ISessionManager sessionManager; - Bridge(ChannelWrapper channelWrapper, ISessionManager sessionManager) { + Bridge(ChannelWrapper channelWrapper, ISessionManager sessionManager) { this.channelWrapper = channelWrapper; this.sessionManager = sessionManager; } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/Connection.java b/Dorkbox-Network/src/dorkbox/network/connection/Connection.java index 67e49807..c3579343 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/Connection.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/Connection.java @@ -20,7 +20,8 @@ import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleSender; import dorkbox.network.rmi.RemoteObject; import dorkbox.network.rmi.TimeoutException; -import dorkbox.util.exceptions.NetException; + +import java.io.IOException; @SuppressWarnings("unused") public @@ -40,7 +41,7 @@ interface Connection { /** * @return the endpoint associated with this connection */ - EndPoint getEndPoint(); + EndPoint getEndPoint(); /** @@ -89,6 +90,12 @@ interface Connection { */ void close(); + /** + * Marks the connection to be closed as soon as possible. This is evaluated when the current + * thread execution returns to the network stack. + */ + void closeAsap(); + /** * Returns a new proxy object implements the specified interface. Methods invoked on the proxy object will be * invoked remotely on the object with the specified ID in the ObjectSpace for the current connection. @@ -112,7 +119,7 @@ interface Connection { * * @see RemoteObject */ - Iface createRemoteObject(final Class remoteImplementationClass) throws NetException; + Iface createRemoteObject(final Class remoteImplementationClass) throws IOException; /** @@ -138,5 +145,7 @@ interface Connection { * * @see RemoteObject */ - Iface getRemoteObject(final int objectId) throws NetException; + Iface getRemoteObject(final int objectId) throws IOException; + + } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java index 4804d86a..b313ec13 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java @@ -25,11 +25,10 @@ import dorkbox.network.connection.ping.PingTuple; import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; import dorkbox.network.connection.wrapper.ChannelNull; import dorkbox.network.connection.wrapper.ChannelWrapper; +import dorkbox.network.rmi.RMI; import dorkbox.network.rmi.RemoteObject; -import dorkbox.network.rmi.RemoteProxy; import dorkbox.network.rmi.RmiBridge; import dorkbox.network.rmi.RmiRegistration; -import dorkbox.util.exceptions.NetException; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -72,19 +71,22 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, private final Object messageInProgressLock = new Object(); private final AtomicBoolean messageInProgress = new AtomicBoolean(false); - private ISessionManager sessionManager; + private ISessionManager sessionManager; private ChannelWrapper channelWrapper; private volatile PingFuture pingFuture = null; // used to store connection local listeners (instead of global listeners). Only possible on the server. - private volatile ConnectionManager localListenerManager; + private volatile ConnectionManager localListenerManager; // while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error. private boolean remoteKeyChanged; + private final EndPoint endPoint; - private final EndPoint endPoint; + // when true, the connection will be closed (either as RMI or as 'normal' listener execution) when the thread execution returns control + // back to the network stack + private boolean closeAsap = false; private volatile ObjectRegistrationLatch objectRegistrationLatch; private final Object remoteObjectLock = new Object(); @@ -442,6 +444,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, this.messageInProgressLock.notifyAll(); } } + + // in some cases, we want to close the current connection -- and given the way the system is designed, we cannot always close it before + // we return. This will let us close the connection when our business logic is finished. + if (closeAsap) { + close(); + } } @Override @@ -551,6 +559,16 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, } } + /** + * Marks the connection to be closed as soon as possible. This is evaluated when the current + * thread execution returns to the network stack. + */ + @Override + public final + void closeAsap() { + closeAsap = true; + } + @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { @@ -611,7 +629,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, // is empty, we can remove it from this connection. synchronized (this) { if (this.localListenerManager == null) { - this.localListenerManager = ((EndPointServer) this.endPoint).addListenerManager(this); + this.localListenerManager = ((EndPointServer) this.endPoint).addListenerManager(this); } this.localListenerManager.add(listener); } @@ -654,7 +672,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, this.localListenerManager.remove(listener); if (!this.localListenerManager.hasListeners()) { - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPoint).removeListenerManager(this); } } } @@ -687,7 +705,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, this.localListenerManager.removeAll(); this.localListenerManager = null; - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPoint).removeListenerManager(this); } } } @@ -721,7 +739,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, if (!this.localListenerManager.hasListeners()) { this.localListenerManager = null; - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPoint).removeListenerManager(this); } } } @@ -780,7 +798,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"}) @Override public final - Iface createRemoteObject(final Class remoteImplementationClass) throws NetException { + Iface createRemoteObject(final Class remoteImplementationClass) throws IOException { // only one register can happen at a time synchronized (remoteObjectLock) { objectRegistrationLatch = new ObjectRegistrationLatch(); @@ -793,12 +811,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, if (!objectRegistrationLatch.latch.await(2, TimeUnit.SECONDS)) { final String errorMessage = "Timed out getting registration ID for: " + remoteImplementationClass; logger.error(errorMessage); - throw new NetException(errorMessage); + throw new IOException(errorMessage); } } catch (InterruptedException e) { final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass; logger.error(errorMessage, e); - throw new NetException(errorMessage, e); + throw new IOException(errorMessage, e); } // local var to prevent double hit on volatile field @@ -806,7 +824,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, if (latch.hasError) { final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass; logger.error(errorMessage); - throw new NetException(errorMessage); + throw new IOException(errorMessage); } return (Iface) latch.remoteObject; @@ -816,7 +834,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"}) @Override public final - Iface getRemoteObject(final int objectId) throws NetException { + Iface getRemoteObject(final int objectId) throws IOException { // only one register can happen at a time synchronized (remoteObjectLock) { objectRegistrationLatch = new ObjectRegistrationLatch(); @@ -829,12 +847,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, if (!objectRegistrationLatch.latch.await(2, TimeUnit.SECONDS)) { final String errorMessage = "Timed out getting registration for ID: " + objectId; logger.error(errorMessage); - throw new NetException(errorMessage); + throw new IOException(errorMessage); } } catch (InterruptedException e) { final String errorMessage = "Error getting registration for ID: " + objectId; logger.error(errorMessage, e); - throw new NetException(errorMessage, e); + throw new IOException(errorMessage, e); } // local var to prevent double hit on volatile field @@ -842,7 +860,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, if (latch.hasError) { final String errorMessage = "Error getting registration for ID: " + objectId; logger.error(errorMessage); - throw new NetException(errorMessage); + throw new IOException(errorMessage); } return (Iface) latch.remoteObject; @@ -883,7 +901,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, if (annotations != null) { for (Annotation annotation : annotations) { - if (annotation.annotationType().equals(RemoteProxy.class)) { + if (annotation.annotationType().equals(RMI.class)) { boolean prev = field.isAccessible(); field.setAccessible(true); final Object o = field.get(remoteClassObject.object); diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java index 4c2404af..8bf5d07a 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java @@ -17,10 +17,10 @@ package dorkbox.network.connection; import dorkbox.network.rmi.RmiMessages; import dorkbox.network.util.ConcurrentHashMapFactory; -import dorkbox.util.exceptions.NetException; import dorkbox.util.ClassHelper; import org.slf4j.Logger; +import java.io.IOException; import java.lang.reflect.Type; import java.util.*; import java.util.Map.Entry; @@ -29,14 +29,14 @@ import java.util.concurrent.CopyOnWriteArrayList; //note that we specifically DO NOT implement equals/hashCode, because we cannot create two separate // objects that are somehow equal to each other. public -class ConnectionManager implements ListenerBridge, ISessionManager { +class ConnectionManager implements ListenerBridge, ISessionManager { public static Listener unRegisteredType_Listener = null; // these are final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change) private final ConcurrentHashMapFactory>> listeners; - private final ConcurrentHashMapFactory localManagers; - private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); + private final ConcurrentHashMapFactory> localManagers; + private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); /** * Used by the listener subsystem to determine types. @@ -61,13 +61,13 @@ class ConnectionManager implements ListenerBridge, ISessionManager { } }; - this.localManagers = new ConcurrentHashMapFactory() { + this.localManagers = new ConcurrentHashMapFactory>() { private static final long serialVersionUID = 1L; @Override public - ConnectionManager createNewObject(Object... args) { - return new ConnectionManager(loggerName + "-" + args[0] + " Specific", ConnectionManager.this.baseClass); + ConnectionManager createNewObject(Object... args) { + return new ConnectionManager(loggerName + "-" + args[0] + " Specific", ConnectionManager.this.baseClass); } }; } @@ -283,7 +283,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { // now have to account for additional connection listener managers (non-global). - ConnectionManager localManager = this.localManagers.get(connection); + ConnectionManager localManager = this.localManagers.get(connection); if (localManager != null) { // if we found a listener during THIS method call, we need to let the NEXT method call know, // so it doesn't spit out error for not handling a message (since that message MIGHT have @@ -328,7 +328,11 @@ class ConnectionManager implements ListenerBridge, ISessionManager { return; } - listener.idle(connection); + try { + listener.idle(connection); + } catch (IOException e) { + logger.error("Unable to notify listener on idle.", e); + } } connection.send() .flush(); @@ -336,7 +340,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { } // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); + ConnectionManager localManager = this.localManagers.get(connection); if (localManager != null) { localManager.notifyOnIdle(connection); } @@ -349,7 +353,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { */ @Override public - void connectionConnected(Connection connection) { + void connectionConnected(C connection) { // create a new connection! this.connections.add(connection); @@ -371,7 +375,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { } // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); + ConnectionManager localManager = this.localManagers.get(connection); if (localManager != null) { localManager.connectionConnected(connection); } @@ -387,7 +391,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { */ @Override public - void connectionDisconnected(Connection connection) { + void connectionDisconnected(C connection) { Set>>> entrySet = this.listeners.entrySet(); CopyOnWriteArrayList> list; for (Entry>> entry : entrySet) { @@ -404,7 +408,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { } // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); + ConnectionManager localManager = this.localManagers.get(connection); if (localManager != null) { localManager.connectionDisconnected(connection); @@ -442,7 +446,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { } // now have to account for additional (local) listener managers. - ConnectionManager localManager = this.localManagers.get(connection); + ConnectionManager localManager = this.localManagers.get(connection); if (localManager != null) { localManager.connectionError(connection, throwable); } @@ -455,20 +459,20 @@ class ConnectionManager implements ListenerBridge, ISessionManager { */ @Override public - List getConnections() { + List getConnections() { return Collections.unmodifiableList(this.connections); } final - ConnectionManager addListenerManager(Connection connection) { + ConnectionManager addListenerManager(Connection connection) { // when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections // are notified of that listener. // it is POSSIBLE to add a connection-specfic listener (via connection.addListener), meaning that ONLY // that listener is notified on that event (ie, admin type listeners) - ConnectionManager lm = this.localManagers.getOrCreate(connection, connection.toString()); + ConnectionManager lm = this.localManagers.getOrCreate(connection, connection.toString()); Logger logger2 = this.logger; if (logger2.isDebugEnabled()) { @@ -490,7 +494,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { * @return Returns a FAST list of active connections. */ public final - Collection getConnections0() { + Collection getConnections0() { return this.connections; } @@ -500,14 +504,14 @@ class ConnectionManager implements ListenerBridge, ISessionManager { * @return Returns a FAST first connection (for client!). */ public final - Connection getConnection0() { + C getConnection0() throws IOException { if (this.connections.iterator() .hasNext()) { return this.connections.iterator() .next(); } else { - throw new NetException("Not connected to a remote computer. Unable to continue!"); + throw new IOException("Not connected to a remote computer. Unable to continue!"); } } @@ -540,7 +544,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager { final void closeConnections() { // close the sessions - Iterator iterator = this.connections.iterator(); + Iterator iterator = this.connections.iterator(); //noinspection WhileLoopReplaceableByForEach while (iterator.hasNext()) { Connection connection = iterator.next(); diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java index 98e1688b..b130d37a 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java @@ -62,7 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * represents the base of a client/server end point */ public abstract -class EndPoint { +class EndPoint { // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! // it results in severe UDP packet loss and contention. // @@ -122,7 +122,7 @@ class EndPoint { protected final org.slf4j.Logger logger; protected final Class type; - protected final ConnectionManager connectionManager; + protected final ConnectionManager connectionManager; protected final CryptoSerializationManager serializationManager; protected final RegistrationWrapper registrationWrapper; @@ -137,9 +137,6 @@ class EndPoint { private final Executor rmiExecutor; private final boolean rmiEnabled; - // When using RMI, we want to keep track of what our current thread execution connection is. This is because we store state info in the - // connection object. This is the only way to retrieve this info "out-of-band" - private final ThreadLocal currentConnection = new ThreadLocal(); // the eventLoop groups are used to track and manage the event loops for startup/shutdown private final List eventLoopGroups = new ArrayList(8); @@ -193,7 +190,6 @@ class EndPoint { // setup our RMI serialization managers. Can only be called once serializationManager.initRmiSerialization(); } - rmiExecutor = options.rmiExecutor; @@ -278,7 +274,7 @@ class EndPoint { // we don't care about un-instantiated/constructed members, since the class type is the only interest. - this.connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass()); + this.connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass()); // add the ping listener (internal use only!) this.connectionManager.add(new PingSystemListener()); @@ -291,6 +287,8 @@ class EndPoint { else { this.globalRmiBridge = null; } + + serializationManager.finishInit(); } public @@ -314,8 +312,8 @@ class EndPoint { */ @SuppressWarnings("unchecked") public - T getPropertyStore() { - return (T) this.propertyStore; + S getPropertyStore() { + return (S) this.propertyStore; } /** @@ -395,7 +393,7 @@ class EndPoint { * @return a new network connection */ protected - ConnectionImpl newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { + ConnectionImpl newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { return new ConnectionImpl(logger, endPoint, rmiBridge); } @@ -437,7 +435,7 @@ class EndPoint { metaChannel.connection = connection; // now initialize the connection channels with whatever extra info they might need. - connection.init(new Bridge(wrapper, this.connectionManager)); + connection.init(new Bridge(wrapper, this.connectionManager)); if (rmiBridge != null) { // notify our remote object space that it is able to receive method calls. @@ -461,13 +459,14 @@ class EndPoint { *

* Only the CLIENT injects in front of this) */ + @SuppressWarnings("unchecked") void connectionConnected0(ConnectionImpl connection) { this.isConnected.set(true); // prep the channel wrapper connection.prep(); - this.connectionManager.connectionConnected(connection); + this.connectionManager.connectionConnected((C) connection); } /** @@ -482,7 +481,7 @@ class EndPoint { * Returns a non-modifiable list of active connections */ public - List getConnections() { + List getConnections() { return this.connectionManager.getConnections(); } @@ -491,8 +490,8 @@ class EndPoint { */ @SuppressWarnings("unchecked") public - Collection getConnectionsAs() { - return (Collection) this.connectionManager.getConnections(); + Collection getConnectionsAs() { + return this.connectionManager.getConnections(); } /** @@ -505,7 +504,7 @@ class EndPoint { * Registers a tool with the server, to be used by other services. */ public - void registerTool(T toolClass) { + void registerTool(Tool toolClass) { if (toolClass == null) { throw new IllegalArgumentException("Tool must not be null! Unable to add tool"); } @@ -520,13 +519,13 @@ class EndPoint { * Only get the tools in the ModuleStart (ie: load) methods. If done in the constructor, the tool might not be available yet */ public - T getTool(Class toolClass) { + Tool getTool(Class toolClass) { if (toolClass == null) { throw new IllegalArgumentException("Tool must not be null! Unable to add tool"); } @SuppressWarnings("unchecked") - T tool = (T) this.toolMap.get(toolClass); + Tool tool = (Tool) this.toolMap.get(toolClass); return tool; } @@ -734,6 +733,7 @@ class EndPoint { return result; } + @SuppressWarnings("rawtypes") @Override public boolean equals(Object obj) { @@ -788,19 +788,4 @@ class EndPoint { globalRmiBridge.register(globalObjectId, globalObject); return globalObjectId; } - - /** - * When using RMI, we want to keep track of what our current thread execution connection is. - *

- * This is because we store state info in the connection object. This is the only way to retrieve this info "out-of-band" - */ - public - Connection getCurrentConnection() { - return this.currentConnection.get(); - } - - public - void setCurrentConnection(final Connection currentConnection) { - this.currentConnection.set(currentConnection); - } } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java index a4ffd850..4946a56d 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java @@ -34,7 +34,9 @@ import java.util.concurrent.atomic.AtomicInteger; * This serves the purpose of making sure that specific methods are not available to the end user. */ public -class EndPointClient extends EndPoint implements Runnable { +class EndPointClient extends EndPoint implements Runnable { + + protected C connection; protected final Object registrationLock = new Object(); protected final AtomicInteger connectingBootstrap = new AtomicInteger(0); @@ -152,8 +154,7 @@ class EndPointClient extends EndPoint implements Runnable { ConnectionBridge send() { ConnectionBridgeFlushAlways connectionBridgeFlushAlways2 = this.connectionBridgeFlushAlways; if (connectionBridgeFlushAlways2 == null) { - ConnectionBridge clientBridge = this.connectionManager.getConnection0() - .send(); + ConnectionBridge clientBridge = this.connection.send(); this.connectionBridgeFlushAlways = new ConnectionBridgeFlushAlways(clientBridge); } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java index ba66fc82..9a55a221 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java @@ -27,7 +27,7 @@ import java.io.IOException; * This serves the purpose of making sure that specific methods are not available to the end user. */ public -class EndPointServer extends EndPoint { +class EndPointServer extends EndPoint { private final ServerConnectionBridge serverConnections; @@ -57,7 +57,7 @@ class EndPointServer extends EndPoint { * @return a newly created listener manager for the connection */ final - ConnectionManager addListenerManager(Connection connection) { + ConnectionManager addListenerManager(C connection) { return this.connectionManager.addListenerManager(connection); } @@ -71,7 +71,7 @@ class EndPointServer extends EndPoint { * This removes the listener manager for that specific connection */ final - void removeListenerManager(Connection connection) { + void removeListenerManager(C connection) { this.connectionManager.removeListenerManager(connection); } } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ISessionManager.java b/Dorkbox-Network/src/dorkbox/network/connection/ISessionManager.java index e175bb42..bf78d689 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ISessionManager.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ISessionManager.java @@ -18,29 +18,29 @@ package dorkbox.network.connection; import java.util.Collection; public -interface ISessionManager { +interface ISessionManager { /** * Called when a message is received */ - void notifyOnMessage(Connection connection, Object message); + void notifyOnMessage(T connection, Object message); /** * Called when the connection has been idle (read & write) for 2 seconds */ - void notifyOnIdle(Connection connection); + void notifyOnIdle(T connection); - void connectionConnected(Connection connection); + void connectionConnected(T connection); - void connectionDisconnected(Connection connection); + void connectionDisconnected(T connection); /** * Called when there is an error of some kind during the up/down stream process */ - void connectionError(Connection connection, Throwable throwable); + void connectionError(T connection, Throwable throwable); /** * Returns a non-modifiable list of active connections */ - Collection getConnections(); + Collection getConnections(); } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/KryoCryptoSerializationManager.java b/Dorkbox-Network/src/dorkbox/network/connection/KryoCryptoSerializationManager.java index 6aae8e96..4f66f5e9 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/KryoCryptoSerializationManager.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/KryoCryptoSerializationManager.java @@ -27,7 +27,6 @@ import dorkbox.network.connection.ping.PingMessage; import dorkbox.network.rmi.*; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.crypto.Crypto; -import dorkbox.util.exceptions.NetException; import dorkbox.util.objectPool.ObjectPool; import dorkbox.util.objectPool.ObjectPoolFactory; import dorkbox.util.objectPool.PoolableObject; @@ -43,6 +42,7 @@ import org.bouncycastle.crypto.params.IESWithCipherParameters; import org.jctools.util.Pow2; import org.slf4j.Logger; +import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; @@ -91,7 +91,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { public static boolean useUnsafeMemory = false; private static final String OBJECT_ID = "objectID"; - private boolean rmiInitialized = false; + private boolean initialized = false; /** * The default serialization manager. This is static, since serialization must be consistent within the JVM. This can be changed. @@ -159,7 +159,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { private static - void decompress(ByteBuf inputBuffer, ByteBuf outputBuffer, Inflater decompress) { + void decompress(ByteBuf inputBuffer, ByteBuf outputBuffer, Inflater decompress) throws IOException { byte[] in = new byte[inputBuffer.readableBytes()]; inputBuffer.readBytes(in); @@ -173,7 +173,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { numBytes = decompress.inflate(out, 0, out.length); } catch (DataFormatException e) { logger.error("Error inflating data.", e); - throw new NetException(e.getCause()); + throw new IOException(e.getCause()); } outputBuffer.writeBytes(out, 0, numBytes); @@ -213,7 +213,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } private static - void snappyDecompress(ByteBuf inputBuffer, ByteBuf outputBuffer, SnappyAccess snappy) { + void snappyDecompress(ByteBuf inputBuffer, ByteBuf outputBuffer, SnappyAccess snappy) throws IOException { try { int idx = inputBuffer.readerIndex(); final int inSize = inputBuffer.writerIndex() - idx; @@ -280,7 +280,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { break; } } catch (Exception e) { - throw new NetException("Unable to decompress SNAPPY data!! " + e.getMessage()); + throw new IOException("Unable to decompress SNAPPY data!! " + e.getMessage()); } } @@ -429,6 +429,34 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { }, capacity); } + /** + * If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is + * automatically registered using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}. + * + * @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true. + * @see ClassResolver#getRegistration(Class) + */ + @Override + public synchronized + Registration getRegistration(Class clazz) { + Kryo kryo = null; + Registration r = null; + + try { + kryo = this.pool.take(); + r = kryo.getRegistration(clazz); + } catch (InterruptedException e) { + final String msg = "Interrupted during getRegistration()"; + logger.error(msg); + } finally { + if (kryo != null) { + this.pool.release(kryo); + } + } + + return r; + } + /** * Registers the class using the lowest, next available integer ID and the * {@link Kryo#getDefaultSerializer(Class) default serializer}. If the class @@ -443,6 +471,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { @Override public synchronized void register(Class clazz) { + if (initialized) { + throw new RuntimeException("Cannot register classes after initialization."); + } + Kryo kryo; try { for (int i = 0; i < capacity; i++) { @@ -468,6 +500,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { @Override public synchronized void register(Class clazz, Serializer serializer) { + if (initialized) { + throw new RuntimeException("Cannot register classes after initialization."); + } + Kryo kryo; try { for (int i = 0; i < capacity; i++) { @@ -495,6 +531,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { @Override public synchronized void register(Class clazz, Serializer serializer, int id) { + if (initialized) { + throw new RuntimeException("Cannot register classes after initialization."); + } + Kryo kryo; try { for (int i = 0; i < capacity; i++) { @@ -507,27 +547,75 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } } + /** + * Objects that we want to use RMI with must be accessed via an interface. This method configures the serialization of an + * implementation to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation + * class is ALREADY registered, then it's registration will be overwritten by this one + * + * @param ifaceClass The interface used to access the remote object + * @param implClass The implementation class of the interface + */ + @Override + public synchronized + void registerRemote(final Class ifaceClass, final Class implClass) { + register(implClass, new RemoteObjectSerializer()); + + // After all common registrations, register OtherObjectImpl only on the server using the remote object interface ID. + // This causes OtherObjectImpl to be serialized as OtherObject. + int otherObjectID = getRegistration(implClass).getId(); + + // this overrides the 'otherObjectID' with the specified class/serializer, so that when we WRITE this ID, the impl ID is written. + register(ifaceClass, new RemoteObjectSerializer(), otherObjectID); + + // we have to save this info in CachedMethod. + CachedMethod.registerOverridden(ifaceClass, implClass); + } + /** * Necessary to register classes for RMI, only called once when the RMI bridge is created. */ @Override public synchronized void initRmiSerialization() { - if (rmiInitialized) { + if (initialized) { + // already initialized. return; } - rmiInitialized = true; + + InvokeMethodSerializer methodSerializer = new InvokeMethodSerializer(); + Serializer invocationSerializer = new Serializer() { + @Override + public + void write(Kryo kryo, Output output, Object object) { + RemoteInvocationHandler handler = (RemoteInvocationHandler) Proxy.getInvocationHandler(object); + output.writeInt(handler.objectID, true); + } + + @Override + @SuppressWarnings({"unchecked", "AutoBoxing"}) + public + Object read(Kryo kryo, Input input, Class type) { + int objectID = input.readInt(true); + + KryoExtra kryoExtra = (KryoExtra) kryo; + Object object = kryoExtra.connection.getRegisteredObject(objectID); + + if (object == null) { + logger.error("Unknown object ID in RMI ObjectSpace: {}", objectID); + } + return object; + } + }; Kryo kryo; try { for (int i = 0; i < capacity; i++) { kryo = this.pool.take(); - // necessary for the RMI bridge. Only called once, but for all kryo instances kryo.register(Class.class); kryo.register(RmiRegistration.class); + kryo.register(InvokeMethod.class, methodSerializer); kryo.register(Object[].class); - kryo.register(InvokeMethod.class, new InvokeMethodSerializer()); FieldSerializer resultSerializer = new FieldSerializer(kryo, InvokeMethodResult.class) { @Override @@ -547,30 +635,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { }; resultSerializer.removeField(OBJECT_ID); kryo.register(InvokeMethodResult.class, resultSerializer); - - kryo.register(InvocationHandler.class, new Serializer() { - @Override - public - void write(Kryo kryo, Output output, Object object) { - RemoteInvocationHandler handler = (RemoteInvocationHandler) Proxy.getInvocationHandler(object); - output.writeInt(handler.objectID, true); - } - - @Override - @SuppressWarnings({"unchecked"}) - public - Object read(Kryo kryo, Input input, Class type) { - int objectID = input.readInt(true); - - KryoExtra kryoExtra = (KryoExtra) kryo; - Object object = kryoExtra.connection.getRegisteredObject(objectID); - - if (object == null) { - logger.error("Unknown object ID in RMI ObjectSpace: {}", objectID); - } - return object; - } - }); + kryo.register(InvocationHandler.class, invocationSerializer); this.pool.release(kryo); } @@ -579,6 +644,19 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } } + /** + * Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration. + */ + public + void finishInit() { + initialized = true; + } + + @Override + public + boolean initialized() { + return initialized; + } /** * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. @@ -589,7 +667,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public final - void write(ByteBuf buffer, Object message) { + void write(ByteBuf buffer, Object message) throws IOException { write0(null, buffer, message, false); } @@ -602,7 +680,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public final - Object read(ByteBuf buffer, int length) { + Object read(ByteBuf buffer, int length) throws IOException { return read0(null, buffer, length, false); } @@ -611,7 +689,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public - void writeFullClassAndObject(Output output, Object value) { + void writeFullClassAndObject(final Logger logger, Output output, Object value) throws IOException { Kryo kryo = null; boolean prev = false; @@ -623,8 +701,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { kryo.writeClassAndObject(output, value); } catch (Exception ex) { final String msg = "Unable to serialize buffer"; - logger.error(msg, ex); - throw new NetException(msg, ex); + if (logger != null) { + logger.error(msg, ex); + } + throw new IOException(msg, ex); } finally { if (kryo != null) { kryo.setRegistrationRequired(prev); @@ -635,7 +715,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { @Override public - Object readFullClassAndObject(final Input input) { + Object readFullClassAndObject(final Logger logger, final Input input) throws IOException { Kryo kryo = null; boolean prev = false; @@ -647,8 +727,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { return kryo.readClassAndObject(input); } catch (Exception ex) { final String msg = "Unable to deserialize buffer"; - logger.error(msg, ex); - throw new NetException(msg, ex); + if (logger != null) { + logger.error(msg, ex); + } + throw new IOException(msg, ex); } finally { if (kryo != null) { kryo.setRegistrationRequired(prev); @@ -669,61 +751,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { this.pool.release(kryo); } - /** - * If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is - * automatically registered using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}. - * - * @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true. - * @see ClassResolver#getRegistration(Class) - */ - @Override - public - Registration getRegistration(Class clazz) { - Kryo kryo = null; - Registration r = null; - - try { - kryo = this.pool.take(); - r = kryo.getRegistration(clazz); - } catch (InterruptedException e) { - final String msg = "Interrupted during getRegistration()"; - logger.error(msg); - } finally { - if (kryo != null) { - this.pool.release(kryo); - } - } - - return r; - } - - /** - * Objects that we want to use RMI with must be accessed via an interface. This method configures the serialization of an - * implementation to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation - * class is ALREADY registered, then it's registration will be overwritten by this one - * - * @param ifaceClass The interface used to access the remote object - * @param implClass The implementation class of the interface - */ - @Override - public - void registerRemote(final Class ifaceClass, final Class implClass) { - - register(implClass, new RemoteObjectSerializer()); - - // After all common registrations, register OtherObjectImpl only on the server using the remote object interface ID. - // This causes OtherObjectImpl to be serialized as OtherObject. - int otherObjectID = getRegistration(implClass).getId(); - - // this overrides the 'otherObjectID' with the specified class/serializer - register(ifaceClass, new RemoteObjectSerializer(), otherObjectID); - } - /** * Determines if this buffer is encrypted or not. */ - @Override - public final + public static boolean isEncrypted(ByteBuf buffer) { // read off the magic byte byte magicByte = buffer.getByte(buffer.readerIndex()); @@ -737,11 +768,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public final - void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) { - if (connection == null) { - throw new NetException("Unable to perform crypto when NO network connection!"); - } - + void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException { write0(connection, buffer, message, true); } @@ -752,11 +779,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public final - void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) { - if (connection == null) { - throw new NetException("Unable to perform crypto when NO network connection!"); - } - + void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException { write0(connection, buffer, message, true); } @@ -770,11 +793,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public final - Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) { - if (connection == null) { - throw new NetException("Unable to perform crypto when NO network connection!"); - } - + Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException { return read0(connection, buffer, length, true); } @@ -788,11 +807,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public final - Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) { - if (connection == null) { - throw new NetException("Unable to perform crypto when NO network connection!"); - } - + Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException { return read0(connection, buffer, length, true); } @@ -801,7 +816,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @SuppressWarnings("unchecked") private - void write0(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) { + void write0(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) throws IOException { final KryoExtra kryo = (KryoExtra) this.pool.takeUninterruptibly(); Logger logger2 = logger; @@ -826,7 +841,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { // connection will ALWAYS be of type Connection or NULL. // used by RMI/some serializers to determine which connection wrote this object // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI - kryo.connection = (ConnectionImpl) connection; + kryo.connection = connection; kryo.writeClassAndObject(kryo.output, message); @@ -893,7 +908,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } catch (Exception ex) { final String msg = "Unable to serialize buffer"; logger2.error(msg, ex); - throw new NetException(msg, ex); + throw new IOException(msg, ex); } finally { // release resources kryo.output.setBuffer(NULL_BUFFER); @@ -911,7 +926,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"}) private - Object read0(final ConnectionImpl connection, final ByteBuf buffer, final int length, final boolean doCrypto) { + Object read0(final ConnectionImpl connection, final ByteBuf buffer, final int length, final boolean doCrypto) throws IOException { final KryoExtra kryo = (KryoExtra) this.pool.takeUninterruptibly(); Logger logger2 = logger; @@ -938,7 +953,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { // AES CRYPTO STUFF if (doCrypto) { if ((magicByte & crypto) != crypto) { - throw new NetException("Unable to perform crypto when data does not use crypto!"); + throw new IOException("Unable to perform crypto when data does not use crypto!"); } if (logger2.isTraceEnabled()) { @@ -977,7 +992,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { // connection will ALWAYS be of type IConnection or NULL. // used by RMI/some serializers to determine which connection read this object // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI - kryo.connection = (ConnectionImpl) connection; + kryo.connection = connection; Object object = kryo.readClassAndObject(kryo.input); @@ -985,7 +1000,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } catch (Exception ex) { final String msg = "Unable to deserialize buffer"; logger2.error(msg, ex); - throw new NetException(msg, ex); + throw new IOException(msg, ex); } finally { // make sure the end of the buffer is in the correct spot. // move the reader index to the end of the object (since we are reading encrypted data diff --git a/Dorkbox-Network/src/dorkbox/network/connection/KryoExtra.java b/Dorkbox-Network/src/dorkbox/network/connection/KryoExtra.java index 42e69f2f..6e2b06be 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/KryoExtra.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/KryoExtra.java @@ -16,26 +16,14 @@ package dorkbox.network.connection; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.util.Util; -import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.network.pipeline.ByteBufInput; import dorkbox.network.pipeline.ByteBufOutput; -import dorkbox.network.rmi.AsmCachedMethod; -import dorkbox.network.rmi.CachedMethod; import dorkbox.util.crypto.bouncycastle.GCMBlockCipher_ByteBuf; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.compression.SnappyAccess; import org.bouncycastle.crypto.engines.AESFastEngine; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.zip.Deflater; import java.util.zip.Inflater; @@ -52,9 +40,6 @@ class KryoExtra extends Kryo { final ByteBuf tmpBuffer2; final GCMBlockCipher_ByteBuf aesEngine; - private final Map, CachedMethod[]> methodCache = new ConcurrentHashMap, CachedMethod[]>(); - private final boolean asmEnabled = !KryoCryptoSerializationManager.useUnsafeMemory; - // not thread safe public ConnectionImpl connection; @@ -72,111 +57,4 @@ class KryoExtra extends Kryo { this.tmpBuffer2 = Unpooled.buffer(1024); this.aesEngine = new GCMBlockCipher_ByteBuf(new AESFastEngine()); } - - public - CachedMethod[] getMethods(Class type) { - CachedMethod[] cachedMethods = this.methodCache.get(type); // Maybe should cache per Kryo instance? - if (cachedMethods != null) { - return cachedMethods; - } - - ArrayList allMethods = new ArrayList(); - - Class nextClass = type; - while (nextClass != null) { - Collections.addAll(allMethods, nextClass.getDeclaredMethods()); - nextClass = nextClass.getSuperclass(); - if (nextClass == Object.class) { - break; - } - } - - ArrayList methods = new ArrayList(Math.max(1, allMethods.size())); - for (int i = 0, n = allMethods.size(); i < n; i++) { - Method method = allMethods.get(i); - int modifiers = method.getModifiers(); - if (Modifier.isStatic(modifiers)) { - continue; - } - if (Modifier.isPrivate(modifiers)) { - continue; - } - if (method.isSynthetic()) { - continue; - } - methods.add(method); - } - Collections.sort(methods, new Comparator() { - @Override - public - int compare(Method o1, Method o2) { - // Methods are sorted so they can be represented as an index. - int diff = o1.getName() - .compareTo(o2.getName()); - if (diff != 0) { - return diff; - } - Class[] argTypes1 = o1.getParameterTypes(); - Class[] argTypes2 = o2.getParameterTypes(); - if (argTypes1.length > argTypes2.length) { - return 1; - } - if (argTypes1.length < argTypes2.length) { - return -1; - } - for (int i = 0; i < argTypes1.length; i++) { - diff = argTypes1[i].getName() - .compareTo(argTypes2[i].getName()); - if (diff != 0) { - return diff; - } - } - throw new RuntimeException("Two methods with same signature!"); // Impossible. - } - }); - - Object methodAccess = null; - if (asmEnabled && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) { - methodAccess = MethodAccess.get(type); - } - - - int n = methods.size(); - cachedMethods = new CachedMethod[n]; - for (int i = 0; i < n; i++) { - Method method = methods.get(i); - Class[] parameterTypes = method.getParameterTypes(); - - CachedMethod cachedMethod = null; - if (methodAccess != null) { - try { - AsmCachedMethod asmCachedMethod = new AsmCachedMethod(); - asmCachedMethod.methodAccessIndex = ((MethodAccess) methodAccess).getIndex(method.getName(), parameterTypes); - asmCachedMethod.methodAccess = (MethodAccess) methodAccess; - cachedMethod = asmCachedMethod; - } catch (RuntimeException ignored) { - } - } - - if (cachedMethod == null) { - cachedMethod = new CachedMethod(); - } - cachedMethod.method = method; - cachedMethod.methodClassID = getRegistration(method.getDeclaringClass()).getId(); - cachedMethod.methodIndex = i; - - // Store the serializer for each final parameter. - cachedMethod.serializers = new Serializer[parameterTypes.length]; - for (int ii = 0, nn = parameterTypes.length; ii < nn; ii++) { - if (isFinal(parameterTypes[ii])) { - cachedMethod.serializers[ii] = getSerializer(parameterTypes[ii]); - } - } - - cachedMethods[i] = cachedMethod; - } - - this.methodCache.put(type, cachedMethods); - return cachedMethods; - } } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java b/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java index 78763b57..7959986f 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ListenerRaw.java @@ -17,6 +17,8 @@ package dorkbox.network.connection; import dorkbox.util.ClassHelper; +import java.io.IOException; + public abstract class ListenerRaw { @@ -103,7 +105,7 @@ class ListenerRaw { */ @SuppressWarnings("unused") public - void idle(C connection) { + void idle(C connection) throws IOException { } /** diff --git a/Dorkbox-Network/src/dorkbox/network/connection/PropertyStore.java b/Dorkbox-Network/src/dorkbox/network/connection/PropertyStore.java index 23dd3290..2cfa3ad1 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/PropertyStore.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/PropertyStore.java @@ -55,9 +55,13 @@ class PropertyStore extends SettingsStore { public void init(Class type, final SerializationManager serializationManager, Storage storage) throws IOException { // make sure our custom types are registered - serializationManager.register(HashMap.class); - serializationManager.register(ByteArrayWrapper.class); - serializationManager.register(DB_Server.class); + // only register if not ALREADY initialized, since we can initialize in the server and in the client. This creates problems if + // running inside the same JVM (we don't permit it) + if (!serializationManager.initialized()) { + serializationManager.register(HashMap.class); + serializationManager.register(ByteArrayWrapper.class); + serializationManager.register(DB_Server.class); + } if (storage == null) { this.storage = Store.Memory() @@ -67,16 +71,16 @@ class PropertyStore extends SettingsStore { this.storage = storage; } - servers = this.storage.load(DatabaseStorage.SERVERS, new HashMap(16)); + servers = this.storage.getAndPut(DatabaseStorage.SERVERS, new HashMap(16)); //use map to keep track of recid, so we can get record info during restarts. - DB_Server localServer = servers.get(DB_Server.IP_0_0_0_0); + DB_Server localServer = servers.get(DB_Server.IP_LOCALHOST); if (localServer == null) { localServer = new DB_Server(); - servers.put(DB_Server.IP_0_0_0_0, localServer); + servers.put(DB_Server.IP_LOCALHOST, localServer); // have to always specify what we are saving - this.storage.commit(DatabaseStorage.SERVERS, servers); + this.storage.putAndSave(DatabaseStorage.SERVERS, servers); } } @@ -88,7 +92,7 @@ class PropertyStore extends SettingsStore { ECPrivateKeyParameters getPrivateKey() throws dorkbox.util.exceptions.SecurityException { checkAccess(EndPoint.class); - return servers.get(DB_Server.IP_0_0_0_0) + return servers.get(DB_Server.IP_LOCALHOST) .getPrivateKey(); } @@ -100,11 +104,11 @@ class PropertyStore extends SettingsStore { void savePrivateKey(ECPrivateKeyParameters serverPrivateKey) throws SecurityException { checkAccess(EndPoint.class); - servers.get(DB_Server.IP_0_0_0_0) + servers.get(DB_Server.IP_LOCALHOST) .setPrivateKey(serverPrivateKey); // have to always specify what we are saving - storage.commit(DatabaseStorage.SERVERS, servers); + storage.putAndSave(DatabaseStorage.SERVERS, servers); } /** @@ -115,7 +119,7 @@ class PropertyStore extends SettingsStore { ECPublicKeyParameters getPublicKey() throws SecurityException { checkAccess(EndPoint.class); - return servers.get(DB_Server.IP_0_0_0_0) + return servers.get(DB_Server.IP_LOCALHOST) .getPublicKey(); } @@ -127,11 +131,11 @@ class PropertyStore extends SettingsStore { void savePublicKey(ECPublicKeyParameters serverPublicKey) throws SecurityException { checkAccess(EndPoint.class); - servers.get(DB_Server.IP_0_0_0_0) + servers.get(DB_Server.IP_LOCALHOST) .setPublicKey(serverPublicKey); // have to always specify what we are saving - storage.commit(DatabaseStorage.SERVERS, servers); + storage.putAndSave(DatabaseStorage.SERVERS, servers); } /** @@ -140,7 +144,7 @@ class PropertyStore extends SettingsStore { @Override public synchronized byte[] getSalt() { - final DB_Server localServer = servers.get(DB_Server.IP_0_0_0_0); + final DB_Server localServer = servers.get(DB_Server.IP_LOCALHOST); byte[] salt = localServer.getSalt(); // we don't care who gets the server salt @@ -156,7 +160,7 @@ class PropertyStore extends SettingsStore { localServer.setSalt(bytes); // have to always specify what we are saving - storage.commit(DatabaseStorage.SERVERS, servers); + storage.putAndSave(DatabaseStorage.SERVERS, servers); } return salt; diff --git a/Dorkbox-Network/src/dorkbox/network/connection/idle/IdleSender.java b/Dorkbox-Network/src/dorkbox/network/connection/idle/IdleSender.java index 5257edfb..19875601 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/idle/IdleSender.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/idle/IdleSender.java @@ -18,6 +18,8 @@ package dorkbox.network.connection.idle; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ListenerRaw; +import java.io.IOException; + public abstract class IdleSender extends ListenerRaw { final IdleListener idleListener; @@ -30,7 +32,7 @@ class IdleSender extends ListenerRaw { @Override public - void idle(C connection) { + void idle(C connection) throws IOException { if (!this.started) { this.started = true; start(); @@ -58,5 +60,5 @@ class IdleSender extends ListenerRaw { * Returns the next object to send, or null if no more objects will be sent. */ protected abstract - M next(); + M next() throws IOException; } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/idle/InputStreamSender.java b/Dorkbox-Network/src/dorkbox/network/connection/idle/InputStreamSender.java index ab9edc91..21ab979e 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/idle/InputStreamSender.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/idle/InputStreamSender.java @@ -16,7 +16,6 @@ package dorkbox.network.connection.idle; import dorkbox.network.connection.Connection; -import dorkbox.util.exceptions.NetException; import java.io.IOException; import java.io.InputStream; @@ -36,23 +35,19 @@ class InputStreamSender extends IdleSender { @Override protected final - byte[] next() { - try { - int total = 0; - while (total < this.chunk.length) { - int count = this.input.read(this.chunk, total, this.chunk.length - total); - if (count < 0) { - if (total == 0) { - return null; - } - byte[] partial = new byte[total]; - System.arraycopy(this.chunk, 0, partial, 0, total); - return onNext(partial); + byte[] next() throws IOException { + int total = 0; + while (total < this.chunk.length) { + int count = this.input.read(this.chunk, total, this.chunk.length - total); + if (count < 0) { + if (total == 0) { + return null; } - total += count; + byte[] partial = new byte[total]; + System.arraycopy(this.chunk, 0, partial, 0, total); + return onNext(partial); } - } catch (IOException ex) { - throw new NetException(ex); + total += count; } return onNext(this.chunk); } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ping/PingCanceledException.java b/Dorkbox-Network/src/dorkbox/network/connection/ping/PingCanceledException.java index 204fcfc2..6541e45f 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ping/PingCanceledException.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ping/PingCanceledException.java @@ -15,10 +15,10 @@ */ package dorkbox.network.connection.ping; -import dorkbox.util.exceptions.NetException; +import java.io.IOException; public -class PingCanceledException extends NetException { +class PingCanceledException extends IOException { private static final long serialVersionUID = 9045461384091038605L; diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java index 1febbb7b..0d1110c7 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java @@ -21,7 +21,6 @@ import dorkbox.network.connection.registration.Registration; import dorkbox.network.pipeline.udp.KryoDecoderUdp; import dorkbox.network.pipeline.udp.KryoEncoderUdp; import dorkbox.network.util.CryptoSerializationManager; -import dorkbox.util.exceptions.NetException; import dorkbox.util.bytes.OptimizeUtilsByteArray; import dorkbox.util.collections.IntMap; import dorkbox.util.collections.IntMap.Entries; @@ -32,6 +31,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -112,7 +112,7 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient } if (!success) { - throw new NetException("UDP cannot connect to a remote server before TCP is established!"); + throw new IOException("UDP cannot connect to a remote server before TCP is established!"); } if (logger2.isTraceEnabled()) { @@ -124,7 +124,7 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient channel.writeAndFlush(registration); } else { - throw new NetException("UDP cannot connect to remote server! No remote address specified!"); + throw new IOException("UDP cannot connect to remote server! No remote address specified!"); } } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java index e16dbed7..27dd0c4d 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java @@ -19,7 +19,6 @@ import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.util.CryptoSerializationManager; -import dorkbox.util.exceptions.NetException; import dorkbox.util.bytes.OptimizeUtilsByteArray; import dorkbox.util.collections.IntMap; import dorkbox.util.collections.IntMap.Entries; @@ -29,6 +28,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -103,7 +103,7 @@ class RegistrationRemoteHandlerClientUDT extends RegistrationRemoteHandlerClient } if (!success) { - throw new NetException("UDT cannot connect to a remote server before TCP is established!"); + throw new IOException("UDT cannot connect to a remote server before TCP is established!"); } if (logger2.isTraceEnabled()) { @@ -115,7 +115,7 @@ class RegistrationRemoteHandlerClientUDT extends RegistrationRemoteHandlerClient channel.writeAndFlush(registration); } else { - throw new NetException("UDT cannot connect to remote server! No remote address specified!"); + throw new IOException("UDT cannot connect to remote server! No remote address specified!"); } } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java index c46e379e..a30cb97c 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java @@ -17,6 +17,7 @@ package dorkbox.network.connection.registration.remote; import dorkbox.network.Broadcast; import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -26,7 +27,6 @@ import dorkbox.util.bytes.OptimizeUtilsByteArray; import dorkbox.util.collections.IntMap; import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.Crypto; -import dorkbox.util.exceptions.NetException; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -36,6 +36,7 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageCodec; import org.slf4j.Logger; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.List; @@ -97,11 +98,25 @@ class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec { @@ -33,7 +34,7 @@ class KryoEncoder extends MessageToByteEncoder { public - KryoEncoder(CryptoSerializationManager serializationManager) { + KryoEncoder(final CryptoSerializationManager serializationManager) { super(); this.serializationManager = serializationManager; this.optimize = OptimizeUtilsByteBuf.get(); @@ -42,14 +43,22 @@ class KryoEncoder extends MessageToByteEncoder { // the crypto writer will override this @SuppressWarnings("unused") protected - void writeObject(CryptoSerializationManager kryoWrapper, ChannelHandlerContext context, Object msg, ByteBuf buffer) { + void writeObject(final CryptoSerializationManager kryoWrapper, + final ChannelHandlerContext context, + final Object msg, + final ByteBuf buffer) { // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. - kryoWrapper.write(buffer, msg); + try { + kryoWrapper.write(buffer, msg); + } catch (IOException ex) { + context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass() + .getName(), ex)); + } } @Override protected - void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + void encode(final ChannelHandlerContext context, final Object msg, final ByteBuf out) throws Exception { // we don't necessarily start at 0!! int startIndex = out.writerIndex(); @@ -60,7 +69,7 @@ class KryoEncoder extends MessageToByteEncoder { out.writeInt(0); // put an int in, which is the same size as reservedLengthIndex try { - writeObject(this.serializationManager, ctx, msg, out); + writeObject(this.serializationManager, context, msg, out); // now set the frame (if it's TCP)! int length = out.readableBytes() - startIndex - @@ -80,8 +89,8 @@ class KryoEncoder extends MessageToByteEncoder { optimize.writeInt(out, length, true); out.setIndex(newIndex, oldIndex); } catch (KryoException ex) { - ctx.fireExceptionCaught(new NetException("Unable to serialize object of type: " + msg.getClass() - .getName(), ex)); + context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass() + .getName(), ex)); } } } diff --git a/Dorkbox-Network/src/dorkbox/network/pipeline/KryoEncoderCrypto.java b/Dorkbox-Network/src/dorkbox/network/pipeline/KryoEncoderCrypto.java index eedfb903..7e34aff2 100644 --- a/Dorkbox-Network/src/dorkbox/network/pipeline/KryoEncoderCrypto.java +++ b/Dorkbox-Network/src/dorkbox/network/pipeline/KryoEncoderCrypto.java @@ -22,20 +22,31 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; + @Sharable public class KryoEncoderCrypto extends KryoEncoder { public - KryoEncoderCrypto(CryptoSerializationManager serializationManager) { + KryoEncoderCrypto(final CryptoSerializationManager serializationManager) { super(serializationManager); } @Override protected - void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) { - ChannelHandler last = ctx.pipeline() - .last(); - serializationManager.writeWithCryptoTcp((ConnectionImpl) last, buffer, msg); + void writeObject(final CryptoSerializationManager serializationManager, + final ChannelHandlerContext context, + final Object msg, + final ByteBuf buffer) { + + ChannelHandler last = context.pipeline() + .last(); + try { + serializationManager.writeWithCryptoTcp((ConnectionImpl) last, buffer, msg); + } catch (IOException ex) { + context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass() + .getName(), ex)); + } } } diff --git a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java index ffdd73c5..bf78ec11 100644 --- a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java +++ b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java @@ -15,14 +15,16 @@ */ package dorkbox.network.pipeline.udp; +import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.util.CryptoSerializationManager; -import dorkbox.util.exceptions.NetException; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; @Sharable @@ -46,13 +48,21 @@ class KryoDecoderUdp extends MessageToMessageDecoder { // there is a REMOTE possibility that UDP traffic BEAT the TCP registration traffic, which means that THIS packet // COULD be encrypted! - if (serializationManager.isEncrypted(data)) { - throw new NetException("Encrypted UDP packet received before registration complete. WHOOPS!"); + if (KryoCryptoSerializationManager.isEncrypted(data)) { + String message = "Encrypted UDP packet received before registration complete."; + LoggerFactory.getLogger(this.getClass()).error(message); + throw new IOException(message); + } else { + try { + // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. + Object read = serializationManager.read(data, data.writerIndex()); + out.add(read); + } catch (IOException e) { + String message = "Unable to deserialize object"; + LoggerFactory.getLogger(this.getClass()).error(message, e); + throw new IOException(message, e); + } } - - // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. - Object read = serializationManager.read(data, data.writerIndex()); - out.add(read); } } } diff --git a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java index 165d7870..31a79f3f 100644 --- a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java +++ b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java @@ -23,7 +23,9 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; @Sharable @@ -43,8 +45,15 @@ class KryoDecoderUdpCrypto extends MessageToMessageDecoder { ChannelHandler last = ctx.pipeline() .last(); - ByteBuf data = in.content(); - Object object = serializationManager.readWithCryptoUdp((ConnectionImpl) last, data, data.readableBytes()); - out.add(object); + + try { + ByteBuf data = in.content(); + Object object = serializationManager.readWithCryptoUdp((ConnectionImpl) last, data, data.readableBytes()); + out.add(object); + } catch (IOException e) { + String message = "Unable to deserialize object"; + LoggerFactory.getLogger(this.getClass()).error(message, e); + throw new IOException(message, e); + } } } diff --git a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java index 9cdcc40c..39adca93 100644 --- a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java +++ b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java @@ -15,17 +15,17 @@ */ package dorkbox.network.pipeline.udp; -import com.esotericsoftware.kryo.KryoException; import dorkbox.network.connection.EndPoint; import dorkbox.network.util.CryptoSerializationManager; -import dorkbox.util.exceptions.NetException; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageEncoder; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; @@ -46,9 +46,8 @@ class KryoEncoderUdp extends MessageToMessageEncoder { } // the crypto writer will override this - @SuppressWarnings("unused") - protected - void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) { + void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) + throws IOException { // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. serializationManager.write(buffer, msg); } @@ -63,21 +62,25 @@ class KryoEncoderUdp extends MessageToMessageEncoder { // no size info, since this is UDP, it is not segmented writeObject(this.serializationManager, ctx, msg, outBuffer); - // have to check to see if we are too big for UDP! if (outBuffer.readableBytes() > EndPoint.udpMaxSize) { System.err.println("Object larger than MAX udp size! " + EndPoint.udpMaxSize + "/" + outBuffer.readableBytes()); - throw new NetException("Object is TOO BIG FOR UDP! " + msg.toString() + " (" + EndPoint.udpMaxSize + "/" + - outBuffer.readableBytes() + ")"); + + String message = "Object is TOO BIG FOR UDP! " + msg.toString() + " (" + EndPoint.udpMaxSize + "/" + + outBuffer.readableBytes() + ")"; + LoggerFactory.getLogger(this.getClass()).error(message); + throw new IOException(message); } DatagramPacket packet = new DatagramPacket(outBuffer, (InetSocketAddress) ctx.channel() .remoteAddress()); out.add(packet); - } catch (KryoException ex) { - throw new NetException("Unable to serialize object of type: " + msg.getClass() - .getName(), ex); + } catch (Exception e) { + String message = "Unable to serialize object of type: " + msg.getClass() + .getName(); + LoggerFactory.getLogger(this.getClass()).error(message, e); + throw new IOException(message, e); } } } diff --git a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java index 8241ab8b..566819a0 100644 --- a/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java +++ b/Dorkbox-Network/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java @@ -22,6 +22,8 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; + @Sharable public class KryoEncoderUdpCrypto extends KryoEncoderUdp { @@ -32,8 +34,8 @@ class KryoEncoderUdpCrypto extends KryoEncoderUdp { } @Override - protected - void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) { + void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) + throws IOException { ChannelHandler last = ctx.pipeline() .last(); diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java b/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java index 373df8f9..8ad62c3c 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/AsmCachedMethod.java @@ -35,6 +35,7 @@ package dorkbox.network.rmi; import com.esotericsoftware.reflectasm.MethodAccess; +import dorkbox.network.connection.Connection; import java.lang.reflect.InvocationTargetException; @@ -45,9 +46,18 @@ class AsmCachedMethod extends CachedMethod { @Override public - Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { + Object invoke(final Connection connection, Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { try { - return this.methodAccess.invoke(target, this.methodAccessIndex, args); + if (origMethod == null) { + return this.methodAccess.invoke(target, this.methodAccessIndex, args); + } else { + int length = args.length; + Object[] newArgs = new Object[length + 1]; + newArgs[0] = connection; + System.arraycopy(args, 0, newArgs, 1, length); + + return this.methodAccess.invoke(target, this.methodAccessIndex, newArgs); + } } catch (Exception ex) { throw new InvocationTargetException(ex); } diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/CachedMethod.java b/Dorkbox-Network/src/dorkbox/network/rmi/CachedMethod.java index c9a2e9be..2aa57959 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/CachedMethod.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/CachedMethod.java @@ -34,22 +34,259 @@ */ package dorkbox.network.rmi; +import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.util.Util; +import com.esotericsoftware.reflectasm.MethodAccess; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.EndPoint; +import dorkbox.util.ClassHelper; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; public class CachedMethod { + private static final Map, CachedMethod[]> methodCache = new ConcurrentHashMap, CachedMethod[]>(EndPoint.DEFAULT_THREAD_POOL_SIZE); + private static final Map, Class> overriddenMethods = new HashMap, Class>(); + + // type will be likely be the interface + public static + CachedMethod[] getMethods(final Kryo kryo, final Class type) { + CachedMethod[] cachedMethods = methodCache.get(type); + if (cachedMethods != null) { + return cachedMethods; + } + + // race-conditions are OK, because we just recreate the same thing. + ArrayList methods = getMethods(type); + + // In situations where we want to pass in the Connection (to an RMI method), we have to be able to override method A, with method B. + // This is to support calling RMI methods from an interface (that does pass the connection reference) to + // an implType, that DOES pass the connection reference. The remote side (that initiates the RMI calls), MUST use + // the interface, and the implType may override the method, so that we add the connection as the first in + // the list of parameters. + // + // for example: + // Interface: foo(String x) + // Impl: foo(Connection c, String x) + // + // The implType (if it exists, with the same name, and with the same signature+connection) will be called from the interface. + // This MUST hold valid for both remote and local connection types. + + // To facilitate this functionality, for methods with the same name, the "overriding" method is the one that inherits the Connection + // interface as the first parameter. + Map overriddenMethods = getOverriddenMethods(type, methods); + + + MethodAccess methodAccess = null; + if (kryo.getAsmEnabled() && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) { + methodAccess = MethodAccess.get(type); + } + + int n = methods.size(); + cachedMethods = new CachedMethod[n]; + for (int i = 0; i < n; i++) { + Method origMethod = methods.get(i); + Method method = origMethod; + MethodAccess localMethodAccess = methodAccess; + Class[] parameterTypes = method.getParameterTypes(); + Class[] asmParameterTypes = parameterTypes; + + Method overriddenMethod = overriddenMethods.remove(method); + boolean overridden = overriddenMethod != null; + if (overridden) { + // we can override the details of this method BECAUSE (and only because) our kryo registration override will return + // the correct object for this overridden method to be called on. + method = overriddenMethod; + + Class overrideType = method.getDeclaringClass(); + + if (kryo.getAsmEnabled() && !Util.isAndroid && Modifier.isPublic(overrideType.getModifiers())) { + localMethodAccess = MethodAccess.get(overrideType); + asmParameterTypes = method.getParameterTypes(); + } + } + + CachedMethod cachedMethod = null; + if (localMethodAccess != null) { + try { + AsmCachedMethod asmCachedMethod = new AsmCachedMethod(); + asmCachedMethod.methodAccessIndex = localMethodAccess.getIndex(method.getName(), asmParameterTypes); + asmCachedMethod.methodAccess = localMethodAccess; + cachedMethod = asmCachedMethod; + } catch (RuntimeException ignored) { + ignored.printStackTrace(); + } + } + + if (cachedMethod == null) { + cachedMethod = new CachedMethod(); + } + cachedMethod.method = method; + cachedMethod.origMethod = origMethod; + cachedMethod.methodClassID = kryo.getRegistration(method.getDeclaringClass()).getId(); + cachedMethod.methodIndex = i; + + // Store the serializer for each final parameter. + // ONLY for the ORIGINAL method, not he overridden one. + cachedMethod.serializers = new Serializer[parameterTypes.length]; + for (int ii = 0, nn = parameterTypes.length; ii < nn; ii++) { + if (kryo.isFinal(parameterTypes[ii])) { + cachedMethod.serializers[ii] = kryo.getSerializer(parameterTypes[ii]); + } + } + + cachedMethods[i] = cachedMethod; + } + + methodCache.put(type, cachedMethods); + return cachedMethods; + } + + private static + Map getOverriddenMethods(final Class type, final ArrayList origMethods) { + final Class implType = overriddenMethods.get(type); + + if (implType != null) { + ArrayList implMethods = getMethods(implType); + + HashMap overrideMap = new HashMap(implMethods.size()); + + for (Method origMethod : origMethods) { + String name = origMethod.getName(); + Class[] types = origMethod.getParameterTypes(); + int modLength = types.length + 1; + + METHOD_CHECK: + for (Method implMethod : implMethods) { + String checkName = implMethod.getName(); + Class[] checkTypes = implMethod.getParameterTypes(); + int checkLength = checkTypes.length; + + if (modLength != checkLength || !(name.equals(checkName))) { + continue; + } + + // checkLength > 0 + Class checkType = checkTypes[0]; + if (ClassHelper.hasInterface(dorkbox.network.connection.Connection.class, checkType)) { + // now we check to see if our "check" method is equal to our "cached" method + Connection + for (int k = 1; k < checkLength; k++) { + if (types[k-1] == checkTypes[k]) { + overrideMap.put(origMethod, implMethod); + break METHOD_CHECK; + } + } + } + } + } + + return overrideMap; + } else { + return new HashMap(0); + } + } + + + + private static + ArrayList getMethods(final Class type) { + ArrayList allMethods = new ArrayList(); + + Class nextClass = type; + while (nextClass != null) { + Collections.addAll(allMethods, nextClass.getDeclaredMethods()); + nextClass = nextClass.getSuperclass(); + if (nextClass == Object.class) { + break; + } + } + + ArrayList methods = new ArrayList(Math.max(1, allMethods.size())); + for (int i = 0, n = allMethods.size(); i < n; i++) { + Method method = allMethods.get(i); + int modifiers = method.getModifiers(); + if (Modifier.isStatic(modifiers)) { + continue; + } + if (Modifier.isPrivate(modifiers)) { + continue; + } + if (method.isSynthetic()) { + continue; + } + methods.add(method); + } + + Collections.sort(methods, new Comparator() { + @Override + public + int compare(Method o1, Method o2) { + // Methods are sorted so they can be represented as an index. + int diff = o1.getName() + .compareTo(o2.getName()); + if (diff != 0) { + return diff; + } + Class[] argTypes1 = o1.getParameterTypes(); + Class[] argTypes2 = o2.getParameterTypes(); + if (argTypes1.length > argTypes2.length) { + return 1; + } + if (argTypes1.length < argTypes2.length) { + return -1; + } + for (int i = 0; i < argTypes1.length; i++) { + diff = argTypes1[i].getName() + .compareTo(argTypes2[i].getName()); + if (diff != 0) { + return diff; + } + } + throw new RuntimeException("Two methods with same signature!"); // Impossible. + } + }); + return methods; + } + public Method method; public int methodClassID; public int methodIndex; + /** + * in some cases, we want to override the cached method, with one that supports passing 'Connection' as the first argument. + * This is completely OPTIONAL, however - greatly adds functionality to RMI methods. + */ + public transient Method origMethod; + @SuppressWarnings("rawtypes") public Serializer[] serializers; public - Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { - return this.method.invoke(target, args); + Object invoke(final Connection connection, Object target, Object[] args) throws IllegalAccessException, InvocationTargetException { + // did we override our cached method? + if (origMethod == null) { + return this.method.invoke(target, args); + } else { + int length = args.length; + Object[] newArgs = new Object[length + 1]; + newArgs[0] = connection; + System.arraycopy(args, 0, newArgs, 1, length); + + return this.method.invoke(target, newArgs); + } + } + + /** + * Called by the SerializationManager, so that RMI classes that are overridden for serialization purposes, can check to see if + * certain methods need to be overridden. + */ + public static + void registerOverridden(final Class ifaceClass, final Class implClass) { + overriddenMethods.put(ifaceClass, implClass); } } diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java index c6c058a2..01e3d781 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethod.java @@ -47,7 +47,6 @@ class InvokeMethod implements RmiMessages { // possible duplicate IDs. A response data of 0 means to not respond. public byte responseData; - public InvokeMethod() { } } diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodPoolable.java b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodPoolable.java deleted file mode 100644 index a645ebdd..00000000 --- a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodPoolable.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.rmi; - -import dorkbox.util.objectPool.PoolableObject; - -public -class InvokeMethodPoolable implements PoolableObject { - @Override - public - InvokeMethod create() { - return new InvokeMethod(); - } -} diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodSerializer.java b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodSerializer.java index 667b3677..bad77a0d 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodSerializer.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/InvokeMethodSerializer.java @@ -39,7 +39,6 @@ import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import dorkbox.network.connection.KryoExtra; /** * Internal message to invoke methods remotely. @@ -61,7 +60,9 @@ class InvokeMethodSerializer extends Serializer { Serializer[] serializers = object.cachedMethod.serializers; Object[] args = object.args; - for (int i = 0, n = serializers.length; i < n; i++) { + + int i = 0, n = serializers.length; + for (; i < n; i++) { Serializer serializer = serializers[i]; if (serializer != null) { kryo.writeObjectOrNull(output, args[i], serializer); @@ -87,15 +88,14 @@ class InvokeMethodSerializer extends Serializer { byte methodIndex = input.readByte(); try { - KryoExtra kryoExtra = (KryoExtra) kryo; - - invokeMethod.cachedMethod = kryoExtra.getMethods(methodClass)[methodIndex]; + invokeMethod.cachedMethod = CachedMethod.getMethods(kryo, methodClass)[methodIndex]; } catch (IndexOutOfBoundsException ex) { throw new KryoException("Invalid method index " + methodIndex + " for class: " + methodClass.getName()); } - Serializer[] serializers = invokeMethod.cachedMethod.serializers; - Class[] parameterTypes = invokeMethod.cachedMethod.method.getParameterTypes(); + CachedMethod cachedMethod = invokeMethod.cachedMethod; + Serializer[] serializers = cachedMethod.serializers; + Class[] parameterTypes = cachedMethod.method.getParameterTypes(); Object[] args = new Object[serializers.length]; invokeMethod.args = args; for (int i = 0, n = args.length; i < n; i++) { diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteProxy.java b/Dorkbox-Network/src/dorkbox/network/rmi/RMI.java similarity index 52% rename from Dorkbox-Network/src/dorkbox/network/rmi/RemoteProxy.java rename to Dorkbox-Network/src/dorkbox/network/rmi/RMI.java index c3796db3..f5dac214 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteProxy.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RMI.java @@ -17,8 +17,21 @@ package dorkbox.network.rmi; import java.lang.annotation.*; + + +/** + * This specifies to the serializer, that this field is an RMI object. + *

+ * Additional behavior of RMI methods, is if there is another method (of the same name and signature), with the addition of a + * Connection parameter in the first position, THAT method will be called instead, an will have the current connection object passed + * into the method. + *

+ * It is mandatory for the correct implementation (as per the interface guideline) to exist, and should return null. + *

+ * IE: foo(String something)... -> foo(Connection connection, String something).... + */ @Retention(value = RetentionPolicy.RUNTIME) @Inherited -@Target(value = {ElementType.FIELD, ElementType.ANNOTATION_TYPE}) +@Target(value = {ElementType.FIELD}) public -@interface RemoteProxy {} +@interface RMI {} diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java index 56396ad4..5063172d 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteInvocationHandler.java @@ -35,16 +35,15 @@ package dorkbox.network.rmi; +import com.esotericsoftware.kryo.Kryo; import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.KryoExtra; import dorkbox.network.connection.ListenerRaw; import dorkbox.network.util.CryptoSerializationManager; -import dorkbox.util.exceptions.NetException; -import dorkbox.util.objectPool.ObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.Arrays; @@ -82,12 +81,9 @@ class RemoteInvocationHandler implements InvocationHandler { final InvokeMethodResult[] responseTable = new InvokeMethodResult[64]; final boolean[] pendingResponses = new boolean[64]; - private final ObjectPool invokeMethodPool; - public - RemoteInvocationHandler(ObjectPool invokeMethodPool, Connection connection, final int objectID) { + RemoteInvocationHandler(Connection connection, final int objectID) { super(); - this.invokeMethodPool = invokeMethodPool; this.connection = connection; this.objectID = objectID; @@ -129,6 +125,7 @@ class RemoteInvocationHandler implements InvocationHandler { .add(this.responseListener); } + @SuppressWarnings({"AutoUnboxing", "AutoBoxing"}) @Override public Object invoke(Object proxy, Method method, Object[] args) throws Exception { @@ -189,41 +186,62 @@ class RemoteInvocationHandler implements InvocationHandler { return this.connection; } // Should never happen, for debugging purposes only - throw new Exception("Invocation handler could not find RemoteObject method. Check ObjectSpace.java"); + throw new Exception("Invocation handler could not find RemoteObject method."); } else if (!this.remoteToString && declaringClass == Object.class && method.getName() .equals("toString")) { return ""; } + final Logger logger1 = RemoteInvocationHandler.logger; - EndPoint endPoint = this.connection.getEndPoint(); + EndPoint endPoint = this.connection.getEndPoint(); + final CryptoSerializationManager serializationManager = endPoint.getSerialization(); - InvokeMethod invokeMethod = this.invokeMethodPool.take(); + InvokeMethod invokeMethod = new InvokeMethod(); invokeMethod.objectID = this.objectID; invokeMethod.args = args; - final CryptoSerializationManager serializationManager = endPoint.getSerialization(); + // thread safe access. - final KryoExtra kryo = (KryoExtra) serializationManager.take(); + final Kryo kryo = serializationManager.take(); if (kryo == null) { String msg = "Interrupted during kryo pool.take()"; - logger.error(msg); + logger1.error(msg); return msg; } - CachedMethod[] cachedMethods = kryo.getMethods(method.getDeclaringClass()); + // which method do we access? + CachedMethod[] cachedMethods = CachedMethod.getMethods(kryo, method.getDeclaringClass()); serializationManager.release(kryo); for (int i = 0, n = cachedMethods.length; i < n; i++) { CachedMethod cachedMethod = cachedMethods[i]; - if (cachedMethod.method.equals(method)) { + Method checkMethod = cachedMethod.origMethod; + if (checkMethod == null) { + checkMethod = cachedMethod.method; + } + + // In situations where we want to pass in the Connection (to an RMI method), we have to be able to override method A, with method B. + // This is to support calling RMI methods from an interface (that does pass the connection reference) to + // an implementation, that DOES pass the connection reference. The remote side (that initiates the RMI calls), MUST use + // the interface, and the implementation may override the method, so that we add the connection as the first in + // the list of parameters. + // + // for example: + // Interface: foo(String x) + // Impl: foo(Connection c, String x) + // + // The implementation (if it exists, with the same name, and with the same signature+connection) will be called from the interface. + // This MUST hold valid for both remote and local connection types. + + if (checkMethod.equals(method)) { invokeMethod.cachedMethod = cachedMethod; break; } } if (invokeMethod.cachedMethod == null) { String msg = "Method not found: " + method; - logger.error(msg); + logger1.error(msg); return msg; } @@ -270,21 +288,22 @@ class RemoteInvocationHandler implements InvocationHandler { .flush(); } - if (logger.isDebugEnabled()) { + if (logger1.isTraceEnabled()) { String argString = ""; if (args != null) { argString = Arrays.deepToString(args); argString = argString.substring(1, argString.length() - 1); } - logger.debug(this.connection + " sent: " + method.getDeclaringClass() + logger1.trace(this.connection + " sent: " + method.getDeclaringClass() .getSimpleName() + "#" + method.getName() + "(" + argString + ")"); } this.lastResponseID = (byte) (invokeMethod.responseData & RmiBridge.responseIdMask); - this.invokeMethodPool.release(invokeMethod); - if (this.nonBlocking || this.udp) { + + + if (this.nonBlocking || this.udp || this.udt) { Class returnType = method.getReturnType(); if (returnType.isPrimitive()) { if (returnType == int.class) { @@ -334,12 +353,16 @@ class RemoteInvocationHandler implements InvocationHandler { } } + /** + * A timeout of 0 means that we want to disable waiting, otherwise - it waits in milliseconds + */ private - Object waitForResponse(byte responseID) { + Object waitForResponse(byte responseID) throws IOException { long endTime = System.currentTimeMillis() + this.timeoutMillis; long remaining = this.timeoutMillis; - while (remaining > 0) { + if (remaining == 0) { + // just wait however log it takes. InvokeMethodResult invokeMethodResult; synchronized (this) { invokeMethodResult = this.responseTable[responseID]; @@ -352,18 +375,54 @@ class RemoteInvocationHandler implements InvocationHandler { else { this.lock.lock(); try { - this.responseCondition.await(remaining, TimeUnit.MILLISECONDS); + this.responseCondition.await(); } catch (InterruptedException e) { Thread.currentThread() .interrupt(); - throw new NetException(e); + throw new IOException("Response timed out.", e); } finally { this.lock.unlock(); } } - remaining = endTime - System.currentTimeMillis(); + synchronized (this) { + invokeMethodResult = this.responseTable[responseID]; + } + if (invokeMethodResult != null) { + this.lastResponseID = null; + return invokeMethodResult.result; + } } + else { + // wait for the specified time + while (remaining > 0) { + InvokeMethodResult invokeMethodResult; + synchronized (this) { + invokeMethodResult = this.responseTable[responseID]; + } + + if (invokeMethodResult != null) { + this.lastResponseID = null; + return invokeMethodResult.result; + } + else { + this.lock.lock(); + try { + this.responseCondition.await(remaining, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread() + .interrupt(); + throw new IOException("Response timed out.", e); + } finally { + this.lock.unlock(); + } + } + + remaining = endTime - System.currentTimeMillis(); + } + } + + // only get here if we timeout throw new TimeoutException("Response timed out."); diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java index 62a7023c..3f76c464 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObject.java @@ -44,7 +44,7 @@ import dorkbox.network.connection.Connection; public interface RemoteObject { /** - * Sets the milliseconds to wait for a method to return value. Default is 3000. + * Sets the milliseconds to wait for a method to return value. Default is 3000, 0 disables (ie: waits forever) */ void setResponseTimeout(int timeoutMillis); diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java index a15f4266..63dea2b1 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RemoteObjectSerializer.java @@ -40,7 +40,6 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.KryoExtra; -import dorkbox.util.exceptions.NetException; /** * Serializes an object registered with the RmiBridge so the receiving side @@ -62,7 +61,7 @@ class RemoteObjectSerializer extends Serializer { KryoExtra kryoExtra = (KryoExtra) kryo; int id = kryoExtra.connection.getRegisteredId(object); if (id == Integer.MAX_VALUE) { - throw new NetException("Object not found in an ObjectSpace: " + object); + throw new RuntimeException("Object not found in RMI objectSpace: " + object); } output.writeInt(id, true); diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java b/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java index 514ea9fc..f2aaf328 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/RmiBridge.java @@ -34,6 +34,15 @@ */ package dorkbox.network.rmi; +import com.esotericsoftware.kryo.util.IntMap; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.ListenerRaw; +import dorkbox.util.collections.ObjectIntMap; +import org.slf4j.Logger; + +import java.io.IOException; import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.concurrent.Executor; @@ -42,29 +51,31 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import org.slf4j.Logger; - -import com.esotericsoftware.kryo.util.IntMap; - -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.ListenerRaw; -import dorkbox.util.collections.ObjectIntMap; -import dorkbox.util.exceptions.NetException; -import dorkbox.util.objectPool.ObjectPool; -import dorkbox.util.objectPool.ObjectPoolFactory; - /** * Allows methods on objects to be invoked remotely over TCP, UDP, or UDT. Objects are * {@link dorkbox.network.util.RMISerializationManager#registerRemote(Class, Class)}, and endpoint connections - * can then {@link Connection#createRemoteObject(Class, Class)} for the registered objects. + * can then {@link Connection#createRemoteObject(Class)} for the registered objects. *

* It costs at least 2 bytes more to use remote method invocation than just * sending the parameters. If the method has a return value which is not * {@link RemoteObject#setNonBlocking(boolean) ignored}, an extra byte is * written. If the type of a parameter is not final (note that primitives are final) * then an extra byte is written for that parameter. + *

+ *

+ * In situations where we want to pass in the Connection (to an RMI method), we have to be able to override method A, with method B. + *

+ * This is to support calling RMI methods from an interface (that does pass the connection reference) to + * an implementation, that DOES pass the connection reference. The remote side (that initiates the RMI calls), MUST use + * the interface, and the implementation may override the method, so that we add the connection as the first in + * the list of parameters. + *

+ * for example: + * Interface: foo(String x) + * Impl: foo(Connection c, String x) + *

+ * The implementation (if it exists, with the same name, and with the same signature+connection) will be called from the interface. + * This MUST hold valid for both remote and local connection types. * * @author Nathan Sweet , Nathan Robinson */ @@ -101,16 +112,16 @@ class RmiBridge { private final Executor executor; - // 4096 concurrent method invocations max - private static final ObjectPool invokeMethodPool = ObjectPoolFactory.create(new InvokeMethodPoolable(), 4096); - private final ListenerRaw invokeListener = new ListenerRaw() { + @SuppressWarnings("AutoBoxing") @Override public void received(final ConnectionImpl connection, final InvokeMethod invokeMethod) { int objectID = invokeMethod.objectID; // have to make sure to get the correct object (global vs local) + // This is what is overridden when registering interfaces/classes for RMI. + // objectID is the interface ID, and this returns the implementation ID. final Object target = connection.getRegisteredObject(objectID); if (target == null) { @@ -124,20 +135,60 @@ class RmiBridge { Executor executor2 = RmiBridge.this.executor; if (executor2 == null) { - invoke(connection, target, invokeMethod); + try { + invoke(connection, target, invokeMethod); + } catch (IOException e) { + logger.error("Unable to invoke method.", e); + } } else { executor2.execute(new Runnable() { @Override public void run() { - invoke(connection, target, invokeMethod); + try { + invoke(connection, target, invokeMethod); + } catch (IOException e) { + logger.error("Unable to invoke method.", e); + } } }); } } }; + + //for (int i = 0; i < cachedMethods.length; i++) { + // Method cachedMethod = cachedMethods[i].method; + // String name = cachedMethod.getName(); + // Class[] types = cachedMethod.getParameterTypes(); + // int modLength = types.length + 1; + // + // for (int j = i+1; j < cachedMethods.length; j++) { + // Method checkMethod = cachedMethods[j].method; + // String checkName = checkMethod.getName(); + // Class[] checkTypes = cachedMethod.getParameterTypes(); + // int checkLength = checkTypes.length; + // + // if (modLength != checkLength || !(name.equals(checkName))) { + // break; + // } + // + // // checkLength > 0 + // Class checkType = checkTypes[0]; + // if (!checkType.isAssignableFrom(com.sun.jdi.connect.spi.Connection.class)) { + // break; + // } + // + // // now we check to see if our "check" method is equal to our "cached" method + Connection + // + // } + //} + // + + + + /** * Creates an RmiBridge with no connections. Connections must be * {@link RmiBridge#register(int, Object)} added to allow the remote end of @@ -179,7 +230,9 @@ class RmiBridge { * @param connection The remote side of this connection requested the invocation. */ protected - void invoke(Connection connection, Object target, InvokeMethod invokeMethod) { + void invoke(final Connection connection, final Object target, final InvokeMethod invokeMethod) throws IOException { + CachedMethod cachedMethod = invokeMethod.cachedMethod; + Logger logger2 = this.logger; if (logger2.isDebugEnabled()) { String argString = ""; @@ -197,29 +250,25 @@ class RmiBridge { stringBuilder.append(":") .append(invokeMethod.objectID); stringBuilder.append("#") - .append(invokeMethod.cachedMethod.method.getName()); + .append(cachedMethod.method.getName()); stringBuilder.append("(") .append(argString) .append(")"); + + if (cachedMethod.origMethod != null) { + stringBuilder.append(" [Connection param override]"); + } logger2.debug(stringBuilder.toString()); } - byte responseData = invokeMethod.responseData; boolean transmitReturnVal = (responseData & returnValueMask) == returnValueMask; boolean transmitExceptions = (responseData & returnExceptionMask) == returnExceptionMask; int responseID = responseData & responseIdMask; Object result; - CachedMethod cachedMethod = invokeMethod.cachedMethod; - - // we have to provide access to the connection (since the RMI-server is generally going to keep the state in - // the connection object. - connection.getEndPoint() - .setCurrentConnection(connection); - try { - result = cachedMethod.invoke(target, invokeMethod.args); + result = cachedMethod.invoke(connection, target, invokeMethod.args); } catch (Exception ex) { if (transmitExceptions) { Throwable cause = ex.getCause(); @@ -236,8 +285,8 @@ class RmiBridge { result = cause; } else { - throw new NetException("Error invoking method: " + cachedMethod.method.getDeclaringClass() - .getName() + "." + cachedMethod.method.getName(), ex); + throw new IOException("Error invoking method: " + cachedMethod.method.getDeclaringClass() + .getName() + "." + cachedMethod.method.getName(), ex); } } @@ -275,7 +324,7 @@ class RmiBridge { int value = rmiObjectIdCounter.getAndAdd(2); if (value > MAX_RMI_VALUE) { rmiObjectIdCounter.set(MAX_RMI_VALUE); // prevent wrapping by spammy callers - throw new NetException("RMI next value has exceeded maximum limits."); + logger.error("RMI next value has exceeded maximum limits in RmiBridge!"); } return value; } @@ -286,6 +335,7 @@ class RmiBridge { * * @param objectID Must not be Integer.MAX_VALUE. */ + @SuppressWarnings("AutoBoxing") public void register(int objectID, Object object) { if (objectID == Integer.MAX_VALUE) { @@ -307,12 +357,12 @@ class RmiBridge { if (logger2.isTraceEnabled()) { logger2.trace("Object registered with ObjectSpace as {}:{}", objectID, object); } - logger2.info("Object registered with ObjectSpace as {}:{}", objectID, object); } /** * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. */ + @SuppressWarnings("AutoBoxing") public void remove(int objectID) { WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); @@ -334,6 +384,7 @@ class RmiBridge { /** * Removes an object. The remote end of the RmiBridge connection will no longer be able to access it. */ + @SuppressWarnings("AutoBoxing") public void remove(Object object) { WriteLock writeLock = RmiBridge.this.objectLock.writeLock(); @@ -395,7 +446,7 @@ class RmiBridge { return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, - new RemoteInvocationHandler(invokeMethodPool, connection, objectID)); + new RemoteInvocationHandler(connection, objectID)); } /** diff --git a/Dorkbox-Network/src/dorkbox/network/rmi/TimeoutException.java b/Dorkbox-Network/src/dorkbox/network/rmi/TimeoutException.java index f30fba80..1bfb0843 100644 --- a/Dorkbox-Network/src/dorkbox/network/rmi/TimeoutException.java +++ b/Dorkbox-Network/src/dorkbox/network/rmi/TimeoutException.java @@ -34,13 +34,13 @@ */ package dorkbox.network.rmi; -import dorkbox.util.exceptions.NetException; +import java.io.IOException; /** Thrown when a method with a return value is invoked on a remote object and the response is not received with the * {@link RemoteObject#setResponseTimeout(int) response timeout}. * @see dorkbox.network.connection.Connection#createRemoteObject(Class, Class) * @author Nathan Sweet */ -public class TimeoutException extends NetException { +public class TimeoutException extends IOException { private static final long serialVersionUID = -3526277240277423682L; public TimeoutException () { diff --git a/Dorkbox-Network/src/dorkbox/network/util/CryptoSerializationManager.java b/Dorkbox-Network/src/dorkbox/network/util/CryptoSerializationManager.java index 533a75fe..13d4fb74 100644 --- a/Dorkbox-Network/src/dorkbox/network/util/CryptoSerializationManager.java +++ b/Dorkbox-Network/src/dorkbox/network/util/CryptoSerializationManager.java @@ -19,6 +19,8 @@ import dorkbox.network.connection.ConnectionImpl; import dorkbox.util.SerializationManager; import io.netty.buffer.ByteBuf; +import java.io.IOException; + /** * Threads reading/writing, it messes up a single instance. * it is possible to use a single kryo with the use of synchronize, however - that defeats the point of multi-threaded @@ -27,24 +29,18 @@ public interface CryptoSerializationManager extends SerializationManager, RMISerializationManager { /** - * Determines if this buffer is encrypted or not. + * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + *

+ * There is a small speed penalty if there were no kryo's available to use. */ - boolean isEncrypted(ByteBuf buffer); - + void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException; /** * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. *

* There is a small speed penalty if there were no kryo's available to use. */ - void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message); - - /** - * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. - *

- * There is a small speed penalty if there were no kryo's available to use. - */ - void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message); + void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException; /** * Reads an object from the buffer. @@ -54,7 +50,7 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat * @param connection can be NULL * @param length should ALWAYS be the length of the expected object! */ - Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length); + Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException; /** * Reads an object from the buffer. @@ -64,5 +60,5 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat * @param connection can be NULL * @param length should ALWAYS be the length of the expected object! */ - Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length); + Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException; } diff --git a/Dorkbox-Network/test/dorkbox/network/ChunkedDataIdleTest.java b/Dorkbox-Network/test/dorkbox/network/ChunkedDataIdleTest.java index 80d0271f..c67cb48d 100644 --- a/Dorkbox-Network/test/dorkbox/network/ChunkedDataIdleTest.java +++ b/Dorkbox-Network/test/dorkbox/network/ChunkedDataIdleTest.java @@ -28,7 +28,7 @@ public class ChunkedDataIdleTest extends BaseTest { // have to test sending objects @Test - public void ObjectSender() throws InitializationException, SecurityException, IOException { + public void ObjectSender() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -62,7 +62,7 @@ public class ChunkedDataIdleTest extends BaseTest { private void sendObject(final Data mainData, Configuration configuration, final ConnectionType type) - throws InitializationException, SecurityException, IOException { + throws InitializationException, SecurityException, IOException, InterruptedException { Server server = new Server(configuration); server.disableRemoteKeyValidation(); diff --git a/Dorkbox-Network/test/dorkbox/network/ClientSendTest.java b/Dorkbox-Network/test/dorkbox/network/ClientSendTest.java index 65dd3266..ef732abc 100644 --- a/Dorkbox-Network/test/dorkbox/network/ClientSendTest.java +++ b/Dorkbox-Network/test/dorkbox/network/ClientSendTest.java @@ -21,7 +21,7 @@ class ClientSendTest extends BaseTest { @Test public - void sendDataFromClientClass() throws InitializationException, SecurityException, IOException { + void sendDataFromClientClass() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); diff --git a/Dorkbox-Network/test/dorkbox/network/ConnectionTest.java b/Dorkbox-Network/test/dorkbox/network/ConnectionTest.java index 0629e987..45d93e41 100644 --- a/Dorkbox-Network/test/dorkbox/network/ConnectionTest.java +++ b/Dorkbox-Network/test/dorkbox/network/ConnectionTest.java @@ -20,7 +20,7 @@ class ConnectionTest extends BaseTest { @Test public - void connectLocal() throws InitializationException, SecurityException, IOException { + void connectLocal() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -37,7 +37,7 @@ class ConnectionTest extends BaseTest { @Test public - void connectTcp() throws InitializationException, SecurityException, IOException { + void connectTcp() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -56,7 +56,7 @@ class ConnectionTest extends BaseTest { @Test public - void connectTcpUdp() throws InitializationException, SecurityException, IOException { + void connectTcpUdp() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -76,7 +76,7 @@ class ConnectionTest extends BaseTest { @Test public - void connectTcpUdt() throws InitializationException, SecurityException, IOException { + void connectTcpUdt() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -96,7 +96,7 @@ class ConnectionTest extends BaseTest { @Test public - void connectTcpUdpUdt() throws InitializationException, SecurityException, IOException { + void connectTcpUdpUdt() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -151,7 +151,7 @@ class ConnectionTest extends BaseTest { } private - Client startClient(Configuration configuration) throws InitializationException, SecurityException, IOException { + Client startClient(Configuration configuration) throws InitializationException, SecurityException, IOException, InterruptedException { Client client; if (configuration != null) { client = new Client(configuration); diff --git a/Dorkbox-Network/test/dorkbox/network/DiscoverHostTest.java b/Dorkbox-Network/test/dorkbox/network/DiscoverHostTest.java index 6ec7608f..94956746 100644 --- a/Dorkbox-Network/test/dorkbox/network/DiscoverHostTest.java +++ b/Dorkbox-Network/test/dorkbox/network/DiscoverHostTest.java @@ -17,7 +17,7 @@ class DiscoverHostTest extends BaseTest { @Test public - void broadcast() throws InitializationException, SecurityException, IOException { + void broadcast() throws InitializationException, SecurityException, IOException, InterruptedException { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; diff --git a/Dorkbox-Network/test/dorkbox/network/IdleTest.java b/Dorkbox-Network/test/dorkbox/network/IdleTest.java index b915a967..19d7f4e1 100644 --- a/Dorkbox-Network/test/dorkbox/network/IdleTest.java +++ b/Dorkbox-Network/test/dorkbox/network/IdleTest.java @@ -32,7 +32,7 @@ class IdleTest extends BaseTest { @Test public - void InputStreamSender() throws InitializationException, SecurityException, IOException { + void InputStreamSender() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(false, false); final int largeDataSize = 12345; @@ -65,7 +65,7 @@ class IdleTest extends BaseTest { // have to test sending objects @Test public - void ObjectSender() throws InitializationException, SecurityException, IOException { + void ObjectSender() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -100,7 +100,7 @@ class IdleTest extends BaseTest { private void sendObject(final Data mainData, Configuration configuration, final ConnectionType type) - throws InitializationException, SecurityException, IOException { + throws InitializationException, SecurityException, IOException, InterruptedException { Server server = new Server(configuration); server.disableRemoteKeyValidation(); @@ -160,7 +160,7 @@ class IdleTest extends BaseTest { private void streamSpecificType(final int largeDataSize, Configuration configuration, final ConnectionType type) - throws InitializationException, SecurityException, IOException { + throws InitializationException, SecurityException, IOException, InterruptedException { Server server = new Server(configuration); server.disableRemoteKeyValidation(); addEndPoint(server); diff --git a/Dorkbox-Network/test/dorkbox/network/LargeBufferTest.java b/Dorkbox-Network/test/dorkbox/network/LargeBufferTest.java index 76f2ba1f..1d289b22 100644 --- a/Dorkbox-Network/test/dorkbox/network/LargeBufferTest.java +++ b/Dorkbox-Network/test/dorkbox/network/LargeBufferTest.java @@ -25,7 +25,7 @@ class LargeBufferTest extends BaseTest { @Test public - void manyLargeMessages() throws InitializationException, SecurityException, IOException { + void manyLargeMessages() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); diff --git a/Dorkbox-Network/test/dorkbox/network/ListenerTest.java b/Dorkbox-Network/test/dorkbox/network/ListenerTest.java index 4fa96fa1..e1ded5d3 100644 --- a/Dorkbox-Network/test/dorkbox/network/ListenerTest.java +++ b/Dorkbox-Network/test/dorkbox/network/ListenerTest.java @@ -61,7 +61,7 @@ class ListenerTest extends BaseTest { @SuppressWarnings("rawtypes") @Test public - void listener() throws SecurityException, InitializationException, IOException { + void listener() throws SecurityException, InitializationException, IOException, InterruptedException { Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; configuration.host = host; diff --git a/Dorkbox-Network/test/dorkbox/network/MultipleServerTest.java b/Dorkbox-Network/test/dorkbox/network/MultipleServerTest.java index ec776840..0c00f3e7 100644 --- a/Dorkbox-Network/test/dorkbox/network/MultipleServerTest.java +++ b/Dorkbox-Network/test/dorkbox/network/MultipleServerTest.java @@ -18,7 +18,7 @@ class MultipleServerTest extends BaseTest { @Test public - void multipleServers() throws InitializationException, SecurityException, IOException { + void multipleServers() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); KryoCryptoSerializationManager.DEFAULT.register(String[].class); diff --git a/Dorkbox-Network/test/dorkbox/network/MultipleThreadTest.java b/Dorkbox-Network/test/dorkbox/network/MultipleThreadTest.java index 920a2059..cce259c8 100644 --- a/Dorkbox-Network/test/dorkbox/network/MultipleThreadTest.java +++ b/Dorkbox-Network/test/dorkbox/network/MultipleThreadTest.java @@ -37,7 +37,7 @@ class MultipleThreadTest extends BaseTest { @Test public - void multipleThreads() throws InitializationException, SecurityException, IOException { + void multipleThreads() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); KryoCryptoSerializationManager.DEFAULT.register(String[].class); KryoCryptoSerializationManager.DEFAULT.register(DataClass.class); diff --git a/Dorkbox-Network/test/dorkbox/network/PingPongLocalTest.java b/Dorkbox-Network/test/dorkbox/network/PingPongLocalTest.java index bfd72167..6a19f8af 100644 --- a/Dorkbox-Network/test/dorkbox/network/PingPongLocalTest.java +++ b/Dorkbox-Network/test/dorkbox/network/PingPongLocalTest.java @@ -2,6 +2,7 @@ package dorkbox.network; import dorkbox.network.connection.Connection; +import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.exceptions.InitializationException; @@ -19,7 +20,10 @@ public class PingPongLocalTest extends BaseTest { int tries = 10000; @Test - public void pingPongLocal() throws InitializationException, SecurityException, IOException { + public void pingPongLocal() throws InitializationException, SecurityException, IOException, InterruptedException { + KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); + register(KryoCryptoSerializationManager.DEFAULT); + this.fail = "Data not received."; final Data dataLOCAL = new Data(); @@ -28,7 +32,6 @@ public class PingPongLocalTest extends BaseTest { Server server = new Server(); server.disableRemoteKeyValidation(); addEndPoint(server); - register(server.getSerialization()); server.bind(false); server.listeners().add(new Listener() { @Override @@ -52,7 +55,6 @@ public class PingPongLocalTest extends BaseTest { Client client = new Client(); client.disableRemoteKeyValidation(); addEndPoint(client); - register(client.getSerialization()); client.listeners().add(new Listener() { AtomicInteger check = new AtomicInteger(0); diff --git a/Dorkbox-Network/test/dorkbox/network/PingPongTest.java b/Dorkbox-Network/test/dorkbox/network/PingPongTest.java index 48ba7601..f635e644 100644 --- a/Dorkbox-Network/test/dorkbox/network/PingPongTest.java +++ b/Dorkbox-Network/test/dorkbox/network/PingPongTest.java @@ -30,7 +30,7 @@ class PingPongTest extends BaseTest { @Test public - void pingPong() throws InitializationException, SecurityException, IOException { + void pingPong() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); diff --git a/Dorkbox-Network/test/dorkbox/network/PingTest.java b/Dorkbox-Network/test/dorkbox/network/PingTest.java index 170fe398..8e5b6787 100644 --- a/Dorkbox-Network/test/dorkbox/network/PingTest.java +++ b/Dorkbox-Network/test/dorkbox/network/PingTest.java @@ -21,7 +21,7 @@ class PingTest extends BaseTest { // ping prefers the following order: UDP, UDT, TCP @Test public - void pingTCP() throws InitializationException, SecurityException, IOException { + void pingTCP() throws InitializationException, SecurityException, IOException, InterruptedException { this.response = -1; Configuration configuration = new Configuration(); @@ -57,7 +57,7 @@ class PingTest extends BaseTest { @Test public - void pingTCP_testListeners1() throws InitializationException, SecurityException, IOException { + void pingTCP_testListeners1() throws InitializationException, SecurityException, IOException, InterruptedException { this.response = -1; Configuration configuration = new Configuration(); @@ -114,7 +114,7 @@ class PingTest extends BaseTest { @Test public - void pingTCP_testListeners2() throws InitializationException, SecurityException, IOException { + void pingTCP_testListeners2() throws InitializationException, SecurityException, IOException, InterruptedException { this.response = -1; Configuration configuration = new Configuration(); @@ -163,7 +163,7 @@ class PingTest extends BaseTest { // ping prefers the following order: UDP, UDT, TCP @Test public - void pingUDP() throws InitializationException, SecurityException, IOException { + void pingUDP() throws InitializationException, SecurityException, IOException, InterruptedException { this.response = -1; Configuration configuration = new Configuration(); @@ -204,7 +204,7 @@ class PingTest extends BaseTest { // ping prefers the following order: UDP, UDT, TCP @Test public - void pingUDT() throws InitializationException, SecurityException, IOException { + void pingUDT() throws InitializationException, SecurityException, IOException, InterruptedException { this.response = -1; Configuration configuration = new Configuration(); diff --git a/Dorkbox-Network/test/dorkbox/network/ReconnectTest.java b/Dorkbox-Network/test/dorkbox/network/ReconnectTest.java index c3680ca9..81fabd34 100644 --- a/Dorkbox-Network/test/dorkbox/network/ReconnectTest.java +++ b/Dorkbox-Network/test/dorkbox/network/ReconnectTest.java @@ -19,7 +19,7 @@ class ReconnectTest extends BaseTest { @Test public - void reconnect() throws InitializationException, SecurityException, IOException { + void reconnect() throws InitializationException, SecurityException, IOException, InterruptedException { final Timer timer = new Timer(); Configuration configuration = new Configuration(); @@ -67,7 +67,11 @@ class ReconnectTest extends BaseTest { public void run() { System.out.println("Reconnecting: " + reconnectCount.get()); - client.reconnect(); + try { + client.reconnect(); + } catch (IOException e) { + e.printStackTrace(); + } } }.start(); } diff --git a/Dorkbox-Network/test/dorkbox/network/ReuseTest.java b/Dorkbox-Network/test/dorkbox/network/ReuseTest.java index 98cc06f9..5a995f65 100644 --- a/Dorkbox-Network/test/dorkbox/network/ReuseTest.java +++ b/Dorkbox-Network/test/dorkbox/network/ReuseTest.java @@ -19,7 +19,7 @@ class ReuseTest extends BaseTest { @Test public - void socketReuse() throws InitializationException, SecurityException, IOException { + void socketReuse() throws InitializationException, SecurityException, IOException, InterruptedException { this.serverCount = new AtomicInteger(0); this.clientCount = new AtomicInteger(0); @@ -99,7 +99,7 @@ class ReuseTest extends BaseTest { @Test public - void localReuse() throws InitializationException, SecurityException, IOException { + void localReuse() throws InitializationException, SecurityException, IOException, InterruptedException { this.serverCount = new AtomicInteger(0); this.clientCount = new AtomicInteger(0); diff --git a/Dorkbox-Network/test/dorkbox/network/UnregisteredClassTest.java b/Dorkbox-Network/test/dorkbox/network/UnregisteredClassTest.java index c287add2..fce82234 100644 --- a/Dorkbox-Network/test/dorkbox/network/UnregisteredClassTest.java +++ b/Dorkbox-Network/test/dorkbox/network/UnregisteredClassTest.java @@ -26,7 +26,7 @@ class UnregisteredClassTest extends BaseTest { @Test public - void unregisteredClasses() throws InitializationException, SecurityException, IOException { + void unregisteredClasses() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(false, false); int origSize = EndPoint.udpMaxSize; diff --git a/Dorkbox-Network/test/dorkbox/network/rmi/RmiGlobalTest.java b/Dorkbox-Network/test/dorkbox/network/rmi/RmiGlobalTest.java index 05975a24..1d8649c7 100644 --- a/Dorkbox-Network/test/dorkbox/network/rmi/RmiGlobalTest.java +++ b/Dorkbox-Network/test/dorkbox/network/rmi/RmiGlobalTest.java @@ -17,8 +17,7 @@ import org.junit.Test; import java.io.IOException; import java.io.Serializable; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class RmiGlobalTest extends BaseTest { @@ -26,8 +25,8 @@ class RmiGlobalTest extends BaseTest { private int CLIENT_GLOBAL_OBJECT_ID = 0; private int SERVER_GLOBAL_OBJECT_ID = 0; - private TestObject globalRemoteServerObject = new TestObjectImpl(); - private TestObject globalRemoteClientObject = new TestObjectImpl(); + private final TestObject globalRemoteServerObject = new TestObjectImpl(); + private final TestObject globalRemoteClientObject = new TestObjectImpl(); private static void runTest(final Connection connection, final Object remoteObject, final int remoteObjectID) { @@ -35,107 +34,112 @@ class RmiGlobalTest extends BaseTest { @Override public void run() { - TestObject test = connection.getRemoteObject(remoteObjectID); - - System.err.println("Starting test for: " + remoteObjectID); - - //TestObject test = connection.getRemoteObject(id, TestObject.class); - assertEquals(remoteObject.hashCode(), test.hashCode()); - RemoteObject remoteObject = (RemoteObject) test; - - // Default behavior. RMI is transparent, method calls behave like normal - // (return values and exceptions are returned, call is synchronous) - System.err.println("hashCode: " + test.hashCode()); - System.err.println("toString: " + test); - test.moo(); - test.moo("Cow"); - assertEquals(remoteObjectID, test.id()); - - - // UDP calls that ignore the return value - remoteObject.setUDP(true); - test.moo("Meow"); - assertEquals(0, test.id()); - remoteObject.setUDP(false); - - - // Test that RMI correctly waits for the remotely invoked method to exit - remoteObject.setResponseTimeout(5000); - test.moo("You should see this two seconds before...", 2000); - System.out.println("...This"); - remoteObject.setResponseTimeout(3000); - - // Try exception handling - boolean caught = false; try { + TestObject test = connection.getRemoteObject(remoteObjectID); + + System.err.println("Starting test for: " + remoteObjectID); + + //TestObject test = connection.getRemoteObject(id, TestObject.class); + assertEquals(remoteObject.hashCode(), test.hashCode()); + RemoteObject remoteObject = (RemoteObject) test; + + // Default behavior. RMI is transparent, method calls behave like normal + // (return values and exceptions are returned, call is synchronous) + System.err.println("hashCode: " + test.hashCode()); + System.err.println("toString: " + test); + test.moo(); + test.moo("Cow"); + assertEquals(remoteObjectID, test.id()); + + + // UDP calls that ignore the return value + remoteObject.setUDP(true); + test.moo("Meow"); + assertEquals(0, test.id()); + remoteObject.setUDP(false); + + + // Test that RMI correctly waits for the remotely invoked method to exit + remoteObject.setResponseTimeout(5000); + test.moo("You should see this two seconds before...", 2000); + System.out.println("...This"); + remoteObject.setResponseTimeout(3000); + + // Try exception handling + boolean caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + System.err.println("\tExpected."); + caught = true; + } + assertTrue(caught); + + // Return values are ignored, but exceptions are still dealt with properly + + remoteObject.setTransmitReturnValue(false); + test.moo("Baa"); + test.id(); + caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + caught = true; + } + assertTrue(caught); + + // Non-blocking call that ignores the return value + remoteObject.setNonBlocking(true); + remoteObject.setTransmitReturnValue(false); + test.moo("Meow"); + assertEquals(0, test.id()); + + // Non-blocking call that returns the return value + remoteObject.setTransmitReturnValue(true); + test.moo("Foo"); + + assertEquals(0, test.id()); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); + + assertEquals(0, test.id()); + byte responseID = remoteObject.getLastResponseID(); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); + + // Non-blocking call that errors out + remoteObject.setTransmitReturnValue(false); test.throwException(); - } catch (UnsupportedOperationException ex) { - System.err.println("\tExpected."); - caught = true; + assertEquals(remoteObject.waitForLastResponse() + .getClass(), UnsupportedOperationException.class); + + // Call will time out if non-blocking isn't working properly + remoteObject.setTransmitExceptions(false); + test.moo("Mooooooooo", 3000); + + + // should wait for a small time + remoteObject.setTransmitReturnValue(true); + remoteObject.setNonBlocking(false); + remoteObject.setResponseTimeout(6000); + System.out.println("You should see this 2 seconds before"); + float slow = test.slow(); + System.out.println("...This"); + assertEquals(123f, slow, .0001D); + + + // Test sending a reference to a remote object. + MessageWithTestObject m = new MessageWithTestObject(); + m.number = 678; + m.text = "sometext"; + m.testObject = test; + connection.send() + .TCP(m) + .flush(); + } catch (IOException e) { + e.printStackTrace(); + fail(); } - assertTrue(caught); - - // Return values are ignored, but exceptions are still dealt with properly - - remoteObject.setTransmitReturnValue(false); - test.moo("Baa"); - test.id(); - caught = false; - try { - test.throwException(); - } catch (UnsupportedOperationException ex) { - caught = true; - } - assertTrue(caught); - - // Non-blocking call that ignores the return value - remoteObject.setNonBlocking(true); - remoteObject.setTransmitReturnValue(false); - test.moo("Meow"); - assertEquals(0, test.id()); - - // Non-blocking call that returns the return value - remoteObject.setTransmitReturnValue(true); - test.moo("Foo"); - - assertEquals(0, test.id()); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); - - assertEquals(0, test.id()); - byte responseID = remoteObject.getLastResponseID(); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); - - // Non-blocking call that errors out - remoteObject.setTransmitReturnValue(false); - test.throwException(); - assertEquals(remoteObject.waitForLastResponse() - .getClass(), UnsupportedOperationException.class); - - // Call will time out if non-blocking isn't working properly - remoteObject.setTransmitExceptions(false); - test.moo("Mooooooooo", 3000); - - - // should wait for a small time - remoteObject.setTransmitReturnValue(true); - remoteObject.setNonBlocking(false); - remoteObject.setResponseTimeout(6000); - System.out.println("You should see this 2 seconds before"); - float slow = test.slow(); - System.out.println("...This"); - assertEquals(123f, slow, .0001D); - - - // Test sending a reference to a remote object. - MessageWithTestObject m = new MessageWithTestObject(); - m.number = 678; - m.text = "sometext"; - m.testObject = test; - connection.send() - .TCP(m) - .flush(); } }.start(); } @@ -154,7 +158,7 @@ class RmiGlobalTest extends BaseTest { @Test public - void rmi() throws InitializationException, SecurityException, IOException { + void rmi() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -169,8 +173,6 @@ class RmiGlobalTest extends BaseTest { server.disableRemoteKeyValidation(); server.setIdleTimeout(0); - register(server.getSerialization()); - // register this object as a global object that the client will get SERVER_GLOBAL_OBJECT_ID = server.createGlobalObject(globalRemoteServerObject); diff --git a/Dorkbox-Network/test/dorkbox/network/rmi/RmiSendObjectTest.java b/Dorkbox-Network/test/dorkbox/network/rmi/RmiSendObjectTest.java index fa98a5ee..87438908 100644 --- a/Dorkbox-Network/test/dorkbox/network/rmi/RmiSendObjectTest.java +++ b/Dorkbox-Network/test/dorkbox/network/rmi/RmiSendObjectTest.java @@ -27,7 +27,7 @@ class RmiSendObjectTest extends BaseTest { */ @Test public - void rmi() throws InitializationException, SecurityException, IOException { + void rmi() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); KryoCryptoSerializationManager.DEFAULT.registerRemote(TestObject.class, TestObjectImpl.class); KryoCryptoSerializationManager.DEFAULT.registerRemote(OtherObject.class, OtherObjectImpl.class); @@ -77,23 +77,32 @@ class RmiSendObjectTest extends BaseTest { @Override public void run() { - TestObject test = connection.createRemoteObject(TestObjectImpl.class); - test.setOther(43.21f); - // Normal remote method call. - assertEquals(43.21f, test.other(), .0001f); - // Make a remote method call that returns another remote proxy object. - OtherObject otherObject = test.getOtherObject(); - // Normal remote method call on the second object. - otherObject.setValue(12.34f); - float value = otherObject.value(); - assertEquals(12.34f, value, .0001f); - // When a remote proxy object is sent, the other side receives its actual remote object. - // we have to manually flush, since we are in a separate thread that does not auto-flush. - connection.send() - .TCP(otherObject) - .flush(); + TestObject test = null; + try { + test = connection.createRemoteObject(TestObjectImpl.class); + + test.setOther(43.21f); + // Normal remote method call. + assertEquals(43.21f, test.other(), .0001f); + + // Make a remote method call that returns another remote proxy object. + OtherObject otherObject = test.getOtherObject(); + // Normal remote method call on the second object. + otherObject.setValue(12.34f); + float value = otherObject.value(); + assertEquals(12.34f, value, .0001f); + + // When a remote proxy object is sent, the other side receives its actual remote object. + // we have to manually flush, since we are in a separate thread that does not auto-flush. + connection.send() + .TCP(otherObject) + .flush(); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } } }).start(); } @@ -129,7 +138,7 @@ class RmiSendObjectTest extends BaseTest { @IgnoreSerialization private final int ID = idCounter.getAndIncrement(); - @RemoteProxy + @RMI private final OtherObject otherObject = new OtherObjectImpl(); private float aFloat; diff --git a/Dorkbox-Network/test/dorkbox/network/rmi/RmiTest.java b/Dorkbox-Network/test/dorkbox/network/rmi/RmiTest.java index d4044cef..8eae4628 100644 --- a/Dorkbox-Network/test/dorkbox/network/rmi/RmiTest.java +++ b/Dorkbox-Network/test/dorkbox/network/rmi/RmiTest.java @@ -17,8 +17,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class RmiTest extends BaseTest { @@ -29,104 +28,111 @@ class RmiTest extends BaseTest { @Override public void run() { - TestObject test = connection.createRemoteObject(TestObjectImpl.class); - - System.err.println("Starting test for: " + remoteObjectID); - - //TestObject test = connection.getRemoteObject(id, TestObject.class); - RemoteObject remoteObject = (RemoteObject) test; - - // Default behavior. RMI is transparent, method calls behave like normal - // (return values and exceptions are returned, call is synchronous) - System.err.println("hashCode: " + test.hashCode()); - System.err.println("toString: " + test); - test.moo(); - test.moo("Cow"); - assertEquals(remoteObjectID, test.id()); - - - // UDP calls that ignore the return value - remoteObject.setUDP(true); - test.moo("Meow"); - assertEquals(0, test.id()); - remoteObject.setUDP(false); - - - // Test that RMI correctly waits for the remotely invoked method to exit - remoteObject.setResponseTimeout(5000); - test.moo("You should see this two seconds before...", 2000); - System.out.println("...This"); - remoteObject.setResponseTimeout(3000); - - // Try exception handling - boolean caught = false; + TestObject test = null; try { + test = connection.createRemoteObject(TestObjectImpl.class); + + System.err.println("Starting test for: " + remoteObjectID); + + //TestObject test = connection.getRemoteObject(id, TestObject.class); + RemoteObject remoteObject = (RemoteObject) test; + + // Default behavior. RMI is transparent, method calls behave like normal + // (return values and exceptions are returned, call is synchronous) + System.err.println("hashCode: " + test.hashCode()); + System.err.println("toString: " + test); + test.moo(); + test.moo("Cow"); + assertEquals(remoteObjectID, test.id()); + + + // UDP calls that ignore the return value + remoteObject.setUDP(true); + test.moo("Meow"); + assertEquals(0, test.id()); + remoteObject.setUDP(false); + + + // Test that RMI correctly waits for the remotely invoked method to exit + remoteObject.setResponseTimeout(5000); + test.moo("You should see this two seconds before...", 2000); + System.out.println("...This"); + remoteObject.setResponseTimeout(3000); + + // Try exception handling + boolean caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + System.err.println("\tExpected."); + caught = true; + } + assertTrue(caught); + + // Return values are ignored, but exceptions are still dealt with properly + + remoteObject.setTransmitReturnValue(false); + test.moo("Baa"); + test.id(); + caught = false; + try { + test.throwException(); + } catch (UnsupportedOperationException ex) { + caught = true; + } + assertTrue(caught); + + // Non-blocking call that ignores the return value + remoteObject.setNonBlocking(true); + remoteObject.setTransmitReturnValue(false); + test.moo("Meow"); + assertEquals(0, test.id()); + + // Non-blocking call that returns the return value + remoteObject.setTransmitReturnValue(true); + test.moo("Foo"); + + assertEquals(0, test.id()); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); + + assertEquals(0, test.id()); + byte responseID = remoteObject.getLastResponseID(); + // wait for the response to id() + assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); + + // Non-blocking call that errors out + remoteObject.setTransmitReturnValue(false); test.throwException(); - } catch (UnsupportedOperationException ex) { - System.err.println("\tExpected."); - caught = true; + assertEquals(remoteObject.waitForLastResponse() + .getClass(), UnsupportedOperationException.class); + + // Call will time out if non-blocking isn't working properly + remoteObject.setTransmitExceptions(false); + test.moo("Mooooooooo", 3000); + + // should wait for a small time + remoteObject.setTransmitReturnValue(true); + remoteObject.setNonBlocking(false); + remoteObject.setResponseTimeout(6000); + System.out.println("You should see this 2 seconds before"); + float slow = test.slow(); + System.out.println("...This"); + assertEquals(slow, 123, .0001D); + + // Test sending a reference to a remote object. + MessageWithTestObject m = new MessageWithTestObject(); + m.number = 678; + m.text = "sometext"; + m.testObject = test; + connection.send() + .TCP(m) + .flush(); + + } catch (IOException e) { + e.printStackTrace(); + fail(); } - assertTrue(caught); - - // Return values are ignored, but exceptions are still dealt with properly - - remoteObject.setTransmitReturnValue(false); - test.moo("Baa"); - test.id(); - caught = false; - try { - test.throwException(); - } catch (UnsupportedOperationException ex) { - caught = true; - } - assertTrue(caught); - - // Non-blocking call that ignores the return value - remoteObject.setNonBlocking(true); - remoteObject.setTransmitReturnValue(false); - test.moo("Meow"); - assertEquals(0, test.id()); - - // Non-blocking call that returns the return value - remoteObject.setTransmitReturnValue(true); - test.moo("Foo"); - - assertEquals(0, test.id()); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForLastResponse()); - - assertEquals(0, test.id()); - byte responseID = remoteObject.getLastResponseID(); - // wait for the response to id() - assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID)); - - // Non-blocking call that errors out - remoteObject.setTransmitReturnValue(false); - test.throwException(); - assertEquals(remoteObject.waitForLastResponse() - .getClass(), UnsupportedOperationException.class); - - // Call will time out if non-blocking isn't working properly - remoteObject.setTransmitExceptions(false); - test.moo("Mooooooooo", 3000); - - // should wait for a small time - remoteObject.setTransmitReturnValue(true); - remoteObject.setNonBlocking(false); - remoteObject.setResponseTimeout(6000); - System.out.println("You should see this 2 seconds before"); - float slow = test.slow(); - System.out.println("...This"); - assertEquals(slow, 123, .0001D); - - // Test sending a reference to a remote object. - MessageWithTestObject m = new MessageWithTestObject(); - m.number = 678; - m.text = "sometext"; - m.testObject = test; - connection.send() - .TCP(m) - .flush(); } }.start(); } @@ -143,7 +149,7 @@ class RmiTest extends BaseTest { @Test public - void rmi() throws InitializationException, SecurityException, IOException { + void rmi() throws InitializationException, SecurityException, IOException, InterruptedException { KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(); register(KryoCryptoSerializationManager.DEFAULT); @@ -158,9 +164,6 @@ class RmiTest extends BaseTest { server.disableRemoteKeyValidation(); server.setIdleTimeout(0); - register(server.getSerialization()); - - addEndPoint(server); server.bind(false);