Added support for optional encryption. Cleaned up connection registration process.

This commit is contained in:
nathan 2019-06-13 22:08:50 +02:00
parent 6ca4da2e23
commit 89d70f9255
11 changed files with 387 additions and 393 deletions

View File

@ -15,9 +15,12 @@
*/
package dorkbox.network.connection;
import java.net.InetSocketAddress;
import dorkbox.network.Configuration;
import dorkbox.network.Server;
import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.connection.registration.UpgradeType;
import dorkbox.util.exceptions.SecurityException;
/**
@ -104,4 +107,9 @@ class EndPointServer extends EndPoint {
void remove(Connection connection) {
connectionManager.removeConnection(connection);
}
byte getConnectionUpgradeType(final InetSocketAddress remoteAddress) {
// TODO, crypto/compression based on ip/range of remote address
return UpgradeType.ENCRYPT;
}
}

View File

@ -26,12 +26,19 @@ import org.slf4j.Logger;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.connection.registration.UpgradeType;
import dorkbox.network.pipeline.tcp.KryoEncoderTcp;
import dorkbox.network.pipeline.tcp.KryoEncoderTcpCompression;
import dorkbox.network.pipeline.tcp.KryoEncoderTcpCrypto;
import dorkbox.network.pipeline.tcp.KryoEncoderTcpNone;
import dorkbox.network.pipeline.udp.KryoDecoderUdp;
import dorkbox.network.pipeline.udp.KryoDecoderUdpCompression;
import dorkbox.network.pipeline.udp.KryoDecoderUdpCrypto;
import dorkbox.network.pipeline.udp.KryoDecoderUdpNone;
import dorkbox.network.pipeline.udp.KryoEncoderUdp;
import dorkbox.network.pipeline.udp.KryoEncoderUdpCompression;
import dorkbox.network.pipeline.udp.KryoEncoderUdpCrypto;
import dorkbox.network.pipeline.udp.KryoEncoderUdpNone;
import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.RandomUtil;
@ -40,6 +47,7 @@ import dorkbox.util.collections.LockFreeIntMap;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.exceptions.SecurityException;
import io.netty.channel.Channel;
import io.netty.util.NetUtil;
/**
* Just wraps common/needed methods of the client/server endpoint by the registration stage/handshake.
@ -55,11 +63,18 @@ class RegistrationWrapper {
private final org.slf4j.Logger logger;
public final KryoEncoderTcp kryoTcpEncoder;
public final KryoEncoderTcpNone kryoTcpEncoderNone;
public final KryoEncoderTcpCompression kryoTcpEncoderCompression;
public final KryoEncoderTcpCrypto kryoTcpEncoderCrypto;
public final KryoEncoderUdp kryoUdpEncoder;
public final KryoEncoderUdpNone kryoUdpEncoderNone;
public final KryoEncoderUdpCompression kryoUdpEncoderCompression;
public final KryoEncoderUdpCrypto kryoUdpEncoderCrypto;
public final KryoDecoderUdp kryoUdpDecoder;
public final KryoDecoderUdpNone kryoUdpDecoderNone;
public final KryoDecoderUdpCompression kryoUdpDecoderCompression;
public final KryoDecoderUdpCrypto kryoUdpDecoderCrypto;
private final EndPoint endPoint;
@ -74,13 +89,20 @@ class RegistrationWrapper {
this.logger = logger;
this.kryoTcpEncoder = new KryoEncoderTcp(endPoint.serializationManager);
this.kryoTcpEncoderNone = new KryoEncoderTcpNone(endPoint.serializationManager);
this.kryoTcpEncoderCompression = new KryoEncoderTcpCompression(endPoint.serializationManager);
this.kryoTcpEncoderCrypto = new KryoEncoderTcpCrypto(endPoint.serializationManager);
this.kryoUdpEncoder = new KryoEncoderUdp(endPoint.serializationManager);
this.kryoUdpEncoderNone = new KryoEncoderUdpNone(endPoint.serializationManager);
this.kryoUdpEncoderCompression = new KryoEncoderUdpCompression(endPoint.serializationManager);
this.kryoUdpEncoderCrypto = new KryoEncoderUdpCrypto(endPoint.serializationManager);
this.kryoUdpDecoder = new KryoDecoderUdp(endPoint.serializationManager);
this.kryoUdpDecoderNone = new KryoDecoderUdpNone(endPoint.serializationManager);
this.kryoUdpDecoderCompression = new KryoDecoderUdpCompression(endPoint.serializationManager);
this.kryoUdpDecoderCrypto = new KryoDecoderUdpCrypto(endPoint.serializationManager);
}
@ -144,6 +166,27 @@ class RegistrationWrapper {
return this.endPoint.publicKey;
}
/**
* Only called by the server!
*
* If we are loopback or the client is a specific IP/CIDR address, then we do things differently. The LOOPBACK address will never encrypt or compress the traffic.
*/
public
byte getConnectionUpgradeType(final InetSocketAddress remoteAddress) {
if (isClient()) {
throw new IllegalArgumentException("This should never be called by the client!");
}
if (remoteAddress.getAddress().equals(NetUtil.LOCALHOST)) {
return UpgradeType.COMPRESS;
}
return ((EndPointServer) this.endPoint).getConnectionUpgradeType(remoteAddress);
}
/**
* If the key does not match AND we have disabled remote key validation, then metachannel.changedRemoteKey = true. OTHERWISE, key validation is REQUIRED!
*
@ -353,7 +396,7 @@ class RegistrationWrapper {
fragmentedRegistration.payload = fragment;
// tell the server we are fragmented
fragmentedRegistration.upgrade = true;
fragmentedRegistration.upgradeType = UpgradeType.FRAGMENTED;
// tell the server we are upgraded (it will bounce back telling us to connect)
fragmentedRegistration.upgraded = true;
@ -365,7 +408,7 @@ class RegistrationWrapper {
fragmentedRegistration.payload = fragments[allButLast];
// tell the server we are fragmented
fragmentedRegistration.upgrade = true;
fragmentedRegistration.upgradeType = UpgradeType.FRAGMENTED;
// tell the server we are upgraded (it will bounce back telling us to connect)
fragmentedRegistration.upgraded = true;
@ -383,7 +426,7 @@ class RegistrationWrapper {
public
STATE verifyClassRegistration(final MetaChannel metaChannel, final Registration registration) {
if (registration.upgrade) {
if (registration.upgradeType == UpgradeType.FRAGMENTED) {
byte[] fragment = registration.payload;
// this means that the registrations are FRAGMENTED!

View File

@ -1,175 +0,0 @@
/*
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.registration;
import javax.crypto.SecretKey;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPoint;
import dorkbox.network.connection.Connection_;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.Listeners;
import dorkbox.network.connection.bridge.ConnectionBridge;
import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.connection.idle.IdleSender;
import dorkbox.network.rmi.ConnectionNoOpSupport;
import dorkbox.network.rmi.RemoteObjectCallback;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
/**
* A wrapper for the period of time between registration and connect for a "connection" session.
*
* This is to prevent race conditions where onMessage() can happen BEFORE a "connection" is "connected"
*/
public
class ConnectionRegistrationImpl implements Connection_, ChannelHandler {
public final ConnectionImpl connection;
public
ConnectionRegistrationImpl(final ConnectionImpl connection) {
this.connection = connection;
}
@Override
public
void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
}
@Override
public
void handlerRemoved(final ChannelHandlerContext ctx) throws Exception {
}
@Override
public
void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
}
@Override
public
long nextGcmSequence() {
return connection.nextGcmSequence();
}
@Override
public
SecretKey cryptoKey() {
return connection.cryptoKey();
}
@Override
public
boolean hasRemoteKeyChanged() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
String getRemoteHost() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
boolean isLoopback() {
return connection.isLoopback();
}
@Override
public
EndPoint getEndPoint() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
int id() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
String idAsHex() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
boolean hasUDP() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
ConnectionBridge send() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
ConnectionPoint send(final Object message) {
throw new IllegalArgumentException("not implemented");
}
@Override
public
IdleBridge sendOnIdle(final IdleSender<?, ?> sender) {
throw new IllegalArgumentException("not implemented");
}
@Override
public
IdleBridge sendOnIdle(final Object message) {
throw new IllegalArgumentException("not implemented");
}
@Override
public
Listeners listeners() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
void close() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
void closeAsap() {
throw new IllegalArgumentException("not implemented");
}
@Override
public
<Iface> void createRemoteObject(final Class<Iface> interfaceClass, final RemoteObjectCallback<Iface> callback) {
throw new IllegalArgumentException("not implemented");
}
@Override
public
<Iface> void getRemoteObject(final int objectId, final RemoteObjectCallback<Iface> callback) {
throw new IllegalArgumentException("not implemented");
}
@Override
public
ConnectionNoOpSupport rmiSupport() {
return null;
}
}

