diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index a820c2b7..1e9bf69a 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -16,6 +16,8 @@ package dorkbox.network.connection.registration.remote; import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.bouncycastle.jce.ECNamedCurveTable; @@ -43,12 +45,15 @@ import io.netty.channel.EventLoopGroup; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.GenericFutureListener; public abstract class RegistrationRemoteHandler extends RegistrationHandler { + static final AttributeKey MESSAGES = AttributeKey.valueOf(RegistrationRemoteHandler.class, "messages"); + static final String DELETE_IP = "eleteIP"; // purposefully missing the "D", since that is a system parameter, which starts with "-D" static final ECParameterSpec eccSpec = ECNamedCurveTable.getParameterSpec(CryptoECC.curve25519); @@ -223,25 +228,13 @@ class RegistrationRemoteHandler extends RegistrationHandler { try { if (metaChannel.tcpChannel == channel) { - // add the new handlers (FORCE encryption and longer IDLE handler) pipeline.replace(FRAME_AND_KRYO_DECODER, FRAME_AND_KRYO_CRYPTO_DECODER, new KryoDecoderCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation. } if (metaChannel.udpChannel == channel) { - if (metaChannel.tcpChannel == null) { - // TODO: UDP (and TCP??) idle timeout (also, to close UDP session when TCP shuts down) - // this means that we are ONLY UDP, and we should have an idle timeout that CLOSES the session after a while. - // Naturally, one would want the "normal" idle to trigger first, but there always be a heartbeat if the idle trigger DOES NOT - // send data on the network, to make sure that the UDP-only session stays alive or disconnects. - - // If the server disconnects, the client has to be made aware of this when it tries to send data again (it must go through - // it's entire reconnect protocol) - } - - // these encoders are shared - pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, this.registrationWrapper.kryoUdpDecoderCrypto); + pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, this.registrationWrapper.kryoUdpDecoderCrypto); // shared encoder } } catch (Exception e) { logger.error("Error during connection pipeline upgrade", e); @@ -257,15 +250,13 @@ class RegistrationRemoteHandler extends RegistrationHandler { try { if (metaChannel.tcpChannel == channel) { - // add the new handlers (FORCE encryption and longer IDLE handler) pipeline.replace(FRAME_AND_KRYO_ENCODER, FRAME_AND_KRYO_CRYPTO_ENCODER, registrationWrapper.kryoTcpEncoderCrypto); // this is shared } if (metaChannel.udpChannel == channel) { - // these encoders are shared - pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, registrationWrapper.kryoUdpEncoderCrypto); + pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, registrationWrapper.kryoUdpEncoderCrypto); // shared encoder } } catch (Exception e) { logger.error("Error during connection pipeline upgrade", e); @@ -277,6 +268,7 @@ class RegistrationRemoteHandler extends RegistrationHandler { */ final void upgradePipeline(final MetaChannel metaChannel, final InetSocketAddress remoteAddress) { + logger.trace("Upgrading pipeline"); try { if (metaChannel.udpChannel != null) { if (metaChannel.tcpChannel == null) { @@ -309,36 +301,19 @@ class RegistrationRemoteHandler extends RegistrationHandler { if (this.logger.isInfoEnabled()) { - String type = ""; - - if (metaChannel.tcpChannel != null) { - type = "TCP"; - - if (metaChannel.udpChannel != null) { - type += "/"; - } - } - - if (metaChannel.udpChannel != null) { - type += "UDP"; - } - - StringBuilder stringBuilder = new StringBuilder(96); - stringBuilder.append("Encrypted "); if (metaChannel.tcpChannel != null) { - stringBuilder.append(type) - .append(" connection ["); + stringBuilder.append("Encrypted TCP connection ["); EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.localAddress()); stringBuilder.append(getConnectionDirection()); EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.remoteAddress()); stringBuilder.append("]"); } - else if (metaChannel.udpChannel != null) { - stringBuilder.append(type) - .append(" connection ["); + + if (metaChannel.udpChannel != null) { + stringBuilder.append("Encrypted UDP connection ["); EndPoint.getHostDetails(stringBuilder, metaChannel.udpChannel.localAddress()); stringBuilder.append(getConnectionDirection()); @@ -354,19 +329,54 @@ class RegistrationRemoteHandler extends RegistrationHandler { } final - void cleanupPipeline(final MetaChannel metaChannel, final long delay) { + void cleanupPipeline(final MetaChannel metaChannel, final Runnable onConnectFinishRunnable) { final int idleTimeout = this.registrationWrapper.getIdleTimeout(); try { // REMOVE our channel wrapper (only used for encryption) with the actual connection - metaChannel.connection = ((ConnectionWrapper) metaChannel.connection).connection; + ChannelHandler handler = metaChannel.connection = ((ConnectionWrapper) metaChannel.connection).connection; + + Channel channel; + if (metaChannel.tcpChannel != null) { + channel = metaChannel.tcpChannel; + } else { + channel = metaChannel.udpChannel; + } + + // channel should NEVER == null! (we will always have TCP or UDP!) + // we also ONLY want to add this to a single cleanup, NOT BOTH, because this must only run once!! + final ChannelPromise channelPromise = channel.newPromise(); + channelPromise.addListener(new FutureListener() { + @Override + public + void operationComplete(final Future future) throws Exception { + EventLoop loop = channelPromise.channel() + .eventLoop(); + loop.execute(new Runnable() { + @Override + public + void run() { + logger.trace("Notify Connection"); + + // safe cast, because it's always this way... + registrationWrapper.connectionConnected0((ConnectionImpl) metaChannel.connection); + + // run things AFTER the onConnect() method is called... + // CLIENT - runs the deferred 'onMessage' events in the connection as needed + // SERVER - send 'onConnect' bounce back message to client + onConnectFinishRunnable.run(); + } + }); + } + }); + if (metaChannel.tcpChannel != null) { - cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.tcpChannel, true); + cleanupPipeline0(idleTimeout, handler, metaChannel.tcpChannel, channelPromise, true); } if (metaChannel.udpChannel != null) { - cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.udpChannel, false); + cleanupPipeline0(idleTimeout, handler, metaChannel.udpChannel, channelPromise, false); } } catch (Exception e) { logger.error("Error during pipeline replace", e); @@ -374,14 +384,14 @@ class RegistrationRemoteHandler extends RegistrationHandler { } private - void cleanupPipeline0(final long delay, - final int idleTimeout, - final MetaChannel metaChannel, + void cleanupPipeline0(final int idleTimeout, final ChannelHandler connection, final Channel channel, + final ChannelPromise channelPromise, final boolean isTcp) { final ChannelPipeline pipeline = channel.pipeline(); + // have to explicitly remove handlers based on the input type. Cannot use this.getClass().... boolean isClient = registrationWrapper.isClient(); if (isClient) { if (isTcp) { @@ -412,54 +422,22 @@ class RegistrationRemoteHandler extends RegistrationHandler { pipeline.addLast(CONNECTION_HANDLER, connection); // we also DEREGISTER from the HANDSHAKE event-loop and run on the worker event-loop! - // if (isClient) { - ChannelFuture future = channel.deregister(); - future.addListener(new GenericFutureListener>() { - @Override - public - void operationComplete(final Future f) throws Exception { - if (f.isSuccess()) { - // final EventLoop next = workerEventLoop.next(); - - final ChannelPromise channelPromise = channel.newPromise(); - channelPromise.addListener(new FutureListener() { - @Override - public - void operationComplete(final Future future) throws Exception { - EventLoop loop = channel.eventLoop(); - loop.schedule(new Runnable() { - @Override - public - void run() { - logger.trace("Notify Connection"); - doConnect(metaChannel); - } - }, delay, TimeUnit.MILLISECONDS); - } - }); - - // TODO: TCP and UDP have to register on DIFFERENT event loops + ChannelFuture future = channel.deregister(); + future.addListener(new GenericFutureListener>() { + @Override + public + void operationComplete(final Future f) throws Exception { + if (f.isSuccess()) { + // TCP and UDP register on DIFFERENT event loops. The channel promise ONLY runs on 1 of them... + if (channelPromise.channel() == channel) { workerEventLoop.register(channelPromise); } + else { + workerEventLoop.register(channel); + } } - }); - // } - // else { - // channel.eventLoop().schedule(new Runnable() { - // @Override - // public - // void run() { - // logger.trace("Notify Connection"); - // doConnect(metaChannel); - // } - // }, delay, TimeUnit.MILLISECONDS); - // } - } - - final - void doConnect(final MetaChannel metaChannel) { - // safe cast, because it's always this way... - this.registrationWrapper.connectionConnected0((ConnectionImpl) metaChannel.connection); + } + }); } // whoa! Didn't send valid public key info! @@ -493,4 +471,42 @@ class RegistrationRemoteHandler extends RegistrationHandler { return false; } + + /** + * have to have a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready. + */ + void prepChannelForOutOfOrderMessages(final Channel channel) { + // this could POSSIBLY be screwed up when getting created, so we make sure to only create the list ONE time + channel.attr(MESSAGES) + .setIfAbsent(new LinkedList()); + } + + + /** + * have to have a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready. + */ + void saveOutOfOrderMessage(final Channel channel, final Object message) { + // this will ALWAYS have already been created, or IF NULL -- then something really screwed up! + LinkedList list = channel.attr(MESSAGES) + .get(); + + if (list == null) { + logger.error("Completely screwed up message order from server!"); + shutdown(channel, 0); + return; + } + + //noinspection unchecked + list.add(message); + } + + /** + * Gets all of the messages for this channel that were "out of order" (onMessage called before onConnect finished). + */ + List getOutOfOrderMessagesAndReset(final Channel channel) { + LinkedList messages = channel.attr(MESSAGES) + .getAndSet(null); + + return messages; + } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java index 20097073..6b7f0f1d 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java @@ -18,6 +18,8 @@ package dorkbox.network.connection.registration.remote; import java.math.BigInteger; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import org.bouncycastle.crypto.BasicAgreement; import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement; @@ -28,6 +30,7 @@ import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -190,6 +193,8 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { // IN: hasMore=true if we have more registrations to do, false otherwise if (registration.hasMore) { + logger.trace("Starting another protocol registration"); + metaChannel.totalProtocols.incrementAndGet(); registrationWrapper.startNextProtocolRegistration(); return; } @@ -213,7 +218,61 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { } - // remove the ConnectionWrapper (that was used to upgrade the connection) - cleanupPipeline(metaChannel, 20); + // IN: upgraded=true this means we are ready to connect, and the server is done with it's onConnect calls + // we defer the messages until after our own onConnect() is called... + + + // we have to wait for ALL messages to be received, this way we can prevent out-of-order oddities... + int protocolsRemaining = metaChannel.totalProtocols.decrementAndGet(); + if (protocolsRemaining > 0) { + logger.trace("{} done. Waiting for {} more protocols registrations to arrive...", type, protocolsRemaining); + return; + } + + // remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline + // always wait until AFTER the server calls "onConnect", then we do this + cleanupPipeline(metaChannel, new Runnable() { + @Override + public + void run() { + // this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated + + // get all of the out of order messages that we missed + List messages = new LinkedList(); + + if (metaChannel.tcpChannel != null) { + List list = getOutOfOrderMessagesAndReset(metaChannel.tcpChannel); + if (list != null) { + logger.trace("Getting deferred TCP messages: {}", list.size()); + messages.addAll(list); + } + } + + if (metaChannel.udpChannel != null) { + List list = getOutOfOrderMessagesAndReset(metaChannel.udpChannel); + if (list != null) { + logger.trace("Getting deferred UDP messages: {}", list.size()); + messages.addAll(list); + } + } + + // now call 'onMessage' in the connection object with our messages! + try { + ConnectionImpl connection = (ConnectionImpl) metaChannel.connection; + + for (Object message : messages) { + logger.trace(" deferred onMessage({}, {})", connection.id(), message); + try { + connection.channelRead(null, message); + } catch (Exception e) { + logger.error("Error running deferred messages!", e); + } + } + + } catch (Exception e) { + logger.error("Error initialising deferred messages!", e); + } + } + }); } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java index bedc4e46..12686381 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java @@ -15,7 +15,6 @@ */ package dorkbox.network.connection.registration.remote; -import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -69,25 +68,24 @@ class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandlerClient else { metaChannel = registrationWrapper.getSession(sessionId); + // TCP channel registration is ALWAYS first, so this is the correct way to do this. if (metaChannel == null) { metaChannel = registrationWrapper.createSessionClient(sessionId); metaChannel.tcpChannel = channel; + logger.debug("New TCP connection. Saving meta-channel id: {}", metaChannel.sessionId); } + + // have to add a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready. + prepChannelForOutOfOrderMessages(channel); } + logger.trace("TCP read"); readClient(channel, registration, "TCP client", metaChannel); } else { - logger.error("Error registering TCP with remote server!"); - - // this is what happens when the registration happens too quickly... - Object connection = context.pipeline().last(); - if (connection instanceof ConnectionImpl) { - ((ConnectionImpl) connection).channelRead(context, message); - } else { - shutdown(channel, 0); - } + logger.trace("Out of order TCP message from server!"); + saveOutOfOrderMessage(channel, message); } } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java index 9b096a7e..36d7de44 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java @@ -18,7 +18,6 @@ package dorkbox.network.connection.registration.remote; import java.io.IOException; import java.net.InetSocketAddress; -import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -89,26 +88,26 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient if (metaChannel == null) { metaChannel = registrationWrapper.createSessionClient(sessionId); + logger.debug("New UDP connection. Saving meta-channel id: {}", metaChannel.sessionId); } + else if (metaChannel.udpChannel == null) { + logger.debug("Using TCP connection meta-channel for UDP connection"); + } // in the event that we start with a TCP channel first, we still have to set the UDP channel metaChannel.udpChannel = channel; + + // have to add a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready. + // note: UDP channels are also unique (just like TCP channels) because of the SessionManager we added + prepChannelForOutOfOrderMessages(channel); } readClient(channel, registration, "UDP client", metaChannel); } else { - logger.error("Error registering UDP with remote server!"); - // this is what happens when the registration happens too quickly... - Object connection = context.pipeline() - .last(); - if (connection instanceof ConnectionImpl) { - ((ConnectionImpl) connection).channelRead(context, message); - } - else { - shutdown(channel, 0); - } + logger.trace("Out of order UDP message from server!"); + saveOutOfOrderMessage(channel, message); } } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java index 919f9e44..b002e97a 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java @@ -116,7 +116,6 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { outboundRegister.eccParameters = CryptoECC.generateSharedParameters(registrationWrapper.getSecureRandom()); channel.writeAndFlush(outboundRegister); - metaChannel.updateRoundTripOnWrite(); return; } @@ -173,7 +172,6 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { outboundRegister.payload = output.toBytes(); channel.writeAndFlush(outboundRegister); - metaChannel.updateRoundTripOnWrite(); return; } @@ -202,7 +200,6 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { } channel.flush(); - metaChannel.updateRoundTripOnWrite(); return; } @@ -214,20 +211,29 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { // // - // make sure we don't try to upgrade the client again. - registration.upgrade = false; - // have to get the delay before we update the round-trip time. We cannot have a delay that is BIGGER than our idle timeout! - final int idleTimeout = Math.max(2, this.registrationWrapper.getIdleTimeout()-2); // 2 because it is a reasonable amount for the "standard" delay amount - final long delay = Math.max(idleTimeout, TimeUnit.NANOSECONDS.toMillis(metaChannel.getRoundTripTime())); - logger.trace("Notify delay MS: {}", delay); + // remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline + cleanupPipeline(metaChannel, new Runnable() { + @Override + public + void run() { + // this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated + // this tells the client we are ready to connect (we just bounce back the original message over ALL protocols) + if (metaChannel.tcpChannel != null) { + logger.trace("Sending TCP upgraded command"); + Registration reg = new Registration(registration.sessionID); + reg.upgraded = true; + metaChannel.tcpChannel.writeAndFlush(reg); + } - // remove the ConnectionWrapper (that was used to upgrade the connection) - // wait for a "round trip" amount of time, then notify the APP! - cleanupPipeline(metaChannel, delay); - - // this tells the client we are ready to connect (we just bounce back the original message) - channel.writeAndFlush(registration); + if (metaChannel.udpChannel != null) { + logger.trace("Sending UDP upgraded command"); + Registration reg = new Registration(registration.sessionID); + reg.upgraded = true; + metaChannel.udpChannel.writeAndFlush(reg); + } + } + }); } }