diff --git a/src/dorkbox/network/Client.java b/src/dorkbox/network/Client.java index f7f07fce..083ef40e 100644 --- a/src/dorkbox/network/Client.java +++ b/src/dorkbox/network/Client.java @@ -32,7 +32,6 @@ import dorkbox.network.rmi.RemoteObjectCallback; import dorkbox.network.rmi.TimeoutException; import dorkbox.util.NamedThreadFactory; import dorkbox.util.OS; -import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; @@ -61,7 +60,7 @@ import io.netty.util.internal.PlatformDependent; */ @SuppressWarnings({"unused", "WeakerAccess"}) public -class Client extends EndPointClient implements Connection { +class Client extends EndPointClient implements Connection { /** * Gets the version number. */ @@ -78,7 +77,7 @@ class Client extends EndPointClient implements Connecti * Starts a LOCAL only client, with the default local channel name and serialization scheme */ public - Client() throws InitializationException, SecurityException, IOException { + Client() throws SecurityException { this(Configuration.localOnly()); } @@ -86,8 +85,7 @@ class Client extends EndPointClient implements Connecti * Starts a TCP & UDP client (or a LOCAL client), with the specified serialization scheme */ public - Client(String host, int tcpPort, int udpPort, String localChannelName) - throws InitializationException, SecurityException, IOException { + Client(String host, int tcpPort, int udpPort, String localChannelName) throws SecurityException { this(new Configuration(host, tcpPort, udpPort, localChannelName)); } @@ -96,7 +94,7 @@ class Client extends EndPointClient implements Connecti */ @SuppressWarnings("AutoBoxing") public - Client(final Configuration config) throws InitializationException, SecurityException, IOException { + Client(final Configuration config) throws SecurityException { super(config); String threadName = Client.class.getSimpleName(); @@ -144,7 +142,7 @@ class Client extends EndPointClient implements Connecti localBootstrap.group(localBoss) .channel(LocalChannel.class) .remoteAddress(new LocalAddress(config.localChannelName)) - .handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper)); + .handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper)); manageForShutdown(localBoss); } @@ -181,9 +179,9 @@ class Client extends EndPointClient implements Connecti .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) .remoteAddress(config.host, config.tcpPort) - .handler(new RegistrationRemoteHandlerClientTCP(threadName, - registrationWrapper, - serializationManager)); + .handler(new RegistrationRemoteHandlerClientTCP(threadName, + registrationWrapper, + serializationManager)); // android screws up on this!! tcpBootstrap.option(ChannelOption.TCP_NODELAY, !isAndroid) @@ -217,9 +215,9 @@ class Client extends EndPointClient implements Connecti .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) .localAddress(new InetSocketAddress(0)) // bind to wildcard .remoteAddress(new InetSocketAddress(config.host, config.udpPort)) - .handler(new RegistrationRemoteHandlerClientUDP(threadName, - registrationWrapper, - serializationManager)); + .handler(new RegistrationRemoteHandlerClientUDP(threadName, + registrationWrapper, + serializationManager)); // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0) // in order to WRITE: write as normal, just make sure it ends in .255 @@ -419,9 +417,8 @@ class Client extends EndPointClient implements Connecti public void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) { try { - C connection0 = connectionManager.getConnection0(); - connection0.createRemoteObject(interfaceClass, callback); - } catch (IOException e) { + connection.createRemoteObject(interfaceClass, callback); + } catch (NullPointerException e) { logger.error("Error creating remote object!", e); } } @@ -452,9 +449,8 @@ class Client extends EndPointClient implements Connecti public void getRemoteObject(final int objectId, final RemoteObjectCallback callback) { try { - C connection0 = connectionManager.getConnection0(); - connection0.getRemoteObject(objectId, callback); - } catch (IOException e) { + connection.getRemoteObject(objectId, callback); + } catch (NullPointerException e) { logger.error("Error getting remote object!", e); } } @@ -464,11 +460,12 @@ class Client extends EndPointClient implements Connecti *

* Make sure that you only call this after the client connects! *

