Added better out-of-order message handling during connection handshake

This commit is contained in:
nathan 2018-04-04 15:01:08 +02:00
parent b9c6a1422d
commit ccbe893efa
5 changed files with 205 additions and 127 deletions

View File

@ -16,6 +16,8 @@
package dorkbox.network.connection.registration.remote;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.bouncycastle.jce.ECNamedCurveTable;
@ -43,12 +45,15 @@ import io.netty.channel.EventLoopGroup;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
public abstract
class RegistrationRemoteHandler extends RegistrationHandler {
static final AttributeKey<LinkedList> MESSAGES = AttributeKey.valueOf(RegistrationRemoteHandler.class, "messages");
static final String DELETE_IP = "eleteIP"; // purposefully missing the "D", since that is a system parameter, which starts with "-D"
static final ECParameterSpec eccSpec = ECNamedCurveTable.getParameterSpec(CryptoECC.curve25519);
@ -223,25 +228,13 @@ class RegistrationRemoteHandler extends RegistrationHandler {
try {
if (metaChannel.tcpChannel == channel) {
// add the new handlers (FORCE encryption and longer IDLE handler)
pipeline.replace(FRAME_AND_KRYO_DECODER,
FRAME_AND_KRYO_CRYPTO_DECODER,
new KryoDecoderCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation.
}
if (metaChannel.udpChannel == channel) {
if (metaChannel.tcpChannel == null) {
// TODO: UDP (and TCP??) idle timeout (also, to close UDP session when TCP shuts down)
// this means that we are ONLY UDP, and we should have an idle timeout that CLOSES the session after a while.
// Naturally, one would want the "normal" idle to trigger first, but there always be a heartbeat if the idle trigger DOES NOT
// send data on the network, to make sure that the UDP-only session stays alive or disconnects.
// If the server disconnects, the client has to be made aware of this when it tries to send data again (it must go through
// it's entire reconnect protocol)
}
// these encoders are shared
pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, this.registrationWrapper.kryoUdpDecoderCrypto);
pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, this.registrationWrapper.kryoUdpDecoderCrypto); // shared encoder
}
} catch (Exception e) {
logger.error("Error during connection pipeline upgrade", e);
@ -257,15 +250,13 @@ class RegistrationRemoteHandler extends RegistrationHandler {
try {
if (metaChannel.tcpChannel == channel) {
// add the new handlers (FORCE encryption and longer IDLE handler)
pipeline.replace(FRAME_AND_KRYO_ENCODER,
FRAME_AND_KRYO_CRYPTO_ENCODER,
registrationWrapper.kryoTcpEncoderCrypto); // this is shared
}
if (metaChannel.udpChannel == channel) {
// these encoders are shared
pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, registrationWrapper.kryoUdpEncoderCrypto);
pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, registrationWrapper.kryoUdpEncoderCrypto); // shared encoder
}
} catch (Exception e) {
logger.error("Error during connection pipeline upgrade", e);
@ -277,6 +268,7 @@ class RegistrationRemoteHandler extends RegistrationHandler {
*/
final
void upgradePipeline(final MetaChannel metaChannel, final InetSocketAddress remoteAddress) {
logger.trace("Upgrading pipeline");
try {
if (metaChannel.udpChannel != null) {
if (metaChannel.tcpChannel == null) {
@ -309,36 +301,19 @@ class RegistrationRemoteHandler extends RegistrationHandler {
if (this.logger.isInfoEnabled()) {
String type = "";
if (metaChannel.tcpChannel != null) {
type = "TCP";
if (metaChannel.udpChannel != null) {
type += "/";
}
}
if (metaChannel.udpChannel != null) {
type += "UDP";
}
StringBuilder stringBuilder = new StringBuilder(96);
stringBuilder.append("Encrypted ");
if (metaChannel.tcpChannel != null) {
stringBuilder.append(type)
.append(" connection [");
stringBuilder.append("Encrypted TCP connection [");
EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.localAddress());
stringBuilder.append(getConnectionDirection());
EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.remoteAddress());
stringBuilder.append("]");
}
else if (metaChannel.udpChannel != null) {
stringBuilder.append(type)
.append(" connection [");
if (metaChannel.udpChannel != null) {
stringBuilder.append("Encrypted UDP connection [");
EndPoint.getHostDetails(stringBuilder, metaChannel.udpChannel.localAddress());
stringBuilder.append(getConnectionDirection());
@ -354,19 +329,54 @@ class RegistrationRemoteHandler extends RegistrationHandler {
}
final
void cleanupPipeline(final MetaChannel metaChannel, final long delay) {
void cleanupPipeline(final MetaChannel metaChannel, final Runnable onConnectFinishRunnable) {
final int idleTimeout = this.registrationWrapper.getIdleTimeout();
try {
// REMOVE our channel wrapper (only used for encryption) with the actual connection
metaChannel.connection = ((ConnectionWrapper) metaChannel.connection).connection;
ChannelHandler handler = metaChannel.connection = ((ConnectionWrapper) metaChannel.connection).connection;
Channel channel;
if (metaChannel.tcpChannel != null) {
channel = metaChannel.tcpChannel;
} else {
channel = metaChannel.udpChannel;
}
// channel should NEVER == null! (we will always have TCP or UDP!)
// we also ONLY want to add this to a single cleanup, NOT BOTH, because this must only run once!!
final ChannelPromise channelPromise = channel.newPromise();
channelPromise.addListener(new FutureListener<Void>() {
@Override
public
void operationComplete(final Future<Void> future) throws Exception {
EventLoop loop = channelPromise.channel()
.eventLoop();
loop.execute(new Runnable() {
@Override
public
void run() {
logger.trace("Notify Connection");
// safe cast, because it's always this way...
registrationWrapper.connectionConnected0((ConnectionImpl) metaChannel.connection);
// run things AFTER the onConnect() method is called...
// CLIENT - runs the deferred 'onMessage' events in the connection as needed
// SERVER - send 'onConnect' bounce back message to client
onConnectFinishRunnable.run();
}
});
}
});
if (metaChannel.tcpChannel != null) {
cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.tcpChannel, true);
cleanupPipeline0(idleTimeout, handler, metaChannel.tcpChannel, channelPromise, true);
}
if (metaChannel.udpChannel != null) {
cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.udpChannel, false);
cleanupPipeline0(idleTimeout, handler, metaChannel.udpChannel, channelPromise, false);
}
} catch (Exception e) {
logger.error("Error during pipeline replace", e);
@ -374,14 +384,14 @@ class RegistrationRemoteHandler extends RegistrationHandler {
}
private
void cleanupPipeline0(final long delay,
final int idleTimeout,
final MetaChannel metaChannel,
void cleanupPipeline0(final int idleTimeout,
final ChannelHandler connection,
final Channel channel,
final ChannelPromise channelPromise,
final boolean isTcp) {
final ChannelPipeline pipeline = channel.pipeline();
// have to explicitly remove handlers based on the input type. Cannot use this.getClass()....
boolean isClient = registrationWrapper.isClient();
if (isClient) {
if (isTcp) {
@ -412,54 +422,22 @@ class RegistrationRemoteHandler extends RegistrationHandler {
pipeline.addLast(CONNECTION_HANDLER, connection);
// we also DEREGISTER from the HANDSHAKE event-loop and run on the worker event-loop!
// if (isClient) {
ChannelFuture future = channel.deregister();
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public
void operationComplete(final Future<? super Void> f) throws Exception {
if (f.isSuccess()) {
// final EventLoop next = workerEventLoop.next();
final ChannelPromise channelPromise = channel.newPromise();
channelPromise.addListener(new FutureListener<Void>() {
@Override
public
void operationComplete(final Future<Void> future) throws Exception {
EventLoop loop = channel.eventLoop();
loop.schedule(new Runnable() {
@Override
public
void run() {
logger.trace("Notify Connection");
doConnect(metaChannel);
}
}, delay, TimeUnit.MILLISECONDS);
}
});
// TODO: TCP and UDP have to register on DIFFERENT event loops
ChannelFuture future = channel.deregister();
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public
void operationComplete(final Future<? super Void> f) throws Exception {
if (f.isSuccess()) {
// TCP and UDP register on DIFFERENT event loops. The channel promise ONLY runs on 1 of them...
if (channelPromise.channel() == channel) {
workerEventLoop.register(channelPromise);
}
else {
workerEventLoop.register(channel);
}
}
});
// }
// else {
// channel.eventLoop().schedule(new Runnable() {
// @Override
// public
// void run() {
// logger.trace("Notify Connection");
// doConnect(metaChannel);
// }
// }, delay, TimeUnit.MILLISECONDS);
// }
}
final
void doConnect(final MetaChannel metaChannel) {
// safe cast, because it's always this way...
this.registrationWrapper.connectionConnected0((ConnectionImpl) metaChannel.connection);
}
});
}
// whoa! Didn't send valid public key info!
@ -493,4 +471,42 @@ class RegistrationRemoteHandler extends RegistrationHandler {
return false;
}
/**
* have to have a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready.
*/
void prepChannelForOutOfOrderMessages(final Channel channel) {
// this could POSSIBLY be screwed up when getting created, so we make sure to only create the list ONE time
channel.attr(MESSAGES)
.setIfAbsent(new LinkedList<Object>());
}
/**
* have to have a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready.
*/
void saveOutOfOrderMessage(final Channel channel, final Object message) {
// this will ALWAYS have already been created, or IF NULL -- then something really screwed up!
LinkedList list = channel.attr(MESSAGES)
.get();
if (list == null) {
logger.error("Completely screwed up message order from server!");
shutdown(channel, 0);
return;
}
//noinspection unchecked
list.add(message);
}
/**
* Gets all of the messages for this channel that were "out of order" (onMessage called before onConnect finished).
*/
List<Object> getOutOfOrderMessagesAndReset(final Channel channel) {
LinkedList messages = channel.attr(MESSAGES)
.getAndSet(null);
return messages;
}
}

View File

@ -18,6 +18,8 @@ package dorkbox.network.connection.registration.remote;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.bouncycastle.crypto.BasicAgreement;
import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement;
@ -28,6 +30,7 @@ import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
@ -190,6 +193,8 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
// IN: hasMore=true if we have more registrations to do, false otherwise
if (registration.hasMore) {
logger.trace("Starting another protocol registration");
metaChannel.totalProtocols.incrementAndGet();
registrationWrapper.startNextProtocolRegistration();
return;
}
@ -213,7 +218,61 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
}
// remove the ConnectionWrapper (that was used to upgrade the connection)
cleanupPipeline(metaChannel, 20);
// IN: upgraded=true this means we are ready to connect, and the server is done with it's onConnect calls
// we defer the messages until after our own onConnect() is called...
// we have to wait for ALL messages to be received, this way we can prevent out-of-order oddities...
int protocolsRemaining = metaChannel.totalProtocols.decrementAndGet();
if (protocolsRemaining > 0) {
logger.trace("{} done. Waiting for {} more protocols registrations to arrive...", type, protocolsRemaining);
return;
}
// remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline
// always wait until AFTER the server calls "onConnect", then we do this
cleanupPipeline(metaChannel, new Runnable() {
@Override
public
void run() {
// this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated
// get all of the out of order messages that we missed
List<Object> messages = new LinkedList<Object>();
if (metaChannel.tcpChannel != null) {
List<Object> list = getOutOfOrderMessagesAndReset(metaChannel.tcpChannel);
if (list != null) {
logger.trace("Getting deferred TCP messages: {}", list.size());
messages.addAll(list);
}
}
if (metaChannel.udpChannel != null) {
List<Object> list = getOutOfOrderMessagesAndReset(metaChannel.udpChannel);
if (list != null) {
logger.trace("Getting deferred UDP messages: {}", list.size());
messages.addAll(list);
}
}
// now call 'onMessage' in the connection object with our messages!
try {
ConnectionImpl connection = (ConnectionImpl) metaChannel.connection;
for (Object message : messages) {
logger.trace(" deferred onMessage({}, {})", connection.id(), message);
try {
connection.channelRead(null, message);
} catch (Exception e) {
logger.error("Error running deferred messages!", e);
}
}
} catch (Exception e) {
logger.error("Error initialising deferred messages!", e);
}
}
});
}
}

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.connection.registration.remote;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
@ -69,25 +68,24 @@ class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandlerClient
else {
metaChannel = registrationWrapper.getSession(sessionId);
// TCP channel registration is ALWAYS first, so this is the correct way to do this.
if (metaChannel == null) {
metaChannel = registrationWrapper.createSessionClient(sessionId);
metaChannel.tcpChannel = channel;
logger.debug("New TCP connection. Saving meta-channel id: {}", metaChannel.sessionId);
}
// have to add a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready.
prepChannelForOutOfOrderMessages(channel);
}
logger.trace("TCP read");
readClient(channel, registration, "TCP client", metaChannel);
}
else {
logger.error("Error registering TCP with remote server!");
// this is what happens when the registration happens too quickly...
Object connection = context.pipeline().last();
if (connection instanceof ConnectionImpl) {
((ConnectionImpl) connection).channelRead(context, message);
} else {
shutdown(channel, 0);
}
logger.trace("Out of order TCP message from server!");
saveOutOfOrderMessage(channel, message);
}
}
}

View File

@ -18,7 +18,6 @@ package dorkbox.network.connection.registration.remote;
import java.io.IOException;
import java.net.InetSocketAddress;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
@ -89,26 +88,26 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient
if (metaChannel == null) {
metaChannel = registrationWrapper.createSessionClient(sessionId);
logger.debug("New UDP connection. Saving meta-channel id: {}", metaChannel.sessionId);
}
else if (metaChannel.udpChannel == null) {
logger.debug("Using TCP connection meta-channel for UDP connection");
}
// in the event that we start with a TCP channel first, we still have to set the UDP channel
metaChannel.udpChannel = channel;
// have to add a way for us to store messages in case the remote end calls "onConnect()" and sends messages before we are ready.
// note: UDP channels are also unique (just like TCP channels) because of the SessionManager we added
prepChannelForOutOfOrderMessages(channel);
}
readClient(channel, registration, "UDP client", metaChannel);
}
else {
logger.error("Error registering UDP with remote server!");
// this is what happens when the registration happens too quickly...
Object connection = context.pipeline()
.last();
if (connection instanceof ConnectionImpl) {
((ConnectionImpl) connection).channelRead(context, message);
}
else {
shutdown(channel, 0);
}
logger.trace("Out of order UDP message from server!");
saveOutOfOrderMessage(channel, message);
}
}
}

View File

@ -116,7 +116,6 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
outboundRegister.eccParameters = CryptoECC.generateSharedParameters(registrationWrapper.getSecureRandom());
channel.writeAndFlush(outboundRegister);
metaChannel.updateRoundTripOnWrite();
return;
}
@ -173,7 +172,6 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
outboundRegister.payload = output.toBytes();
channel.writeAndFlush(outboundRegister);
metaChannel.updateRoundTripOnWrite();
return;
}
@ -202,7 +200,6 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
}
channel.flush();
metaChannel.updateRoundTripOnWrite();
return;
}
@ -214,20 +211,29 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
//
//
// make sure we don't try to upgrade the client again.
registration.upgrade = false;
// have to get the delay before we update the round-trip time. We cannot have a delay that is BIGGER than our idle timeout!
final int idleTimeout = Math.max(2, this.registrationWrapper.getIdleTimeout()-2); // 2 because it is a reasonable amount for the "standard" delay amount
final long delay = Math.max(idleTimeout, TimeUnit.NANOSECONDS.toMillis(metaChannel.getRoundTripTime()));
logger.trace("Notify delay MS: {}", delay);
// remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline
cleanupPipeline(metaChannel, new Runnable() {
@Override
public
void run() {
// this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated
// this tells the client we are ready to connect (we just bounce back the original message over ALL protocols)
if (metaChannel.tcpChannel != null) {
logger.trace("Sending TCP upgraded command");
Registration reg = new Registration(registration.sessionID);
reg.upgraded = true;
metaChannel.tcpChannel.writeAndFlush(reg);
}
// remove the ConnectionWrapper (that was used to upgrade the connection)
// wait for a "round trip" amount of time, then notify the APP!
cleanupPipeline(metaChannel, delay);
// this tells the client we are ready to connect (we just bounce back the original message)
channel.writeAndFlush(registration);
if (metaChannel.udpChannel != null) {
logger.trace("Sending UDP upgraded command");
Registration reg = new Registration(registration.sessionID);
reg.upgraded = true;
metaChannel.udpChannel.writeAndFlush(reg);
}
}
});
}
}