From 89d70f9255619ab808581e86a41ab8bc7ba36945 Mon Sep 17 00:00:00 2001 From: nathan Date: Thu, 13 Jun 2019 22:08:50 +0200 Subject: [PATCH] Added support for optional encryption. Cleaned up connection registration process. --- .../network/connection/EndPointServer.java | 8 + .../connection/RegistrationWrapper.java | 49 ++- .../ConnectionRegistrationImpl.java | 175 ---------- .../connection/registration/MetaChannel.java | 12 +- .../connection/registration/Registration.java | 10 +- .../connection/registration/UpgradeType.java | 11 + .../remote/RegistrationRemoteHandler.java | 330 ++++++++++-------- .../RegistrationRemoteHandlerClient.java | 88 +++-- .../RegistrationRemoteHandlerClientTCP.java | 2 +- .../RegistrationRemoteHandlerClientUDP.java | 2 +- .../RegistrationRemoteHandlerServer.java | 93 +++-- 11 files changed, 387 insertions(+), 393 deletions(-) delete mode 100644 src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java create mode 100644 src/dorkbox/network/connection/registration/UpgradeType.java diff --git a/src/dorkbox/network/connection/EndPointServer.java b/src/dorkbox/network/connection/EndPointServer.java index 369514fe..3e8943a5 100644 --- a/src/dorkbox/network/connection/EndPointServer.java +++ b/src/dorkbox/network/connection/EndPointServer.java @@ -15,9 +15,12 @@ */ package dorkbox.network.connection; +import java.net.InetSocketAddress; + import dorkbox.network.Configuration; import dorkbox.network.Server; import dorkbox.network.connection.bridge.ConnectionBridgeServer; +import dorkbox.network.connection.registration.UpgradeType; import dorkbox.util.exceptions.SecurityException; /** @@ -104,4 +107,9 @@ class EndPointServer extends EndPoint { void remove(Connection connection) { connectionManager.removeConnection(connection); } + + byte getConnectionUpgradeType(final InetSocketAddress remoteAddress) { + // TODO, crypto/compression based on ip/range of remote address + return UpgradeType.ENCRYPT; + } } diff --git a/src/dorkbox/network/connection/RegistrationWrapper.java b/src/dorkbox/network/connection/RegistrationWrapper.java index af6c1ce0..eaf6145f 100644 --- a/src/dorkbox/network/connection/RegistrationWrapper.java +++ b/src/dorkbox/network/connection/RegistrationWrapper.java @@ -26,12 +26,19 @@ import org.slf4j.Logger; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; +import dorkbox.network.connection.registration.UpgradeType; import dorkbox.network.pipeline.tcp.KryoEncoderTcp; +import dorkbox.network.pipeline.tcp.KryoEncoderTcpCompression; import dorkbox.network.pipeline.tcp.KryoEncoderTcpCrypto; +import dorkbox.network.pipeline.tcp.KryoEncoderTcpNone; import dorkbox.network.pipeline.udp.KryoDecoderUdp; +import dorkbox.network.pipeline.udp.KryoDecoderUdpCompression; import dorkbox.network.pipeline.udp.KryoDecoderUdpCrypto; +import dorkbox.network.pipeline.udp.KryoDecoderUdpNone; import dorkbox.network.pipeline.udp.KryoEncoderUdp; +import dorkbox.network.pipeline.udp.KryoEncoderUdpCompression; import dorkbox.network.pipeline.udp.KryoEncoderUdpCrypto; +import dorkbox.network.pipeline.udp.KryoEncoderUdpNone; import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.network.serialization.Serialization; import dorkbox.util.RandomUtil; @@ -40,6 +47,7 @@ import dorkbox.util.collections.LockFreeIntMap; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.exceptions.SecurityException; import io.netty.channel.Channel; +import io.netty.util.NetUtil; /** * Just wraps common/needed methods of the client/server endpoint by the registration stage/handshake. @@ -55,11 +63,18 @@ class RegistrationWrapper { private final org.slf4j.Logger logger; public final KryoEncoderTcp kryoTcpEncoder; + public final KryoEncoderTcpNone kryoTcpEncoderNone; + public final KryoEncoderTcpCompression kryoTcpEncoderCompression; public final KryoEncoderTcpCrypto kryoTcpEncoderCrypto; public final KryoEncoderUdp kryoUdpEncoder; + public final KryoEncoderUdpNone kryoUdpEncoderNone; + public final KryoEncoderUdpCompression kryoUdpEncoderCompression; public final KryoEncoderUdpCrypto kryoUdpEncoderCrypto; + public final KryoDecoderUdp kryoUdpDecoder; + public final KryoDecoderUdpNone kryoUdpDecoderNone; + public final KryoDecoderUdpCompression kryoUdpDecoderCompression; public final KryoDecoderUdpCrypto kryoUdpDecoderCrypto; private final EndPoint endPoint; @@ -74,13 +89,20 @@ class RegistrationWrapper { this.logger = logger; this.kryoTcpEncoder = new KryoEncoderTcp(endPoint.serializationManager); + this.kryoTcpEncoderNone = new KryoEncoderTcpNone(endPoint.serializationManager); + this.kryoTcpEncoderCompression = new KryoEncoderTcpCompression(endPoint.serializationManager); this.kryoTcpEncoderCrypto = new KryoEncoderTcpCrypto(endPoint.serializationManager); this.kryoUdpEncoder = new KryoEncoderUdp(endPoint.serializationManager); + this.kryoUdpEncoderNone = new KryoEncoderUdpNone(endPoint.serializationManager); + this.kryoUdpEncoderCompression = new KryoEncoderUdpCompression(endPoint.serializationManager); this.kryoUdpEncoderCrypto = new KryoEncoderUdpCrypto(endPoint.serializationManager); + this.kryoUdpDecoder = new KryoDecoderUdp(endPoint.serializationManager); + this.kryoUdpDecoderNone = new KryoDecoderUdpNone(endPoint.serializationManager); + this.kryoUdpDecoderCompression = new KryoDecoderUdpCompression(endPoint.serializationManager); this.kryoUdpDecoderCrypto = new KryoDecoderUdpCrypto(endPoint.serializationManager); } @@ -144,6 +166,27 @@ class RegistrationWrapper { return this.endPoint.publicKey; } + + + + /** + * Only called by the server! + * + * If we are loopback or the client is a specific IP/CIDR address, then we do things differently. The LOOPBACK address will never encrypt or compress the traffic. + */ + public + byte getConnectionUpgradeType(final InetSocketAddress remoteAddress) { + if (isClient()) { + throw new IllegalArgumentException("This should never be called by the client!"); + } + + if (remoteAddress.getAddress().equals(NetUtil.LOCALHOST)) { + return UpgradeType.COMPRESS; + } + + return ((EndPointServer) this.endPoint).getConnectionUpgradeType(remoteAddress); + } + /** * If the key does not match AND we have disabled remote key validation, then metachannel.changedRemoteKey = true. OTHERWISE, key validation is REQUIRED! * @@ -353,7 +396,7 @@ class RegistrationWrapper { fragmentedRegistration.payload = fragment; // tell the server we are fragmented - fragmentedRegistration.upgrade = true; + fragmentedRegistration.upgradeType = UpgradeType.FRAGMENTED; // tell the server we are upgraded (it will bounce back telling us to connect) fragmentedRegistration.upgraded = true; @@ -365,7 +408,7 @@ class RegistrationWrapper { fragmentedRegistration.payload = fragments[allButLast]; // tell the server we are fragmented - fragmentedRegistration.upgrade = true; + fragmentedRegistration.upgradeType = UpgradeType.FRAGMENTED; // tell the server we are upgraded (it will bounce back telling us to connect) fragmentedRegistration.upgraded = true; @@ -383,7 +426,7 @@ class RegistrationWrapper { public STATE verifyClassRegistration(final MetaChannel metaChannel, final Registration registration) { - if (registration.upgrade) { + if (registration.upgradeType == UpgradeType.FRAGMENTED) { byte[] fragment = registration.payload; // this means that the registrations are FRAGMENTED! diff --git a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java b/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java deleted file mode 100644 index 20462de9..00000000 --- a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright 2018 dorkbox, llc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.connection.registration; - -import javax.crypto.SecretKey; - -import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.ConnectionPoint; -import dorkbox.network.connection.Connection_; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.Listeners; -import dorkbox.network.connection.bridge.ConnectionBridge; -import dorkbox.network.connection.idle.IdleBridge; -import dorkbox.network.connection.idle.IdleSender; -import dorkbox.network.rmi.ConnectionNoOpSupport; -import dorkbox.network.rmi.RemoteObjectCallback; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; - -/** - * A wrapper for the period of time between registration and connect for a "connection" session. - * - * This is to prevent race conditions where onMessage() can happen BEFORE a "connection" is "connected" - */ -public -class ConnectionRegistrationImpl implements Connection_, ChannelHandler { - public final ConnectionImpl connection; - - public - ConnectionRegistrationImpl(final ConnectionImpl connection) { - this.connection = connection; - } - - @Override - public - void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - } - - @Override - public - void handlerRemoved(final ChannelHandlerContext ctx) throws Exception { - } - - @Override - public - void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { - } - - @Override - public - long nextGcmSequence() { - return connection.nextGcmSequence(); - } - - @Override - public - SecretKey cryptoKey() { - return connection.cryptoKey(); - } - - @Override - public - boolean hasRemoteKeyChanged() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - String getRemoteHost() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - boolean isLoopback() { - return connection.isLoopback(); - } - - @Override - public - EndPoint getEndPoint() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - int id() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - String idAsHex() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - boolean hasUDP() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - ConnectionBridge send() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - ConnectionPoint send(final Object message) { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - IdleBridge sendOnIdle(final IdleSender sender) { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - IdleBridge sendOnIdle(final Object message) { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - Listeners listeners() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - void close() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - void closeAsap() { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - void getRemoteObject(final int objectId, final RemoteObjectCallback callback) { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - ConnectionNoOpSupport rmiSupport() { - return null; - } -} diff --git a/src/dorkbox/network/connection/registration/MetaChannel.java b/src/dorkbox/network/connection/registration/MetaChannel.java index fb5b8b2a..b0d95799 100644 --- a/src/dorkbox/network/connection/registration/MetaChannel.java +++ b/src/dorkbox/network/connection/registration/MetaChannel.java @@ -15,6 +15,7 @@ */ package dorkbox.network.connection.registration; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.crypto.SecretKey; @@ -22,8 +23,8 @@ import javax.crypto.SecretKey; import org.bouncycastle.crypto.AsymmetricCipherKeyPair; import org.bouncycastle.crypto.params.ECPublicKeyParameters; +import dorkbox.network.connection.ConnectionImpl; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; public class MetaChannel { @@ -38,10 +39,13 @@ class MetaChannel { // keep track of how many protocols to register, so that way when we are ready to connect the SERVER sends a message to the client over // all registered protocols and the last protocol to receive the message does the registration - // we ALWAYS start off with at least 1 protocol - public AtomicInteger totalProtocols = new AtomicInteger(1); + public AtomicInteger totalProtocols = new AtomicInteger(0); - public volatile ChannelHandler connection; // only needed until the connection has been notified. + + // only permits the FIST protocol for running the pipeline upgrade. + public AtomicBoolean canUpgradePipeline = new AtomicBoolean(true); + + public volatile ConnectionImpl connection; // only needed until the connection has been notified. public volatile ECPublicKeyParameters publicKey; // used for ECC crypto + handshake on NETWORK (remote) connections. This is the remote public key. public volatile AsymmetricCipherKeyPair ecdhKey; // used for ECC Diffie-Hellman-Merkle key exchanges: see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange diff --git a/src/dorkbox/network/connection/registration/Registration.java b/src/dorkbox/network/connection/registration/Registration.java index 306fb9ca..a35ecee6 100644 --- a/src/dorkbox/network/connection/registration/Registration.java +++ b/src/dorkbox/network/connection/registration/Registration.java @@ -25,7 +25,7 @@ public class Registration { // used to keep track and associate TCP/UDP/etc sessions. This is always defined by the server // a sessionId if '0', means we are still figuring it out. - public int sessionID; + public int sessionID = 0; public ECPublicKeyParameters publicKey; public IESParameters eccParameters; @@ -33,14 +33,14 @@ class Registration { public byte[] payload; // true if we have more registrations to process, false if we are done - public boolean hasMore; + public boolean hasMore = false; - // true when we are ready to setup the connection (hasMore will always be false if this is true). False when we are ready to connect + // > 0 when we are ready to setup the connection (hasMore will always be false if this is >0). 0 when we are ready to connect // ALSO used if there are fragmented frames for registration data (since we have to split it up to fit inside a single UDP packet without fragmentation) - public boolean upgrade; + public byte upgradeType = (byte) 0; // true when we are fully upgraded - public boolean upgraded; + public boolean upgraded = false; private Registration() { diff --git a/src/dorkbox/network/connection/registration/UpgradeType.java b/src/dorkbox/network/connection/registration/UpgradeType.java new file mode 100644 index 00000000..2dfa773f --- /dev/null +++ b/src/dorkbox/network/connection/registration/UpgradeType.java @@ -0,0 +1,11 @@ +package dorkbox.network.connection.registration; + +public +class UpgradeType { + // The check is > 0, so these MUST be all > 0 + + public static final byte NONE = (byte) 1; + public static final byte ENCRYPT = (byte) 2; + public static final byte COMPRESS = (byte) 3; + public static final byte FRAGMENTED = (byte) 4; +} diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index 21b1fb92..9207bdbb 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -26,21 +26,21 @@ import org.bouncycastle.jce.spec.ECParameterSpec; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.RegistrationWrapper; -import dorkbox.network.connection.registration.ConnectionRegistrationImpl; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.connection.registration.RegistrationHandler; +import dorkbox.network.connection.registration.UpgradeType; import dorkbox.network.pipeline.tcp.KryoDecoderTcp; +import dorkbox.network.pipeline.tcp.KryoDecoderTcpCompression; import dorkbox.network.pipeline.tcp.KryoDecoderTcpCrypto; +import dorkbox.network.pipeline.tcp.KryoDecoderTcpNone; import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.crypto.CryptoECC; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; @@ -57,20 +57,34 @@ class RegistrationRemoteHandler extends RegistrationHandler { static final String DELETE_IP = "eleteIP"; // purposefully missing the "D", since that is a system parameter, which starts with "-D" static final ECParameterSpec eccSpec = ECNamedCurveTable.getParameterSpec(CryptoECC.curve25519); - static final String KRYO_ENCODER = "kryoEncoder"; - static final String KRYO_DECODER = "kryoDecoder"; + private static final String UDP_DECODE = "ud1"; + private static final String UDP_DECODE_NONE = "ud2"; + private static final String UDP_DECODE_COMPRESS = "ud3"; + private static final String UDP_DECODE_CRYPTO = "ud4"; + + private static final String TCP_DECODE = "td1"; + private static final String TCP_DECODE_NONE = "td2"; + private static final String TCP_DECODE_COMPRESS = "td3"; + private static final String TCP_DECODE_CRYPTO = "td4"; + + + private static final String IDLE_HANDLER = "idle"; + private static final String IDLE_HANDLER_FULL = "idleFull"; + + private static final String UDP_ENCODE = "ue"; + private static final String UDP_ENCODE_NONE = "ue2"; + private static final String UDP_ENCODE_COMPRESS = "ue3"; + private static final String UDP_ENCODE_CRYPTO = "ue4"; + + private static final String TCP_ENCODE = "te1"; + private static final String TCP_ENCODE_NONE = "te2"; + private static final String TCP_ENCODE_COMPRESS = "te3"; + private static final String TCP_ENCODE_CRYPTO = "te4"; + - private static final String IDLE_HANDLER_FULL = "idleHandlerFull"; - private static final String FRAME_AND_KRYO_ENCODER = "frameAndKryoEncoder"; - private static final String FRAME_AND_KRYO_DECODER = "frameAndKryoDecoder"; - private static final String FRAME_AND_KRYO_CRYPTO_ENCODER = "frameAndKryoCryptoEncoder"; - private static final String FRAME_AND_KRYO_CRYPTO_DECODER = "frameAndKryoCryptoDecoder"; - private static final String KRYO_CRYPTO_ENCODER = "kryoCryptoEncoder"; - private static final String KRYO_CRYPTO_DECODER = "kryoCryptoDecoder"; - private static final String IDLE_HANDLER = "idleHandler"; protected final NetworkSerializationManager serializationManager; @@ -98,12 +112,11 @@ class RegistrationRemoteHandler extends RegistrationHandler { /////////////////////// // DECODE (or upstream) /////////////////////// - pipeline.addFirst(FRAME_AND_KRYO_DECODER, - new KryoDecoderTcp(this.serializationManager)); // cannot be shared because of possible fragmentation. + pipeline.addFirst(TCP_DECODE, new KryoDecoderTcp(this.serializationManager)); // cannot be shared because of possible fragmentation. } else if (isUdpChannel) { // can be shared because there cannot be fragmentation for our UDP packets. If there is, we throw an error and continue... - pipeline.addFirst(KRYO_DECODER, this.registrationWrapper.kryoUdpDecoder); + pipeline.addFirst(UDP_DECODE, this.registrationWrapper.kryoUdpDecoder); } // this makes the proper event get raised in the registrationHandler to kill NEW idle connections. Once "connected" they last a lot longer. @@ -115,10 +128,10 @@ class RegistrationRemoteHandler extends RegistrationHandler { ///////////////////////// // ENCODE (or downstream) ///////////////////////// - pipeline.addFirst(FRAME_AND_KRYO_ENCODER, this.registrationWrapper.kryoTcpEncoder); // this is shared + pipeline.addFirst(TCP_ENCODE, this.registrationWrapper.kryoTcpEncoder); // this is shared } else if (isUdpChannel) { - pipeline.addFirst(KRYO_ENCODER, this.registrationWrapper.kryoUdpEncoder); + pipeline.addFirst(UDP_ENCODE, this.registrationWrapper.kryoUdpEncoder); } } @@ -204,18 +217,45 @@ class RegistrationRemoteHandler extends RegistrationHandler { * upgrades a channel ONE channel at a time */ final - void upgradeDecoders(final Channel channel, final MetaChannel metaChannel) { + void upgradeDecoders(final int upgradeType, final Channel channel, final MetaChannel metaChannel) { ChannelPipeline pipeline = channel.pipeline(); try { if (metaChannel.tcpChannel == channel) { - pipeline.replace(FRAME_AND_KRYO_DECODER, - FRAME_AND_KRYO_CRYPTO_DECODER, - new KryoDecoderTcpCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation. - } + // cannot be shared because of possible fragmentation. + switch (upgradeType) { + case (UpgradeType.NONE) : + pipeline.replace(TCP_DECODE, TCP_DECODE_NONE, new KryoDecoderTcpNone(this.serializationManager)); + break; - if (metaChannel.udpChannel == channel) { - pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, this.registrationWrapper.kryoUdpDecoderCrypto); // shared encoder + case (UpgradeType.COMPRESS) : + pipeline.replace(TCP_DECODE, TCP_DECODE_COMPRESS, new KryoDecoderTcpCompression(this.serializationManager)); + break; + + case (UpgradeType.ENCRYPT) : + pipeline.replace(TCP_DECODE, TCP_DECODE_CRYPTO, new KryoDecoderTcpCrypto(this.serializationManager)); + break; + default: + throw new IllegalArgumentException("Unable to upgrade TCP connection pipeline for type: " + upgradeType); + } + } + else if (metaChannel.udpChannel == channel) { + // shared decoder + switch (upgradeType) { + case (UpgradeType.NONE) : + pipeline.replace(UDP_DECODE, UDP_DECODE_NONE, this.registrationWrapper.kryoUdpDecoderNone); + break; + + case (UpgradeType.COMPRESS) : + pipeline.replace(UDP_DECODE, UDP_DECODE_COMPRESS, this.registrationWrapper.kryoUdpDecoderCompression); + break; + + case (UpgradeType.ENCRYPT) : + pipeline.replace(UDP_DECODE, UDP_DECODE_CRYPTO, this.registrationWrapper.kryoUdpDecoderCrypto); + break; + default: + throw new IllegalArgumentException("Unable to upgrade UDP connection pipeline for type: " + upgradeType); + } } } catch (Exception e) { logger.error("Error during connection pipeline upgrade", e); @@ -226,104 +266,117 @@ class RegistrationRemoteHandler extends RegistrationHandler { * upgrades a channel ONE channel at a time */ final - void upgradeEncoders(final Channel channel, final MetaChannel metaChannel, final InetSocketAddress remoteAddress) { + void upgradeEncoders(final int upgradeType, final Channel channel, final MetaChannel metaChannel) { ChannelPipeline pipeline = channel.pipeline(); - try { + if (metaChannel.tcpChannel == channel) { + // shared encoder + switch (upgradeType) { + case (UpgradeType.NONE) : + pipeline.replace(TCP_ENCODE, TCP_ENCODE_NONE, this.registrationWrapper.kryoTcpEncoderNone); + break; + + case (UpgradeType.COMPRESS) : + pipeline.replace(TCP_ENCODE, TCP_ENCODE_COMPRESS, this.registrationWrapper.kryoTcpEncoderCompression); + break; + + case (UpgradeType.ENCRYPT) : + pipeline.replace(TCP_ENCODE, TCP_ENCODE_CRYPTO, this.registrationWrapper.kryoTcpEncoderCrypto); + break; + default: + throw new IllegalArgumentException("Unable to upgrade TCP connection pipeline for type: " + upgradeType); + } + } + else if (metaChannel.udpChannel == channel) { + // shared encoder + switch (upgradeType) { + case (UpgradeType.NONE) : + pipeline.replace(UDP_ENCODE, UDP_ENCODE_NONE, this.registrationWrapper.kryoUdpEncoderNone); + break; + + case (UpgradeType.COMPRESS) : + pipeline.replace(UDP_ENCODE, UDP_ENCODE_COMPRESS, this.registrationWrapper.kryoUdpEncoderCompression); + break; + + case (UpgradeType.ENCRYPT) : + pipeline.replace(UDP_ENCODE, UDP_ENCODE_CRYPTO, this.registrationWrapper.kryoUdpEncoderCrypto); + break; + default: + throw new IllegalArgumentException("Unable to upgrade UDP connection pipeline for type: " + upgradeType); + } + } + } + + final + void logChannelUpgrade(final int upgradeType, final Channel channel, final MetaChannel metaChannel) { + if (this.logger.isInfoEnabled()) { + final String channelType; + if (metaChannel.tcpChannel == channel) { - pipeline.replace(FRAME_AND_KRYO_ENCODER, - FRAME_AND_KRYO_CRYPTO_ENCODER, - registrationWrapper.kryoTcpEncoderCrypto); // this is shared - } + // cannot be shared because of possible fragmentation. + switch (upgradeType) { + case (UpgradeType.NONE) : + channelType = "TCP simple"; + break; - if (metaChannel.udpChannel == channel) { - pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, registrationWrapper.kryoUdpEncoderCrypto); // shared encoder - } - } catch (Exception e) { - logger.error("Error during connection pipeline upgrade", e); - } - } + case (UpgradeType.COMPRESS) : + channelType = "TCP compression"; + break; - /** - * upgrades a channel ONE channel at a time - */ - final - void upgradePipeline(final MetaChannel metaChannel, final InetSocketAddress remoteAddress) { - logger.trace("Upgrading pipeline"); - try { - if (metaChannel.udpChannel != null) { - if (metaChannel.tcpChannel == null) { - // TODO: UDP (and TCP??) idle timeout (also, to close UDP session when TCP shuts down) - // this means that we are ONLY UDP, and we should have an idle timeout that CLOSES the session after a while. - // Naturally, one would want the "normal" idle to trigger first, but there always be a heartbeat if the idle trigger DOES NOT - // send data on the network, to make sure that the UDP-only session stays alive or disconnects. - - // If the server disconnects, the client has to be made aware of this when it tries to send data again (it must go through - // it's entire reconnect protocol) + case (UpgradeType.ENCRYPT) : + channelType = "TCP encrypted"; + break; + default: + this.logger.error("Unable to upgrade TCP connection pipeline for type: " + upgradeType); + return; } } + else if (metaChannel.udpChannel == channel) { + // shared decoder + switch (upgradeType) { + case (UpgradeType.NONE) : + channelType = "UDP simple"; + break; - // add the "connected"/"normal" handler now that we have established a "new" connection. - // This will have state, etc. for this connection. THIS MUST BE 100% TCP/UDP created, otherwise it will break connections! - ConnectionImpl connection = this.registrationWrapper.connection0(metaChannel, remoteAddress); - metaChannel.connection = new ConnectionRegistrationImpl(connection); + case (UpgradeType.COMPRESS) : + channelType = "UDP compression"; + break; - // Now setup our meta-channel to migrate to the correct connection handler for all regular data. - - if (metaChannel.tcpChannel != null) { - final ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline(); - pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection); - } - - if (metaChannel.udpChannel != null) { - final ChannelPipeline pipeline = metaChannel.udpChannel.pipeline(); - pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection); - } - - - if (this.logger.isInfoEnabled()) { - StringBuilder stringBuilder = new StringBuilder(96); - - if (metaChannel.tcpChannel != null) { - stringBuilder.append("Encrypted TCP connection ["); - EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.localAddress()); - - stringBuilder.append(getConnectionDirection()); - EndPoint.getHostDetails(stringBuilder, metaChannel.tcpChannel.remoteAddress()); - stringBuilder.append("]"); + case (UpgradeType.ENCRYPT) : + channelType = "UDP encrypted"; + break; + default: + this.logger.error("Unable to upgrade UDP connection pipeline for type: " + upgradeType); + return; } - - if (metaChannel.udpChannel != null) { - stringBuilder.append("Encrypted UDP connection ["); - EndPoint.getHostDetails(stringBuilder, metaChannel.udpChannel.localAddress()); - - stringBuilder.append(getConnectionDirection()); - EndPoint.getHostDetails(stringBuilder, metaChannel.udpChannel.remoteAddress()); - stringBuilder.append("]"); - } - - this.logger.info(stringBuilder.toString()); } - } catch (Exception e) { - logger.error("Error during connection pipeline upgrade", e); + else { + this.logger.error("Unable to upgrade unknown connection pipeline for type: " + upgradeType); + return; + } + + + StringBuilder stringBuilder = new StringBuilder(96); + + stringBuilder.append(channelType); + stringBuilder.append(" connection ["); + + EndPoint.getHostDetails(stringBuilder, channel.localAddress()); + + stringBuilder.append(getConnectionDirection()); + EndPoint.getHostDetails(stringBuilder, channel.remoteAddress()); + + stringBuilder.append("]"); + + this.logger.info(stringBuilder.toString()); } } final - void cleanupPipeline(final MetaChannel metaChannel, final Runnable onConnectFinishRunnable) { + void cleanupPipeline(final Channel channel, final MetaChannel metaChannel, final Runnable preConnectRunnable, final Runnable postConnectRunnable) { final int idleTimeout = this.registrationWrapper.getIdleTimeout(); try { - // REMOVE our channel wrapper (only used for encryption) with the actual connection - ChannelHandler handler = metaChannel.connection = ((ConnectionRegistrationImpl) metaChannel.connection).connection; - - Channel channel; - if (metaChannel.tcpChannel != null) { - channel = metaChannel.tcpChannel; - } else { - channel = metaChannel.udpChannel; - } - // channel should NEVER == null! (we will always have TCP or UDP!) // we also ONLY want to add this to a single cleanup, NOT BOTH, because this must only run once!! final ChannelPromise channelPromise = channel.newPromise(); @@ -331,33 +384,29 @@ class RegistrationRemoteHandler extends RegistrationHandler { @Override public void operationComplete(final Future future) throws Exception { - EventLoop loop = channelPromise.channel() - .eventLoop(); - loop.execute(new Runnable() { - @Override - public - void run() { - logger.trace("Notify Connection"); + // this runs on the channel's event loop + logger.trace("Connection connected"); - // safe cast, because it's always this way... - registrationWrapper.connectionConnected0((ConnectionImpl) metaChannel.connection); + if (preConnectRunnable != null) { + preConnectRunnable.run(); + } - // run things AFTER the onConnect() method is called... - // CLIENT - runs the deferred 'onMessage' events in the connection as needed - // SERVER - send 'onConnect' bounce back message to client - onConnectFinishRunnable.run(); - } - }); + // safe cast, because it's always this way... + registrationWrapper.connectionConnected0(metaChannel.connection); + + if (postConnectRunnable != null) { + postConnectRunnable.run(); + } } }); if (metaChannel.tcpChannel != null) { - cleanupPipeline0(idleTimeout, handler, metaChannel.tcpChannel, channelPromise, true); + cleanupPipeline0(idleTimeout, metaChannel.tcpChannel, channelPromise); } if (metaChannel.udpChannel != null) { - cleanupPipeline0(idleTimeout, handler, metaChannel.udpChannel, channelPromise, false); + cleanupPipeline0(idleTimeout, metaChannel.udpChannel, channelPromise); } } catch (Exception e) { logger.error("Error during pipeline replace", e); @@ -365,34 +414,9 @@ class RegistrationRemoteHandler extends RegistrationHandler { } private - void cleanupPipeline0(final int idleTimeout, - final ChannelHandler connection, - final Channel channel, - final ChannelPromise channelPromise, - final boolean isTcp) { + void cleanupPipeline0(final int idleTimeout, final Channel channel, final ChannelPromise channelPromise) { final ChannelPipeline pipeline = channel.pipeline(); - // have to explicitly remove handlers based on the input type. Cannot use this.getClass().... - boolean isClient = registrationWrapper.isClient(); - if (isClient) { - if (isTcp) { - pipeline.remove(RegistrationRemoteHandlerClientTCP.class); - } - else { - pipeline.remove(RegistrationRemoteHandlerClientUDP.class); - } - } - else { - if (isTcp) { - pipeline.remove(RegistrationRemoteHandlerServerTCP.class); - } - else { - pipeline.remove(RegistrationRemoteHandlerServerUDP.class); - } - } - - pipeline.remove(ConnectionRegistrationImpl.class); - if (idleTimeout > 0) { pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS)); } @@ -400,8 +424,6 @@ class RegistrationRemoteHandler extends RegistrationHandler { pipeline.remove(IDLE_HANDLER); } - pipeline.addLast(CONNECTION_HANDLER, connection); - // we also DEREGISTER from the HANDSHAKE event-loop and run on the worker event-loop! ChannelFuture future = channel.deregister(); future.addListener(new GenericFutureListener>() { @@ -409,13 +431,13 @@ class RegistrationRemoteHandler extends RegistrationHandler { public void operationComplete(final Future f) throws Exception { if (f.isSuccess()) { - // TCP and UDP register on DIFFERENT event loops. The channel promise ONLY runs on 1 of them... + workerEventLoop.register(channel); + + // TCP and UDP register on DIFFERENT event loops. This channel promise ONLY runs on the same worker as the + // channel that called `cleanupPipeline` if (channelPromise.channel() == channel) { workerEventLoop.register(channelPromise); } - else { - workerEventLoop.register(channel); - } } } }); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java index c0263e27..27f1a5d0 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java @@ -40,6 +40,7 @@ import dorkbox.util.crypto.CryptoECC; import dorkbox.util.exceptions.SecurityException; import dorkbox.util.serialization.EccPublicKeySerializer; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; public @@ -96,7 +97,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { @SuppressWarnings("Duplicates") - void readClient(final Channel channel, final Registration registration, final String type, final MetaChannel metaChannel) { + void readClient(final ChannelHandlerContext context, final Channel channel, final Registration registration, final String type, final MetaChannel metaChannel) { final InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); // IN: session ID + public key + ecc parameters (which are a nonce. the SERVER defines what these are) @@ -170,6 +171,10 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { // do we have any more registrations? outboundRegister.hasMore = registrationWrapper.hasMoreRegistrations(); + if (outboundRegister.hasMore) { + metaChannel.totalProtocols.incrementAndGet(); + } + channel.writeAndFlush(outboundRegister); // wait for ack from the server before registering the next protocol @@ -177,18 +182,19 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { } - // IN: upgrade=true if we must upgrade this connection - if (registration.upgrade) { - // this pipeline can now be marked to be upgraded + // IN: upgrade>0 if we must upgrade this connection + // can this pipeline can now be upgraded + int upgradeType = registration.upgradeType; + if (upgradeType > 0) { + // upgrade the connection to an none/compression/encrypted connection + upgradeEncoders(upgradeType, channel, metaChannel); + upgradeDecoders(upgradeType, channel, metaChannel); - // upgrade the connection to an encrypted connection - // this pipeline encoder/decoder can now be upgraded, and the "connection" added - upgradeEncoders(channel, metaChannel, remoteAddress); - upgradeDecoders(channel, metaChannel); + logChannelUpgrade(upgradeType, channel, metaChannel); } // IN: hasMore=true if we have more registrations to do, false otherwise - if (registration.hasMore) { + if (registrationWrapper.hasMoreRegistrations()) { logger.trace("Starting another protocol registration"); metaChannel.totalProtocols.incrementAndGet(); registrationWrapper.startNextProtocolRegistration(); @@ -198,45 +204,75 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { // // - // we only get this when we are 100% done with the registration of all connection types. + // we only get this when we are 100% done with the registration and upgrade of all connection types. // + // THIS is the last channel registered! // + + // we don't verify anything on the CLIENT. We only verify on the server. + // we don't support registering NEW classes after the client starts. + + // this will perform channel WRITE on whatever channel is the last channel registered! if (!registration.upgraded) { - // setup the pipeline with the real connection - upgradePipeline(metaChannel, remoteAddress); + // this only get's called once. Ever other time the server talks to the client now, "upgraded" will be true. - // we don't verify anything on the CLIENT. We only verify on the server. - // we don't support registering NEW classes after the client starts. - if (!registrationWrapper.initClassRegistration(channel, registration)) { + // this can ONLY be created when all protocols are registered! + metaChannel.connection = this.registrationWrapper.connection0(metaChannel, remoteAddress); + + if (metaChannel.tcpChannel != null) { + metaChannel.tcpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection); + } + if (metaChannel.udpChannel != null) { + metaChannel.udpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection); + } + + + // this tells the server we are now "upgraded" and can continue + boolean hasErrors = !registrationWrapper.initClassRegistration(channel, registration); + if (hasErrors) { // abort if something messed up! shutdown(channel, registration.sessionID); } + // the server will ping back when it is ready to connect return; } - // IN: upgraded=true this means we are ready to connect, and the server is done with it's onConnect calls - // we defer the messages until after our own onConnect() is called... + // NOTE: The server will ALWAYS call "onConnect" before we do! + // it does this via sending the client the "upgraded" signal JUST before it calls "onConnect" - // we have to wait for ALL messages to be received, this way we can prevent out-of-order oddities... - int protocolsRemaining = metaChannel.totalProtocols.decrementAndGet(); - if (protocolsRemaining > 0) { - logger.trace("{} done. Waiting for {} more protocols registrations to arrive...", type, protocolsRemaining); + // It will upgrade the different connections INDIVIDUALLY, and whichever one arrives first will start the process + // and the "late" message will be ignored + if (!metaChannel.canUpgradePipeline.compareAndSet(true, false)) { return; } + // remove ourselves from handling any more messages, because we are done. + + // since only the FIRST registration gets here, setup other ones as well (since they are no longer needed) + if (metaChannel.tcpChannel != null) { + // the "other" channel is the TCP channel that we have to cleanup + metaChannel.tcpChannel.pipeline().remove(RegistrationRemoteHandlerClientTCP.class); + } + + if (metaChannel.udpChannel != null) { + // the "other" channel is the UDP channel that we have to cleanup + metaChannel.udpChannel.pipeline().remove(RegistrationRemoteHandlerClientUDP.class); + } + + // remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline // always wait until AFTER the server calls "onConnect", then we do this - cleanupPipeline(metaChannel, new Runnable() { + Runnable postConnectRunnable = new Runnable() { @Override public void run() { - // this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated + // this method runs after all of the channels have be correctly updated // get all of the out of order messages that we missed List messages = new LinkedList(); @@ -259,7 +295,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { // now call 'onMessage' in the connection object with our messages! try { - ConnectionImpl connection = (ConnectionImpl) metaChannel.connection; + ConnectionImpl connection = metaChannel.connection; for (Object message : messages) { logger.trace(" deferred onMessage({}, {})", connection.id(), message); @@ -274,6 +310,8 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { logger.error("Error initialising deferred messages!", e); } } - }); + }; + + cleanupPipeline(channel, metaChannel, null, postConnectRunnable); } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java index ed631e81..2d5c4f5f 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java @@ -81,7 +81,7 @@ class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandlerClient } logger.trace("TCP read"); - readClient(channel, registration, "TCP client", metaChannel); + readClient(context, channel, registration, "TCP client", metaChannel); } else { logger.trace("Out of order TCP message from server!"); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java index fa67196a..294210b8 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java @@ -103,7 +103,7 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient metaChannel.udpChannel = channel; } - readClient(channel, registration, "UDP client", metaChannel); + readClient(context, channel, registration, "UDP client", metaChannel); } else { logger.trace("Out of order UDP message from server!"); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java index e1eeb172..c462cf1e 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java @@ -42,6 +42,7 @@ import dorkbox.network.connection.registration.Registration; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.serialization.EccPublicKeySerializer; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; @@ -175,32 +176,51 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { // NOTE: if we have more registrations, we will "bounce back" that status so the client knows what to do. // IN: hasMore=true if we have more registrations to do, false otherwise - // ALWAYS upgrade the connection at this point. + // Some cases we want to SKIP encryption (ie, loopback or specific IP/CIDR addresses) + // OTHERWISE ALWAYS upgrade the connection at this point. // IN: upgraded=false if we haven't upgraded to encryption yet (this will always be the case right after encryption is setup) if (!registration.upgraded) { - // upgrade the connection to an encrypted connection - registration.upgrade = true; - upgradeDecoders(channel, metaChannel); + // this is the last protocol registered + if (!registration.hasMore) { + // this can ONLY be created when all protocols are registered! + // this must happen before we verify class registrations. + metaChannel.connection = this.registrationWrapper.connection0(metaChannel, remoteAddress); + + if (metaChannel.tcpChannel != null) { + metaChannel.tcpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection); + } + if (metaChannel.udpChannel != null) { + metaChannel.udpChannel.pipeline().addLast(CONNECTION_HANDLER, metaChannel.connection); + } + } + + // If we are loopback or the client is a specific IP/CIDR address, then we do things differently. The LOOPBACK address will never encrypt or compress the traffic. + byte upgradeType = registrationWrapper.getConnectionUpgradeType(remoteAddress); + registration.upgradeType = upgradeType; + + // upgrade the connection to a none/compression/encrypted connection + upgradeDecoders(upgradeType, channel, metaChannel); // bounce back to the client so it knows we received it channel.write(registration); - // this pipeline encoder/decoder can now be upgraded, and the "connection" added - upgradeEncoders(channel, metaChannel, remoteAddress); + upgradeEncoders(upgradeType, channel, metaChannel); - if (!registration.hasMore) { - // we only get this when we are 100% done with the registration of all connection types. - - // setup the pipeline with the real connection - upgradePipeline(metaChannel, remoteAddress); - } + logChannelUpgrade(upgradeType, channel, metaChannel); channel.flush(); return; } - // the client will send their class registration data. VERIFY IT IS CORRECT! + // + // + // we only get this when we are 100% done with encrypting/etc the connections + // + // + + + // upgraded=true when the client will send their class registration data. VERIFY IT IS CORRECT! STATE state = registrationWrapper.verifyClassRegistration(metaChannel, registration); if (state == STATE.ERROR) { // abort! There was an error @@ -216,34 +236,57 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { // // - // we only get this when we are 100% done with the registration of all connection types. - // the context is the LAST protocol to be registered + // we only get this when we are 100% done with validation of class registrations. The last protocol to register gets us here. // // + + // remove ourselves from handling any more messages, because we are done. + ChannelHandler handler = context.handler(); + channel.pipeline().remove(handler); + + + // since only the LAST registration gets here, setup other ones as well (since they are no longer needed) + if (channel == metaChannel.tcpChannel && metaChannel.udpChannel != null) { + // the "other" channel is the UDP channel that we have to cleanup + metaChannel.udpChannel.pipeline().remove(RegistrationRemoteHandlerServerUDP.class); + } + else if (channel == metaChannel.udpChannel && metaChannel.tcpChannel != null) { + // the "other" channel is the TCP channel that we have to cleanup + metaChannel.tcpChannel.pipeline().remove(RegistrationRemoteHandlerServerTCP.class); + } + + + // remove the ConnectionWrapper (that was used to upgrade the connection) and cleanup the pipeline - cleanupPipeline(metaChannel, new Runnable() { + Runnable preConnectRunnable = new Runnable() { @Override public void run() { - // this method runs after the "onConnect()" runs and only after all of the channels have be correctly updated + // this method BEFORE the "onConnect()" runs and only after all of the channels have be correctly updated - // this tells the client we are ready to connect (we just bounce back the original message over ALL protocols) + // this tells the client we are ready to connect (we just bounce back "upgraded" over TCP, preferably). + // only the FIRST one to arrive at the client will actually setup the pipeline + Registration reg = new Registration(registration.sessionID); + reg.upgraded = true; + + // there is a risk of UDP losing the packet, so if we can send via TCP, then we do so. if (metaChannel.tcpChannel != null) { logger.trace("Sending TCP upgraded command"); - Registration reg = new Registration(registration.sessionID); - reg.upgraded = true; metaChannel.tcpChannel.writeAndFlush(reg); } - - if (metaChannel.udpChannel != null) { + else if (metaChannel.udpChannel != null) { logger.trace("Sending UDP upgraded command"); - Registration reg = new Registration(registration.sessionID); - reg.upgraded = true; metaChannel.udpChannel.writeAndFlush(reg); } + else { + logger.error("This shouldn't happen!"); + + } } - }); + }; + + cleanupPipeline(channel, metaChannel, preConnectRunnable, null); } }