From 4c8c50e8a3ba70a5629a81dfd9fede97b99b366a Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 5 Jun 2020 19:50:16 +0200 Subject: [PATCH] Cleanup. pre-kotlin conversion --- src/dorkbox/network/Server.java | 100 +++++++++--------- .../network/aeron/EchoServerDuologue.java | 1 - .../network/connection/ConnectionImpl.java | 38 +------ .../network/connection/ConnectionManager.java | 73 +------------ .../network/connection/ConnectionPoint.java | 5 - src/dorkbox/network/connection/EndPoint.java | 13 +-- .../network/connection/EndPointClient.java | 5 - .../network/connection/ISessionManager.java | 7 -- .../OnConnectedManager.java | 5 +- .../listenerManagement/OnIdleManager.java | 50 --------- .../wrapper/ChannelLocalWrapper.java | 12 --- .../connection/wrapper/ChannelNetwork.java | 8 -- .../wrapper/ChannelNetworkWrapper.java | 15 --- .../connection/wrapper/ChannelNull.java | 5 - .../connection/wrapper/ChannelWrapper.java | 5 - .../network/rmi/ConnectionRmiImplSupport.java | 14 ++- src/dorkbox/network/rmi/RmiProxyHandler.java | 4 +- 17 files changed, 69 insertions(+), 291 deletions(-) delete mode 100644 src/dorkbox/network/connection/listenerManagement/OnIdleManager.java diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java index 489889f5..cdb956f6 100644 --- a/src/dorkbox/network/Server.java +++ b/src/dorkbox/network/Server.java @@ -132,37 +132,63 @@ class Server extends EndPointServer { isRunning = true; - try (final Publication publication = this.setupAllClientsPublication()) { - try (final Subscription subscription = this.setupAllClientsSubscription()) { + Publication publication = null; + Subscription subscription = null; + FragmentHandler handler = null; + try { + publication = EchoChannels.createPublicationDynamicMDC(this.aeron, + this.config.listenIpAddress, + this.config.controlPort, + UDP_STREAM_ID); - /** - * Note: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice. - * - * Note: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy. - */ - final FragmentHandler handler = new FragmentAssembler((buffer, offset, length, header)->this.onInitialClientMessage( - publication, - buffer, - offset, - length, - header)); + subscription = EchoChannels.createSubscriptionWithHandlers(this.aeron, + this.config.listenIpAddress, + this.config.port, + UDP_STREAM_ID, + this::onInitialClientConnected, + this::onInitialClientDisconnected); - while (true) { - this.executor.execute(()->{ - subscription.poll(handler, 100); // this checks to see if there are NEW clients - this.clients.poll(); // this manages existing clients - }); - try { - Thread.sleep(100L); - } catch (final InterruptedException e) { - Thread.currentThread() - .interrupt(); - } + + /** + * Note: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice. + * + * Note: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy. + */ + final Publication finalPublication = publication; + handler = new FragmentAssembler((buffer, offset, length, header)->this.onInitialClientMessage(finalPublication, + buffer, + offset, + length, + header)); + + final FragmentHandler initialConnectionHandler = handler; + final Subscription initialConnectionSubscription = subscription; + + while (true) { + this.executor.execute(()->{ + initialConnectionSubscription.poll(initialConnectionHandler, 100); // this checks to see if there are NEW clients + this.clients.poll(); // this manages existing clients + }); + + try { + Thread.sleep(100L); + } catch (final InterruptedException e) { + Thread.currentThread() + .interrupt(); } } + } finally { + if (publication != null) { + publication.close(); + } + + if (subscription != null) { + subscription.close(); + } } + // we now BLOCK until the stop method is called. // if we want to continue running code in the server, bind should be called in a separate, non-daemon thread. // if (blockUntilTerminate) { @@ -265,32 +291,6 @@ class Server extends EndPointServer { }); } - /** - * Configure the publication for the "all-clients" channel. - */ - - private - Publication setupAllClientsPublication() { - return EchoChannels.createPublicationDynamicMDC(this.aeron, - this.config.listenIpAddress, - this.config.controlPort, - UDP_STREAM_ID); - } - - /** - * Configure the subscription for the "all-clients" channel. - */ - - private - Subscription setupAllClientsSubscription() { - return EchoChannels.createSubscriptionWithHandlers(this.aeron, - this.config.listenIpAddress, - this.config.port, - UDP_STREAM_ID, - this::onInitialClientConnected, - this::onInitialClientDisconnected); - } - private void onInitialClientConnected(final Image image) { this.executor.execute(()->{ diff --git a/src/dorkbox/network/aeron/EchoServerDuologue.java b/src/dorkbox/network/aeron/EchoServerDuologue.java index dc5f214b..57e33ba3 100644 --- a/src/dorkbox/network/aeron/EchoServerDuologue.java +++ b/src/dorkbox/network/aeron/EchoServerDuologue.java @@ -156,7 +156,6 @@ class EchoServerDuologue implements AutoCloseable { /** * Poll the duologue for activity. */ - public void poll() { this.exec.assertIsExecutorThread(); diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index 3afd959c..a165f191 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -45,8 +45,6 @@ import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.local.LocalChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.oio.OioSocketChannel; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Promise; @@ -315,10 +313,10 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ public final void ping0(PingMessage ping) { if (this.channelWrapper.udp() != null) { - UDP(ping).flush(); + UDP(ping); } else if (this.channelWrapper.tcp() != null) { - TCP(ping).flush(); + TCP(ping); } else { self(ping); @@ -484,36 +482,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ } } - /** - * Flushes the contents of the TCP/UDP/etc pipes to the actual transport. - */ - final - void flush() { - this.channelWrapper.flush(); - } - - - /** - * Invoked when a {@link Channel} has been idle for a while. - */ - @Override - public - void userEventTriggered(ChannelHandlerContext context, Object event) throws Exception { - // if (e.getState() == IdleState.READER_IDLE) { - // e.getChannel().close(); - // } else if (e.getState() == IdleState.WRITER_IDLE) { - // e.getChannel().write(new Object()); - // } else - if (event instanceof IdleStateEvent) { - if (((IdleStateEvent) event).state() == IdleState.ALL_IDLE) { - // will auto-flush if necessary - this.sessionManager.onIdle(this); - } - } - - super.userEventTriggered(context, event); - } - /** * @param context can be NULL when running deferred messages from registration process. * @param message the received message @@ -698,8 +666,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ } } - // flush any pending messages - this.channelWrapper.flush(); // close out the ping future PingFuture pingFuture2 = this.pingFuture; diff --git a/src/dorkbox/network/connection/ConnectionManager.java b/src/dorkbox/network/connection/ConnectionManager.java index 2ffc8184..dd304acc 100644 --- a/src/dorkbox/network/connection/ConnectionManager.java +++ b/src/dorkbox/network/connection/ConnectionManager.java @@ -29,7 +29,6 @@ import dorkbox.network.connection.bridge.ConnectionBridgeServer; import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; import dorkbox.network.connection.listenerManagement.OnConnectedManager; import dorkbox.network.connection.listenerManagement.OnDisconnectedManager; -import dorkbox.network.connection.listenerManagement.OnIdleManager; import dorkbox.network.connection.listenerManagement.OnMessageReceivedManager; import dorkbox.network.connection.ping.PingMessage; import dorkbox.util.Property; @@ -69,7 +68,6 @@ class ConnectionManager implements Listeners, ISessionMana private final OnConnectedManager onConnectedManager; private final OnDisconnectedManager onDisconnectedManager; - private final OnIdleManager onIdleManager; private final OnMessageReceivedManager onMessageReceivedManager; @SuppressWarnings({"FieldCanBeLocal", "unused"}) @@ -106,7 +104,6 @@ class ConnectionManager implements Listeners, ISessionMana onConnectedManager = new OnConnectedManager(logger); onDisconnectedManager = new OnDisconnectedManager(logger); - onIdleManager = new OnIdleManager(logger); onMessageReceivedManager = new OnMessageReceivedManager(logger); } @@ -161,10 +158,6 @@ class ConnectionManager implements Listeners, ISessionMana onDisconnectedManager.add((Listener.OnDisconnected) listener); found = true; } - if (listener instanceof Listener.OnIdle) { - onIdleManager.add((Listener.OnIdle) listener); - found = true; - } if (listener instanceof Listener.OnMessageReceived) { onMessageReceivedManager.add((Listener.OnMessageReceived) listener); @@ -226,13 +219,6 @@ class ConnectionManager implements Listeners, ISessionMana found |= true; } } - if (listener instanceof Listener.OnIdle) { - int size = onIdleManager.removeWithSize((Listener.OnIdle) listener); - if (size >= 0) { - remainingListeners += size; - found |= true; - } - } if (listener instanceof Listener.OnMessageReceived) { int size = onMessageReceivedManager.removeWithSize((Listener.OnMessageReceived) listener); if (size >= 0) { @@ -265,7 +251,6 @@ class ConnectionManager implements Listeners, ISessionMana Listeners removeAll() { onConnectedManager.clear(); onDisconnectedManager.clear(); - onIdleManager.clear(); onMessageReceivedManager.clear(); logger.trace("ALL listeners removed !!"); @@ -305,8 +290,7 @@ class ConnectionManager implements Listeners, ISessionMana /** * Invoked when a message object was received from a remote peer. *

- * If data is sent in response to this event, the connection data is automatically flushed to the wire. If the data is sent in a separate thread, - * {@link EndPoint#send().flush()} must be called manually. + * If data is sent in response to this event, the connection data is automatically flushed to the wire. *

* {@link ISessionManager} */ @@ -344,9 +328,6 @@ class ConnectionManager implements Listeners, ISessionMana if (connection.manageRmi(message)) { // if we are an RMI message/registration, we have very specific, defined behavior. We do not use the "normal" listener callback pattern // because these methods are rare, and require special functionality - - // make sure we flush the message to the socket! - connection.flush(); return true; } @@ -367,10 +348,7 @@ class ConnectionManager implements Listeners, ISessionMana } // only run a flush once - if (foundListener) { - connection.flush(); - } - else { + if (!foundListener) { this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}", message.getClass() .getSimpleName()); @@ -378,28 +356,6 @@ class ConnectionManager implements Listeners, ISessionMana return foundListener; } - /** - * Invoked when a Connection has been idle for a while. - */ - @Override - public final - void onIdle(final ConnectionImpl connection) { - boolean foundListener = onIdleManager.notifyIdle((C) connection, shutdown); - - if (foundListener) { - connection.flush(); - } - - // now have to account for additional (local) listener managers. - // access a snapshot of the managers (single-writer-principle) - final IdentityMap localManagers = localManagersREF.get(this); - ConnectionManager localManager = localManagers.get(connection); - if (localManager != null) { - localManager.onIdle(connection); - } - } - - /** * Invoked when a Channel is open, bound to a local address, and connected to a remote address. */ @@ -410,10 +366,6 @@ class ConnectionManager implements Listeners, ISessionMana boolean foundListener = onConnectedManager.notifyConnected((C) connection, shutdown); - if (foundListener) { - connection.flush(); - } - // now have to account for additional (local) listener managers. // access a snapshot of the managers (single-writer-principle) final IdentityMap localManagers = localManagersREF.get(this); @@ -433,10 +385,6 @@ class ConnectionManager implements Listeners, ISessionMana boolean foundListener = onDisconnectedManager.notifyDisconnected((C) connection); - if (foundListener) { - connection.flush(); - } - // now have to account for additional (local) listener managers. // access a snapshot of the managers (single-writer-principle) @@ -633,7 +581,6 @@ class ConnectionManager implements Listeners, ISessionMana onConnectedManager.clear(); onDisconnectedManager.clear(); - onIdleManager.clear(); onMessageReceivedManager.clear(); } @@ -822,22 +769,6 @@ class ConnectionManager implements Listeners, ISessionMana return this; } - /** - * Flushes the contents of the TCP/UDP/etc pipes to the actual transport socket. - */ - @Override - public - void flush() { - ConcurrentEntry current = connectionsREF.get(this); - ConnectionImpl c; - while (current != null) { - c = current.getValue(); - current = current.next(); - - c.flush(); - } - } - @Override public boolean equals(final Object o) { diff --git a/src/dorkbox/network/connection/ConnectionPoint.java b/src/dorkbox/network/connection/ConnectionPoint.java index c328c7fc..a4d81c3c 100644 --- a/src/dorkbox/network/connection/ConnectionPoint.java +++ b/src/dorkbox/network/connection/ConnectionPoint.java @@ -30,11 +30,6 @@ interface ConnectionPoint { */ void write(Object object) throws Exception; - /** - * Flushes the contents of the TCP/UDP/etc pipes to the actual transport socket. - */ - void flush(); - /** * Creates a new promise associated with this connection type */ diff --git a/src/dorkbox/network/connection/EndPoint.java b/src/dorkbox/network/connection/EndPoint.java index 5668e766..13336481 100644 --- a/src/dorkbox/network/connection/EndPoint.java +++ b/src/dorkbox/network/connection/EndPoint.java @@ -408,7 +408,7 @@ After this command is executed the new disk will be mounted under /Volumes/DevSh } /** - * This method allows the connections used by the client/server to be subclassed (custom implementations). + * This method allows the connections used by the client/server to be subclassed (with custom implementations). *

* As this is for the network stack, the new connection MUST subclass {@link ConnectionImpl} *

@@ -487,17 +487,6 @@ After this command is executed the new disk will be mounted under /Volumes/DevSh return connectionManager.getConnections(); } - /** - * Closes all connections ONLY (keeps the server/client running). To STOP the client/server, use stop(). - *

- * This is used, for example, when reconnecting to a server. - *

- * The server should ALWAYS use STOP. - */ - void closeConnections(boolean shouldKeepListeners) { - - } - /** * Creates a "global" RMI object for use by multiple connections. * diff --git a/src/dorkbox/network/connection/EndPointClient.java b/src/dorkbox/network/connection/EndPointClient.java index faad6eea..a4c7d9b2 100644 --- a/src/dorkbox/network/connection/EndPointClient.java +++ b/src/dorkbox/network/connection/EndPointClient.java @@ -65,7 +65,6 @@ class EndPointClient extends EndPoint { public ConnectionPoint self(Object message) { ConnectionPoint self = connection.self(message); - connection.flush(); return self; } @@ -74,7 +73,6 @@ class EndPointClient extends EndPoint { public ConnectionPoint TCP(Object message) { ConnectionPoint tcp = connection.TCP(message); - connection.flush(); // needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from // INSIDE the event loop @@ -87,7 +85,6 @@ class EndPointClient extends EndPoint { public ConnectionPoint UDP(Object message) { ConnectionPoint udp = connection.UDP(message); - connection.flush(); // needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from // INSIDE the event loop @@ -99,7 +96,6 @@ class EndPointClient extends EndPoint { public Ping ping() { Ping ping = connection.ping(); - connection.flush(); return ping; } }; @@ -131,7 +127,6 @@ class EndPointClient extends EndPoint { public ConnectionPoint send(final Object message) { ConnectionPoint send = connection.send(message); - send.flush(); // needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from // INSIDE the event loop diff --git a/src/dorkbox/network/connection/ISessionManager.java b/src/dorkbox/network/connection/ISessionManager.java index 1a7f5514..e304a67d 100644 --- a/src/dorkbox/network/connection/ISessionManager.java +++ b/src/dorkbox/network/connection/ISessionManager.java @@ -26,13 +26,6 @@ interface ISessionManager { */ void onMessage(ConnectionImpl connection, Object message); - /** - * Called when the connection has been idle (read & write) for 2 seconds. - *

- * Will auto-flush the connection queue if necessary. - */ - void onIdle(ConnectionImpl connection); - /** * Invoked when a Channel is open, bound to a local address, and connected to a remote address. */ diff --git a/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java index 89156af9..015367c6 100644 --- a/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java +++ b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java @@ -23,16 +23,13 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.Listener.OnConnected; /** - * Called when the remote end has been connected. This will be invoked before any objects are received by the network. + * Called when the remote computer has been connected. This will be invoked before any objects are received by the network. * This method should not block for long periods as other network activity will not be processed * until it returns. */ public final class OnConnectedManager extends ConcurrentManager> { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) public OnConnectedManager(final Logger logger) { super(logger); diff --git a/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java b/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java deleted file mode 100644 index 3562bd40..00000000 --- a/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.connection.listenerManagement; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; - -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.Listener.OnIdle; - -/** - * Called when the remote end has been connected. This will be invoked before any objects are received by the network. - * This method should not block for long periods as other network activity will not be processed - * until it returns. - */ -public final -class OnIdleManager extends ConcurrentManager> { - - public - OnIdleManager(final Logger logger) { - super(logger); - } - - /** - * @return true if a listener was found, false otherwise - */ - public - boolean notifyIdle(final C connection, final AtomicBoolean shutdown) { - return doAction(connection, shutdown); - } - - @Override - void listenerAction(final C connection, final OnIdle listener) throws Exception { - listener.idle(connection); - } -} diff --git a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java index 0d83e055..f2eeb4c7 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java @@ -76,18 +76,6 @@ class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint { return this; } - /** - * Flushes the contents of the LOCAL pipes to the actual transport. - */ - @Override - public - void flush() { - if (this.shouldFlush.compareAndSet(true, false)) { - this.channel.flush(); - } - } - - @Override public Promise newPromise() { diff --git a/src/dorkbox/network/connection/wrapper/ChannelNetwork.java b/src/dorkbox/network/connection/wrapper/ChannelNetwork.java index be265628..77315d28 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelNetwork.java +++ b/src/dorkbox/network/connection/wrapper/ChannelNetwork.java @@ -56,14 +56,6 @@ class ChannelNetwork implements ConnectionPoint { return channel.isWritable(); } - @Override - public - void flush() { - if (shouldFlush.compareAndSet(true, false)) { - channel.flush(); - } - } - @Override public Promise newPromise() { diff --git a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java index 6f8786d1..5811b22e 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java @@ -85,21 +85,6 @@ class ChannelNetworkWrapper implements ChannelWrapper { return this.udp; } - /** - * Flushes the contents of the TCP/UDP/etc pipes to the actual transport. - */ - @Override - public - void flush() { - if (this.tcp != null) { - this.tcp.flush(); - } - - if (this.udp != null) { - this.udp.flush(); - } - } - /** * @return the AES key. */ diff --git a/src/dorkbox/network/connection/wrapper/ChannelNull.java b/src/dorkbox/network/connection/wrapper/ChannelNull.java index 7cc79517..550f7d36 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelNull.java +++ b/src/dorkbox/network/connection/wrapper/ChannelNull.java @@ -38,11 +38,6 @@ class ChannelNull implements ConnectionPoint { void write(Object object) { } - @Override - public - void flush() { - } - /** * @return true if the channel is writable. Useful when sending large amounts of data at once. */ diff --git a/src/dorkbox/network/connection/wrapper/ChannelWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelWrapper.java index a1e5fdf0..20dd1f80 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelWrapper.java @@ -27,11 +27,6 @@ interface ChannelWrapper { ConnectionPoint tcp(); ConnectionPoint udp(); - /** - * Flushes the contents of the TCP/UDP/etc pipes to the actual transport. - */ - void flush(); - /** * @return the AES key. */ diff --git a/src/dorkbox/network/rmi/ConnectionRmiImplSupport.java b/src/dorkbox/network/rmi/ConnectionRmiImplSupport.java index cd835fc9..37b9e4a4 100644 --- a/src/dorkbox/network/rmi/ConnectionRmiImplSupport.java +++ b/src/dorkbox/network/rmi/ConnectionRmiImplSupport.java @@ -79,6 +79,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { abstract void registration(final ConnectionImpl connection, final RmiRegistration message); + @Override public void close() { // proxy listeners are cleared in the removeAll() call (which happens BEFORE close) @@ -90,11 +91,13 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { /** * This will remove the invoke and invoke response listeners for this remote object */ + @Override public void removeAllListeners() { proxyListeners.clear(); } + @Override public void createRemoteObject(final ConnectionImpl connection, final Class interfaceClass, final RemoteObjectCallback callback) { if (!interfaceClass.isInterface()) { @@ -112,9 +115,10 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. // this means we are creating a NEW object on the server, bound access to only this connection - connection.send(message).flush(); + connection.send(message); } + @Override public void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback callback) { if (objectId < 0) { @@ -137,12 +141,13 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { // have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here. // this means we are getting an EXISTING object on the server, bound access to only this connection - connection.send(message).flush(); + connection.send(message); } /** * Manages the RMI stuff for a connection. */ + @Override public boolean manage(final ConnectionImpl connection, final Object message) { if (message instanceof InvokeMethod) { @@ -167,7 +172,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { InvokeMethodResult result = RmiBridge.invoke(connection, target, invokeMethod, logger); if (result != null) { // System.err.println("Sending: " + invokeMethod.responseID); - connection.send(result).flush(); + connection.send(result); } } catch (IOException e) { @@ -207,6 +212,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { * * @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID. */ + @Override public int getRegisteredId(final T object) { // always check global before checking local, because less contention on the synchronization @@ -225,6 +231,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { * * @param objectId this is the RMI object ID */ + @Override public Object getImplementationObject(final int objectId) { if (RmiBridge.isGlobal(objectId)) { @@ -363,6 +370,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport { * @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID * @param iFace this is the RMI interface */ + @Override public RemoteObject getProxyObject(final int rmiId, final Class iFace) { if (iFace == null) { diff --git a/src/dorkbox/network/rmi/RmiProxyHandler.java b/src/dorkbox/network/rmi/RmiProxyHandler.java index 98a67ed5..471d9065 100644 --- a/src/dorkbox/network/rmi/RmiProxyHandler.java +++ b/src/dorkbox/network/rmi/RmiProxyHandler.java @@ -297,11 +297,11 @@ class RmiProxyHandler implements InvocationHandler { // Sends our invokeMethod to the remote connection, which the RmiBridge listens for if (this.udp) { // flush is necessary in case this is called outside of a network worker thread - this.connection.UDP(invokeMethod).flush(); + this.connection.UDP(invokeMethod); } else { // flush is necessary in case this is called outside of a network worker thread - this.connection.send(invokeMethod).flush(); + this.connection.send(invokeMethod); } if (logger.isTraceEnabled()) {