From 80ab10df010439876af2cf589f2329a0b3416102 Mon Sep 17 00:00:00 2001 From: nathan Date: Sat, 2 Apr 2016 17:00:40 +0200 Subject: [PATCH] Moved channel registration into registration wrapper --- src/dorkbox/network/connection/EndPoint.java | 22 +- .../connection/RegistrationWrapper.java | 305 +++++++++++++++++- .../connection/registration/Registration.java | 10 +- .../registration/RegistrationHandler.java | 21 +- .../local/RegistrationLocalHandler.java | 31 +- .../local/RegistrationLocalHandlerClient.java | 9 +- .../local/RegistrationLocalHandlerServer.java | 12 +- .../remote/RegistrationRemoteHandler.java | 56 +--- .../RegistrationRemoteHandlerClientTCP.java | 23 +- .../RegistrationRemoteHandlerClientUDP.java | 49 +-- .../RegistrationRemoteHandlerClientUDT.java | 44 +-- .../RegistrationRemoteHandlerServerTCP.java | 32 +- .../RegistrationRemoteHandlerServerUDP.java | 67 +--- .../RegistrationRemoteHandlerServerUDT.java | 45 +-- 14 files changed, 349 insertions(+), 377 deletions(-) diff --git a/src/dorkbox/network/connection/EndPoint.java b/src/dorkbox/network/connection/EndPoint.java index 760d6eea..84a2bc94 100644 --- a/src/dorkbox/network/connection/EndPoint.java +++ b/src/dorkbox/network/connection/EndPoint.java @@ -30,8 +30,6 @@ import dorkbox.network.util.store.NullSettingsStore; import dorkbox.network.util.store.SettingsStore; import dorkbox.util.OS; import dorkbox.util.Property; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.entropy.Entropy; import dorkbox.util.exceptions.InitializationException; @@ -98,8 +96,7 @@ class EndPoint { public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime() .availableProcessors() * 2; /** - * The amount of time in milli-seconds to wait for this endpoint to close all - * {@link Channel}s and shutdown gracefully. + * The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully. */ @Property public static long maxShutdownWaitTimeInMilliSeconds = 2000L; // in milliseconds @@ -558,22 +555,7 @@ class EndPoint { this.connectionManager.closeConnections(); // Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed. - long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; - RegistrationWrapper registrationWrapper2 = this.registrationWrapper; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - MetaChannel metaChannel = entries.next().value; - metaChannel.close(maxShutdownWaitTimeInMilliSeconds); - Thread.yield(); - } - - channelMap.clear(); - - } finally { - registrationWrapper2.releaseChannelMap(); - } + this.registrationWrapper.closeChannels(maxShutdownWaitTimeInMilliSeconds); this.isConnected.set(false); } diff --git a/src/dorkbox/network/connection/RegistrationWrapper.java b/src/dorkbox/network/connection/RegistrationWrapper.java index b3e76aa3..dc69d4d9 100644 --- a/src/dorkbox/network/connection/RegistrationWrapper.java +++ b/src/dorkbox/network/connection/RegistrationWrapper.java @@ -15,12 +15,17 @@ */ package dorkbox.network.connection; +import com.esotericsoftware.kryo.util.ObjectMap; import dorkbox.network.connection.registration.MetaChannel; +import dorkbox.network.connection.registration.remote.RegistrationRemoteHandler; import dorkbox.network.pipeline.KryoEncoder; import dorkbox.network.pipeline.KryoEncoderCrypto; +import dorkbox.util.MathUtil; import dorkbox.util.collections.IntMap; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.exceptions.SecurityException; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; import org.bouncycastle.crypto.CipherParameters; import org.bouncycastle.crypto.params.ECPublicKeyParameters; import org.slf4j.Logger; @@ -29,14 +34,15 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.security.SecureRandom; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantLock; +import static dorkbox.network.connection.registration.remote.RegistrationRemoteHandler.checkEqual; + /** * Just wraps common/needed methods of the client/server endpoint by the registration stage/handshake. *

- * This is in the connection package, so it can access the endpoint methods that it needs to. + * This is in the connection package, so it can access the endpoint methods that it needs to (without having to publicly expose them) */ public class RegistrationWrapper implements UdpServer { @@ -52,8 +58,20 @@ class RegistrationWrapper implements UdpServer { private final IntMap channelMap = new IntMap(); // keeps track of connections (UDP-server) - // this is final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change) - private final ConcurrentMap udpRemoteMap; + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + private volatile ObjectMap udpRemoteMap; + + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + private final Object singleWriterLock1 = new Object(); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater udpRemoteMapREF = + AtomicReferenceFieldUpdater.newUpdater(RegistrationWrapper.class, + ObjectMap.class, + "udpRemoteMap"); public @@ -67,7 +85,7 @@ class RegistrationWrapper implements UdpServer { this.kryoEncoderCrypto = kryoEncoderCrypto; if (endPoint instanceof EndPointServer) { - this.udpRemoteMap = new ConcurrentHashMap(); + this.udpRemoteMap = new ObjectMap(32, ConnectionManager.LOAD_FACTOR); } else { this.udpRemoteMap = null; @@ -97,14 +115,14 @@ class RegistrationWrapper implements UdpServer { *

* Make SURE to use this in a try/finally block with releaseChannelMap in the finally block! */ - public + private IntMap getAndLockChannelMap() { // try to lock access, also guarantees that the contents of this map are visible across threads this.channelMapLock.lock(); return this.channelMap; } - public + private void releaseChannelMap() { // try to unlock access this.channelMapLock.unlock(); @@ -235,7 +253,18 @@ class RegistrationWrapper implements UdpServer { public final void registerServerUDP(final MetaChannel metaChannel) { if (metaChannel != null && metaChannel.udpRemoteAddress != null) { - this.udpRemoteMap.put(metaChannel.udpRemoteAddress, metaChannel.connection); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock1) { + // access a snapshot of the connections (single-writer-principle) + final ObjectMap udpRemoteMap = udpRemoteMapREF.get(this); + + udpRemoteMap.put(metaChannel.udpRemoteAddress, metaChannel.connection); + + // save this snapshot back to the original (single writer principle) + udpRemoteMapREF.lazySet(this, udpRemoteMap); + } this.logger.info("Connected to remote UDP connection. [{} <== {}]", metaChannel.udpChannel.localAddress(), @@ -250,7 +279,19 @@ class RegistrationWrapper implements UdpServer { public final void unRegisterServerUDP(final InetSocketAddress udpRemoteAddress) { if (udpRemoteAddress != null) { - this.udpRemoteMap.remove(udpRemoteAddress); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (singleWriterLock1) { + // access a snapshot of the connections (single-writer-principle) + final ObjectMap udpRemoteMap = udpRemoteMapREF.get(this); + + udpRemoteMap.remove(udpRemoteAddress); + + // save this snapshot back to the original (single writer principle) + udpRemoteMapREF.lazySet(this, udpRemoteMap); + } + logger.info("Closed remote UDP connection: {}", udpRemoteAddress); } } @@ -262,7 +303,9 @@ class RegistrationWrapper implements UdpServer { public ConnectionImpl getServerUDP(final InetSocketAddress udpRemoteAddress) { if (udpRemoteAddress != null) { - return this.udpRemoteMap.get(udpRemoteAddress); + // access a snapshot of the connections (single-writer-principle) + final ObjectMap udpRemoteMap = udpRemoteMapREF.get(this); + return udpRemoteMap.get(udpRemoteAddress); } else { return null; @@ -275,4 +318,244 @@ class RegistrationWrapper implements UdpServer { ((EndPointClient) this.endPoint).abortRegistration(); } } + + public + void addChannel(final int channelHashCodeOrId, final MetaChannel metaChannel) { + try { + IntMap channelMap = this.getAndLockChannelMap(); + channelMap.put(channelHashCodeOrId, metaChannel); + } finally { + this.releaseChannelMap(); + } + + } + + public + MetaChannel removeChannel(final int channelHashCodeOrId) { + try { + IntMap channelMap = getAndLockChannelMap(); + return channelMap.remove(channelHashCodeOrId); + } finally { + releaseChannelMap(); + } + } + + public + MetaChannel getChannel(final int channelHashCodeOrId) { + try { + IntMap channelMap = getAndLockChannelMap(); + return channelMap.get(channelHashCodeOrId); + } finally { + releaseChannelMap(); + } + } + + /** + * Closes all connections ONLY (keeps the server/client running). + * + * @param maxShutdownWaitTimeInMilliSeconds + * The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully. + */ + public + void closeChannels(final long maxShutdownWaitTimeInMilliSeconds) { + try { + IntMap channelMap = getAndLockChannelMap(); + IntMap.Entries entries = channelMap.entries(); + while (entries.hasNext()) { + MetaChannel metaChannel = entries.next().value; + metaChannel.close(maxShutdownWaitTimeInMilliSeconds); + Thread.yield(); + } + + channelMap.clear(); + + } finally { + releaseChannelMap(); + } + } + + /** + * Closes the specified connections ONLY (keeps the server/client running). + * + * @param maxShutdownWaitTimeInMilliSeconds + * The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully. + */ + public + MetaChannel closeChannel(final Channel channel, final long maxShutdownWaitTimeInMilliSeconds) { + try { + IntMap channelMap = getAndLockChannelMap(); + IntMap.Entries entries = channelMap.entries(); + while (entries.hasNext()) { + MetaChannel metaChannel = entries.next().value; + + if (metaChannel.localChannel == channel || + metaChannel.tcpChannel == channel || + metaChannel.udpChannel == channel || + metaChannel.udtChannel == channel) { + + entries.remove(); + metaChannel.close(maxShutdownWaitTimeInMilliSeconds); + return metaChannel; + } + } + } finally { + releaseChannelMap(); + } + + return null; + } + + /** + * now that we are CONNECTED, we want to remove ourselves (and channel ID's) from the map. + * they will be ADDED in another map, in the followup handler!! + */ + public + boolean setupChannels(final RegistrationRemoteHandler handler, final MetaChannel metaChannel) { + boolean registerServer = false; + + try { + IntMap channelMap = getAndLockChannelMap(); + + channelMap.remove(metaChannel.tcpChannel.hashCode()); + channelMap.remove(metaChannel.connectionID); + + + ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline(); + // The TCP channel is what calls this method, so we can use "this" for TCP, and the others are handled during the registration process + pipeline.remove(handler); + + if (metaChannel.udpChannel != null) { + // the setup is different between CLIENT / SERVER + if (metaChannel.udpRemoteAddress == null) { + // CLIENT RUNS THIS + // don't want to muck with the SERVER udp pipeline, as it NEVER CHANGES. + // More specifically, the UDP SERVER doesn't use a channelMap, it uses the udpRemoteMap + // to keep track of UDP connections. This is very different than how the client works + // only the client will have the udp remote address + channelMap.remove(metaChannel.udpChannel.hashCode()); + } + else { + // SERVER RUNS THIS + // don't ALWAYS have UDP on SERVER... + registerServer = true; + } + } + } finally { + releaseChannelMap(); + } + + return registerServer; + } + + public + Integer initializeChannel(final MetaChannel metaChannel) { + Integer connectionID = MathUtil.randomInt(); + try { + IntMap channelMap = getAndLockChannelMap(); + while (channelMap.containsKey(connectionID)) { + connectionID = MathUtil.randomInt(); + } + + metaChannel.connectionID = connectionID; + channelMap.put(connectionID, metaChannel); + + } finally { + releaseChannelMap(); + } + + return connectionID; + } + + public + boolean associateChannels(final Channel channel, final InetAddress remoteAddress, final boolean isUdt) { + boolean success = false; + + try { + IntMap channelMap = getAndLockChannelMap(); + IntMap.Entries entries = channelMap.entries(); + while (entries.hasNext()) { + MetaChannel metaChannel = entries.next().value; + + // associate TCP and UDP! + final InetSocketAddress inetSocketAddress = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress(); + InetAddress tcpRemoteServer = inetSocketAddress.getAddress(); + if (checkEqual(tcpRemoteServer, remoteAddress)) { + channelMap.put(channel.hashCode(), metaChannel); + if (isUdt) { + metaChannel.udtChannel = channel; + } + else { + metaChannel.udpChannel = channel; + } + success = true; + // only allow one server per registration! + break; + } + } + } finally { + releaseChannelMap(); + } + + return success; + } + + public + MetaChannel getAssociatedChannel_UDT(final InetAddress remoteAddress) { + try { + MetaChannel metaChannel; + IntMap channelMap = getAndLockChannelMap(); + IntMap.Entries entries = channelMap.entries(); + + while (entries.hasNext()) { + metaChannel = entries.next().value; + + // only look at connections that do not have UDP already setup. + if (metaChannel.udtChannel == null) { + InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress(); + InetAddress tcpRemoteAddress = tcpRemote.getAddress(); + + if (RegistrationRemoteHandler.checkEqual(tcpRemoteAddress, remoteAddress)) { + return metaChannel; + } + else { + return null; + } + } + } + } finally { + releaseChannelMap(); + } + + return null; + } + + public + MetaChannel getAssociatedChannel_UDP(final InetAddress remoteAddress) { + try { + MetaChannel metaChannel; + IntMap channelMap = getAndLockChannelMap(); + IntMap.Entries entries = channelMap.entries(); + + while (entries.hasNext()) { + metaChannel = entries.next().value; + + // only look at connections that do not have UDP already setup. + if (metaChannel.udpChannel == null) { + InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress(); + InetAddress tcpRemoteAddress = tcpRemote.getAddress(); + + if (RegistrationRemoteHandler.checkEqual(tcpRemoteAddress, remoteAddress)) { + return metaChannel; + } + else { + return null; + } + } + } + } finally { + releaseChannelMap(); + } + + return null; + } } diff --git a/src/dorkbox/network/connection/registration/Registration.java b/src/dorkbox/network/connection/registration/Registration.java index 0cedb9c9..3f22c14e 100644 --- a/src/dorkbox/network/connection/registration/Registration.java +++ b/src/dorkbox/network/connection/registration/Registration.java @@ -18,12 +18,18 @@ package dorkbox.network.connection.registration; import org.bouncycastle.crypto.params.ECPublicKeyParameters; import org.bouncycastle.crypto.params.IESParameters; - /** - * Internal message to handle the TCP/UDP registration process + * Internal message to handle the TCP/UDP/UDT registration process */ public class Registration { + public static final byte notAdroid = (byte) 0; + public static final byte android = (byte) 1; + + + // signals which serialization is possible. If they match, then UNSAFE can be used (except android. it always must use ASM) + public byte connectionType; + public ECPublicKeyParameters publicKey; public IESParameters eccParameters; public byte[] aesKey; diff --git a/src/dorkbox/network/connection/registration/RegistrationHandler.java b/src/dorkbox/network/connection/registration/RegistrationHandler.java index ec43eaf1..29811968 100644 --- a/src/dorkbox/network/connection/registration/RegistrationHandler.java +++ b/src/dorkbox/network/connection/registration/RegistrationHandler.java @@ -16,9 +16,8 @@ package dorkbox.network.connection.registration; import dorkbox.network.connection.Connection; +import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.RegistrationWrapper; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -95,22 +94,10 @@ class RegistrationHandler extends ChannelInboundHandlerAda // also, once we notify, we unregister this. if (registrationWrapper != null) { - try { - IntMap channelMap = registrationWrapper.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - MetaChannel metaChannel = entries.next().value; - if (metaChannel.localChannel == channel || metaChannel.tcpChannel == channel || metaChannel.udpChannel == channel) { - entries.remove(); - metaChannel.close(); - return metaChannel; - } - } + MetaChannel metaChannel = registrationWrapper.closeChannel(channel, EndPoint.maxShutdownWaitTimeInMilliSeconds); + registrationWrapper.abortRegistrationIfClient(); - } finally { - registrationWrapper.releaseChannelMap(); - registrationWrapper.abortRegistrationIfClient(); - } + return metaChannel; } return null; diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java index 8d591f5d..43c88b47 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java @@ -16,18 +16,17 @@ package dorkbox.network.connection.registration.local; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.RegistrationHandler; import dorkbox.network.pipeline.LocalRmiDecoder; import dorkbox.network.pipeline.LocalRmiEncoder; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; +import static dorkbox.network.connection.EndPoint.maxShutdownWaitTimeInMilliSeconds; + public abstract class RegistrationLocalHandler extends RegistrationHandler { protected static final String LOCAL_RMI_ENCODER = "localRmiEncoder"; @@ -50,12 +49,7 @@ class RegistrationLocalHandler extends RegistrationHandler MetaChannel metaChannel = new MetaChannel(); metaChannel.localChannel = channel; - try { - IntMap channelMap = this.registrationWrapper.getAndLockChannelMap(); - channelMap.put(channel.hashCode(), metaChannel); - } finally { - this.registrationWrapper.releaseChannelMap(); - } + this.registrationWrapper.addChannel(channel.hashCode(), metaChannel); Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { @@ -93,25 +87,8 @@ class RegistrationLocalHandler extends RegistrationHandler this.logger.info("Closed LOCAL connection: {}", channel.remoteAddress()); - long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; - // also, once we notify, we unregister this. - - try { - IntMap channelMap = this.registrationWrapper.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - MetaChannel metaChannel = entries.next().value; - - if (metaChannel.localChannel == channel) { - metaChannel.close(maxShutdownWaitTimeInMilliSeconds); - entries.remove(); - break; - } - } - } finally { - this.registrationWrapper.releaseChannelMap(); - } + registrationWrapper.closeChannel(channel, maxShutdownWaitTimeInMilliSeconds); super.channelInactive(context); } diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java index dbe3cdb0..d641f833 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java @@ -20,7 +20,6 @@ import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; -import dorkbox.util.collections.IntMap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -67,13 +66,7 @@ class RegistrationLocalHandlerClient extends RegistrationL Channel channel = context.channel(); - MetaChannel metaChannel = null; - try { - IntMap channelMap = registrationWrapper.getAndLockChannelMap(); - metaChannel = channelMap.remove(channel.hashCode()); - } finally { - registrationWrapper.releaseChannelMap(); - } + MetaChannel metaChannel = this.registrationWrapper.removeChannel(channel.hashCode()); // have to setup new listeners if (metaChannel != null) { diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java index 344f6b07..31748bd7 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java @@ -19,7 +19,6 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; -import dorkbox.util.collections.IntMap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -77,14 +76,9 @@ class RegistrationLocalHandlerServer extends RegistrationL } ConnectionImpl connection = null; - try { - IntMap channelMap = this.registrationWrapper.getAndLockChannelMap(); - MetaChannel metaChannel = channelMap.remove(channel.hashCode()); - if (metaChannel != null) { - connection = metaChannel.connection; - } - } finally { - this.registrationWrapper.releaseChannelMap(); + MetaChannel metaChannel = this.registrationWrapper.removeChannel(channel.hashCode()); + if (metaChannel != null) { + connection = metaChannel.connection; } if (connection != null) { diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index a1e664a4..85de466c 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -18,7 +18,6 @@ package dorkbox.network.connection.registration.remote; import com.esotericsoftware.kryo.io.Input; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.RegistrationHandler; @@ -27,8 +26,6 @@ import dorkbox.network.pipeline.KryoDecoderCrypto; import dorkbox.network.pipeline.udp.KryoDecoderUdpCrypto; import dorkbox.network.pipeline.udp.KryoEncoderUdpCrypto; import dorkbox.network.util.CryptoSerializationManager; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.serialization.EccPublicKeySerializer; import io.netty.channel.Channel; @@ -52,6 +49,8 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.TimeUnit; +import static dorkbox.network.connection.EndPoint.maxShutdownWaitTimeInMilliSeconds; + public abstract class RegistrationRemoteHandler extends RegistrationHandler { protected static final String KRYO_ENCODER = "kryoEncoder"; @@ -365,40 +364,9 @@ class RegistrationRemoteHandler extends RegistrationHandle @SuppressWarnings("AutoUnboxing") protected final void setupConnection(MetaChannel metaChannel) { - boolean registerServer = false; - // now that we are CONNECTED, we want to remove ourselves (and channel ID's) from the map. // they will be ADDED in another map, in the followup handler!! - try { - IntMap channelMap = this.registrationWrapper.getAndLockChannelMap(); - - channelMap.remove(metaChannel.tcpChannel.hashCode()); - channelMap.remove(metaChannel.connectionID); - - - ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline(); - // The TCP channel is what calls this method, so we can use "this" for TCP, and the others are handled during the registration process - pipeline.remove(this); - - if (metaChannel.udpChannel != null) { - // the setup is different between CLIENT / SERVER - if (metaChannel.udpRemoteAddress == null) { - // CLIENT RUNS THIS - // don't want to muck with the SERVER udp pipeline, as it NEVER CHANGES. - // More specifically, the UDP SERVER doesn't use a channelMap, it uses the udpRemoteMap - // to keep track of UDP connections. This is very different than how the client works - // only the client will have the udp remote address - channelMap.remove(metaChannel.udpChannel.hashCode()); - } - else { - // SERVER RUNS THIS - // don't ALWAYS have UDP on SERVER... - registerServer = true; - } - } - } finally { - this.registrationWrapper.releaseChannelMap(); - } + boolean registerServer = this.registrationWrapper.setupChannels(this, metaChannel); if (registerServer) { // Only called if we have a UDP channel @@ -445,27 +413,11 @@ class RegistrationRemoteHandler extends RegistrationHandle this.logger.info("Closed connection: {}", channel.remoteAddress()); - long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; // also, once we notify, we unregister this. // SEARCH for our channel! // on the server, we only get this for TCP events! - try { - IntMap channelMap = this.registrationWrapper.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - MetaChannel metaChannel = entries.next().value; - - if (metaChannel.tcpChannel == channel || metaChannel.udpChannel == channel || metaChannel.udtChannel == channel) { - metaChannel.close(maxShutdownWaitTimeInMilliSeconds); - entries.remove(); - break; - } - } - - } finally { - this.registrationWrapper.releaseChannelMap(); - } + this.registrationWrapper.closeChannel(channel, maxShutdownWaitTimeInMilliSeconds); super.channelInactive(context); } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java index ffdce624..a644b164 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientTCP.java @@ -23,7 +23,6 @@ import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteArray; -import dorkbox.util.collections.IntMap; import dorkbox.util.crypto.CryptoAES; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.exceptions.SecurityException; @@ -131,12 +130,7 @@ class RegistrationRemoteHandlerClientTCP extends Registrat MetaChannel metaChannel = new MetaChannel(); metaChannel.tcpChannel = channel; - try { - IntMap channelMap = this.registrationWrapper.getAndLockChannelMap(); - channelMap.put(channel.hashCode(), metaChannel); - } finally { - this.registrationWrapper.releaseChannelMap(); - } + this.registrationWrapper.addChannel(channel.hashCode(), metaChannel); Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { @@ -160,13 +154,7 @@ class RegistrationRemoteHandlerClientTCP extends Registrat Logger logger2 = this.logger; if (message instanceof Registration) { // make sure this connection was properly registered in the map. (IT SHOULD BE) - MetaChannel metaChannel = null; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - metaChannel = channelMap.get(channel.hashCode()); - } finally { - registrationWrapper2.releaseChannelMap(); - } + MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode()); //noinspection StatementWithEmptyBody if (metaChannel != null) { @@ -257,12 +245,7 @@ class RegistrationRemoteHandlerClientTCP extends Registrat metaChannel.ecdhKey = CryptoECC.generateKeyPair(eccSpec, new SecureRandom()); // register the channel! - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - channelMap.put(metaChannel.connectionID, metaChannel); - } finally { - registrationWrapper2.releaseChannelMap(); - } + registrationWrapper2.addChannel(metaChannel.connectionID, metaChannel); metaChannel.publicKey = registration.publicKey; diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java index 62f599ce..fc22b6b5 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDP.java @@ -23,13 +23,10 @@ import dorkbox.network.pipeline.udp.KryoDecoderUdp; import dorkbox.network.pipeline.udp.KryoEncoderUdp; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteArray; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.CryptoAES; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import java.io.IOException; @@ -82,33 +79,12 @@ class RegistrationRemoteHandlerClientUDP extends Registrat // The ORDER has to be TCP (always) -> UDP (optional) -> UDT (optional) // UDP - boolean success = false; + InetSocketAddress udpRemoteAddress = (InetSocketAddress) channel.remoteAddress(); if (udpRemoteAddress != null) { InetAddress udpRemoteServer = udpRemoteAddress.getAddress(); - - RegistrationWrapper registrationWrapper2 = this.registrationWrapper; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - MetaChannel metaChannel = entries.next().value; - - // associate TCP and UDP! - InetAddress tcpRemoteServer = ((InetSocketAddress) metaChannel.tcpChannel.remoteAddress()).getAddress(); - if (checkEqual(tcpRemoteServer, udpRemoteServer)) { - channelMap.put(channel.hashCode(), metaChannel); - metaChannel.udpChannel = channel; - success = true; - // only allow one server per registration! - break; - } - } - } finally { - registrationWrapper2.releaseChannelMap(); - } - + boolean success = registrationWrapper.associateChannels(channel, udpRemoteServer, false); if (!success) { throw new IOException("UDP cannot connect to a remote server before TCP is established!"); } @@ -135,14 +111,8 @@ class RegistrationRemoteHandlerClientUDP extends Registrat // if we also have a UDP channel, we will receive the "connected" message on UDP (otherwise it will be on TCP) - MetaChannel metaChannel = null; RegistrationWrapper registrationWrapper2 = this.registrationWrapper; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - metaChannel = channelMap.get(channel.hashCode()); - } finally { - registrationWrapper2.releaseChannelMap(); - } + MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode()); if (metaChannel != null) { if (message instanceof Registration) { @@ -154,20 +124,12 @@ class RegistrationRemoteHandlerClientUDP extends Registrat if (!OptimizeUtilsByteArray.canReadInt(payload)) { this.logger.error("Invalid decryption of connection ID. Aborting."); shutdown(registrationWrapper2, channel); - - ReferenceCountUtil.release(message); return; } Integer connectionID = OptimizeUtilsByteArray.readInt(payload, true); - MetaChannel metaChannel2 = null; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - metaChannel2 = channelMap.get(connectionID); - } finally { - registrationWrapper2.releaseChannelMap(); - } + MetaChannel metaChannel2 = registrationWrapper2.getChannel(connectionID); if (metaChannel2 != null) { // hooray! we are successful @@ -189,7 +151,6 @@ class RegistrationRemoteHandlerClientUDP extends Registrat .remove(this); // if we are NOT done, then we will continue registering other protocols, so do nothing else here. - ReferenceCountUtil.release(message); return; } } @@ -199,7 +160,5 @@ class RegistrationRemoteHandlerClientUDP extends Registrat this.logger.error("Error registering UDP with remote server!"); shutdown(registrationWrapper2, channel); - - ReferenceCountUtil.release(message); } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java index f7171df2..b289cbc7 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClientUDT.java @@ -21,8 +21,6 @@ import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteArray; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.CryptoAES; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -73,33 +71,11 @@ class RegistrationRemoteHandlerClientUDT extends Registrat // The ORDER has to be TCP (always) -> UDP (optional) -> UDT (optional) // UDT - boolean success = false; InetSocketAddress udtRemoteAddress = (InetSocketAddress) channel.remoteAddress(); if (udtRemoteAddress != null) { InetAddress udtRemoteServer = udtRemoteAddress.getAddress(); - RegistrationWrapper registrationWrapper2 = this.registrationWrapper; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - MetaChannel metaChannel = entries.next().value; - - // associate TCP and UDP! - InetAddress tcpRemoteServer = ((InetSocketAddress) metaChannel.tcpChannel.remoteAddress()).getAddress(); - if (checkEqual(tcpRemoteServer, udtRemoteServer)) { - channelMap.put(channel.hashCode(), metaChannel); - metaChannel.udtChannel = channel; - success = true; - // only allow one server per registration! - break; - } - } - - } finally { - registrationWrapper2.releaseChannelMap(); - } - + boolean success = registrationWrapper.associateChannels(channel, udtRemoteServer, true); if (!success) { throw new IOException("UDT cannot connect to a remote server before TCP is established!"); } @@ -126,15 +102,8 @@ class RegistrationRemoteHandlerClientUDT extends Registrat Channel channel = context.channel(); // if we also have a UDP channel, we will receive the "connected" message on UDP (otherwise it will be on TCP) - MetaChannel metaChannel = null; - RegistrationWrapper registrationWrapper2 = this.registrationWrapper; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - metaChannel = channelMap.get(channel.hashCode()); - } finally { - registrationWrapper2.releaseChannelMap(); - } + MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode()); Logger logger2 = this.logger; if (metaChannel != null) { @@ -153,14 +122,7 @@ class RegistrationRemoteHandlerClientUDT extends Registrat } Integer connectionID = OptimizeUtilsByteArray.readInt(payload, true); - - MetaChannel metaChannel2 = null; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - metaChannel2 = channelMap.get(connectionID); - } finally { - registrationWrapper2.releaseChannelMap(); - } + MetaChannel metaChannel2 = registrationWrapper2.getChannel(connectionID); if (metaChannel2 != null) { // hooray! we are successful diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java index d65baacc..27917295 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerTCP.java @@ -22,9 +22,7 @@ import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.util.CryptoSerializationManager; -import dorkbox.util.MathUtil; import dorkbox.util.bytes.OptimizeUtilsByteArray; -import dorkbox.util.collections.IntMap; import dorkbox.util.crypto.CryptoAES; import dorkbox.util.crypto.CryptoECC; import dorkbox.util.serialization.EccPublicKeySerializer; @@ -106,12 +104,7 @@ class RegistrationRemoteHandlerServerTCP extends Registrat MetaChannel metaChannel = new MetaChannel(); metaChannel.tcpChannel = channel; - try { - IntMap channelMap = this.registrationWrapper.getAndLockChannelMap(); - channelMap.put(channel.hashCode(), metaChannel); - } finally { - this.registrationWrapper.releaseChannelMap(); - } + this.registrationWrapper.addChannel(channel.hashCode(), metaChannel); Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { @@ -134,13 +127,7 @@ class RegistrationRemoteHandlerServerTCP extends Registrat if (message instanceof Registration) { Registration registration = (Registration) message; - MetaChannel metaChannel = null; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - metaChannel = channelMap.get(channel.hashCode()); - } finally { - registrationWrapper2.releaseChannelMap(); - } + MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode()); // make sure this connection was properly registered in the map. (IT SHOULD BE) Logger logger2 = this.logger; @@ -177,21 +164,10 @@ class RegistrationRemoteHandlerServerTCP extends Registrat } - Integer connectionID = MathUtil.randomInt(); + // if I'm unlucky, keep from confusing connections! - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - while (channelMap.containsKey(connectionID)) { - connectionID = MathUtil.randomInt(); - } - - metaChannel.connectionID = connectionID; - channelMap.put(connectionID, metaChannel); - - } finally { - registrationWrapper2.releaseChannelMap(); - } + Integer connectionID = registrationWrapper2.initializeChannel(metaChannel); Registration register = new Registration(); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java index f75687f0..49d5b147 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java @@ -18,6 +18,7 @@ package dorkbox.network.connection.registration.remote; import dorkbox.network.Broadcast; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; @@ -25,8 +26,6 @@ import dorkbox.network.connection.registration.Registration; import dorkbox.network.connection.wrapper.UdpWrapper; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteArray; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.CryptoAES; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -165,14 +164,15 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo private void receivedUDP(final ChannelHandlerContext context, final Channel channel, - final ByteBuf data, + final ByteBuf message, final InetSocketAddress udpRemoteAddress) throws Exception { + // registration is the ONLY thing NOT encrypted Logger logger2 = this.logger; RegistrationWrapper registrationWrapper2 = this.registrationWrapper; CryptoSerializationManager serializationManager2 = this.serializationManager; - if (KryoCryptoSerializationManager.isEncrypted(data)) { + if (KryoCryptoSerializationManager.isEncrypted(message)) { // we need to FORWARD this message "down the pipeline". ConnectionImpl connection = registrationWrapper2.getServerUDP(udpRemoteAddress); @@ -182,7 +182,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo Object object; try { - object = serializationManager2.readWithCrypto(connection, data, data.writerIndex()); + object = serializationManager2.readWithCrypto(connection, message, message.writerIndex()); } catch (Exception e) { logger2.error("UDP unable to deserialize buffer", e); shutdown(registrationWrapper2, channel); @@ -202,7 +202,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo Object object; try { - object = serializationManager2.read(data, data.writerIndex()); + object = serializationManager2.read(message, message.writerIndex()); } catch (Exception e) { logger2.error("UDP unable to deserialize buffer", e); shutdown(registrationWrapper2, channel); @@ -210,40 +210,11 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo } if (object instanceof Registration) { - boolean matches = false; - MetaChannel metaChannel = null; + // find out and make sure that UDP and TCP are talking to the same server + InetAddress udpRemoteServer = udpRemoteAddress.getAddress(); + MetaChannel metaChannel = registrationWrapper2.getAssociatedChannel_UDP(udpRemoteServer); - try { - // find out and make sure that UDP and TCP are talking to the same server - InetAddress udpRemoteServer = udpRemoteAddress.getAddress(); - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - - while (entries.hasNext()) { - metaChannel = entries.next().value; - - // only look at connections that do not have UDP already setup. - if (metaChannel.udpChannel == null) { - InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress(); - InetAddress tcpRemoteAddress = tcpRemote.getAddress(); - - if (RegistrationRemoteHandler.checkEqual(tcpRemoteAddress, udpRemoteServer)) { - matches = true; - break; - } - else { - logger2.error("Mismatch UDP and TCP client addresses! UDP: {} TCP: {}", udpRemoteServer, tcpRemoteAddress); - shutdown(registrationWrapper2, channel); - return; - } - } - } - } finally { - registrationWrapper2.releaseChannelMap(); - } - - - if (matches) { + if (metaChannel != null) { // associate TCP and UDP! metaChannel.udpChannel = channel; metaChannel.udpRemoteAddress = udpRemoteAddress; @@ -269,7 +240,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo } else { // if we get here, there was a failure! - logger2.error("Error trying to register UDP without udp specified! UDP: {}", udpRemoteAddress); + logger2.error("Error trying to register UDP with incorrect udp specified! UDP: {}", udpRemoteAddress); shutdown(registrationWrapper2, channel); } } @@ -295,21 +266,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo // also, once we notify, we unregister this. if (registrationWrapper != null) { - try { - IntMap channelMap = registrationWrapper.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - MetaChannel metaChannel = entries.next().value; - if (metaChannel.localChannel == channel || metaChannel.tcpChannel == channel || metaChannel.udpChannel == channel) { - entries.remove(); - metaChannel.close(); - return metaChannel; - } - } - - } finally { - registrationWrapper.releaseChannelMap(); - } + return registrationWrapper.closeChannel(channel, EndPoint.maxShutdownWaitTimeInMilliSeconds); } return null; diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDT.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDT.java index 717947eb..9a64bb52 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDT.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDT.java @@ -21,12 +21,9 @@ import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteArray; -import dorkbox.util.collections.IntMap; -import dorkbox.util.collections.IntMap.Entries; import dorkbox.util.crypto.CryptoAES; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import java.net.InetAddress; @@ -80,41 +77,8 @@ class RegistrationRemoteHandlerServerUDT extends Registrat // find out and make sure that UDP and TCP are talking to the same server InetAddress udtRemoteAddress = ((InetSocketAddress) channel.remoteAddress()).getAddress(); - boolean matches = false; - MetaChannel metaChannel = null; - try { - IntMap channelMap = registrationWrapper2.getAndLockChannelMap(); - Entries entries = channelMap.entries(); - while (entries.hasNext()) { - metaChannel = entries.next().value; - - // only look at connections that do not have UDT already setup. - if (metaChannel.udtChannel == null) { - InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress(); - InetAddress tcpRemoteAddress = tcpRemote.getAddress(); - - if (checkEqual(tcpRemoteAddress, udtRemoteAddress)) { - matches = true; - } - else { - if (logger2.isErrorEnabled()) { - logger2.error(this.name, - "Mismatch UDT and TCP client addresses! UDP: {} TCP: {}", - udtRemoteAddress, - tcpRemoteAddress); - } - shutdown(registrationWrapper2, channel); - ReferenceCountUtil.release(message); - return; - } - } - } - - } finally { - registrationWrapper2.releaseChannelMap(); - } - - if (matches) { + MetaChannel metaChannel = registrationWrapper2.getAssociatedChannel_UDT(udtRemoteAddress); + if (metaChannel != null) { // associate TCP and UDT! metaChannel.udtChannel = channel; @@ -142,15 +106,13 @@ class RegistrationRemoteHandlerServerUDT extends Registrat if (logger2.isTraceEnabled()) { logger2.trace("Register UDT connection from {}", udtRemoteAddress); } - ReferenceCountUtil.release(message); } else { // if we get here, there was a failure! if (logger2.isErrorEnabled()) { - logger2.error("Error trying to register UDT without udt specified! UDT: {}", udtRemoteAddress); + logger2.error("Error trying to register UDT with incorrect udt specified! UDT: {}", udtRemoteAddress); } shutdown(registrationWrapper2, channel); - ReferenceCountUtil.release(message); } } else { @@ -158,7 +120,6 @@ class RegistrationRemoteHandlerServerUDT extends Registrat logger2.error("UDT attempting to spoof client! Unencrypted packet other than registration received."); } shutdown(registrationWrapper2, channel); - ReferenceCountUtil.release(message); } } }