View File

@ -15,6 +15,7 @@
*/
package dorkbox.network.connection.registration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
@ -22,8 +23,8 @@ import javax.crypto.SecretKey;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import dorkbox.network.connection.ConnectionImpl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
public
class MetaChannel {
@ -38,10 +39,13 @@ class MetaChannel {
// keep track of how many protocols to register, so that way when we are ready to connect the SERVER sends a message to the client over
// all registered protocols and the last protocol to receive the message does the registration
// we ALWAYS start off with at least 1 protocol
public AtomicInteger totalProtocols = new AtomicInteger(1);
public AtomicInteger totalProtocols = new AtomicInteger(0);
public volatile ChannelHandler connection; // only needed until the connection has been notified.
// only permits the FIST protocol for running the pipeline upgrade.
public AtomicBoolean canUpgradePipeline = new AtomicBoolean(true);
public volatile ConnectionImpl connection; // only needed until the connection has been notified.
public volatile ECPublicKeyParameters publicKey; // used for ECC crypto + handshake on NETWORK (remote) connections. This is the remote public key.
public volatile AsymmetricCipherKeyPair ecdhKey; // used for ECC Diffie-Hellman-Merkle key exchanges: see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange

View File

@ -25,7 +25,7 @@ public
class Registration {
// used to keep track and associate TCP/UDP/etc sessions. This is always defined by the server
// a sessionId if '0', means we are still figuring it out.
public int sessionID;
public int sessionID = 0;
public ECPublicKeyParameters publicKey;
public IESParameters eccParameters;
@ -33,14 +33,14 @@ class Registration {
public byte[] payload;
// true if we have more registrations to process, false if we are done
public boolean hasMore;
public boolean hasMore = false;
// true when we are ready to setup the connection (hasMore will always be false if this is true). False when we are ready to connect
// > 0 when we are ready to setup the connection (hasMore will always be false if this is >0). 0 when we are ready to connect
// ALSO used if there are fragmented frames for registration data (since we have to split it up to fit inside a single UDP packet without fragmentation)
public boolean upgrade;
public byte upgradeType = (byte) 0;
// true when we are fully upgraded
public boolean upgraded;
public boolean upgraded = false;
private
Registration() {

View File

@ -0,0 +1,11 @@
package dorkbox.network.connection.registration;
public
class UpgradeType {
// The check is > 0, so these MUST be all > 0
public static final byte NONE = (byte) 1;
public static final byte ENCRYPT = (byte) 2;
public static final byte COMPRESS = (byte) 3;
public static final byte FRAGMENTED = (byte) 4;
}

View File

@ -26,21 +26,21 @@ import org.bouncycastle.jce.spec.ECParameterSpec;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.ConnectionRegistrationImpl;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.connection.registration.RegistrationHandler;
import dorkbox.network.connection.registration.UpgradeType;
import dorkbox.network.pipeline.tcp.KryoDecoderTcp;
import dorkbox.network.pipeline.tcp.KryoDecoderTcpCompression;
import dorkbox.network.pipeline.tcp.KryoDecoderTcpCrypto;
import dorkbox.network.pipeline.tcp.KryoDecoderTcpNone;
import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.util.crypto.CryptoECC;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
@ -57,20 +57,34 @@ class RegistrationRemoteHandler extends RegistrationHandler {
static final String DELETE_IP = "eleteIP"; // purposefully missing the "D", since that is a system parameter, which starts with "-D"
static final ECParameterSpec eccSpec = ECNamedCurveTable.getParameterSpec(CryptoECC.curve25519);
static final String KRYO_ENCODER = "kryoEncoder";
static final String KRYO_DECODER = "kryoDecoder";
private static final String UDP_DECODE = "ud1";
private static final String UDP_DECODE_NONE = "ud2";
private static final String UDP_DECODE_COMPRESS = "ud3";
private static final String UDP_DECODE_CRYPTO = "ud4";
private static final String TCP_DECODE = "td1";
private static final String TCP_DECODE_NONE = "td2";
private static final String TCP_DECODE_COMPRESS = "td3";
private static final String TCP_DECODE_CRYPTO = "td4";
private static final String IDLE_HANDLER = "idle";
private static final String IDLE_HANDLER_FULL = "idleFull";
private static final String UDP_ENCODE = "ue";
private static final String UDP_ENCODE_NONE = "ue2";
private static final String UDP_ENCODE_COMPRESS = "ue3";
private static final String UDP_ENCODE_CRYPTO = "ue4";
private static final String TCP_ENCODE = "te1";
private static final String TCP_ENCODE_NONE = "te2";
private static final String TCP_ENCODE_COMPRESS = "te3";
private static final String TCP_ENCODE_CRYPTO = "te4";
private static final String IDLE_HANDLER_FULL = "idleHandlerFull";
private static final String FRAME_AND_KRYO_ENCODER = "frameAndKryoEncoder";
private static final String FRAME_AND_KRYO_DECODER = "frameAndKryoDecoder";
private static final String FRAME_AND_KRYO_CRYPTO_ENCODER = "frameAndKryoCryptoEncoder";
private static final String FRAME_AND_KRYO_CRYPTO_DECODER = "frameAndKryoCryptoDecoder";
private static final String KRYO_CRYPTO_ENCODER = "kryoCryptoEncoder";
private static final String KRYO_CRYPTO_DECODER = "kryoCryptoDecoder";
private static final String IDLE_HANDLER = "idleHandler";
protected final NetworkSerializationManager serializationManager;
@ -98,12 +112,11 @@ class RegistrationRemoteHandler extends RegistrationHandler {
///////////////////////
// DECODE (or upstream)
///////////////////////
pipeline.addFirst(FRAME_AND_KRYO_DECODER,
new KryoDecoderTcp(this.serializationManager)); // cannot be shared because of possible fragmentation.
pipeline.addFirst(TCP_DECODE, new KryoDecoderTcp(this.serializationManager)); // cannot be shared because of possible fragmentation.
}
else if (isUdpChannel) {
// can be shared because there cannot be fragmentation for our UDP packets. If there is, we throw an error and continue...
pipeline.addFirst(KRYO_DECODER, this.registrationWrapper.kryoUdpDecoder);
pipeline.addFirst(UDP_DECODE, this.registrationWrapper.kryoUdpDecoder);
}
// this makes the proper event get raised in the registrationHandler to kill NEW idle connections. Once "connected" they last a lot longer.
@ -115,10 +128,10 @@ class RegistrationRemoteHandler extends RegistrationHandler {
/////////////////////////
// ENCODE (or downstream)
/////////////////////////
pipeline.addFirst(FRAME_AND_KRYO_ENCODER, this.registrationWrapper.kryoTcpEncoder); // this is shared
pipeline.addFirst(TCP_ENCODE, this.registrationWrapper.kryoTcpEncoder); // this is shared
}
else if (isUdpChannel) {
pipeline.addFirst(KRYO_ENCODER, this.registrationWrapper.kryoUdpEncoder);
pipeline.addFirst(UDP_ENCODE, this.registrationWrapper.kryoUdpEncoder);
}
}
@ -204,18 +217,45 @@ class RegistrationRemoteHandler extends RegistrationHandler {
* upgrades a channel ONE channel at a time
*/
final
void upgradeDecoders(final Channel channel, final MetaChannel metaChannel) {
void upgradeDecoders(final int upgradeType, final Channel channel, final MetaChannel metaChannel) {
ChannelPipeline pipeline = channel.pipeline();
try {
if (metaChannel.tcpChannel == channel) {
pipeline.replace(FRAME_AND_KRYO_DECODER,
FRAME_AND_KRYO_CRYPTO_DECODER,
new KryoDecoderTcpCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation.
}
// cannot be shared because of possible fragmentation.
switch (upgradeType) {
case (UpgradeType.NONE) :
pipeline.replace(TCP_DECODE, TCP_DECODE_NONE, new KryoDecoderTcpNone(this.serializationManager));
break;
if (metaChannel.udpChannel == channel) {
pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, this.registrationWrapper.kryoUdpDecoderCrypto); // shared encoder
case (UpgradeType.COMPRESS) :
pipeline.replace(TCP_DECODE, TCP_DECODE_COMPRESS, new KryoDecoderTcpCompression(this.serializationManager));
break;
case (UpgradeType.ENCRYPT) :
pipeline.replace(TCP_DECODE, TCP_DECODE_CRYPTO, new KryoDecoderTcpCrypto(this.serializationManager));
break;
default:
throw new IllegalArgumentException("Unable to upgrade TCP connection pipeline for type: " + upgradeType);
}
}
else if (metaChannel.udpChannel == channel) {
// shared decoder
switch (upgradeType) {
case (UpgradeType.NONE) :
pipeline.replace(UDP_DECODE, UDP_DECODE_NONE, this.registrationWrapper.kryoUdpDecoderNone);
break;
case (UpgradeType.COMPRESS) :
pipeline.replace(UDP_DECODE, UDP_DECODE_COMPRESS, this.registrationWrapper.kryoUdpDecoderCompression);
break;
case (UpgradeType.ENCRYPT) :
pipeline.replace(UDP_DECODE, UDP_DECODE_CRYPTO, this.registrationWrapper.kryoUdpDecoderCrypto);
break;
default:
throw new IllegalArgumentException("Unable to upgrade UDP connection pipeline for type: " + upgradeType);
}
}
} catch (Exception e) {
logger.error("Error during connection pipeline upgrade", e);
@ -226,104 +266,117 @@ class RegistrationRemoteHandler extends RegistrationHandler {
* upgrades a channel ONE channel at a time
*/
final
void upgradeEncoders(final Channel channel, final MetaChannel metaChannel, final InetSocketAddress remoteAddress) {
void upgradeEncoders(final int upgradeType, final Channel channel, final MetaChannel metaChannel) {
ChannelPipeline pipeline = channel.pipeline();
try {
if (metaChannel.tcpChannel == channel) {
// shared encoder
switch (upgradeType) {
case (UpgradeType.NONE) :
pipeline.replace(TCP_ENCODE, TCP_ENCODE_NONE, this.registrationWrapper.kryoTcpEncoderNone);
break;
case (UpgradeType.COMPRESS) :
pipeline.replace(TCP_ENCODE, TCP_ENCODE_COMPRESS, this.registrationWrapper.kryoTcpEncoderCompression);
break;
case (UpgradeType.ENCRYPT) :
pipeline.replace(TCP_ENCODE, TCP_ENCODE_CRYPTO, this.registrationWrapper.kryoTcpEncoderCrypto);
break;
default:
throw new IllegalArgumentException("Unable to upgrade TCP connection pipeline for type: " + upgradeType);
}
}
else if (metaChannel.udpChannel == channel) {
// shared encoder
switch (upgradeType) {
case (UpgradeType.NONE) :
pipeline.replace(UDP_ENCODE, UDP_ENCODE_NONE, this.registrationWrapper.kryoUdpEncoderNone);
break;
case (UpgradeType.COMPRESS) :
pipeline.replace(UDP_ENCODE, UDP_ENCODE_COMPRESS, this.registrationWrapper.kryoUdpEncoderCompression);
break;
case (UpgradeType.ENCRYPT) :
pipeline.replace(UDP_ENCODE, UDP_ENCODE_CRYPTO, this.registrationWrapper.kryoUdpEncoderCrypto);
break;
default:
throw new IllegalArgumentException("Unable to upgrade UDP connection pipeline for type: " + upgradeType);
}
}
}
final
void logChannelUpgrade(final int upgradeType, final Channel channel, final MetaChannel metaChannel) {
if (this.logger.isInfoEnabled()) {
final String channelType;
if (metaChannel.tcpChannel == channel) {
pipeline.replace(FRAME_AND_KRYO_ENCODER,
FRAME_AND_KRYO_CRYPTO_ENCODER,
registrationWrapper.kryoTcpEncoderCrypto); // this is shared
}
// cannot be shared because of possible fragmentation.
switch (upgradeType) {
case (UpgradeType.NONE) :
channelType = "TCP simple";
break;
if (metaChannel.udpChannel == channel) {
pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, registrationWrapper.kryoUdpEncoderCrypto); // shared encoder
}
} catch (Exception e) {
logger.error("Error during connection pipeline upgrade", e);
}
}
case (UpgradeType.COMPRESS) :
channelType = "TCP compression";
break;
/**
* upgrades a channel ONE channel at a time
*/
final
void upgradePipeline(final MetaChannel metaChannel, final InetSocketAddress remoteAddress) {
logger.trace("Upgrading pipeline");
try {
if (metaChannel.udpChannel != null) {
if (metaChannel.tcpChannel == null) {
// TODO: UDP (and TCP??) idle timeout (also, to close UDP session when TCP shuts down)
// this means that we are ONLY UDP, and we should have an idle timeout that CLOSES the session after a while.
// Naturally, one would want the "normal" idle to trigger first, but there always be a heartbeat if the idle trigger DOES NOT
// send data on the network, to make sure that the UDP-only session stays alive or disconnects.
// If the server disconnects, the client has to be made aware of this when it tries to send data again (it must go through
// it's entire reconnect protocol)
case (UpgradeType.ENCRYPT) :
channelType = "TCP encrypted";
break;
default:
this.logger.error("Unable to upgrade TCP connection pipeline for type: " + upgradeType);
return;
}
}
else if (metaChannel.udpChannel == channel) {
// shared decoder
switch (upgradeType) {
case (UpgradeType.NONE) :
channelType = "UDP simple";
break;
// add the "connected"/"normal" handler now that we have established a "new" connection.
// This will have state, etc. for this connection. THIS MUST BE 100% TCP/UDP created, otherwise it will break connections!
ConnectionImpl connection = this.registrationWrapper.connection0(metaChannel, remoteAddress);
metaChannel.connection = new ConnectionRegistrationImpl(connection);
case (UpgradeType.COMPRESS) :
channelType = "UDP compression";
break;
// Now setup our meta-channel to migrate to the correct connection handler for all regular data.
if (metaChannel.tcpChannel != null) {
final ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline();
pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection);
}
if (metaChannel.udpChannel != null) {
final ChannelPipeline pipeline = metaChannel.udpChannel.pipeline();
pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection);
}
if (this.logger.isInfoEnabled()) {
StringBuilder stringBuilder = new StringBuilder(96);
if (metaChannel.tcpChannel != null) {
stringBuilder.append("Encrypted TCP connection [");
EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.localAddress());
stringBuilder.append(getConnectionDirection());
EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.remoteAddress());
stringBuilder.append("]");
case (UpgradeType.ENCRYPT) :
channelType = "UDP encrypted";
break;
default:
this.logger.error("Unable to upgrade UDP connection pipeline for type: " + upgradeType);
return;
}
if (metaChannel.udpChannel != null) {
stringBuilder.append("Encrypted UDP connection [");
EndPoint.getHostDetails(stringBuilder, metaChannel.udpChannel.localAddress());
stringBuilder.append(getConnectionDirection());
EndPoint.getHostDetails(stringBuilder, metaChannel.udpChannel.remoteAddress());
stringBuilder.append("]");
}
this.logger.info(stringBuilder.toString());
}
} catch (Exception e) {
logger.error("Error during connection pipeline upgrade", e);
else {
this.logger.error("Unable to upgrade unknown connection pipeline for type: " + upgradeType);
return;
}
StringBuilder stringBuilder = new StringBuilder(96);
stringBuilder.append(channelType);
stringBuilder.append(" connection [");
EndPoint.getHostDetails(stringBuilder, channel.localAddress());
stringBuilder.append(getConnectionDirection());
EndPoint.getHostDetails(stringBuilder, channel.remoteAddress());
stringBuilder.append("]");
this.logger.info(stringBuilder.toString());
}
}
final
void cleanupPipeline(final MetaChannel metaChannel, final Runnable onConnectFinishRunnable) {
void cleanupPipeline(final Channel channel, final MetaChannel metaChannel, final Runnable preConnectRunnable, final Runnable postConnectRunnable) {
final int idleTimeout = this.registrationWrapper.getIdleTimeout();
try {
// REMOVE our channel wrapper (only used for encryption) with the actual connection
ChannelHandler handler = metaChannel.connection = ((ConnectionRegistrationImpl) metaChannel.connection).connection;
Channel channel;
if (metaChannel.tcpChannel != null) {
channel = metaChannel.tcpChannel;
} else {
channel = metaChannel.udpChannel;
}
// channel should NEVER == null! (we will always have TCP or UDP!)
// we also ONLY want to add this to a single cleanup, NOT BOTH, because this must only run once!!
final ChannelPromise channelPromise = channel.newPromise();
@ -331,33 +384,29 @@ class RegistrationRemoteHandler extends RegistrationHandler {
@Override
public
void operationComplete(final Future<Void> future) throws Exception {
EventLoop loop = channelPromise.channel()
.eventLoop();
loop.execute(new Runnable() {
@Override
public
void run() {
logger.trace("Notify Connection");
// this runs on the channel's event loop
logger.trace("Connection connected");
// safe cast, because it's always this way...
registrationWrapper.connectionConnected0((ConnectionImpl) metaChannel.connection);
if (preConnectRunnable != null) {
preConnectRunnable.run();
}
// run things AFTER the onConnect() method is called...
// CLIENT - runs the deferred 'onMessage' events in the connection as needed
// SERVER - send 'onConnect' bounce back message to client
onConnectFinishRunnable.run();
}
});
// safe cast, because it's always this way...
registrationWrapper.connectionConnected0(metaChannel.connection);
if (postConnectRunnable != null) {
postConnectRunnable.run();
}
}
});
if (metaChannel.tcpChannel != null) {
cleanupPipeline0(idleTimeout, handler, metaChannel.tcpChannel, channelPromise, true);
cleanupPipeline0(idleTimeout, metaChannel.tcpChannel, channelPromise);
}
if (metaChannel.udpChannel != null) {
cleanupPipeline0(idleTimeout, handler, metaChannel.udpChannel, channelPromise, false);
cleanupPipeline0(idleTimeout, metaChannel.udpChannel, channelPromise);
}
} catch (Exception e) {
logger.error("Error during pipeline replace", e);
@ -365,34 +414,9 @@ class RegistrationRemoteHandler extends RegistrationHandler {
}
private
void cleanupPipeline0(final int idleTimeout,
final ChannelHandler connection,
final Channel channel,
final ChannelPromise channelPromise,
final boolean isTcp) {
void cleanupPipeline0(final int idleTimeout, final Channel channel, final ChannelPromise channelPromise) {
final ChannelPipeline pipeline = channel.pipeline();
// have to explicitly remove handlers based on the input type. Cannot use this.getClass()....
boolean isClient = registrationWrapper.isClient();
if (isClient) {
if (isTcp) {
pipeline.remove(RegistrationRemoteHandlerClientTCP.class);
}
else {
pipeline.remove(RegistrationRemoteHandlerClientUDP.class);
}
}
else {
if (isTcp) {
pipeline.remove(RegistrationRemoteHandlerServerTCP.class);
}
else {
pipeline.remove(RegistrationRemoteHandlerServerUDP.class);
}
}
pipeline.remove(ConnectionRegistrationImpl.class);
if (idleTimeout > 0) {
pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS));
}
@ -400,8 +424,6 @@ class RegistrationRemoteHandler extends RegistrationHandler {
pipeline.remove(IDLE_HANDLER);
}
pipeline.addLast(CONNECTION_HANDLER, connection);
// we also DEREGISTER from the HANDSHAKE event-loop and run on the worker event-loop!
ChannelFuture future = channel.deregister();
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@ -409,13 +431,13 @@ class RegistrationRemoteHandler extends RegistrationHandler {
public
void operationComplete(final Future<? super Void> f) throws Exception {
if (f.isSuccess()) {
// TCP and UDP register on DIFFERENT event loops. The channel promise ONLY runs on 1 of them...
workerEventLoop.register(channel);
// TCP and UDP register on DIFFERENT event loops. This channel promise ONLY runs on the same worker as the
// channel that called `cleanupPipeline`
if (channelPromise.channel() == channel) {
workerEventLoop.register(channelPromise);
}
else {
workerEventLoop.register(channel);
}
}
}
});

View File

@ -40,6 +40,7 @@ import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.EccPublicKeySerializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
public
@ -96,7 +97,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
@SuppressWarnings("Duplicates")
void readClient(final Channel channel, final Registration registration, final String type, final MetaChannel metaChannel) {
void readClient(final ChannelHandlerContext context, final Channel channel, final Registration registration, final String type, final MetaChannel metaChannel) {
final InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
// IN: session ID + public key + ecc parameters (which are a nonce. the SERVER defines what these are)
@ -170,6 +171,10 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
// do we have any more registrations?
outboundRegister.hasMore = registrationWrapper.hasMoreRegistrations();
if (outboundRegister.hasMore) {
metaChannel.totalProtocols.incrementAndGet();
}
channel.writeAndFlush(outboundRegister);
// wait for ack from the server before registering the next protocol
@ -177,18 +182,19 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
}
// IN: upgrade=true if we must upgrade this connection
if (registration.upgrade) {
// this pipeline can now be marked to be upgraded
// IN: upgrade>0 if we must upgrade this connection
// can this pipeline can now be upgraded
int upgradeType = registration.upgradeType;
if (upgradeType > 0) {
// upgrade the connection to an none/compression/encrypted connection
upgradeEncoders(upgradeType, channel, metaChannel);
upgradeDecoders(upgradeType, channel, metaChannel);
// upgrade the connection to an encrypted connection
// this pipeline encoder/decoder can now be upgraded, and the "connection" added
upgradeEncoders(channel, metaChannel, remoteAddress);
upgradeDecoders(channel, metaChannel);
logChannelUpgrade(upgradeType, channel, metaChannel);
}
// IN: hasMore=true if we have more registrations to do, false otherwise
if (registration.hasMore) {
if (registrationWrapper.hasMoreRegistrations()) {
logger.trace("Starting another protocol registration");
metaChannel.totalProtocols.incrementAndGet();
registrationWrapper.startNextProtocolRegistration();
@ -198,45 +204,75 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
//
//
// we only get this when we are 100% done with the registration of all connection types.
// we only get this when we are 100% done with the registration and upgrade of all connection types.
//
// THIS is the last channel registered!
//
// we don't verify anything on the CLIENT. We only verify on the server.
// we don't support registering NEW classes after the client starts.
// this will perform channel WRITE on whatever channel is the last channel registered!
if (!registration.upgraded) {
// setup the pipeline with the real connection
upgradePipeline(metaChannel, remoteAddress);
// this only get's called once. Ever other time the server talks to the client now, "upgraded" will be true.
// we don't verify anything on the CLIENT. We only verify on the server.
// we don't support registering NEW classes after the client starts.
if (!registrationWrapper.initClassRegistration(channel, registration)) {
// this can ONLY be created when all protocols are registered!
metaChannel.connection = this.registrationWrapper.connection0(metaChannel, remoteAddress);
if (metaChannel.tcpChannel != null) {
metaChannel.tcpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection);
}
if (metaChannel.udpChannel != null) {
metaChannel.udpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection);
}
// this tells the server we are now "upgraded" and can continue
boolean hasErrors = !registrationWrapper.initClassRegistration(channel, registration);
if (hasErrors) {
// abort if something messed up!
shutdown(channel, registration.sessionID);
}
// the server will ping back when it is ready to connect
return;
}
// IN: upgraded=true this means we are ready to connect, and the server is done with it's onConnect calls
// we defer the messages until after our own onConnect() is called...
// NOTE: The server will ALWAYS call "onConnect" before we do!
// it does this via sending the client the "upgraded" signal JUST before it calls "onConnect"
// we have to wait for ALL messages to be received, this way we can prevent out-of-order oddities...
int protocolsRemaining = metaChannel.totalProtocols.decrementAndGet();
if (protocolsRemaining > 0) {
logger.trace("{} done. Waiting for {} more protocols registrations to arrive...", type, protocolsRemaining);
// It will upgrade the different connections INDIVIDUALLY, and whichever one arrives first will start the process
// and the "late" message will be ignored
if (!metaChannel.canUpgradePipeline.compareAndSet(true, false)) {
return;
}
// remove ourselves from handling any more messages, because we are done.
// since only the FIRST registration gets here, setup other ones as well (since they are no longer needed)
if (metaChannel.tcpChannel != null) {
// the "other" channel is the TCP channel that we have to cleanup
metaChannel.tcpChannel.pipeline().remove(RegistrationRemoteHandlerClientTCP.class);
}
if (metaChannel.udpChannel != null) {
// the "other" channel is the UDP channel that we have to cleanup
metaChannel.udpChannel.pipeline().remove(RegistrationRemoteHandlerClientUDP.class);
}
// remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline
// always wait until AFTER the server calls "onConnect", then we do this
cleanupPipeline(metaChannel, new Runnable() {
Runnable postConnectRunnable = new Runnable() {
@Override
public
void run() {
// this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated
// this method runs after all of the channels have be correctly updated
// get all of the out of order messages that we missed
List<Object> messages = new LinkedList<Object>();
@ -259,7 +295,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
// now call 'onMessage' in the connection object with our messages!
try {
ConnectionImpl connection = (ConnectionImpl) metaChannel.connection;
ConnectionImpl connection = metaChannel.connection;
for (Object message : messages) {
logger.trace(" deferred onMessage({}, {})", connection.id(), message);
@ -274,6 +310,8 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
logger.error("Error initialising deferred messages!", e);
}
}
});
};
cleanupPipeline(channel, metaChannel, null, postConnectRunnable);
}
}

View File

@ -81,7 +81,7 @@ class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandlerClient
}
logger.trace("TCP read");
readClient(channel, registration, "TCP client", metaChannel);
readClient(context, channel, registration, "TCP client", metaChannel);
}
else {
logger.trace("Out of order TCP message from server!");

View File

@ -103,7 +103,7 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient
metaChannel.udpChannel = channel;
}
readClient(channel, registration, "UDP client", metaChannel);
readClient(context, channel, registration, "UDP client", metaChannel);
}
else {
logger.trace("Out of order UDP message from server!");

View File

@ -42,6 +42,7 @@ import dorkbox.network.connection.registration.Registration;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.serialization.EccPublicKeySerializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
@ -175,32 +176,51 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
// NOTE: if we have more registrations, we will "bounce back" that status so the client knows what to do.
// IN: hasMore=true if we have more registrations to do, false otherwise
// ALWAYS upgrade the connection at this point.
// Some cases we want to SKIP encryption (ie, loopback or specific IP/CIDR addresses)
// OTHERWISE ALWAYS upgrade the connection at this point.
// IN: upgraded=false if we haven't upgraded to encryption yet (this will always be the case right after encryption is setup)
if (!registration.upgraded) {
// upgrade the connection to an encrypted connection
registration.upgrade = true;
upgradeDecoders(channel, metaChannel);
// this is the last protocol registered
if (!registration.hasMore) {
// this can ONLY be created when all protocols are registered!
// this must happen before we verify class registrations.
metaChannel.connection = this.registrationWrapper.connection0(metaChannel, remoteAddress);
if (metaChannel.tcpChannel != null) {
metaChannel.tcpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection);
}
if (metaChannel.udpChannel != null) {
metaChannel.udpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection);
}
}
// If we are loopback or the client is a specific IP/CIDR address, then we do things differently. The LOOPBACK address will never encrypt or compress the traffic.
byte upgradeType = registrationWrapper.getConnectionUpgradeType(remoteAddress);
registration.upgradeType = upgradeType;
// upgrade the connection to a none/compression/encrypted connection
upgradeDecoders(upgradeType, channel, metaChannel);
// bounce back to the client so it knows we received it
channel.write(registration);
// this pipeline encoder/decoder can now be upgraded, and the "connection" added
upgradeEncoders(channel, metaChannel, remoteAddress);
upgradeEncoders(upgradeType, channel, metaChannel);
if (!registration.hasMore) {
// we only get this when we are 100% done with the registration of all connection types.
// setup the pipeline with the real connection
upgradePipeline(metaChannel, remoteAddress);
}
logChannelUpgrade(upgradeType, channel, metaChannel);
channel.flush();
return;
}
// the client will send their class registration data. VERIFY IT IS CORRECT!
//
//
// we only get this when we are 100% done with encrypting/etc the connections
//
//
// upgraded=true when the client will send their class registration data. VERIFY IT IS CORRECT!
STATE state = registrationWrapper.verifyClassRegistration(metaChannel, registration);
if (state == STATE.ERROR) {
// abort! There was an error
@ -216,34 +236,57 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
//
//
// we only get this when we are 100% done with the registration of all connection types.
// the context is the LAST protocol to be registered
// we only get this when we are 100% done with validation of class registrations. The last protocol to register gets us here.
//
//
// remove ourselves from handling any more messages, because we are done.
ChannelHandler handler = context.handler();
channel.pipeline().remove(handler);
// since only the LAST registration gets here, setup other ones as well (since they are no longer needed)
if (channel == metaChannel.tcpChannel && metaChannel.udpChannel != null) {
// the "other" channel is the UDP channel that we have to cleanup
metaChannel.udpChannel.pipeline().remove(RegistrationRemoteHandlerServerUDP.class);
}
else if (channel == metaChannel.udpChannel && metaChannel.tcpChannel != null) {
// the "other" channel is the TCP channel that we have to cleanup
metaChannel.tcpChannel.pipeline().remove(RegistrationRemoteHandlerServerTCP.class);
}
// remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline
cleanupPipeline(metaChannel, new Runnable() {
Runnable preConnectRunnable = new Runnable() {
@Override
public
void run() {
// this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated
// this method BEFORE the "onConnect()" runs and only after all of the channels have be correctly updated
// this tells the client we are ready to connect (we just bounce back the original message over ALL protocols)
// this tells the client we are ready to connect (we just bounce back "upgraded" over TCP, preferably).
// only the FIRST one to arrive at the client will actually setup the pipeline
Registration reg = new Registration(registration.sessionID);
reg.upgraded = true;
// there is a risk of UDP losing the packet, so if we can send via TCP, then we do so.
if (metaChannel.tcpChannel != null) {
logger.trace("Sending TCP upgraded command");
Registration reg = new Registration(registration.sessionID);
reg.upgraded = true;
metaChannel.tcpChannel.writeAndFlush(reg);
}
if (metaChannel.udpChannel != null) {
else if (metaChannel.udpChannel != null) {
logger.trace("Sending UDP upgraded command");
Registration reg = new Registration(registration.sessionID);
reg.upgraded = true;
metaChannel.udpChannel.writeAndFlush(reg);
}
else {
logger.error("This shouldn't happen!");
}
}
});
};
cleanupPipeline(channel, metaChannel, preConnectRunnable, null);
}
}