- * This is preferred to {@link EndPointBase#getConnections()} getConnections()}, as it properly does some error checking + * This is preferred to {@link EndPointBase#getConnections()}, as it properly does some error checking */ + @SuppressWarnings("unchecked") public C getConnection() { - return connection; + return (C) connection; } /** diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java index c21e63cd..4565500f 100644 --- a/src/dorkbox/network/Server.java +++ b/src/dorkbox/network/Server.java @@ -26,7 +26,6 @@ import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerS import dorkbox.util.NamedThreadFactory; import dorkbox.util.OS; import dorkbox.util.Property; -import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; @@ -58,7 +57,7 @@ import io.netty.channel.unix.UnixChannelOption; * To put it bluntly, ONLY have the server do work inside of a listener! */ public -class Server extends EndPointServer { +class Server extends EndPointServer { /** @@ -93,7 +92,7 @@ class Server extends EndPointServer { * Starts a LOCAL only server, with the default serialization scheme. */ public - Server() throws InitializationException, SecurityException, IOException { + Server() throws SecurityException { this(Configuration.localOnly()); } @@ -102,7 +101,7 @@ class Server extends EndPointServer { */ @SuppressWarnings("AutoBoxing") public - Server(Configuration config) throws InitializationException, SecurityException, IOException { + Server(Configuration config) throws SecurityException { // watch-out for serialization... it can be NULL incoming. The EndPoint (superclass) sets it, if null, so // you have to make sure to use this.serialization super(config); @@ -186,7 +185,7 @@ class Server extends EndPointServer { .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) .localAddress(new LocalAddress(localChannelName)) - .childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper)); + .childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper)); manageForShutdown(localBoss); manageForShutdown(localWorker); @@ -219,9 +218,9 @@ class Server extends EndPointServer { .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, - registrationWrapper, - serializationManager)); + .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, + registrationWrapper, + serializationManager)); // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address! if (hostName.equals("0.0.0.0")) { @@ -265,9 +264,9 @@ class Server extends EndPointServer { // not binding to specific address, since it's driven by TCP, and that can be bound to a specific address .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! - .handler(new RegistrationRemoteHandlerServerUDP(threadName, - registrationWrapper, - serializationManager)); + .handler(new RegistrationRemoteHandlerServerUDP(threadName, + registrationWrapper, + serializationManager)); // Enable to READ from MULTICAST data (ie, 192.168.1.0) diff --git a/src/dorkbox/network/connection/Connection.java b/src/dorkbox/network/connection/Connection.java index 5f59eb67..f05e66dc 100644 --- a/src/dorkbox/network/connection/Connection.java +++ b/src/dorkbox/network/connection/Connection.java @@ -47,7 +47,6 @@ interface Connection { @SuppressWarnings("rawtypes") EndPointBase getEndPoint(); - /** * @return the connection (TCP or LOCAL) id of this connection. */ diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index ef5e910c..0c59b2a7 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -17,11 +17,12 @@ package dorkbox.network.connection; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Proxy; import java.util.AbstractMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -38,13 +39,20 @@ import dorkbox.network.connection.ping.PingTuple; import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; import dorkbox.network.connection.wrapper.ChannelNull; import dorkbox.network.connection.wrapper.ChannelWrapper; +import dorkbox.network.rmi.InvokeMethod; +import dorkbox.network.rmi.InvokeMethodResult; import dorkbox.network.rmi.RemoteObject; import dorkbox.network.rmi.RemoteObjectCallback; import dorkbox.network.rmi.Rmi; import dorkbox.network.rmi.RmiBridge; +import dorkbox.network.rmi.RmiMessage; +import dorkbox.network.rmi.RmiObjectHandler; +import dorkbox.network.rmi.RmiProxyHandler; import dorkbox.network.rmi.RmiRegistration; +import dorkbox.network.rmi.TimeoutException; import dorkbox.network.serialization.CryptoSerializationManager; import dorkbox.util.collections.IntMap; +import dorkbox.util.collections.LockFreeHashMap; import dorkbox.util.generics.ClassHelper; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; @@ -68,6 +76,24 @@ import io.netty.util.concurrent.Promise; @Sharable public class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConnection, Connection, Listeners, ConnectionBridge { + // default false + public static boolean ENABLE_PROXY_OBJECTS = true; + + public static + boolean isTcp(Class channelClass) { + return channelClass == NioSocketChannel.class || channelClass == EpollSocketChannel.class; + } + + public static + boolean isUdp(Class channelClass) { + return channelClass == NioDatagramChannel.class || channelClass == EpollDatagramChannel.class; + } + + public static + boolean isLocal(Class channelClass) { + return channelClass == LocalChannel.class; + } + private final org.slf4j.Logger logger; @@ -82,14 +108,14 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn private final Object messageInProgressLock = new Object(); private final AtomicBoolean messageInProgress = new AtomicBoolean(false); - private ISessionManager sessionManager; - private ChannelWrapper channelWrapper; + private ISessionManager sessionManager; + private ChannelWrapper channelWrapper; private boolean isLoopback; private volatile PingFuture pingFuture = null; // used to store connection local listeners (instead of global listeners). Only possible on the server. - private volatile ConnectionManager localListenerManager; + private volatile ConnectionManager localListenerManager; // while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error. private boolean remoteKeyChanged; @@ -108,21 +134,32 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn // // RMI fields // - protected CountDownLatch rmi; private final RmiBridge rmiBridge; - private final Map proxyIdCache = new WeakHashMap(8); - private final IntMap rmiRegistrationCallbacks = new IntMap(); + private final Map proxyIdCache; + private final List> proxyListeners; + + private final IntMap rmiRegistrationCallbacks; private int rmiCallbackId = 0; // protected by synchronized (rmiRegistrationCallbacks) /** * All of the parameters can be null, when metaChannel wants to get the base class type */ - @SuppressWarnings({"rawtypes", "unchecked"}) public ConnectionImpl(final Logger logger, final EndPointBase endPoint, final RmiBridge rmiBridge) { this.logger = logger; this.endPoint = endPoint; this.rmiBridge = rmiBridge; + + if (endPoint != null && endPoint.globalRmiBridge != null) { + // rmi is enabled. + proxyIdCache = new LockFreeHashMap(); + proxyListeners = new CopyOnWriteArrayList>(); + rmiRegistrationCallbacks = new IntMap(); + } else { + proxyIdCache = null; + proxyListeners = null; + rmiRegistrationCallbacks = null; + } } /** @@ -130,8 +167,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn *

* This happens BEFORE prep. */ - @SuppressWarnings("unchecked") - void init(final ChannelWrapper channelWrapper, final ConnectionManager sessionManager) { + final + void init(final ChannelWrapper channelWrapper, final ISessionManager sessionManager) { this.sessionManager = sessionManager; this.channelWrapper = channelWrapper; @@ -151,6 +188,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn *

* This happens AFTER init. */ + final void prep() { if (this.channelWrapper != null) { this.channelWrapper.init(); @@ -201,7 +239,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn return this.channelWrapper.getRemoteHost(); } - /** * @return true if this connection is established on the loopback interface */ @@ -216,7 +253,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn */ @Override public - EndPointBase getEndPoint() { + EndPointBase getEndPoint() { return this.endPoint; } @@ -529,7 +566,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn public void channelRead(Object object) throws Exception { - // prevent close from occurring SMACK in the middle of a message in progress. // delay close until it's finished. this.messageInProgress.set(true); @@ -568,7 +604,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn Channel channel = context.channel(); Class channelClass = channel.getClass(); - boolean isTCP = channelClass == NioSocketChannel.class || channelClass == EpollSocketChannel.class; + boolean isTCP = isTcp(channelClass); + boolean isLocal = isLocal(channelClass); if (this.logger.isInfoEnabled()) { String type; @@ -576,10 +613,10 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn if (isTCP) { type = "TCP"; } - else if (channelClass == NioDatagramChannel.class || channelClass == EpollDatagramChannel.class) { + else if (isUdp(channelClass)) { type = "UDP"; } - else if (channelClass == LocalChannel.class) { + else if (isLocal) { type = "LOCAL"; } else { @@ -597,7 +634,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn } // our master channels are TCP/LOCAL (which are mutually exclusive). Only key disconnect events based on the status of them. - if (isTCP || channelClass == LocalChannel.class) { + if (isTCP || isLocal) { // this is because channelInactive can ONLY happen when netty shuts down the channel. // and connection.close() can be called by the user. this.sessionManager.onDisconnected(this); @@ -618,6 +655,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn @Override public final void close() { + close(false); + } + + + final + void close(final boolean keepListeners) { // only close if we aren't already in the middle of closing. if (this.closeInProgress.compareAndSet(false, true)) { int idleTimeoutMs = this.endPoint.getIdleTimeout(); @@ -657,9 +700,29 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn } } } + + // remove all listeners, but ONLY if we are the server. If we remove all listeners and we are the client, then we remove + // ALL logic from the client! The server is OK because the server listeners per connection are dynamically added + if (!keepListeners) { + removeAll(); + } + + // proxy listeners are cleared in the removeAll() call + if (proxyIdCache != null) { + synchronized (proxyIdCache) { + proxyIdCache.clear(); + } + } + + if (rmiRegistrationCallbacks != null) { + synchronized (rmiRegistrationCallbacks) { + rmiRegistrationCallbacks.clear(); + } + } } } + /** * Marks the connection to be closed as soon as possible. This is evaluated when the current * thread execution returns to the network stack. @@ -716,7 +779,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn * (via connection.addListener), meaning that ONLY that listener attached to * the connection is notified on that event (ie, admin type listeners) */ - @SuppressWarnings("rawtypes") @Override public final Listeners add(Listener listener) { @@ -758,7 +820,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn * connection.removeListener), meaning that ONLY that listener attached to * the connection is removed */ - @SuppressWarnings("rawtypes") @Override public final Listeners remove(Listener listener) { @@ -777,7 +838,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn this.localListenerManager.remove(listener); if (!this.localListenerManager.hasListeners()) { - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPoint).removeListenerManager(this); } } } @@ -793,10 +854,16 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn /** * Removes all registered listeners from this connection/endpoint to NO * LONGER be notified of connect/disconnect/idle/receive(object) events. + * + * This includes all proxy listeners */ @Override public final Listeners removeAll() { + if (proxyListeners != null) { + proxyListeners.clear(); + } + if (this.endPoint instanceof EndPointServer) { // when we are a server, NORMALLY listeners are added at the GLOBAL level // meaning -- @@ -812,7 +879,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn this.localListenerManager.removeAll(); this.localListenerManager = null; - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPoint).removeListenerManager(this); } } } @@ -825,8 +892,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn } /** - * Removes all registered listeners (of the object type) from this - * connection/endpoint to NO LONGER be notified of + * Removes all registered listeners (of the object type) from this connection/endpoint to NO LONGER be notified of * connect/disconnect/idle/receive(object) events. */ @Override @@ -848,7 +914,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn if (!this.localListenerManager.hasListeners()) { this.localListenerManager = null; - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPoint).removeListenerManager(this); } } } @@ -908,7 +974,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn - @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked", "Duplicates"}) @Override public final void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) { @@ -931,7 +996,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn TCP(message).flush(); } - @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked", "Duplicates"}) @Override public final void getRemoteObject(final int objectId, final RemoteObjectCallback callback) { @@ -952,21 +1016,59 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn TCP(message).flush(); } - // default false - public static boolean ENABLE_PROXY_OBJECTS = true; + + /** + * Manages the RMI stuff for a connection. + * + * @return true if there was RMI stuff done, false if the message was "normal" and nothing was done + */ + boolean manageRmi(final Object message) { + if (message instanceof RmiMessage) { + RmiObjectHandler rmiObjectHandler = channelWrapper.manageRmi(); + + if (message instanceof InvokeMethod) { + rmiObjectHandler.invoke(this, (InvokeMethod) message, rmiBridge.getListener()); + } + else if (message instanceof InvokeMethodResult) { + for (Listener.OnMessageReceived proxyListener : proxyListeners) { + proxyListener.received(this, (InvokeMethodResult) message); + } + } + else if (message instanceof RmiRegistration) { + rmiObjectHandler.registration(this, (RmiRegistration) message); + } + + return true; + } + + return false; + } + + /** + * Objects that are on the "local" in-jvm connection have fixup their objects. For "network" connections, this is automatically done. + */ + Object fixupRmi(final Object message) { + // "local RMI" objects have to be modified, this part does that + RmiObjectHandler rmiObjectHandler = channelWrapper.manageRmi(); + return rmiObjectHandler.normalMessages(this, message); + } + + /** + * This will remove the invoke and invoke response listeners for this the remote object + */ + public + void removeRmiListeners(final int objectID, final Listener listener) { + + } + /** * For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically. * For local connections, we have to switch it appropriately in the LocalRmiProxy - * - * @param implementationClass - * @param callbackId - * @return */ public final RmiRegistration createNewRmiObject(final Class interfaceClass, final Class implementationClass, final int callbackId) { - CryptoSerializationManager manager = getEndPoint().serializationManager; KryoExtra kryo = null; @@ -1090,14 +1192,37 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn } /** - * Used by RMI for the LOCAL side, to get the proxy object as an interface + * Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)} + *

+ *

+ * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked + * remotely. + *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link + * RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be + * called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be + * called on the update thread. + *

+ * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will + * have the proxy object replaced with the registered object. * - * @param objectID is the RMI object ID - * @param iFace must be the interface the proxy will bind to + * @see RemoteObject + * + * @param objectID this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID + * @param iFace this is the RMI interface */ @Override public RemoteObject getProxyObject(final int objectID, final Class iFace) { + if (iFace == null) { + throw new IllegalArgumentException("iface cannot be null."); + } + if (!iFace.isInterface()) { + throw new IllegalArgumentException("iface must be an interface."); + } + synchronized (proxyIdCache) { // we want to have a connection specific cache of IDs, using weak references. // because this is PER CONNECTION, this is safe. @@ -1105,7 +1230,18 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn if (remoteObject == null) { // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. - remoteObject = rmiBridge.createProxyObject(this, objectID, iFace); + // remoteObject = rmiBridge.createProxyObject(this, objectID, iFace); + + // the ACTUAL proxy is created in the connection impl. + RmiProxyHandler proxyObject = new RmiProxyHandler(this, objectID, iFace); + proxyListeners.add(proxyObject.getListener()); + + Class[] temp = new Class[2]; + temp[0] = RemoteObject.class; + temp[1] = iFace; + + remoteObject = (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, proxyObject); + proxyIdCache.put(objectID, remoteObject); } diff --git a/src/dorkbox/network/connection/ConnectionManager.java b/src/dorkbox/network/connection/ConnectionManager.java index 2db10103..0bf2a5ed 100644 --- a/src/dorkbox/network/connection/ConnectionManager.java +++ b/src/dorkbox/network/connection/ConnectionManager.java @@ -15,7 +15,6 @@ */ package dorkbox.network.connection; -import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -38,54 +37,52 @@ import dorkbox.util.generics.TypeResolver; // .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other. @SuppressWarnings("unchecked") public -class ConnectionManager implements Listeners, ISessionManager, ConnectionPoint, ConnectionBridgeServer, - ConnectionExceptSpecifiedBridgeServer { +class ConnectionManager implements Listeners, ISessionManager, ConnectionPoint, ConnectionBridgeServer, + ConnectionExceptSpecifiedBridgeServer { /** * Specifies the load-factor for the IdentityMap used to manage keeping track of the number of connections + listeners */ @Property public static final float LOAD_FACTOR = 0.8F; + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater localManagersREF = AtomicReferenceFieldUpdater.newUpdater( + ConnectionManager.class, + IdentityMap.class, + "localManagers"); + + + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater connectionsREF = AtomicReferenceFieldUpdater.newUpdater( + ConnectionManager.class, + ConcurrentEntry.class, + "connectionsHead"); + + private final String loggerName; - private final OnConnectedManager onConnectedManager; - private final OnDisconnectedManager onDisconnectedManager; - private final OnIdleManager onIdleManager; - private final OnMessageReceivedManager onMessageReceivedManager; - - + private final OnConnectedManager onConnectedManager; + private final OnDisconnectedManager onDisconnectedManager; + private final OnIdleManager onIdleManager; + private final OnMessageReceivedManager onMessageReceivedManager; @SuppressWarnings({"FieldCanBeLocal", "unused"}) - private volatile ConcurrentEntry connectionsHead = null; // reference to the first element + private volatile ConcurrentEntry connectionsHead = null; // reference to the first element // This is ONLY touched by a single thread, maintains a map of entries for FAST lookup during connection remove. - private final IdentityMap connectionEntries = new IdentityMap(32, ConnectionManager.LOAD_FACTOR); + private final IdentityMap connectionEntries = new IdentityMap(32, ConnectionManager.LOAD_FACTOR); @SuppressWarnings("unused") - private volatile IdentityMap> localManagers = new IdentityMap>(8, ConnectionManager.LOAD_FACTOR); + private volatile IdentityMap localManagers = new IdentityMap(8, ConnectionManager.LOAD_FACTOR); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - private final Object singleWriterLock2 = new Object(); - private final Object singleWriterLock3 = new Object(); - - - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater localManagersREF = - AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class, - IdentityMap.class, - "localManagers"); - - - - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater connectionsREF = - AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class, - ConcurrentEntry.class, - "connectionsHead"); + private final Object singleWriterConnectionsLock = new Object(); + private final Object singleWriterLocalManagerLock = new Object(); /** @@ -102,10 +99,10 @@ class ConnectionManager implements Listeners, ISessionMana this.logger = org.slf4j.LoggerFactory.getLogger(loggerName); this.baseClass = baseClass; - onConnectedManager = new OnConnectedManager(logger); - onDisconnectedManager = new OnDisconnectedManager(logger); - onIdleManager = new OnIdleManager(logger); - onMessageReceivedManager = new OnMessageReceivedManager(logger); + onConnectedManager = new OnConnectedManager(logger); + onDisconnectedManager = new OnDisconnectedManager(logger); + onIdleManager = new OnIdleManager(logger); + onMessageReceivedManager = new OnMessageReceivedManager(logger); } /** @@ -117,7 +114,6 @@ class ConnectionManager implements Listeners, ISessionMana * It is POSSIBLE to add a server connection ONLY (ie, not global) listener (via connection.addListener), meaning that ONLY that * listener attached to the connection is notified on that event (ie, admin type listeners) */ - @SuppressWarnings("rawtypes") @Override public final Listeners add(final Listener listener) { @@ -149,25 +145,24 @@ class ConnectionManager implements Listeners, ISessionMana /** * INTERNAL USE ONLY */ - @SuppressWarnings({"unchecked", "rawtypes"}) private void addListener0(final Listener listener) { boolean found = false; if (listener instanceof Listener.OnConnected) { - onConnectedManager.add((Listener.OnConnected) listener); + onConnectedManager.add((Listener.OnConnected) listener); found = true; } if (listener instanceof Listener.OnDisconnected) { - onDisconnectedManager.add((Listener.OnDisconnected) listener); + onDisconnectedManager.add((Listener.OnDisconnected) listener); found = true; } if (listener instanceof Listener.OnIdle) { - onIdleManager.add((Listener.OnIdle) listener); + onIdleManager.add((Listener.OnIdle) listener); found = true; } if (listener instanceof Listener.OnMessageReceived) { - onMessageReceivedManager.add((Listener.OnMessageReceived) listener); + onMessageReceivedManager.add((Listener.OnMessageReceived) listener); found = true; } @@ -196,7 +191,6 @@ class ConnectionManager implements Listeners, ISessionMana * It is POSSIBLE to remove a server-connection 'non-global' listener (via connection.removeListener), meaning that ONLY that listener * attached to the connection is removed */ - @SuppressWarnings("rawtypes") @Override public final Listeners remove(final Listener listener) { @@ -206,16 +200,16 @@ class ConnectionManager implements Listeners, ISessionMana boolean found = false; if (listener instanceof Listener.OnConnected) { - found = onConnectedManager.remove((Listener.OnConnected) listener); + found = onConnectedManager.remove((Listener.OnConnected) listener); } if (listener instanceof Listener.OnDisconnected) { - found |= onDisconnectedManager.remove((Listener.OnDisconnected) listener); + found |= onDisconnectedManager.remove((Listener.OnDisconnected) listener); } if (listener instanceof Listener.OnIdle) { - found |= onIdleManager.remove((Listener.OnIdle) listener); + found |= onIdleManager.remove((Listener.OnIdle) listener); } if (listener instanceof Listener.OnMessageReceived) { - found |= onMessageReceivedManager.remove((Listener.OnMessageReceived) listener); + found |= onMessageReceivedManager.remove((Listener.OnMessageReceived) listener); } final Logger logger2 = this.logger; @@ -290,13 +284,22 @@ class ConnectionManager implements Listeners, ISessionMana */ @Override public final - void onMessage(final C connection, final Object message) { + void onMessage(final ConnectionImpl connection, final Object message) { notifyOnMessage0(connection, message, false); } @SuppressWarnings("Duplicates") private - boolean notifyOnMessage0(final C connection, final Object message, boolean foundListener) { + boolean notifyOnMessage0(final ConnectionImpl connection, Object message, boolean foundListener) { + if (connection.manageRmi(message)) { + // if we are an RMI message/registration, we have very specific, defined behavior. We do not use the "normal" listener callback pattern + // because these methods are rare, and require special functionality + return true; + } + + message = connection.fixupRmi(message); + + foundListener |= onMessageReceivedManager.notifyReceived(connection, message, shutdown); // now have to account for additional connection listener managers (non-global). @@ -316,23 +319,19 @@ class ConnectionManager implements Listeners, ISessionMana .flush(); } else { - if (this.logger.isErrorEnabled()) { - this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}", - message.getClass() - .getSimpleName()); - } + this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}", + message.getClass() + .getSimpleName()); } return foundListener; } /** * Invoked when a Connection has been idle for a while. - *

- * {@link ISessionManager} */ @Override public final - void onIdle(final C connection) { + void onIdle(final Connection connection) { boolean foundListener = onIdleManager.notifyIdle(connection, shutdown); if (foundListener) { @@ -342,8 +341,8 @@ class ConnectionManager implements Listeners, ISessionMana // now have to account for additional (local) listener managers. // access a snapshot of the managers (single-writer-principle) - final IdentityMap> localManagers = localManagersREF.get(this); - ConnectionManager localManager = localManagers.get(connection); + final IdentityMap localManagers = localManagersREF.get(this); + ConnectionManager localManager = localManagers.get(connection); if (localManager != null) { localManager.onIdle(connection); } @@ -351,13 +350,10 @@ class ConnectionManager implements Listeners, ISessionMana /** * Invoked when a Channel is open, bound to a local address, and connected to a remote address. - *

- * {@link ISessionManager} */ - @SuppressWarnings("Duplicates") @Override public - void onConnected(final C connection) { + void onConnected(final Connection connection) { addConnection(connection); boolean foundListener = onConnectedManager.notifyConnected(connection, shutdown); @@ -369,8 +365,8 @@ class ConnectionManager implements Listeners, ISessionMana // now have to account for additional (local) listener managers. // access a snapshot of the managers (single-writer-principle) - final IdentityMap> localManagers = localManagersREF.get(this); - ConnectionManager localManager = localManagers.get(connection); + final IdentityMap localManagers = localManagersREF.get(this); + ConnectionManager localManager = localManagers.get(connection); if (localManager != null) { localManager.onConnected(connection); } @@ -378,13 +374,10 @@ class ConnectionManager implements Listeners, ISessionMana /** * Invoked when a Channel was disconnected from its remote peer. - *

- * {@link ISessionManager} */ - @SuppressWarnings("Duplicates") @Override public - void onDisconnected(final C connection) { + void onDisconnected(final Connection connection) { boolean foundListener = onDisconnectedManager.notifyDisconnected(connection, shutdown); if (foundListener) { @@ -395,8 +388,8 @@ class ConnectionManager implements Listeners, ISessionMana // now have to account for additional (local) listener managers. // access a snapshot of the managers (single-writer-principle) - final IdentityMap> localManagers = localManagersREF.get(this); - ConnectionManager localManager = localManagers.get(connection); + final IdentityMap localManagers = localManagersREF.get(this); + ConnectionManager localManager = localManagers.get(connection); if (localManager != null) { localManager.onDisconnected(connection); @@ -415,11 +408,11 @@ class ConnectionManager implements Listeners, ISessionMana * * @param connection the connection to add */ - void addConnection(final C connection) { + void addConnection(final Connection connection) { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - synchronized (singleWriterLock2) { + synchronized (singleWriterConnectionsLock) { // access a snapshot of the connections (single-writer-principle) ConcurrentEntry head = connectionsREF.get(this); @@ -442,12 +435,12 @@ class ConnectionManager implements Listeners, ISessionMana * * @param connection the connection to remove */ - private - void removeConnection(C connection) { + public + void removeConnection(Connection connection) { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - synchronized (singleWriterLock2) { + synchronized (singleWriterConnectionsLock) { // access a snapshot of the connections (single-writer-principle) ConcurrentEntry concurrentEntry = connectionEntries.get(connection); @@ -477,36 +470,34 @@ class ConnectionManager implements Listeners, ISessionMana @Override public List getConnections() { - synchronized (singleWriterLock2) { - final IdentityMap.Keys keys = this.connectionEntries.keys(); - return keys.toArray(); + synchronized (singleWriterConnectionsLock) { + final IdentityMap.Keys keys = this.connectionEntries.keys(); + return (List) keys.toArray(); } } - final - ConnectionManager addListenerManager(final Connection connection) { + ConnectionManager addListenerManager(final Connection connection) { // when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections // are notified of that listener. // it is POSSIBLE to add a connection-specific listener (via connection.addListener), meaning that ONLY // that listener is notified on that event (ie, admin type listeners) - ConnectionManager manager; + ConnectionManager manager; boolean created = false; // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - synchronized (singleWriterLock3) { + synchronized (singleWriterLocalManagerLock) { // access a snapshot of the managers (single-writer-principle) - final IdentityMap> localManagers = localManagersREF.get(this); + final IdentityMap localManagers = localManagersREF.get(this); manager = localManagers.get(connection); if (manager == null) { created = true; - manager = new ConnectionManager(loggerName + "-" + connection.toString() + " Specific", - ConnectionManager.this.baseClass); + manager = new ConnectionManager(loggerName + "-" + connection.toString() + " Specific", ConnectionManager.this.baseClass); localManagers.put(connection, manager); // save this snapshot back to the original (single writer principle) @@ -531,11 +522,11 @@ class ConnectionManager implements Listeners, ISessionMana // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - synchronized (singleWriterLock3) { + synchronized (singleWriterLocalManagerLock) { // access a snapshot of the managers (single-writer-principle) - final IdentityMap> localManagers = localManagersREF.get(this); + final IdentityMap localManagers = localManagersREF.get(this); - final ConnectionManager removed = localManagers.remove(connection); + final ConnectionManager removed = localManagers.remove(connection); if (removed != null) { wasRemoved = true; @@ -552,23 +543,6 @@ class ConnectionManager implements Listeners, ISessionMana } } - - /** - * BE CAREFUL! Only for internal use! - * - * @return Returns a FAST first connection (for client!). - */ - public final - C getConnection0() throws IOException { - ConcurrentEntry head1 = connectionsREF.get(this); - if (head1 != null) { - return head1.getValue(); - } - else { - throw new IOException("Not connected to a remote computer. Unable to continue!"); - } - } - /** * BE CAREFUL! Only for internal use! * @@ -587,7 +561,7 @@ class ConnectionManager implements Listeners, ISessionMana this.shutdown.set(true); // disconnect the sessions - closeConnections(); + closeConnections(false); onConnectedManager.clear(); onDisconnectedManager.clear(); @@ -596,22 +570,31 @@ class ConnectionManager implements Listeners, ISessionMana } /** - * Close all connections ONLY + * Close all connections ONLY. + * + * Only keep the listeners for connections IF we are the client. If we remove listeners as a client, ALL of the client logic will + * be lost. The server is reactive, so listeners are added to connections as needed (instead of before startup, which is what the client does). */ final - void closeConnections() { - + void closeConnections(boolean keepListeners) { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - synchronized (singleWriterLock2) { + synchronized (singleWriterConnectionsLock) { // don't need anything fast or fancy here, because this method will only be called once - final IdentityMap.Keys keys = connectionEntries.keys(); - for (C connection : keys) { + final IdentityMap.Keys keys = connectionEntries.keys(); + for (Connection connection : keys) { // Close the connection. Make sure the close operation ends because // all I/O operations are asynchronous in Netty. // Also necessary otherwise workers won't close. - connection.close(); + + + if (keepListeners && connection instanceof ConnectionImpl) { + ((ConnectionImpl) connection).close(true); + } + else { + connection.close(); + } } this.connectionEntries.clear(); @@ -636,7 +619,7 @@ class ConnectionManager implements Listeners, ISessionMana */ @Override public - ConnectionExceptSpecifiedBridgeServer except() { + ConnectionExceptSpecifiedBridgeServer except() { return this; } @@ -650,8 +633,8 @@ class ConnectionManager implements Listeners, ISessionMana @Override public void flush() { - ConcurrentEntry current = connectionsREF.get(this); - C c; + ConcurrentEntry current = connectionsREF.get(this); + Connection c; while (current != null) { c = current.getValue(); current = current.next(); @@ -667,9 +650,9 @@ class ConnectionManager implements Listeners, ISessionMana */ @Override public - ConnectionPoint TCP(final C connection, final Object message) { - ConcurrentEntry current = connectionsREF.get(this); - C c; + ConnectionPoint TCP(final Connection connection, final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + Connection c; while (current != null) { c = current.getValue(); current = current.next(); @@ -688,9 +671,9 @@ class ConnectionManager implements Listeners, ISessionMana */ @Override public - ConnectionPoint UDP(final C connection, final Object message) { - ConcurrentEntry current = connectionsREF.get(this); - C c; + ConnectionPoint UDP(final Connection connection, final Object message) { + ConcurrentEntry current = connectionsREF.get(this); + Connection c; while (current != null) { c = current.getValue(); current = current.next(); @@ -709,8 +692,8 @@ class ConnectionManager implements Listeners, ISessionMana @Override public void self(final Object message) { - ConcurrentEntry current = connectionsREF.get(this); - C c; + ConcurrentEntry current = connectionsREF.get(this); + ConnectionImpl c; while (current != null) { c = current.getValue(); current = current.next(); @@ -725,8 +708,8 @@ class ConnectionManager implements Listeners, ISessionMana @Override public ConnectionPoint TCP(final Object message) { - ConcurrentEntry current = connectionsREF.get(this); - C c; + ConcurrentEntry current = connectionsREF.get(this); + Connection c; while (current != null) { c = current.getValue(); current = current.next(); @@ -743,8 +726,8 @@ class ConnectionManager implements Listeners, ISessionMana @Override public ConnectionPoint UDP(final Object message) { - ConcurrentEntry current = connectionsREF.get(this); - C c; + ConcurrentEntry current = connectionsREF.get(this); + Connection c; while (current != null) { c = current.getValue(); current = current.next(); diff --git a/src/dorkbox/network/connection/EndPointBase.java b/src/dorkbox/network/connection/EndPointBase.java index cfa7616d..43442529 100644 --- a/src/dorkbox/network/connection/EndPointBase.java +++ b/src/dorkbox/network/connection/EndPointBase.java @@ -15,7 +15,6 @@ */ package dorkbox.network.connection; -import java.io.IOException; import java.security.SecureRandom; import java.util.List; import java.util.concurrent.Executor; @@ -36,13 +35,15 @@ import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.pipeline.KryoEncoder; import dorkbox.network.pipeline.KryoEncoderCrypto; import dorkbox.network.rmi.RmiBridge; +import dorkbox.network.rmi.RmiObjectHandler; +import dorkbox.network.rmi.RmiObjectLocalHandler; +import dorkbox.network.rmi.RmiObjectNetworkHandler; import dorkbox.network.serialization.Serialization; import dorkbox.network.store.NullSettingsStore; import dorkbox.network.store.SettingsStore; import dorkbox.util.Property; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.entropy.Entropy; -import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import io.netty.util.NetUtil; @@ -50,7 +51,7 @@ import io.netty.util.NetUtil; * represents the base of a client/server end point */ public abstract -class EndPointBase extends EndPoint { +class EndPointBase extends EndPoint { // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! // it results in severe UDP packet loss and contention. // @@ -89,14 +90,19 @@ class EndPointBase extends EndPoint { @Property public static int udpMaxSize = 508; - protected final ConnectionManager connectionManager; + protected final ConnectionManager connectionManager; protected final dorkbox.network.serialization.CryptoSerializationManager serializationManager; - protected final RegistrationWrapper registrationWrapper; + protected final RegistrationWrapper registrationWrapper; final ECPrivateKeyParameters privateKey; final ECPublicKeyParameters publicKey; final SecureRandom secureRandom; + + // we only want one instance of these created. These will be called appropriately + private final RmiObjectHandler rmiHandler; + private final RmiObjectLocalHandler localRmiHandler; + private final RmiObjectNetworkHandler networkRmiHandler; final RmiBridge globalRmiBridge; private final Executor rmiExecutor; @@ -117,12 +123,10 @@ class EndPointBase extends EndPoint { * @param type this is either "Client" or "Server", depending on who is creating this endpoint. * @param config these are the specific connection options * - * @throws InitializationException - * @throws SecurityException + * @throws SecurityException if unable to initialize/generate ECC keys */ - @SuppressWarnings({"unchecked", "rawtypes"}) public - EndPointBase(Class type, final Configuration config) throws InitializationException, SecurityException, IOException { + EndPointBase(Class type, final Configuration config) throws SecurityException { super(type); // make sure that 'localhost' is ALWAYS our specific loopback IP address @@ -193,7 +197,7 @@ class EndPointBase extends EndPoint { } catch (Exception e) { String message = "Unable to initialize/generate ECC keys. FORCED SHUTDOWN."; logger.error(message); - throw new InitializationException(message); + throw new SecurityException(message); } } @@ -209,17 +213,22 @@ class EndPointBase extends EndPoint { secureRandom = new SecureRandom(propertyStore.getSalt()); // we don't care about un-instantiated/constructed members, since the class type is the only interest. - connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass()); + //noinspection unchecked + connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass()); // add the ping listener (internal use only!) connectionManager.add(new PingSystemListener()); if (rmiEnabled) { - // these register the listener for registering a class implementation for RMI (internal use only) - connectionManager.add(new RegisterRmiNetworkHandler()); + rmiHandler = null; + localRmiHandler = new RmiObjectLocalHandler(); + networkRmiHandler = new RmiObjectNetworkHandler(); globalRmiBridge = new RmiBridge(logger, config.rmiExecutor, true); } else { + rmiHandler = new RmiObjectHandler(); + localRmiHandler = null; + networkRmiHandler = null; globalRmiBridge = null; } @@ -318,7 +327,6 @@ class EndPointBase extends EndPoint { * * @param metaChannel can be NULL (when getting the baseClass) */ - @SuppressWarnings("unchecked") protected final Connection connection0(MetaChannel metaChannel) { ConnectionImpl connection; @@ -332,31 +340,35 @@ class EndPointBase extends EndPoint { // These properties are ASSIGNED in the same thread that CREATED the object. Only the AES info needs to be // volatile since it is the only thing that changes. if (metaChannel != null) { - ChannelWrapper wrapper; + ChannelWrapper wrapper; connection = newConnection(logger, this, rmiBridge); metaChannel.connection = connection; if (metaChannel.localChannel != null) { - wrapper = new ChannelLocalWrapper(metaChannel); - } - else { - if (this instanceof EndPointServer) { - wrapper = new ChannelNetworkWrapper(metaChannel, registrationWrapper); + if (rmiEnabled) { + wrapper = new ChannelLocalWrapper(metaChannel, localRmiHandler); } else { - wrapper = new ChannelNetworkWrapper(metaChannel, null); + wrapper = new ChannelLocalWrapper(metaChannel, rmiHandler); + } + } + else { + RmiObjectHandler rmiObjectHandler = rmiHandler; + if (rmiEnabled) { + rmiObjectHandler = networkRmiHandler; + } + + if (this instanceof EndPointServer) { + wrapper = new ChannelNetworkWrapper(metaChannel, registrationWrapper, rmiObjectHandler); + } + else { + wrapper = new ChannelNetworkWrapper(metaChannel, null, rmiObjectHandler); } } // now initialize the connection channels with whatever extra info they might need. - connection.init(wrapper, (ConnectionManager) connectionManager); - - if (rmiBridge != null) { - // notify our remote object space that it is able to receive method calls. - connection.listeners() - .add(rmiBridge.getListener()); - } + connection.init(wrapper, connectionManager); } else { // getting the connection baseClass @@ -374,14 +386,13 @@ class EndPointBase extends EndPoint { *

* Only the CLIENT injects in front of this) */ - @SuppressWarnings("unchecked") void connectionConnected0(ConnectionImpl connection) { isConnected.set(true); // prep the channel wrapper connection.prep(); - connectionManager.onConnected((C) connection); + connectionManager.onConnected(connection); } /** @@ -396,7 +407,7 @@ class EndPointBase extends EndPoint { * Returns a non-modifiable list of active connections */ public - List getConnections() { + List getConnections() { return connectionManager.getConnections(); } @@ -413,13 +424,13 @@ class EndPointBase extends EndPoint { *

* The server should ALWAYS use STOP. */ - public - void closeConnections() { + void closeConnections(boolean shouldKeepListeners) { // give a chance to other threads. Thread.yield(); - // stop does the same as this + more - connectionManager.closeConnections(); + // stop does the same as this + more. Only keep the listeners for connections IF we are the client. If we remove listeners as a client, + // ALL of the client logic will be lost. The server is reactive, so listeners are added to connections as needed (instead of before startup) + connectionManager.closeConnections(shouldKeepListeners); // Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed. registrationWrapper.closeChannels(maxShutdownWaitTimeInMilliSeconds); @@ -442,7 +453,7 @@ class EndPointBase extends EndPoint { @Override protected void shutdownChannelsPre() { - closeConnections(); + closeConnections(false); // this does a closeConnections + clear_listeners connectionManager.stop(); @@ -465,7 +476,6 @@ class EndPointBase extends EndPoint { return result; } - @SuppressWarnings("rawtypes") @Override public boolean equals(Object obj) { diff --git a/src/dorkbox/network/connection/EndPointClient.java b/src/dorkbox/network/connection/EndPointClient.java index 787f726a..efefab1d 100644 --- a/src/dorkbox/network/connection/EndPointClient.java +++ b/src/dorkbox/network/connection/EndPointClient.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import dorkbox.network.Client; import dorkbox.network.Configuration; import dorkbox.network.connection.bridge.ConnectionBridge; -import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; @@ -34,9 +33,10 @@ import io.netty.channel.ChannelOption; * This serves the purpose of making sure that specific methods are not available to the end user. */ public -class EndPointClient extends EndPointBase { +class EndPointClient extends EndPointBase { - protected C connection; + // is valid when there is a connection to the server, otherwise it is null + protected volatile Connection connection; private CountDownLatch registration; @@ -51,7 +51,7 @@ class EndPointClient extends EndPointBase { public - EndPointClient(Configuration config) throws InitializationException, SecurityException, IOException { + EndPointClient(Configuration config) throws SecurityException { super(Client.class, config); } @@ -211,7 +211,7 @@ class EndPointClient extends EndPointBase { }; //noinspection unchecked - this.connection = (C) connection; + this.connection = connection; synchronized (bootstrapLock) { // we're done with registration, so no need to keep this around @@ -251,16 +251,19 @@ class EndPointClient extends EndPointBase { *

* This is used, for example, when reconnecting to a server. */ - @Override public void closeConnections() { - super.closeConnections(); + // Only keep the listeners for connections IF we are the client. If we remove listeners as a client, + // ALL of the client logic will be lost. The server is reactive, so listeners are added to connections as needed (instead of before startup) + closeConnections(true); // make sure we're not waiting on registration registrationCompleted(); // for the CLIENT only, we clear these connections! (the server only clears them on shutdown) shutdownChannels(); + + connection = null; } /** diff --git a/src/dorkbox/network/connection/EndPointServer.java b/src/dorkbox/network/connection/EndPointServer.java index f3599a63..91c88540 100644 --- a/src/dorkbox/network/connection/EndPointServer.java +++ b/src/dorkbox/network/connection/EndPointServer.java @@ -15,22 +15,19 @@ */ package dorkbox.network.connection; -import java.io.IOException; - import dorkbox.network.Configuration; import dorkbox.network.Server; import dorkbox.network.connection.bridge.ConnectionBridgeServer; -import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; /** * This serves the purpose of making sure that specific methods are not available to the end user. */ public -class EndPointServer extends EndPointBase { +class EndPointServer extends EndPointBase { public - EndPointServer(final Configuration config) throws InitializationException, SecurityException, IOException { + EndPointServer(final Configuration config) throws SecurityException { super(Server.class, config); } @@ -39,7 +36,7 @@ class EndPointServer extends EndPointBase { */ @Override public - ConnectionBridgeServer send() { + ConnectionBridgeServer send() { return this.connectionManager; } @@ -53,7 +50,7 @@ class EndPointServer extends EndPointBase { * @return a newly created listener manager for the connection */ final - ConnectionManager addListenerManager(final C connection) { + ConnectionManager addListenerManager(final Connection connection) { return this.connectionManager.addListenerManager(connection); } @@ -67,7 +64,7 @@ class EndPointServer extends EndPointBase { * This removes the listener manager for that specific connection */ final - void removeListenerManager(final C connection) { + void removeListenerManager(final Connection connection) { this.connectionManager.removeListenerManager(connection); } @@ -80,7 +77,7 @@ class EndPointServer extends EndPointBase { * @param connection the connection to add */ public - void add(C connection) { + void add(Connection connection) { connectionManager.addConnection(connection); } @@ -93,7 +90,7 @@ class EndPointServer extends EndPointBase { * @param connection the connection to remove */ public - void remove(C connection) { - connectionManager.addConnection(connection); + void remove(Connection connection) { + connectionManager.removeConnection(connection); } } diff --git a/src/dorkbox/network/connection/ISessionManager.java b/src/dorkbox/network/connection/ISessionManager.java index 3f5f6869..cc8d14df 100644 --- a/src/dorkbox/network/connection/ISessionManager.java +++ b/src/dorkbox/network/connection/ISessionManager.java @@ -18,24 +18,29 @@ package dorkbox.network.connection; import java.util.Collection; public -interface ISessionManager { +interface ISessionManager { /** * Called when a message is received */ - void onMessage(C connection, Object message); + void onMessage(ConnectionImpl connection, Object message); /** * Called when the connection has been idle (read & write) for 2 seconds */ - void onIdle(C connection); + void onIdle(Connection connection); + /** + * Invoked when a Channel is open, bound to a local address, and connected to a remote address. + */ + void onConnected(Connection connection); - void onConnected(C connection); - - void onDisconnected(C connection); + /** + * Invoked when a Channel was disconnected from its remote peer. + */ + void onDisconnected(Connection connection); /** * Returns a non-modifiable list of active connections. This is extremely slow, and not recommended! */ - Collection getConnections(); + Collection getConnections(); } diff --git a/src/dorkbox/network/connection/RegistrationWrapper.java b/src/dorkbox/network/connection/RegistrationWrapper.java index e7d9247e..04229f80 100644 --- a/src/dorkbox/network/connection/RegistrationWrapper.java +++ b/src/dorkbox/network/connection/RegistrationWrapper.java @@ -47,13 +47,13 @@ import io.netty.channel.ChannelPipeline; * This is in the connection package, so it can access the endpoint methods that it needs to (without having to publicly expose them) */ public -class RegistrationWrapper implements UdpServer { +class RegistrationWrapper implements UdpServer { private final org.slf4j.Logger logger; private final KryoEncoder kryoEncoder; private final KryoEncoderCrypto kryoEncoderCrypto; - private final EndPointBase endPointBaseConnection; + private final EndPointBase endPointBaseConnection; // keeps track of connections (TCP/UDP-client) private final ReentrantLock channelMapLock = new ReentrantLock(); @@ -77,7 +77,7 @@ class RegistrationWrapper implements UdpServer { public - RegistrationWrapper(final EndPointBase endPointBaseConnection, + RegistrationWrapper(final EndPointBase endPointBaseConnection, final Logger logger, final KryoEncoder kryoEncoder, final KryoEncoderCrypto kryoEncoderCrypto) { @@ -94,14 +94,6 @@ class RegistrationWrapper implements UdpServer { } } - /** - * @return true if RMI is enabled - */ - public - boolean rmiEnabled() { - return endPointBaseConnection.globalRmiBridge != null; - } - public KryoEncoder getKryoEncoder() { return this.kryoEncoder; @@ -161,8 +153,7 @@ class RegistrationWrapper implements UdpServer { /** * Internal call by the pipeline when: - creating a new network connection - when determining the baseClass for generics * - * @param metaChannel - * can be NULL (when getting the baseClass) + * @param metaChannel can be NULL (when getting the baseClass) */ public Connection connection0(MetaChannel metaChannel) { @@ -317,7 +308,7 @@ class RegistrationWrapper implements UdpServer { public void abortRegistrationIfClient() { if (this.endPointBaseConnection instanceof EndPointClient) { - ((EndPointClient) this.endPointBaseConnection).abortRegistration(); + ((EndPointClient) this.endPointBaseConnection).abortRegistration(); } } @@ -411,7 +402,7 @@ class RegistrationWrapper implements UdpServer { * they will be ADDED in another map, in the followup handler!! */ public - boolean setupChannels(final RegistrationRemoteHandler handler, final MetaChannel metaChannel) { + boolean setupChannels(final RegistrationRemoteHandler handler, final MetaChannel metaChannel) { boolean registerServer = false; try { diff --git a/src/dorkbox/network/connection/bridge/ConnectionBridgeServer.java b/src/dorkbox/network/connection/bridge/ConnectionBridgeServer.java index 03a1d4c2..8fc5d8d4 100644 --- a/src/dorkbox/network/connection/bridge/ConnectionBridgeServer.java +++ b/src/dorkbox/network/connection/bridge/ConnectionBridgeServer.java @@ -15,14 +15,12 @@ */ package dorkbox.network.connection.bridge; -import dorkbox.network.connection.Connection; - public -interface ConnectionBridgeServer extends ConnectionBridgeBase { +interface ConnectionBridgeServer extends ConnectionBridgeBase { /** * Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a * local channel). */ - ConnectionExceptSpecifiedBridgeServer except(); + ConnectionExceptSpecifiedBridgeServer except(); } diff --git a/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java b/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java index 482df1c9..3b168990 100644 --- a/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java +++ b/src/dorkbox/network/connection/bridge/ConnectionExceptSpecifiedBridgeServer.java @@ -19,17 +19,17 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionPoint; public -interface ConnectionExceptSpecifiedBridgeServer { +interface ConnectionExceptSpecifiedBridgeServer { /** * Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local * channel). */ - ConnectionPoint TCP(C connection, Object message); + ConnectionPoint TCP(Connection connection, Object message); /** * Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local * channel). */ - ConnectionPoint UDP(C connection, Object message); + ConnectionPoint UDP(Connection connection, Object message); } diff --git a/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java index dc8514ee..58173098 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java @@ -15,16 +15,18 @@ */ package dorkbox.network.connection.listenerManagement; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.slf4j.Logger; + import com.esotericsoftware.kryo.util.IdentityMap; + import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionManager; import dorkbox.network.connection.Listener.OnConnected; import dorkbox.network.connection.Listener.OnError; import dorkbox.util.collections.ConcurrentEntry; -import org.slf4j.Logger; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Called when the remote end has been connected. This will be invoked before any objects are received by the network. @@ -33,93 +35,84 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; */ @SuppressWarnings("Duplicates") public final -class OnConnectedManager { +class OnConnectedManager { + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( + OnConnectedManager.class, + ConcurrentEntry.class, + "head_"); + private final Logger logger; // // The iterators for IdentityMap are NOT THREAD SAFE! // // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. - private final IdentityMap, ConcurrentEntry> entries = new IdentityMap, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR); - private volatile ConcurrentEntry> head = null; // reference to the first element + private final IdentityMap entries = new IdentityMap(32, + ConnectionManager.LOAD_FACTOR); + private volatile ConcurrentEntry head_ = null; // reference to the first element // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - private final Object lock = new Object(); - - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater REF = - AtomicReferenceFieldUpdater.newUpdater(OnConnectedManager.class, - ConcurrentEntry.class, - "head"); - public OnConnectedManager(final Logger logger) { this.logger = logger; } - public void add(final OnConnected listener) { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot (single-writer-principle) - ConcurrentEntry head = REF.get(this); + public synchronized + void add(final OnConnected listener) { + // access a snapshot (single-writer-principle) + ConcurrentEntry head = REF.get(this); - if (!entries.containsKey(listener)) { - head = new ConcurrentEntry(listener, head); + if (!entries.containsKey(listener)) { + head = new ConcurrentEntry(listener, head); - entries.put(listener, head); + entries.put(listener, head); - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - } + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); } } /** * @return true if the listener was removed, false otherwise */ - public - boolean remove(final OnConnected listener) { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot (single-writer-principle) - ConcurrentEntry concurrentEntry = entries.get(listener); + public synchronized + boolean remove(final OnConnected listener) { + // access a snapshot (single-writer-principle) + ConcurrentEntry concurrentEntry = entries.get(listener); - if (concurrentEntry != null) { - ConcurrentEntry head1 = REF.get(this); + if (concurrentEntry != null) { + ConcurrentEntry head = REF.get(this); - if (concurrentEntry == head1) { - // if it was second, now it's first - head1 = head1.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - else { - concurrentEntry.remove(); - } - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head1); - entries.remove(listener); - return true; - } else { - return false; + if (concurrentEntry == head) { + // if it was second, now it's first + head = head.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators } + else { + concurrentEntry.remove(); + } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); + entries.remove(listener); + return true; + } + else { + return false; } } /** * @return true if a listener was found, false otherwise */ - @SuppressWarnings("unchecked") public - boolean notifyConnected(final C connection, final AtomicBoolean shutdown) { + boolean notifyConnected(final C connection, final AtomicBoolean shutdown) { ConcurrentEntry> head = REF.get(this); ConcurrentEntry> current = head; + OnConnected listener; while (current != null && !shutdown.get()) { listener = current.getValue(); @@ -143,14 +136,9 @@ class OnConnectedManager { /** * called on shutdown */ - public + public synchronized void clear() { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - this.entries.clear(); - this.head = null; - } + this.entries.clear(); + this.head_ = null; } } diff --git a/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java b/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java index 9aa3022a..f359491e 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java @@ -15,16 +15,18 @@ */ package dorkbox.network.connection.listenerManagement; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.slf4j.Logger; + import com.esotericsoftware.kryo.util.IdentityMap; + import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionManager; import dorkbox.network.connection.Listener.OnDisconnected; import dorkbox.network.connection.Listener.OnError; import dorkbox.util.collections.ConcurrentEntry; -import org.slf4j.Logger; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Called when the remote end has been connected. This will be invoked before any objects are received by the network. @@ -33,82 +35,74 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; */ @SuppressWarnings("Duplicates") public final -class OnDisconnectedManager { +class OnDisconnectedManager { + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( + OnDisconnectedManager.class, + ConcurrentEntry.class, + "head_"); + private final Logger logger; // // The iterators for IdentityMap are NOT THREAD SAFE! // // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. - private final IdentityMap, ConcurrentEntry> entries = new IdentityMap, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR); - private volatile ConcurrentEntry> head = null; // reference to the first element + private final IdentityMap entries = new IdentityMap(32, + ConnectionManager.LOAD_FACTOR); + + private volatile ConcurrentEntry head_ = null; // reference to the first element // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - private final Object lock = new Object(); - - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater REF = - AtomicReferenceFieldUpdater.newUpdater(OnDisconnectedManager.class, - ConcurrentEntry.class, - "head"); - public OnDisconnectedManager(final Logger logger) { this.logger = logger; } - public void add(final OnDisconnected listener) { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot (single-writer-principle) - ConcurrentEntry head = REF.get(this); + public synchronized + void add(final OnDisconnected listener) { + // access a snapshot (single-writer-principle) + ConcurrentEntry head = REF.get(this); - if (!entries.containsKey(listener)) { - head = new ConcurrentEntry(listener, head); + if (!entries.containsKey(listener)) { + head = new ConcurrentEntry(listener, head); - entries.put(listener, head); + entries.put(listener, head); - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - } + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); } } /** * @return true if the listener was removed, false otherwise */ - public - boolean remove(final OnDisconnected listener) { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot (single-writer-principle) - ConcurrentEntry concurrentEntry = entries.get(listener); + public synchronized + boolean remove(final OnDisconnected listener) { + // access a snapshot (single-writer-principle) + ConcurrentEntry concurrentEntry = entries.get(listener); - if (concurrentEntry != null) { - ConcurrentEntry head1 = REF.get(this); + if (concurrentEntry != null) { + ConcurrentEntry head = REF.get(this); - if (concurrentEntry == head1) { - // if it was second, now it's first - head1 = head1.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - else { - concurrentEntry.remove(); - } - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head1); - entries.remove(listener); - return true; - } else { - return false; + if (concurrentEntry == head) { + // if it was second, now it's first + head = head.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators } + else { + concurrentEntry.remove(); + } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); + entries.remove(listener); + return true; + } + else { + return false; } } @@ -116,11 +110,11 @@ class OnDisconnectedManager { /** * @return true if a listener was found, false otherwise */ - @SuppressWarnings("unchecked") public - boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) { + boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) { ConcurrentEntry> head = REF.get(this); ConcurrentEntry> current = head; + OnDisconnected listener; while (current != null && !shutdown.get()) { listener = current.getValue(); @@ -133,7 +127,10 @@ class OnDisconnectedManager { ((OnError) listener).error(connection, e); } else { - logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e); + logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", + listener, + connection, + e); } } } @@ -144,14 +141,9 @@ class OnDisconnectedManager { /** * called on shutdown */ - public + public synchronized void clear() { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - this.entries.clear(); - this.head = null; - } + this.entries.clear(); + this.head_ = null; } } diff --git a/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java b/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java index ccdcaa3c..fd97c858 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java @@ -15,16 +15,18 @@ */ package dorkbox.network.connection.listenerManagement; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.slf4j.Logger; + import com.esotericsoftware.kryo.util.IdentityMap; + import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionManager; import dorkbox.network.connection.Listener.OnError; import dorkbox.network.connection.Listener.OnIdle; import dorkbox.util.collections.ConcurrentEntry; -import org.slf4j.Logger; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Called when the remote end has been connected. This will be invoked before any objects are received by the network. @@ -33,94 +35,85 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; */ @SuppressWarnings("Duplicates") public final -class OnIdleManager { +class OnIdleManager { + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( + OnIdleManager.class, + ConcurrentEntry.class, + "head_"); + private final Logger logger; // // The iterators for IdentityMap are NOT THREAD SAFE! // // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. - private final IdentityMap, ConcurrentEntry> entries = new IdentityMap, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR); - private volatile ConcurrentEntry> head = null; // reference to the first element + private final IdentityMap> entries = new IdentityMap>(32, + ConnectionManager.LOAD_FACTOR); + + private volatile ConcurrentEntry head_ = null; // reference to the first element // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - private final Object lock = new Object(); - - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater REF = - AtomicReferenceFieldUpdater.newUpdater(OnIdleManager.class, - ConcurrentEntry.class, - "head"); - public OnIdleManager(final Logger logger) { this.logger = logger; } - public void add(final OnIdle listener) { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot (single-writer-principle) - ConcurrentEntry head = REF.get(this); + public synchronized + void add(final OnIdle listener) { + // access a snapshot (single-writer-principle) + ConcurrentEntry head = REF.get(this); - if (!entries.containsKey(listener)) { - head = new ConcurrentEntry(listener, head); + if (!entries.containsKey(listener)) { + head = new ConcurrentEntry(listener, head); - entries.put(listener, head); + entries.put(listener, head); - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head); - } + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); } } /** * @return true if the listener was removed, false otherwise */ - public - boolean remove(final OnIdle listener) { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot (single-writer-principle) - ConcurrentEntry concurrentEntry = entries.get(listener); + public synchronized + boolean remove(final OnIdle listener) { + // access a snapshot (single-writer-principle) + ConcurrentEntry concurrentEntry = entries.get(listener); - if (concurrentEntry != null) { - ConcurrentEntry head1 = REF.get(this); + if (concurrentEntry != null) { + ConcurrentEntry head = REF.get(this); - if (concurrentEntry == head1) { - // if it was second, now it's first - head1 = head1.next(); - //oldHead.clear(); // optimize for GC not possible because of potentially running iterators - } - else { - concurrentEntry.remove(); - } - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, head1); - entries.remove(listener); - return true; + if (concurrentEntry == head) { + // if it was second, now it's first + head = head.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators } else { - return false; + concurrentEntry.remove(); } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); + entries.remove(listener); + return true; + } + else { + return false; } } /** * @return true if a listener was found, false otherwise */ - @SuppressWarnings("unchecked") public - boolean notifyIdle(final C connection, final AtomicBoolean shutdown) { + boolean notifyIdle(final C connection, final AtomicBoolean shutdown) { ConcurrentEntry> head = REF.get(this); ConcurrentEntry> current = head; + OnIdle listener; while (current != null && !shutdown.get()) { listener = current.getValue(); @@ -146,14 +139,9 @@ class OnIdleManager { /** * called on shutdown */ - public + public synchronized void clear() { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - this.entries.clear(); - this.head = null; - } + this.entries.clear(); + this.head_ = null; } } diff --git a/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java b/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java index 9b7b8110..4c578995 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java @@ -31,7 +31,6 @@ import dorkbox.network.connection.Listener; import dorkbox.network.connection.Listener.OnError; import dorkbox.network.connection.Listener.OnMessageReceived; import dorkbox.network.connection.Listener.SelfDefinedType; -import dorkbox.network.rmi.RmiMessages; import dorkbox.util.collections.ConcurrentEntry; import dorkbox.util.collections.ConcurrentIterator; import dorkbox.util.generics.ClassHelper; @@ -43,256 +42,12 @@ import dorkbox.util.generics.ClassHelper; */ @SuppressWarnings("Duplicates") public final -class OnMessageReceivedManager { - private final Logger logger; - - // - // The iterators for IdentityMap are NOT THREAD SAFE! - // - @SuppressWarnings("unused") - private volatile IdentityMap listeners = new IdentityMap(32, ConnectionManager.LOAD_FACTOR); - +class OnMessageReceivedManager { // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater REF = - AtomicReferenceFieldUpdater.newUpdater(OnMessageReceivedManager.class, - IdentityMap.class, - "listeners"); - - - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - private final Object lock = new Object(); - - - public - OnMessageReceivedManager(final Logger logger) { - this.logger = logger; - } - - public void add(final OnMessageReceived listener) { - final Class type; - if (listener instanceof SelfDefinedType) { - type = ((SelfDefinedType) listener).getType(); - } - else { - type = identifyType(listener); - } - - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot of the listeners (single-writer-principle) - @SuppressWarnings("unchecked") - final IdentityMap listeners = REF.get(this); - - ConcurrentIterator subscribedListeners = listeners.get(type); - if (subscribedListeners == null) { - subscribedListeners = new ConcurrentIterator(); - listeners.put(type, subscribedListeners); - } - - subscribedListeners.add(listener); - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, listeners); - } - } - - /** - * @return true if the listener was removed, false otherwise - */ - public - boolean remove(final OnMessageReceived listener) { - final Class type; - if (listener instanceof SelfDefinedType) { - type = ((SelfDefinedType) listener).getType(); - } - else { - type = identifyType(listener); - } - - boolean found = false; - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot of the listeners (single-writer-principle) - @SuppressWarnings("unchecked") - final IdentityMap listeners = REF.get(this); - final ConcurrentIterator concurrentIterator = listeners.get(type); - if (concurrentIterator != null) { - concurrentIterator.remove(listener); - found = true; - } - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, listeners); - } - - return found; - } - - /** - * @return true if a listener was found, false otherwise - */ - @SuppressWarnings("unchecked") - public - boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) { - boolean found = false; - Class objectType = message.getClass(); - - - // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) - final IdentityMap listeners = REF.get(this); - ConcurrentIterator concurrentIterator = listeners.get(objectType); - - if (concurrentIterator != null) { - ConcurrentEntry> head = headREF.get(concurrentIterator); - ConcurrentEntry> current = head; - OnMessageReceived listener; - while (current != null && !shutdown.get()) { - listener = current.getValue(); - current = current.next(); - - try { - listener.received(connection, message); - } catch (Exception e) { - if (listener instanceof OnError) { - ((OnError) listener).error(connection, e); - } - else { - logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", - objectType, - listener, - connection, - e); - } - } - } - - found = head != null; // true if we have something to publish to, otherwise false - } - - if (!(message instanceof RmiMessages)) { - // we march through all super types of the object, and find the FIRST set - // of listeners that are registered and cast it as that, and notify the method. - // NOTICE: we do NOT call ALL TYPE -- meaning, if we have Object->Foo->Bar - // and have listeners for Object and Foo - // we will call Bar (from the above code) - // we will call Foo (from this code) - // we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object! - - objectType = objectType.getSuperclass(); - while (objectType != null) { - // check to see if we have what we are looking for in our CURRENT class - concurrentIterator = listeners.get(objectType); - if (concurrentIterator != null) { - ConcurrentEntry> head = headREF.get(concurrentIterator); - ConcurrentEntry> current = head; - OnMessageReceived listener; - while (current != null && !shutdown.get()) { - listener = current.getValue(); - current = current.next(); - - try { - listener.received(connection, message); - } catch (Exception e) { - if (listener instanceof OnError) { - ((OnError) listener).error(connection, e); - } - else { - logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", - objectType, - listener, - connection, - e); - } - } - } - - found = head != null; // true if we have something to publish to, otherwise false - break; - } - - // NO MATCH, so walk up. - objectType = objectType.getSuperclass(); - } - } - - return found; - } - - public - void removeAll() { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot of the listeners (single-writer-principle) - @SuppressWarnings("unchecked") - final IdentityMap listeners = REF.get(this); - - listeners.clear(); - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, listeners); - } - } - - /** - * @return true if the listener was removed, false otherwise - */ - public - boolean removeAll(final Class classType) { - boolean found; - - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - // access a snapshot of the listeners (single-writer-principle) - @SuppressWarnings("unchecked") - final IdentityMap listeners = REF.get(this); - - found = listeners.remove(classType) != null; - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, listeners); - } - - return found; - } - - /** - * called on shutdown - */ - public - void clear() { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (lock) { - @SuppressWarnings("unchecked") - final IdentityMap listeners = REF.get(this); - - // The iterators for this map are NOT THREAD SAFE! - // using .entries() is what we are supposed to use! - final IdentityMap.Entries entries = listeners.entries(); - for (final IdentityMap.Entry next : entries) { - if (next.value != null) { - next.value.clear(); - } - } - - listeners.clear(); - - // save this snapshot back to the original (single writer principle) - REF.lazySet(this, listeners); - } - } - + private static final AtomicReferenceFieldUpdater REF = AtomicReferenceFieldUpdater.newUpdater( + OnMessageReceivedManager.class, + IdentityMap.class, + "listeners"); /** * Gets the referenced object type for a specific listener, but ONLY necessary for listeners that receive messages @@ -321,4 +76,211 @@ class OnMessageReceivedManager { return Object.class; } } + + private final Logger logger; + + // + // The iterators for IdentityMap are NOT THREAD SAFE! + // + private volatile IdentityMap listeners = new IdentityMap(32, ConnectionManager.LOAD_FACTOR); + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + public + OnMessageReceivedManager(final Logger logger) { + this.logger = logger; + } + + public + void add(final OnMessageReceived listener) { + final Class type; + if (listener instanceof SelfDefinedType) { + type = ((SelfDefinedType) listener).getType(); + } + else { + type = identifyType(listener); + } + + synchronized (this) { + // access a snapshot of the listeners (single-writer-principle) + final IdentityMap listeners = REF.get(this); + + ConcurrentIterator subscribedListeners = listeners.get(type); + if (subscribedListeners == null) { + subscribedListeners = new ConcurrentIterator(); + listeners.put(type, subscribedListeners); + } + + subscribedListeners.add(listener); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } + } + + /** + * @return true if the listener was removed, false otherwise + */ + public + boolean remove(final OnMessageReceived listener) { + final Class type; + if (listener instanceof SelfDefinedType) { + type = ((SelfDefinedType) listener).getType(); + } + else { + type = identifyType(listener); + } + + boolean found = false; + synchronized (this) { + // access a snapshot of the listeners (single-writer-principle) + final IdentityMap listeners = REF.get(this); + + final ConcurrentIterator concurrentIterator = listeners.get(type); + if (concurrentIterator != null) { + concurrentIterator.remove(listener); + found = true; + } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } + + return found; + } + + /** + * @return true if a listener was found, false otherwise + */ + public + boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) { + boolean found = false; + Class objectType = message.getClass(); + + + // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) + final IdentityMap listeners = REF.get(this); + ConcurrentIterator concurrentIterator = listeners.get(objectType); + + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + + OnMessageReceived listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + try { + listener.received(connection, message); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", + objectType, + listener, + connection, + e); + } + } + } + + found = head != null; // true if we have something to publish to, otherwise false + } + + // we march through all super types of the object, and find the FIRST set + // of listeners that are registered and cast it as that, and notify the method. + // NOTICE: we do NOT call ALL TYPES -- meaning, if we have Object->Foo->Bar + // and have listeners for Object and Foo + // we will call Bar (from the above code) + // we will call Foo (from this code) + // we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object! + + objectType = objectType.getSuperclass(); + while (objectType != null) { + // check to see if we have what we are looking for in our CURRENT class + concurrentIterator = listeners.get(objectType); + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + + OnMessageReceived listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + try { + listener.received(connection, message); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", + objectType, + listener, + connection, + e); + } + } + } + + found = head != null; // true if we have something to publish to, otherwise false + break; + } + + // NO MATCH, so walk up. + objectType = objectType.getSuperclass(); + } + + return found; + } + + public synchronized + void removeAll() { + listeners.clear(); + } + + /** + * @return true if the listener was removed, false otherwise + */ + public synchronized + boolean removeAll(final Class classType) { + boolean found; + + // access a snapshot of the listeners (single-writer-principle) + final IdentityMap listeners = REF.get(this); + + found = listeners.remove(classType) != null; + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + + return found; + } + + /** + * called on shutdown + */ + public + void clear() { + final IdentityMap listeners = REF.get(this); + + // The iterators for this map are NOT THREAD SAFE! + // using .entries() is what we are supposed to use! + final IdentityMap.Entries entries = listeners.entries(); + for (final IdentityMap.Entry next : entries) { + if (next.value != null) { + next.value.clear(); + } + } + + listeners.clear(); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } } diff --git a/src/dorkbox/network/connection/registration/RegistrationHandler.java b/src/dorkbox/network/connection/registration/RegistrationHandler.java index e303c4fc..a245ad59 100644 --- a/src/dorkbox/network/connection/registration/RegistrationHandler.java +++ b/src/dorkbox/network/connection/registration/RegistrationHandler.java @@ -15,7 +15,6 @@ */ package dorkbox.network.connection.registration; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.RegistrationWrapper; import io.netty.channel.Channel; @@ -25,16 +24,16 @@ import io.netty.channel.ChannelInboundHandlerAdapter; @Sharable public abstract -class RegistrationHandler extends ChannelInboundHandlerAdapter { +class RegistrationHandler extends ChannelInboundHandlerAdapter { protected static final String CONNECTION_HANDLER = "connectionHandler"; - protected final RegistrationWrapper registrationWrapper; + protected final RegistrationWrapper registrationWrapper; protected final org.slf4j.Logger logger; protected final String name; public - RegistrationHandler(final String name, RegistrationWrapper registrationWrapper) { + RegistrationHandler(final String name, RegistrationWrapper registrationWrapper) { this.name = name; this.logger = org.slf4j.LoggerFactory.getLogger(this.name); this.registrationWrapper = registrationWrapper; @@ -85,7 +84,7 @@ class RegistrationHandler extends ChannelInboundHandlerAda void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) throws Exception; public - MetaChannel shutdown(final RegistrationWrapper registrationWrapper, final Channel channel) { + MetaChannel shutdown(final RegistrationWrapper registrationWrapper, final Channel channel) { // shutdown. Something messed up or was incorrect // properly shutdown the TCP/UDP channels. if (channel.isOpen()) { diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java index b8f67e2d..a4d46bd2 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java @@ -17,8 +17,6 @@ package dorkbox.network.connection.registration.local; import static dorkbox.network.connection.EndPointBase.maxShutdownWaitTimeInMilliSeconds; -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.RegisterRmiLocalHandler; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.RegistrationHandler; @@ -26,11 +24,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; public abstract -class RegistrationLocalHandler extends RegistrationHandler { - static final String LOCAL_RMI_HANDLER = "localRmiHandler"; - final RegisterRmiLocalHandler rmiLocalHandler = new RegisterRmiLocalHandler(); +class RegistrationLocalHandler extends RegistrationHandler { - RegistrationLocalHandler(String name, RegistrationWrapper registrationWrapper) { + RegistrationLocalHandler(String name, RegistrationWrapper registrationWrapper) { super(name, registrationWrapper); } diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java index 21b975d7..e7bf64a7 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java @@ -15,7 +15,6 @@ */ package dorkbox.network.connection.registration.local; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; @@ -26,10 +25,10 @@ import io.netty.channel.ChannelPipeline; import io.netty.util.ReferenceCountUtil; public -class RegistrationLocalHandlerClient extends RegistrationLocalHandler { +class RegistrationLocalHandlerClient extends RegistrationLocalHandler { public - RegistrationLocalHandlerClient(String name, RegistrationWrapper registrationWrapper) { + RegistrationLocalHandlerClient(String name, RegistrationWrapper registrationWrapper) { super(name, registrationWrapper); } @@ -78,15 +77,6 @@ class RegistrationLocalHandlerClient extends RegistrationL ConnectionImpl connection = metaChannel.connection; - - // add our RMI handlers - if (registrationWrapper.rmiEnabled()) { - /////////////////////// - // DECODE (or upstream) - /////////////////////// - pipeline.addFirst(LOCAL_RMI_HANDLER, rmiLocalHandler); - } - // have to setup connection handler pipeline.addLast(CONNECTION_HANDLER, connection); diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java index 5c93a0b2..9bbea74e 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java @@ -15,7 +15,6 @@ */ package dorkbox.network.connection.registration.local; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; @@ -25,10 +24,10 @@ import io.netty.channel.ChannelPipeline; import io.netty.util.ReferenceCountUtil; public -class RegistrationLocalHandlerServer extends RegistrationLocalHandler { +class RegistrationLocalHandlerServer extends RegistrationLocalHandler { public - RegistrationLocalHandlerServer(String name, RegistrationWrapper registrationWrapper) { + RegistrationLocalHandlerServer(String name, RegistrationWrapper registrationWrapper) { super(name, registrationWrapper); } @@ -78,14 +77,6 @@ class RegistrationLocalHandlerServer extends RegistrationL } if (connection != null) { - // add our RMI handlers - if (registrationWrapper.rmiEnabled()) { - /////////////////////// - // DECODE (or upstream) - /////////////////////// - pipeline.addFirst(LOCAL_RMI_HANDLER, rmiLocalHandler); - } - // have to setup connection handler pipeline.addLast(CONNECTION_HANDLER, connection); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index b17d265b..c5554e53 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -27,7 +27,6 @@ import org.bouncycastle.crypto.engines.IESEngine; import org.bouncycastle.crypto.modes.GCMBlockCipher; import org.slf4j.Logger; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; @@ -42,15 +41,11 @@ import dorkbox.util.crypto.CryptoECC; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.epoll.EpollDatagramChannel; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.ReferenceCountUtil; public abstract -class RegistrationRemoteHandler extends RegistrationHandler { +class RegistrationRemoteHandler extends RegistrationHandler { static final String KRYO_ENCODER = "kryoEncoder"; static final String KRYO_DECODER = "kryoDecoder"; @@ -100,7 +95,7 @@ class RegistrationRemoteHandler extends RegistrationHandle protected final CryptoSerializationManager serializationManager; RegistrationRemoteHandler(final String name, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final CryptoSerializationManager serializationManager) { super(name, registrationWrapper); @@ -143,46 +138,50 @@ class RegistrationRemoteHandler extends RegistrationHandle // add the channel so we can access it later. // do NOT want to add UDP channels, since they are tracked differently. + if (this.logger.isInfoEnabled()) { + Channel channel = context.channel(); + Class channelClass = channel.getClass(); + boolean isUdp = ConnectionImpl.isUdp(channelClass); - // this whole bit is inside a if (logger.isDebugEnabled()) section. - Channel channel = context.channel(); - Class channelClass = channel.getClass(); + StringBuilder stringBuilder = new StringBuilder(96); - - StringBuilder stringBuilder = new StringBuilder(96); - - stringBuilder.append("Connected to remote "); - if (channelClass == NioSocketChannel.class || channelClass == EpollSocketChannel.class) { - stringBuilder.append("TCP"); - } - else if (channelClass == NioDatagramChannel.class || channelClass == EpollDatagramChannel.class) { - stringBuilder.append("UDP"); - } - else { - stringBuilder.append("UNKNOWN"); - } - stringBuilder.append(" connection. ["); - stringBuilder.append(channel.localAddress()); - - boolean isSessionless = channel instanceof NioDatagramChannel; - if (isSessionless) { - if (channel.remoteAddress() != null) { - stringBuilder.append(" ==> "); - stringBuilder.append(channel.remoteAddress()); + stringBuilder.append("Connected to remote "); + if (ConnectionImpl.isTcp(channelClass)) { + stringBuilder.append("TCP"); + } + else if (isUdp) { + stringBuilder.append("UDP"); + } + else if (ConnectionImpl.isLocal(channelClass)) { + stringBuilder.append("LOCAL"); } else { - // this means we are LISTENING. - stringBuilder.append(" <== "); - stringBuilder.append("?????"); + stringBuilder.append("UNKNOWN"); } - } - else { - stringBuilder.append(getConnectionDirection()); - stringBuilder.append(channel.remoteAddress()); - } - stringBuilder.append("]"); - this.logger.info(stringBuilder.toString()); + stringBuilder.append(" connection. ["); + stringBuilder.append(channel.localAddress()); + + // this means we are "Sessionless" + if (isUdp) { + if (channel.remoteAddress() != null) { + stringBuilder.append(" ==> "); + stringBuilder.append(channel.remoteAddress()); + } + else { + // this means we are LISTENING. + stringBuilder.append(" <== "); + stringBuilder.append("?????"); + } + } + else { + stringBuilder.append(getConnectionDirection()); + stringBuilder.append(channel.remoteAddress()); + } + stringBuilder.append("]"); + + this.logger.info(stringBuilder.toString()); + } } @Override @@ -276,7 +275,7 @@ class RegistrationRemoteHandler extends RegistrationHandle final boolean verifyAesInfo(final Object message, final Channel channel, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final MetaChannel metaChannel, final Logger logger) { diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java index 9b9e3c29..8a2195b9 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java @@ -15,16 +15,14 @@ */ package dorkbox.network.connection.registration.remote; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.serialization.CryptoSerializationManager; public -class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { +class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { - public RegistrationRemoteHandlerClient(final String name, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final CryptoSerializationManager serializationManager) { super(name, registrationWrapper, serializationManager); } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java index 40f37650..fac0b8a0 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java @@ -31,10 +31,10 @@ import org.bouncycastle.jce.ECNamedCurveTable; import org.bouncycastle.jce.spec.ECParameterSpec; import org.slf4j.Logger; +import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -49,14 +49,14 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public -class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandlerClient { +class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandlerClient { private static final String DELETE_IP = "eleteIP"; // purposefully missing the "D", since that is a system parameter, which starts with "-D" private static final ECParameterSpec eccSpec = ECNamedCurveTable.getParameterSpec(CryptoECC.curve25519); public RegistrationRemoteHandlerClientTCP(final String name, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final CryptoSerializationManager serializationManager) { super(name, registrationWrapper, serializationManager); @@ -151,7 +151,7 @@ class RegistrationRemoteHandlerClientTCP extends Registrat void channelRead(final ChannelHandlerContext context, final Object message) throws Exception { Channel channel = context.channel(); - RegistrationWrapper registrationWrapper2 = this.registrationWrapper; + RegistrationWrapper registrationWrapper2 = this.registrationWrapper; Logger logger2 = this.logger; if (message instanceof Registration) { // make sure this connection was properly registered in the map. (IT SHOULD BE) @@ -232,8 +232,10 @@ class RegistrationRemoteHandlerClientTCP extends Registrat * see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange */ byte[] ecdhPubKeyBytes = Arrays.copyOfRange(payload, intLength, payload.length); - ECPublicKeyParameters ecdhPubKey = EccPublicKeySerializer.read(new Input(ecdhPubKeyBytes)); - if (ecdhPubKey == null) { + ECPublicKeyParameters ecdhPubKey; + try { + ecdhPubKey = EccPublicKeySerializer.read(new Input(ecdhPubKeyBytes)); + } catch (KryoException e) { logger2.error("Invalid decode of ecdh public key. Aborting."); shutdown(registrationWrapper2, channel); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java index b1e05a08..4912a1fb 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java @@ -21,7 +21,6 @@ import java.net.InetSocketAddress; import org.slf4j.Logger; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; @@ -37,11 +36,11 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.FixedRecvByteBufAllocator; public -class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient { +class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient { public RegistrationRemoteHandlerClientUDP(final String name, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final CryptoSerializationManager serializationManager) { super(name, registrationWrapper, serializationManager); } @@ -118,7 +117,7 @@ class RegistrationRemoteHandlerClientUDP extends Registrat // if we also have a UDP channel, we will receive the "connected" message on UDP (otherwise it will be on TCP) - RegistrationWrapper registrationWrapper2 = this.registrationWrapper; + RegistrationWrapper registrationWrapper2 = this.registrationWrapper; MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode()); if (metaChannel != null) { diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java index b7b5aa41..3bf76438 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java @@ -15,17 +15,15 @@ */ package dorkbox.network.connection.registration.remote; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.serialization.CryptoSerializationManager; public -class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { +class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { - public RegistrationRemoteHandlerServer(final String name, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final CryptoSerializationManager serializationManager) { super(name, registrationWrapper, serializationManager); } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java index 5552fae4..ebd863cd 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java @@ -32,10 +32,10 @@ import org.bouncycastle.jce.spec.ECParameterSpec; import org.bouncycastle.util.Arrays; import org.slf4j.Logger; +import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -49,7 +49,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public -class RegistrationRemoteHandlerServerTCP extends RegistrationRemoteHandlerServer { +class RegistrationRemoteHandlerServerTCP extends RegistrationRemoteHandlerServer { private static final long ECDH_TIMEOUT = 10L * 60L * 60L * 1000L * 1000L * 1000L; // 10 minutes in nanoseconds @@ -61,7 +61,7 @@ class RegistrationRemoteHandlerServerTCP extends Registrat public RegistrationRemoteHandlerServerTCP(final String name, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final CryptoSerializationManager serializationManager) { super(name, registrationWrapper, serializationManager); } @@ -264,8 +264,10 @@ class RegistrationRemoteHandlerServerTCP extends Registrat * Diffie-Hellman-Merkle key exchange for the AES key * see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange */ - ECPublicKeyParameters ecdhPubKey = EccPublicKeySerializer.read(new Input(payload)); - if (ecdhPubKey == null) { + ECPublicKeyParameters ecdhPubKey; + try { + ecdhPubKey = EccPublicKeySerializer.read(new Input(payload)); + } catch (KryoException e) { logger2.error("Invalid decode of ecdh public key. Aborting."); shutdown(registrationWrapper2, channel); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java index 66beb83a..fc1892cc 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java @@ -23,7 +23,6 @@ import java.util.List; import org.slf4j.Logger; import dorkbox.network.Broadcast; -import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.KryoExtra; @@ -45,19 +44,18 @@ import io.netty.handler.codec.MessageToMessageCodec; @Sharable public -class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec { +class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec { // this is for the SERVER only. UDP channel is ALWAYS the SAME channel (it's the server's listening channel). private final org.slf4j.Logger logger; private final ByteBuf discoverResponseBuffer; - private final RegistrationWrapper registrationWrapper; + private final RegistrationWrapper registrationWrapper; private final CryptoSerializationManager serializationManager; - public RegistrationRemoteHandlerServerUDP(final String name, - final RegistrationWrapper registrationWrapper, + final RegistrationWrapper registrationWrapper, final CryptoSerializationManager serializationManager) { final String name1 = name + " Registration-UDP-Server"; this.logger = org.slf4j.LoggerFactory.getLogger(name1); @@ -176,7 +174,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo // registration is the ONLY thing NOT encrypted Logger logger2 = this.logger; - RegistrationWrapper registrationWrapper2 = this.registrationWrapper; + RegistrationWrapper registrationWrapper2 = this.registrationWrapper; CryptoSerializationManager serializationManager2 = this.serializationManager; if (KryoExtra.isEncrypted(message)) { @@ -262,7 +260,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo * Copied from RegistrationHandler. There were issues accessing it as static with generics. */ public - MetaChannel shutdown(final RegistrationWrapper registrationWrapper, final Channel channel) { + MetaChannel shutdown(final RegistrationWrapper registrationWrapper, final Channel channel) { this.logger.error("SHUTDOWN HANDLER REACHED! SOMETHING MESSED UP! TRYING TO ABORT"); // shutdown. Something messed up. Only reach this is something messed up. diff --git a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java index 2744f2f1..dbb790ac 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java @@ -19,28 +19,31 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.bouncycastle.crypto.params.ParametersWithIV; -import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPointWriter; import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.ISessionManager; import dorkbox.network.connection.registration.MetaChannel; +import dorkbox.network.rmi.RmiObjectHandler; import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.local.LocalAddress; public -class ChannelLocalWrapper implements ChannelWrapper, ConnectionPointWriter { +class ChannelLocalWrapper implements ChannelWrapper, ConnectionPointWriter { private final Channel channel; + private final RmiObjectHandler rmiObjectHandler; + private final AtomicBoolean shouldFlush = new AtomicBoolean(false); private String remoteAddress; public - ChannelLocalWrapper(MetaChannel metaChannel) { + ChannelLocalWrapper(MetaChannel metaChannel, final RmiObjectHandler rmiObjectHandler) { this.channel = metaChannel.localChannel; + this.rmiObjectHandler = rmiObjectHandler; } - /** * Write an object to the underlying channel */ @@ -111,6 +114,12 @@ class ChannelLocalWrapper implements ChannelWrapper, Co return true; } + @Override + public + RmiObjectHandler manageRmi() { + return rmiObjectHandler; + } + @Override public final String getRemoteHost() { @@ -119,7 +128,7 @@ class ChannelLocalWrapper implements ChannelWrapper, Co @Override public - void close(Connection connection, ISessionManager sessionManager) { + void close(ConnectionImpl connection, ISessionManager sessionManager) { long maxShutdownWaitTimeInMilliSeconds = EndPointBase.maxShutdownWaitTimeInMilliSeconds; this.shouldFlush.set(false); @@ -153,7 +162,7 @@ class ChannelLocalWrapper implements ChannelWrapper, Co if (getClass() != obj.getClass()) { return false; } - ChannelLocalWrapper other = (ChannelLocalWrapper) obj; + ChannelLocalWrapper other = (ChannelLocalWrapper) obj; if (this.remoteAddress == null) { if (other.remoteAddress != null) { return false; diff --git a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java index 7efed5b8..19bd4f02 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java @@ -20,19 +20,20 @@ import java.net.InetSocketAddress; import org.bouncycastle.crypto.params.KeyParameter; import org.bouncycastle.crypto.params.ParametersWithIV; -import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPointWriter; import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.ISessionManager; import dorkbox.network.connection.UdpServer; import dorkbox.network.connection.registration.MetaChannel; +import dorkbox.network.rmi.RmiObjectHandler; import dorkbox.util.FastThreadLocal; import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.util.NetUtil; public -class ChannelNetworkWrapper implements ChannelWrapper { +class ChannelNetworkWrapper implements ChannelWrapper { private final ChannelNetwork tcp; private final ChannelNetwork udp; @@ -49,12 +50,16 @@ class ChannelNetworkWrapper implements ChannelWrapper { private final byte[] aesIV; // AES-GCM requires 12 bytes private final FastThreadLocal cryptoParameters; + private final RmiObjectHandler rmiObjectHandler; /** * @param udpServer is null when created by the client, non-null when created by the server + * @param rmiObjectHandler is a no-op handler if RMI is disabled, otherwise handles RMI object registration */ public - ChannelNetworkWrapper(MetaChannel metaChannel, UdpServer udpServer) { + ChannelNetworkWrapper(MetaChannel metaChannel, UdpServer udpServer, final RmiObjectHandler rmiObjectHandler) { + + this.rmiObjectHandler = rmiObjectHandler; Channel tcpChannel = metaChannel.tcpChannel; this.eventLoop = tcpChannel.eventLoop(); @@ -138,7 +143,6 @@ class ChannelNetworkWrapper implements ChannelWrapper { return this.eventLoop; } - /** * @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal * because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't @@ -156,16 +160,21 @@ class ChannelNetworkWrapper implements ChannelWrapper { return isLoopback; } + @Override + public + RmiObjectHandler manageRmi() { + return rmiObjectHandler; + } + @Override public String getRemoteHost() { return this.remoteAddress; } - @Override public - void close(final Connection connection, final ISessionManager sessionManager) { + void close(final ConnectionImpl connection, final ISessionManager sessionManager) { long maxShutdownWaitTimeInMilliSeconds = EndPointBase.maxShutdownWaitTimeInMilliSeconds; this.tcp.close(maxShutdownWaitTimeInMilliSeconds); @@ -203,7 +212,6 @@ class ChannelNetworkWrapper implements ChannelWrapper { return false; } - @SuppressWarnings("rawtypes") ChannelNetworkWrapper other = (ChannelNetworkWrapper) obj; if (this.remoteAddress == null) { if (other.remoteAddress != null) { diff --git a/src/dorkbox/network/connection/wrapper/ChannelWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelWrapper.java index 17d05cc3..f3135f5f 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelWrapper.java @@ -17,16 +17,16 @@ package dorkbox.network.connection.wrapper; import org.bouncycastle.crypto.params.ParametersWithIV; -import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPointWriter; import dorkbox.network.connection.ISessionManager; +import dorkbox.network.rmi.RmiObjectHandler; import io.netty.channel.EventLoop; public -interface ChannelWrapper { +interface ChannelWrapper { ConnectionPointWriter tcp(); - ConnectionPointWriter udp(); /** @@ -54,12 +54,14 @@ interface ChannelWrapper { */ boolean isLoopback(); + RmiObjectHandler manageRmi(); + /** * @return the remote host (can be local, tcp, udp) */ String getRemoteHost(); - void close(final Connection connection, final ISessionManager sessionManager); + void close(ConnectionImpl connection, ISessionManager sessionManager); int id(); diff --git a/src/dorkbox/network/rmi/InvokeMethod.java b/src/dorkbox/network/rmi/InvokeMethod.java index 5c28c206..b69bd8d4 100644 --- a/src/dorkbox/network/rmi/InvokeMethod.java +++ b/src/dorkbox/network/rmi/InvokeMethod.java @@ -39,8 +39,9 @@ package dorkbox.network.rmi; * Internal message to invoke methods remotely. */ public -class InvokeMethod implements RmiMessages { +class InvokeMethod implements RmiMessage { public int objectID; // the registered kryo ID for the object + public CachedMethod cachedMethod; public Object[] args; @@ -49,7 +50,6 @@ class InvokeMethod implements RmiMessages { // possible duplicate IDs. A response data of 0 means to not respond. public byte responseData; - public InvokeMethod() { } } diff --git a/src/dorkbox/network/rmi/InvokeMethodResult.java b/src/dorkbox/network/rmi/InvokeMethodResult.java index 000b90a0..24808b5f 100644 --- a/src/dorkbox/network/rmi/InvokeMethodResult.java +++ b/src/dorkbox/network/rmi/InvokeMethodResult.java @@ -38,8 +38,9 @@ package dorkbox.network.rmi; * Internal message to return the result of a remotely invoked method. */ public -class InvokeMethodResult implements RmiMessages { +class InvokeMethodResult implements RmiMessage { public int objectID; + public byte responseID; public Object result; } diff --git a/src/dorkbox/network/rmi/RemoteInvocationResponse.java b/src/dorkbox/network/rmi/RemoteInvocationResponse.java deleted file mode 100644 index c9dbb56b..00000000 --- a/src/dorkbox/network/rmi/RemoteInvocationResponse.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.rmi; - -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.Listener; - -abstract -class RemoteInvocationResponse implements Listener.OnDisconnected, - Listener.OnMessageReceived { -} diff --git a/src/dorkbox/network/rmi/RmiBridge.java b/src/dorkbox/network/rmi/RmiBridge.java index 7adbbefd..96677ded 100644 --- a/src/dorkbox/network/rmi/RmiBridge.java +++ b/src/dorkbox/network/rmi/RmiBridge.java @@ -35,7 +35,6 @@ package dorkbox.network.rmi; import java.io.IOException; -import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -188,7 +187,7 @@ class RmiBridge { */ @SuppressWarnings("rawtypes") public - Listener getListener() { + Listener.OnMessageReceived getListener() { return this.invokeListener; } @@ -412,49 +411,4 @@ class RmiBridge { return id; } - - /** - * Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)} - *

- *

- * Returns a proxy object that implements the specified interfaces. Methods invoked on the proxy object will be invoked remotely on the - * object with the specified ID in the ObjectSpace for the specified connection. If the remote end of the connection has not {@link - * RmiBridge#register(int, Object)} added the connection to the ObjectSpace, the remote method invocations will be ignored. - *

- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link - * RemoteObject#setResponseTimeout(int) response timeout}. - *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be - * called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be - * called on the update thread. - *

- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will - * have the proxy object replaced with the registered object. - * - * @see RemoteObject - * - * @param connection this is really the network client -- there is ONLY ever 1 connection - * @param objectID this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID - * @param iFace this is the RMI interface - */ - public - RemoteObject createProxyObject(Connection connection, int objectID, Class iFace) { - if (connection == null) { - throw new IllegalArgumentException("connection cannot be null."); - } - if (iFace == null) { - throw new IllegalArgumentException("iface cannot be null."); - } - if (!iFace.isInterface()) { - throw new IllegalArgumentException("iface must be an interface."); - } - - Class[] temp = new Class[2]; - temp[0] = RemoteObject.class; - temp[1] = iFace; - - return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), - temp, - new RmiProxyHandler(connection, objectID, iFace)); - } } diff --git a/src/dorkbox/network/rmi/RmiMessage.java b/src/dorkbox/network/rmi/RmiMessage.java new file mode 100644 index 00000000..42ef76c0 --- /dev/null +++ b/src/dorkbox/network/rmi/RmiMessage.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.rmi; + +/** + * + */ +public +interface RmiMessage {} diff --git a/src/dorkbox/network/rmi/RmiMessages.java b/src/dorkbox/network/rmi/RmiMessages.java deleted file mode 100644 index d4735151..00000000 --- a/src/dorkbox/network/rmi/RmiMessages.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.rmi; - -// used by RMI. This is to keep track and not throw errors when connections are notified of it's arrival -public -interface RmiMessages {} diff --git a/src/dorkbox/network/rmi/RmiObjectHandler.java b/src/dorkbox/network/rmi/RmiObjectHandler.java new file mode 100644 index 00000000..f030c996 --- /dev/null +++ b/src/dorkbox/network/rmi/RmiObjectHandler.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.rmi; + +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.Listener; + +public +class RmiObjectHandler { + + public + RmiObjectHandler() { + } + + public + void invoke(final ConnectionImpl connection, final InvokeMethod message, final Listener.OnMessageReceived rmiInvokeListener) { + } + + public + void registration(final ConnectionImpl connection, final RmiRegistration message) { + } + + public + Object normalMessages(final ConnectionImpl connection, final Object message) { + return message; + } +} diff --git a/src/dorkbox/network/connection/RegisterRmiLocalHandler.java b/src/dorkbox/network/rmi/RmiObjectLocalHandler.java similarity index 71% rename from src/dorkbox/network/connection/RegisterRmiLocalHandler.java rename to src/dorkbox/network/rmi/RmiObjectLocalHandler.java index f6c9e9ae..62c94eb7 100644 --- a/src/dorkbox/network/connection/RegisterRmiLocalHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectLocalHandler.java @@ -13,28 +13,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dorkbox.network.connection; +package dorkbox.network.rmi; import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.util.IdentityMap; -import dorkbox.network.rmi.CachedMethod; -import dorkbox.network.rmi.InvokeMethod; -import dorkbox.network.rmi.RemoteObject; -import dorkbox.network.rmi.RmiBridge; -import dorkbox.network.rmi.RmiMessages; -import dorkbox.network.rmi.RmiProxyHandler; -import dorkbox.network.rmi.RmiRegistration; +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.EndPointBase; +import dorkbox.network.connection.KryoExtra; +import dorkbox.network.connection.Listener; import dorkbox.network.serialization.CryptoSerializationManager; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; /** * This is for a local-connection (same-JVM) RMI method invocation @@ -44,22 +38,22 @@ import io.netty.handler.codec.MessageToMessageDecoder; * This is for a LOCAL connection (same-JVM) */ public -class RegisterRmiLocalHandler extends MessageToMessageDecoder { +class RmiObjectLocalHandler extends RmiObjectHandler { private static final boolean ENABLE_PROXY_OBJECTS = ConnectionImpl.ENABLE_PROXY_OBJECTS; private static final Field[] NO_REMOTE_FIELDS = new Field[0]; - // private static final AtomicReferenceFieldUpdater rmiFieldsREF = AtomicReferenceFieldUpdater.newUpdater( - // RegisterRmiLocalHandler.class, + // private static final AtomicReferenceFieldUpdater rmiFieldsREF = AtomicReferenceFieldUpdater.newUpdater( + // RmiObjectLocalHandler.class, // IdentityMap.class, // "fieldCache"); - private static final AtomicReferenceFieldUpdater implToProxyREF = AtomicReferenceFieldUpdater.newUpdater( - RegisterRmiLocalHandler.class, + private static final AtomicReferenceFieldUpdater implToProxyREF = AtomicReferenceFieldUpdater.newUpdater( + RmiObjectLocalHandler.class, IdentityMap.class, "implToProxy"); - private static final AtomicReferenceFieldUpdater remoteObjectREF = AtomicReferenceFieldUpdater.newUpdater( - RegisterRmiLocalHandler.class, + private static final AtomicReferenceFieldUpdater remoteObjectREF = AtomicReferenceFieldUpdater.newUpdater( + RmiObjectLocalHandler.class, IdentityMap.class, "objectHasRemoteObjects"); @@ -69,215 +63,74 @@ class RegisterRmiLocalHandler extends MessageToMessageDecoder { public - RegisterRmiLocalHandler() { + RmiObjectLocalHandler() { } @Override - protected - void decode(final ChannelHandlerContext context, final Object msg, final List out) throws Exception { - ConnectionImpl connection = (ConnectionImpl) context.pipeline() - .last(); + public + void invoke(final ConnectionImpl connection, final InvokeMethod invokeMethod, final Listener.OnMessageReceived rmiInvokeListener) { + int methodClassID = invokeMethod.cachedMethod.methodClassID; + int methodIndex = invokeMethod.cachedMethod.methodIndex; + // have to replace the cached methods with the correct (remote) version, otherwise the wrong methods CAN BE invoked. - if (msg instanceof RmiRegistration) { - receivedRegistration(connection, (RmiRegistration) msg); + CryptoSerializationManager serialization = connection.getEndPoint() + .getSerialization(); + + + CachedMethod cachedMethod; + try { + cachedMethod = serialization.getMethods(methodClassID)[methodIndex]; + } catch (Exception ex) { + String errorMessage; + KryoExtra kryo = null; + try { + kryo = serialization.takeKryo(); + + Class methodClass = kryo.getRegistration(methodClassID) + .getType(); + + errorMessage = "Invalid method index " + methodIndex + " for class: " + methodClass.getName(); + } finally { + serialization.returnKryo(kryo); + } + + throw new KryoException(errorMessage); + } + + + Object[] args; + Serializer[] serializers = cachedMethod.serializers; + + int argStartIndex; + + if (cachedMethod.overriddenMethod) { + // did we override our cached method? This is not common. + // this is specifically when we override an interface method, with an implementation method + Connection parameter (@ index 0) + argStartIndex = 1; + + args = new Object[serializers.length + 1]; + args[0] = connection; } else { - if (msg instanceof InvokeMethod) { - InvokeMethod invokeMethod = (InvokeMethod) msg; - int methodClassID = invokeMethod.cachedMethod.methodClassID; - int methodIndex = invokeMethod.cachedMethod.methodIndex; - // have to replace the cached methods with the correct (remote) version, otherwise the wrong methods CAN BE invoked. - - CryptoSerializationManager serialization = connection.getEndPoint() - .getSerialization(); - - - CachedMethod cachedMethod; - try { - cachedMethod = serialization.getMethods(methodClassID)[methodIndex]; - } catch (Exception ex) { - String errorMessage; - KryoExtra kryo = null; - try { - kryo = serialization.takeKryo(); - - Class methodClass = kryo.getRegistration(methodClassID) - .getType(); - - errorMessage = "Invalid method index " + methodIndex + " for class: " + methodClass.getName(); - } finally { - serialization.returnKryo(kryo); - } - - throw new KryoException(errorMessage); - } - - - Object[] args; - Serializer[] serializers = cachedMethod.serializers; - - int argStartIndex; - - if (cachedMethod.overriddenMethod) { - // did we override our cached method? This is not common. - // this is specifically when we override an interface method, with an implementation method + Connection parameter (@ index 0) - argStartIndex = 1; - - args = new Object[serializers.length + 1]; - args[0] = connection; - } - else { - argStartIndex = 0; - args = new Object[serializers.length]; - } - - for (int i = 0, n = serializers.length, j = argStartIndex; i < n; i++, j++) { - args[j] = invokeMethod.args[i]; - } - - // overwrite the invoke method fields with UPDATED versions that have the correct (remote side) implementation/args - invokeMethod.cachedMethod = cachedMethod; - invokeMethod.args = args; - } - - receivedNormal(connection, msg, out); + argStartIndex = 0; + args = new Object[serializers.length]; } + + for (int i = 0, n = serializers.length, j = argStartIndex; i < n; i++, j++) { + args[j] = invokeMethod.args[i]; + } + + // overwrite the invoke method fields with UPDATED versions that have the correct (remote side) implementation/args + invokeMethod.cachedMethod = cachedMethod; + invokeMethod.args = args; + + // default action, now that we have swapped out things + rmiInvokeListener.received(connection, invokeMethod); } - private - void receivedNormal(final ConnectionImpl connection, final Object msg, final List out) { - // else, this was "just a local message" - - if (msg instanceof RmiMessages) { - // don't even process these message types - out.add(msg); - return; - } - - // because we NORMALLY pass around just the object (there is no serialization going on...) we have to explicitly check to see - // if this object, or any of it's fields MIGHT HAVE BEEN an RMI Proxy (or should be on), and switcheroo it here. - // NORMALLY this is automatic since the kryo IDs on each side point to the "correct object" for serialization, but here we don't do that. - - // maybe this object is supposed to switch to a proxy object?? (note: we cannot send proxy objects over local/network connections) - - @SuppressWarnings("unchecked") - IdentityMap implToProxy = implToProxyREF.get(this); - IdentityMap objectHasRemoteObjects = remoteObjectREF.get(this); - - - Object proxy = implToProxy.get(msg); - if (proxy != null) { - // we have a proxy object. nothing left to do. - out.add(proxy); - return; - } - - - Class messageClass = msg.getClass(); - - // are there any fields of this message class that COULD contain remote object fields? (NOTE: not RMI fields yet...) - final Field[] remoteObjectFields = objectHasRemoteObjects.get(messageClass); - if (remoteObjectFields == null) { - // maybe one of it's fields is a proxy object? - - // we cache the fields that have to be replaced, so subsequent invocations are significantly more preformat - final ArrayList fields = new ArrayList(); - - // we have to walk the hierarchy of this object to check ALL fields, public and private, using getDeclaredFields() - while (messageClass != Object.class) { - // this will get ALL fields that are - for (Field field : messageClass.getDeclaredFields()) { - final Class type = field.getType(); - - if (type.isInterface()) { - boolean prev = field.isAccessible(); - final Object o; - try { - field.setAccessible(true); - o = field.get(msg); - - if (o instanceof RemoteObject) { - RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o); - - int id = handler.objectID; - field.set(msg, connection.getImplementationObject(id)); - fields.add(field); - } - else { - // is a field supposed to be a proxy? - proxy = implToProxy.get(o); - if (proxy != null) { - field.set(msg, proxy); - fields.add(field); - } - } - - } catch (IllegalAccessException e) { - e.printStackTrace(); - // logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e); - } finally { - field.setAccessible(prev); - } - } - } - - messageClass = messageClass.getSuperclass(); - } - - Field[] array; - if (fields.isEmpty()) { - // no need to ever process this class again. - array = NO_REMOTE_FIELDS; - } - else { - array = fields.toArray(new Field[fields.size()]); - } - - //noinspection SynchronizeOnNonFinalField - synchronized (objectHasRemoteObjects) { - // i know what I'm doing. This must be synchronized. - objectHasRemoteObjects.put(messageClass, array); - } - } - else if (remoteObjectFields != NO_REMOTE_FIELDS) { - // quickly replace objects as necessary - - for (Field field : remoteObjectFields) { - boolean prev = field.isAccessible(); - final Object o; - try { - field.setAccessible(true); - o = field.get(msg); - - if (o instanceof RemoteObject) { - RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o); - - int id = handler.objectID; - field.set(msg, connection.getImplementationObject(id)); - } - else { - // is a field supposed to be a proxy? - proxy = implToProxy.get(o); - if (proxy != null) { - field.set(msg, proxy); - } - } - - } catch (IllegalAccessException e) { - e.printStackTrace(); - // logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e); - } finally { - field.setAccessible(prev); - } - } - } - - out.add(msg); - } - - - private - void receivedRegistration(final ConnectionImpl connection, final RmiRegistration registration) { + @Override + public + void registration(final ConnectionImpl connection, final RmiRegistration registration) { // manage creating/getting/notifying this RMI object // these fields are ALWAYS present! @@ -292,7 +145,7 @@ class RegisterRmiLocalHandler extends MessageToMessageDecoder { // have to convert the iFace -> Impl - EndPointBase endPoint = connection.getEndPoint(); + EndPointBase endPoint = connection.getEndPoint(); CryptoSerializationManager serialization = endPoint.getSerialization(); Class rmiImpl = serialization.getRmiImpl(registration.interfaceClass); @@ -344,6 +197,135 @@ class RegisterRmiLocalHandler extends MessageToMessageDecoder { } } + @Override + public + Object normalMessages(final ConnectionImpl connection, final Object message) { + // else, this was "just a local message" + + // because we NORMALLY pass around just the object (there is no serialization going on...) we have to explicitly check to see + // if this object, or any of it's fields MIGHT HAVE BEEN an RMI Proxy (or should be on), and switcheroo it here. + // NORMALLY this is automatic since the kryo IDs on each side point to the "correct object" for serialization, but here we don't do that. + + // maybe this object is supposed to switch to a proxy object?? (note: we cannot send proxy objects over local/network connections) + + @SuppressWarnings("unchecked") + IdentityMap implToProxy = implToProxyREF.get(this); + IdentityMap objectHasRemoteObjects = remoteObjectREF.get(this); + + + Object proxy = implToProxy.get(message); + if (proxy != null) { + // we have a proxy object. nothing left to do. + return proxy; + } + + + // otherwise we MIGHT have to modify the fields in the object... + + + Class messageClass = message.getClass(); + + // are there any fields of this message class that COULD contain remote object fields? (NOTE: not RMI fields yet...) + final Field[] remoteObjectFields = objectHasRemoteObjects.get(messageClass); + if (remoteObjectFields == null) { + // maybe one of it's fields is a proxy object? + + // we cache the fields that have to be replaced, so subsequent invocations are significantly more preformat + final ArrayList fields = new ArrayList(); + + // we have to walk the hierarchy of this object to check ALL fields, public and private, using getDeclaredFields() + while (messageClass != Object.class) { + // this will get ALL fields that are + for (Field field : messageClass.getDeclaredFields()) { + final Class type = field.getType(); + + if (type.isInterface()) { + boolean prev = field.isAccessible(); + final Object o; + try { + field.setAccessible(true); + o = field.get(message); + + if (o instanceof RemoteObject) { + RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o); + + int id = handler.objectID; + field.set(message, connection.getImplementationObject(id)); + fields.add(field); + } + else { + // is a field supposed to be a proxy? + proxy = implToProxy.get(o); + if (proxy != null) { + field.set(message, proxy); + fields.add(field); + } + } + + } catch (IllegalAccessException e) { + e.printStackTrace(); + // logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e); + } finally { + field.setAccessible(prev); + } + } + } + + messageClass = messageClass.getSuperclass(); + } + + Field[] array; + if (fields.isEmpty()) { + // no need to ever process this class again. + array = NO_REMOTE_FIELDS; + } + else { + array = fields.toArray(new Field[fields.size()]); + } + + //noinspection SynchronizeOnNonFinalField + synchronized (this.objectHasRemoteObjects) { + // i know what I'm doing. This must be synchronized. + this.objectHasRemoteObjects.put(messageClass, array); + } + } + else if (remoteObjectFields != NO_REMOTE_FIELDS) { + // quickly replace objects as necessary + + for (Field field : remoteObjectFields) { + boolean prev = field.isAccessible(); + final Object o; + try { + field.setAccessible(true); + o = field.get(message); + + if (o instanceof RemoteObject) { + RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o); + + int id = handler.objectID; + field.set(message, connection.getImplementationObject(id)); + } + else { + // is a field supposed to be a proxy? + proxy = implToProxy.get(o); + if (proxy != null) { + field.set(message, proxy); + } + } + + } catch (IllegalAccessException e) { + e.printStackTrace(); + // logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e); + } finally { + field.setAccessible(prev); + } + } + } + + return message; + } + + // private // LocalRmiClassEncoder replaceFieldObjects(final ConnectionImpl connection, final Object object, final Class implClass) { // Field[] rmiFields = fieldCache.get(implClass); diff --git a/src/dorkbox/network/connection/RegisterRmiNetworkHandler.java b/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java similarity index 57% rename from src/dorkbox/network/connection/RegisterRmiNetworkHandler.java rename to src/dorkbox/network/rmi/RmiObjectNetworkHandler.java index b87d0f76..6d303416 100644 --- a/src/dorkbox/network/connection/RegisterRmiNetworkHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java @@ -1,31 +1,40 @@ /* - * Copyright 2010 dorkbox, llc + * Copyright 2018 dorkbox, llc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package dorkbox.network.connection; +package dorkbox.network.rmi; -import dorkbox.network.rmi.RmiBridge; -import dorkbox.network.rmi.RmiRegistration; +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.Listener; -class RegisterRmiNetworkHandler implements Listener.OnMessageReceived { +public +class RmiObjectNetworkHandler extends RmiObjectHandler { - RegisterRmiNetworkHandler() { + public + RmiObjectNetworkHandler() { } @Override public - void received(final ConnectionImpl connection, final RmiRegistration registration) { + void invoke(final ConnectionImpl connection, final InvokeMethod message, final Listener.OnMessageReceived rmiInvokeListener) { + // default, nothing fancy + rmiInvokeListener.received(connection, message); + } + + @Override + public + void registration(final ConnectionImpl connection, final RmiRegistration registration) { // manage creating/getting/notifying this RMI object // these fields are ALWAYS present! @@ -42,7 +51,8 @@ class RegisterRmiNetworkHandler implements Listener.OnMessageReceived responseListener; + private final + Listener.OnMessageReceived responseListener; private int timeoutMillis = 3000; private boolean isAsync = false; @@ -92,7 +95,8 @@ class RmiProxyHandler implements InvocationHandler { * @param objectID this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID * @param iFace this is the RMI interface */ - RmiProxyHandler(final Connection connection, final int objectID, final Class iFace) { + public + RmiProxyHandler(final ConnectionImpl connection, final int objectID, final Class iFace) { super(); this.connection = connection; @@ -115,14 +119,7 @@ class RmiProxyHandler implements InvocationHandler { this.logger = LoggerFactory.getLogger(connection.getEndPoint().getName() + ":" + this.getClass().getSimpleName()); - - this.responseListener = new RemoteInvocationResponse() { - @Override - public - void disconnected(Connection connection) { - close(); - } - + this.responseListener = new Listener.OnMessageReceived() { @Override public void received(Connection connection, InvokeMethodResult invokeMethodResult) { @@ -146,12 +143,12 @@ class RmiProxyHandler implements InvocationHandler { } } }; - - connection.listeners() - .add(this.responseListener); } - + public + Listener.OnMessageReceived getListener() { + return responseListener; + } @SuppressWarnings({"AutoUnboxing", "AutoBoxing", "NumericCastThatLosesPrecision", "IfCanBeSwitch"}) @Override @@ -163,7 +160,7 @@ class RmiProxyHandler implements InvocationHandler { String name = method.getName(); if (name.equals("close")) { - close(); + connection.removeRmiListeners(objectID, getListener()); return null; } else if (name.equals("setResponseTimeout")) { @@ -403,12 +400,6 @@ class RmiProxyHandler implements InvocationHandler { throw new TimeoutException("Response timed out."); } - private - void close() { - this.connection.listeners() - .remove(this.responseListener); - } - @Override public int hashCode() { diff --git a/src/dorkbox/network/rmi/RmiRegistration.java b/src/dorkbox/network/rmi/RmiRegistration.java index 079fe3f7..f64e4608 100644 --- a/src/dorkbox/network/rmi/RmiRegistration.java +++ b/src/dorkbox/network/rmi/RmiRegistration.java @@ -19,7 +19,7 @@ package dorkbox.network.rmi; * Message specifically to register a class implementation for RMI */ public -class RmiRegistration { +class RmiRegistration implements RmiMessage { public boolean isRequest; /** diff --git a/src/dorkbox/network/serialization/Serialization.java b/src/dorkbox/network/serialization/Serialization.java index 8440d73e..07b82f78 100644 --- a/src/dorkbox/network/serialization/Serialization.java +++ b/src/dorkbox/network/serialization/Serialization.java @@ -308,7 +308,9 @@ class Serialization implements CryptoSerializationManager, RmiSerializationManag final boolean registrationRequired, final boolean implementationRequired, final SerializerFactory factory) { + this.forbidInterfaceRegistration = implementationRequired; + this.kryoPool = ObjectPool.NonBlockingSoftReference(new PoolableObject() { @Override public diff --git a/test/dorkbox/network/ReuseTest.java b/test/dorkbox/network/ReuseTest.java index 3f6ca1d6..feac2ce1 100644 --- a/test/dorkbox/network/ReuseTest.java +++ b/test/dorkbox/network/ReuseTest.java @@ -29,7 +29,6 @@ import org.junit.Test; import dorkbox.network.connection.Connection; import dorkbox.network.connection.Listener; import dorkbox.network.connection.Listeners; -import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; public @@ -39,7 +38,7 @@ class ReuseTest extends BaseTest { @Test public - void socketReuse() throws InitializationException, SecurityException, IOException, InterruptedException { + void socketReuse() throws SecurityException, IOException { this.serverCount = new AtomicInteger(0); this.clientCount = new AtomicInteger(0); @@ -104,7 +103,7 @@ class ReuseTest extends BaseTest { System.err.println("Waiting..."); try { Thread.sleep(100); - } catch (InterruptedException ex) { + } catch (InterruptedException ignored) { } } @@ -119,7 +118,7 @@ class ReuseTest extends BaseTest { @Test public - void localReuse() throws InitializationException, SecurityException, IOException, InterruptedException { + void localReuse() throws SecurityException, IOException { this.serverCount = new AtomicInteger(0); this.clientCount = new AtomicInteger(0); diff --git a/test/dorkbox/network/rmi/multiJVM/TestServer.java b/test/dorkbox/network/rmi/multiJVM/TestServer.java index 471e7539..1d89fbfd 100644 --- a/test/dorkbox/network/rmi/multiJVM/TestServer.java +++ b/test/dorkbox/network/rmi/multiJVM/TestServer.java @@ -1,13 +1,10 @@ package dorkbox.network.rmi.multiJVM; -import java.io.IOException; - import dorkbox.network.Server; import dorkbox.network.rmi.RmiTest; import dorkbox.network.rmi.TestCow; import dorkbox.network.rmi.TestCowImpl; import dorkbox.network.serialization.Serialization; -import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; /** @@ -31,12 +28,8 @@ class TestServer Server server = null; try { server = new Server(configuration); - } catch (InitializationException e) { - e.printStackTrace(); } catch (SecurityException e) { e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); } // server.setIdleTimeout(0);