Moved channel registration into registration wrapper

This commit is contained in:
nathan 2016-04-02 17:00:40 +02:00
parent 666b391514
commit 80ab10df01
14 changed files with 349 additions and 377 deletions

View File

@ -30,8 +30,6 @@ import dorkbox.network.util.store.NullSettingsStore;
import dorkbox.network.util.store.SettingsStore;
import dorkbox.util.OS;
import dorkbox.util.Property;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.entropy.Entropy;
import dorkbox.util.exceptions.InitializationException;
@ -98,8 +96,7 @@ class EndPoint<C extends Connection> {
public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime()
.availableProcessors() * 2;
/**
* The amount of time in milli-seconds to wait for this endpoint to close all
* {@link Channel}s and shutdown gracefully.
* The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully.
*/
@Property
public static long maxShutdownWaitTimeInMilliSeconds = 2000L; // in milliseconds
@ -558,22 +555,7 @@ class EndPoint<C extends Connection> {
this.connectionManager.closeConnections();
// Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed.
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
metaChannel.close(maxShutdownWaitTimeInMilliSeconds);
Thread.yield();
}
channelMap.clear();
} finally {
registrationWrapper2.releaseChannelMap();
}
this.registrationWrapper.closeChannels(maxShutdownWaitTimeInMilliSeconds);
this.isConnected.set(false);
}

View File

@ -15,12 +15,17 @@
*/
package dorkbox.network.connection;
import com.esotericsoftware.kryo.util.ObjectMap;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.remote.RegistrationRemoteHandler;
import dorkbox.network.pipeline.KryoEncoder;
import dorkbox.network.pipeline.KryoEncoderCrypto;
import dorkbox.util.MathUtil;
import dorkbox.util.collections.IntMap;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.exceptions.SecurityException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import org.bouncycastle.crypto.CipherParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.slf4j.Logger;
@ -29,14 +34,15 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import static dorkbox.network.connection.registration.remote.RegistrationRemoteHandler.checkEqual;
/**
* Just wraps common/needed methods of the client/server endpoint by the registration stage/handshake.
* <p/>
* This is in the connection package, so it can access the endpoint methods that it needs to.
* This is in the connection package, so it can access the endpoint methods that it needs to (without having to publicly expose them)
*/
public
class RegistrationWrapper<C extends Connection> implements UdpServer {
@ -52,8 +58,20 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
private final IntMap<MetaChannel> channelMap = new IntMap<MetaChannel>();
// keeps track of connections (UDP-server)
// this is final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change)
private final ConcurrentMap<InetSocketAddress, ConnectionImpl> udpRemoteMap;
@SuppressWarnings({"FieldCanBeLocal", "unused"})
private volatile ObjectMap<InetSocketAddress, ConnectionImpl> udpRemoteMap;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object singleWriterLock1 = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<RegistrationWrapper, ObjectMap> udpRemoteMapREF =
AtomicReferenceFieldUpdater.newUpdater(RegistrationWrapper.class,
ObjectMap.class,
"udpRemoteMap");
public
@ -67,7 +85,7 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
this.kryoEncoderCrypto = kryoEncoderCrypto;
if (endPoint instanceof EndPointServer) {
this.udpRemoteMap = new ConcurrentHashMap<InetSocketAddress, ConnectionImpl>();
this.udpRemoteMap = new ObjectMap<InetSocketAddress, ConnectionImpl>(32, ConnectionManager.LOAD_FACTOR);
}
else {
this.udpRemoteMap = null;
@ -97,14 +115,14 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
* <p/>
* Make SURE to use this in a try/finally block with releaseChannelMap in the finally block!
*/
public
private
IntMap<MetaChannel> getAndLockChannelMap() {
// try to lock access, also guarantees that the contents of this map are visible across threads
this.channelMapLock.lock();
return this.channelMap;
}
public
private
void releaseChannelMap() {
// try to unlock access
this.channelMapLock.unlock();
@ -235,7 +253,18 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
public final
void registerServerUDP(final MetaChannel metaChannel) {
if (metaChannel != null && metaChannel.udpRemoteAddress != null) {
this.udpRemoteMap.put(metaChannel.udpRemoteAddress, metaChannel.connection);
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the connections (single-writer-principle)
final ObjectMap<InetSocketAddress, ConnectionImpl> udpRemoteMap = udpRemoteMapREF.get(this);
udpRemoteMap.put(metaChannel.udpRemoteAddress, metaChannel.connection);
// save this snapshot back to the original (single writer principle)
udpRemoteMapREF.lazySet(this, udpRemoteMap);
}
this.logger.info("Connected to remote UDP connection. [{} <== {}]",
metaChannel.udpChannel.localAddress(),
@ -250,7 +279,19 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
public final
void unRegisterServerUDP(final InetSocketAddress udpRemoteAddress) {
if (udpRemoteAddress != null) {
this.udpRemoteMap.remove(udpRemoteAddress);
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the connections (single-writer-principle)
final ObjectMap<InetSocketAddress, ConnectionImpl> udpRemoteMap = udpRemoteMapREF.get(this);
udpRemoteMap.remove(udpRemoteAddress);
// save this snapshot back to the original (single writer principle)
udpRemoteMapREF.lazySet(this, udpRemoteMap);
}
logger.info("Closed remote UDP connection: {}", udpRemoteAddress);
}
}
@ -262,7 +303,9 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
public
ConnectionImpl getServerUDP(final InetSocketAddress udpRemoteAddress) {
if (udpRemoteAddress != null) {
return this.udpRemoteMap.get(udpRemoteAddress);
// access a snapshot of the connections (single-writer-principle)
final ObjectMap<InetSocketAddress, ConnectionImpl> udpRemoteMap = udpRemoteMapREF.get(this);
return udpRemoteMap.get(udpRemoteAddress);
}
else {
return null;
@ -275,4 +318,244 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
((EndPointClient<C>) this.endPoint).abortRegistration();
}
}
public
void addChannel(final int channelHashCodeOrId, final MetaChannel metaChannel) {
try {
IntMap<MetaChannel> channelMap = this.getAndLockChannelMap();
channelMap.put(channelHashCodeOrId, metaChannel);
} finally {
this.releaseChannelMap();
}
}
public
MetaChannel removeChannel(final int channelHashCodeOrId) {
try {
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
return channelMap.remove(channelHashCodeOrId);
} finally {
releaseChannelMap();
}
}
public
MetaChannel getChannel(final int channelHashCodeOrId) {
try {
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
return channelMap.get(channelHashCodeOrId);
} finally {
releaseChannelMap();
}
}
/**
* Closes all connections ONLY (keeps the server/client running).
*
* @param maxShutdownWaitTimeInMilliSeconds
* The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully.
*/
public
void closeChannels(final long maxShutdownWaitTimeInMilliSeconds) {
try {
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
IntMap.Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
metaChannel.close(maxShutdownWaitTimeInMilliSeconds);
Thread.yield();
}
channelMap.clear();
} finally {
releaseChannelMap();
}
}
/**
* Closes the specified connections ONLY (keeps the server/client running).
*
* @param maxShutdownWaitTimeInMilliSeconds
* The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully.
*/
public
MetaChannel closeChannel(final Channel channel, final long maxShutdownWaitTimeInMilliSeconds) {
try {
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
IntMap.Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
if (metaChannel.localChannel == channel ||
metaChannel.tcpChannel == channel ||
metaChannel.udpChannel == channel ||
metaChannel.udtChannel == channel) {
entries.remove();
metaChannel.close(maxShutdownWaitTimeInMilliSeconds);
return metaChannel;
}
}
} finally {
releaseChannelMap();
}
return null;
}
/**
* now that we are CONNECTED, we want to remove ourselves (and channel ID's) from the map.
* they will be ADDED in another map, in the followup handler!!
*/
public
boolean setupChannels(final RegistrationRemoteHandler<C> handler, final MetaChannel metaChannel) {
boolean registerServer = false;
try {
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
channelMap.remove(metaChannel.tcpChannel.hashCode());
channelMap.remove(metaChannel.connectionID);
ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline();
// The TCP channel is what calls this method, so we can use "this" for TCP, and the others are handled during the registration process
pipeline.remove(handler);
if (metaChannel.udpChannel != null) {
// the setup is different between CLIENT / SERVER
if (metaChannel.udpRemoteAddress == null) {
// CLIENT RUNS THIS
// don't want to muck with the SERVER udp pipeline, as it NEVER CHANGES.
// More specifically, the UDP SERVER doesn't use a channelMap, it uses the udpRemoteMap
// to keep track of UDP connections. This is very different than how the client works
// only the client will have the udp remote address
channelMap.remove(metaChannel.udpChannel.hashCode());
}
else {
// SERVER RUNS THIS
// don't ALWAYS have UDP on SERVER...
registerServer = true;
}
}
} finally {
releaseChannelMap();
}
return registerServer;
}
public
Integer initializeChannel(final MetaChannel metaChannel) {
Integer connectionID = MathUtil.randomInt();
try {
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
while (channelMap.containsKey(connectionID)) {
connectionID = MathUtil.randomInt();
}
metaChannel.connectionID = connectionID;
channelMap.put(connectionID, metaChannel);
} finally {
releaseChannelMap();
}
return connectionID;
}
public
boolean associateChannels(final Channel channel, final InetAddress remoteAddress, final boolean isUdt) {
boolean success = false;
try {
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
IntMap.Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
// associate TCP and UDP!
final InetSocketAddress inetSocketAddress = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress();
InetAddress tcpRemoteServer = inetSocketAddress.getAddress();
if (checkEqual(tcpRemoteServer, remoteAddress)) {
channelMap.put(channel.hashCode(), metaChannel);
if (isUdt) {
metaChannel.udtChannel = channel;
}
else {
metaChannel.udpChannel = channel;
}
success = true;
// only allow one server per registration!
break;
}
}
} finally {
releaseChannelMap();
}
return success;
}
public
MetaChannel getAssociatedChannel_UDT(final InetAddress remoteAddress) {
try {
MetaChannel metaChannel;
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
IntMap.Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
metaChannel = entries.next().value;
// only look at connections that do not have UDP already setup.
if (metaChannel.udtChannel == null) {
InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress();
InetAddress tcpRemoteAddress = tcpRemote.getAddress();
if (RegistrationRemoteHandler.checkEqual(tcpRemoteAddress, remoteAddress)) {
return metaChannel;
}
else {
return null;
}
}
}
} finally {
releaseChannelMap();
}
return null;
}
public
MetaChannel getAssociatedChannel_UDP(final InetAddress remoteAddress) {
try {
MetaChannel metaChannel;
IntMap<MetaChannel> channelMap = getAndLockChannelMap();
IntMap.Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
metaChannel = entries.next().value;
// only look at connections that do not have UDP already setup.
if (metaChannel.udpChannel == null) {
InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress();
InetAddress tcpRemoteAddress = tcpRemote.getAddress();
if (RegistrationRemoteHandler.checkEqual(tcpRemoteAddress, remoteAddress)) {
return metaChannel;
}
else {
return null;
}
}
}
} finally {
releaseChannelMap();
}
return null;
}
}

View File

@ -18,12 +18,18 @@ package dorkbox.network.connection.registration;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.bouncycastle.crypto.params.IESParameters;
/**
* Internal message to handle the TCP/UDP registration process
* Internal message to handle the TCP/UDP/UDT registration process
*/
public
class Registration {
public static final byte notAdroid = (byte) 0;
public static final byte android = (byte) 1;
// signals which serialization is possible. If they match, then UNSAFE can be used (except android. it always must use ASM)
public byte connectionType;
public ECPublicKeyParameters publicKey;
public IESParameters eccParameters;
public byte[] aesKey;

View File

@ -16,9 +16,8 @@
package dorkbox.network.connection.registration;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@ -95,22 +94,10 @@ class RegistrationHandler<C extends Connection> extends ChannelInboundHandlerAda
// also, once we notify, we unregister this.
if (registrationWrapper != null) {
try {
IntMap<MetaChannel> channelMap = registrationWrapper.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
if (metaChannel.localChannel == channel || metaChannel.tcpChannel == channel || metaChannel.udpChannel == channel) {
entries.remove();
metaChannel.close();
return metaChannel;
}
}
MetaChannel metaChannel = registrationWrapper.closeChannel(channel, EndPoint.maxShutdownWaitTimeInMilliSeconds);
registrationWrapper.abortRegistrationIfClient();
} finally {
registrationWrapper.releaseChannelMap();
registrationWrapper.abortRegistrationIfClient();
}
return metaChannel;
}
return null;

View File

@ -16,18 +16,17 @@
package dorkbox.network.connection.registration.local;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.RegistrationHandler;
import dorkbox.network.pipeline.LocalRmiDecoder;
import dorkbox.network.pipeline.LocalRmiEncoder;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import static dorkbox.network.connection.EndPoint.maxShutdownWaitTimeInMilliSeconds;
public abstract
class RegistrationLocalHandler<C extends Connection> extends RegistrationHandler<C> {
protected static final String LOCAL_RMI_ENCODER = "localRmiEncoder";
@ -50,12 +49,7 @@ class RegistrationLocalHandler<C extends Connection> extends RegistrationHandler
MetaChannel metaChannel = new MetaChannel();
metaChannel.localChannel = channel;
try {
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
channelMap.put(channel.hashCode(), metaChannel);
} finally {
this.registrationWrapper.releaseChannelMap();
}
this.registrationWrapper.addChannel(channel.hashCode(), metaChannel);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
@ -93,25 +87,8 @@ class RegistrationLocalHandler<C extends Connection> extends RegistrationHandler
this.logger.info("Closed LOCAL connection: {}", channel.remoteAddress());
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
// also, once we notify, we unregister this.
try {
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
if (metaChannel.localChannel == channel) {
metaChannel.close(maxShutdownWaitTimeInMilliSeconds);
entries.remove();
break;
}
}
} finally {
this.registrationWrapper.releaseChannelMap();
}
registrationWrapper.closeChannel(channel, maxShutdownWaitTimeInMilliSeconds);
super.channelInactive(context);
}

View File

@ -20,7 +20,6 @@ import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.util.collections.IntMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -67,13 +66,7 @@ class RegistrationLocalHandlerClient<C extends Connection> extends RegistrationL
Channel channel = context.channel();
MetaChannel metaChannel = null;
try {
IntMap<MetaChannel> channelMap = registrationWrapper.getAndLockChannelMap();
metaChannel = channelMap.remove(channel.hashCode());
} finally {
registrationWrapper.releaseChannelMap();
}
MetaChannel metaChannel = this.registrationWrapper.removeChannel(channel.hashCode());
// have to setup new listeners
if (metaChannel != null) {

View File

@ -19,7 +19,6 @@ import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.util.collections.IntMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -77,14 +76,9 @@ class RegistrationLocalHandlerServer<C extends Connection> extends RegistrationL
}
ConnectionImpl connection = null;
try {
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
MetaChannel metaChannel = channelMap.remove(channel.hashCode());
if (metaChannel != null) {
connection = metaChannel.connection;
}
} finally {
this.registrationWrapper.releaseChannelMap();
MetaChannel metaChannel = this.registrationWrapper.removeChannel(channel.hashCode());
if (metaChannel != null) {
connection = metaChannel.connection;
}
if (connection != null) {

View File

@ -18,7 +18,6 @@ package dorkbox.network.connection.registration.remote;
import com.esotericsoftware.kryo.io.Input;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.RegistrationHandler;
@ -27,8 +26,6 @@ import dorkbox.network.pipeline.KryoDecoderCrypto;
import dorkbox.network.pipeline.udp.KryoDecoderUdpCrypto;
import dorkbox.network.pipeline.udp.KryoEncoderUdpCrypto;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.serialization.EccPublicKeySerializer;
import io.netty.channel.Channel;
@ -52,6 +49,8 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static dorkbox.network.connection.EndPoint.maxShutdownWaitTimeInMilliSeconds;
public abstract
class RegistrationRemoteHandler<C extends Connection> extends RegistrationHandler<C> {
protected static final String KRYO_ENCODER = "kryoEncoder";
@ -365,40 +364,9 @@ class RegistrationRemoteHandler<C extends Connection> extends RegistrationHandle
@SuppressWarnings("AutoUnboxing")
protected final
void setupConnection(MetaChannel metaChannel) {
boolean registerServer = false;
// now that we are CONNECTED, we want to remove ourselves (and channel ID's) from the map.
// they will be ADDED in another map, in the followup handler!!
try {
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
channelMap.remove(metaChannel.tcpChannel.hashCode());
channelMap.remove(metaChannel.connectionID);
ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline();
// The TCP channel is what calls this method, so we can use "this" for TCP, and the others are handled during the registration process
pipeline.remove(this);
if (metaChannel.udpChannel != null) {
// the setup is different between CLIENT / SERVER
if (metaChannel.udpRemoteAddress == null) {
// CLIENT RUNS THIS
// don't want to muck with the SERVER udp pipeline, as it NEVER CHANGES.
// More specifically, the UDP SERVER doesn't use a channelMap, it uses the udpRemoteMap
// to keep track of UDP connections. This is very different than how the client works
// only the client will have the udp remote address
channelMap.remove(metaChannel.udpChannel.hashCode());
}
else {
// SERVER RUNS THIS
// don't ALWAYS have UDP on SERVER...
registerServer = true;
}
}
} finally {
this.registrationWrapper.releaseChannelMap();
}
boolean registerServer = this.registrationWrapper.setupChannels(this, metaChannel);
if (registerServer) {
// Only called if we have a UDP channel
@ -445,27 +413,11 @@ class RegistrationRemoteHandler<C extends Connection> extends RegistrationHandle
this.logger.info("Closed connection: {}", channel.remoteAddress());
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
// also, once we notify, we unregister this.
// SEARCH for our channel!
// on the server, we only get this for TCP events!
try {
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
if (metaChannel.tcpChannel == channel || metaChannel.udpChannel == channel || metaChannel.udtChannel == channel) {
metaChannel.close(maxShutdownWaitTimeInMilliSeconds);
entries.remove();
break;
}
}
} finally {
this.registrationWrapper.releaseChannelMap();
}
this.registrationWrapper.closeChannel(channel, maxShutdownWaitTimeInMilliSeconds);
super.channelInactive(context);
}

View File

@ -23,7 +23,6 @@ import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.crypto.CryptoAES;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.exceptions.SecurityException;
@ -131,12 +130,7 @@ class RegistrationRemoteHandlerClientTCP<C extends Connection> extends Registrat
MetaChannel metaChannel = new MetaChannel();
metaChannel.tcpChannel = channel;
try {
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
channelMap.put(channel.hashCode(), metaChannel);
} finally {
this.registrationWrapper.releaseChannelMap();
}
this.registrationWrapper.addChannel(channel.hashCode(), metaChannel);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
@ -160,13 +154,7 @@ class RegistrationRemoteHandlerClientTCP<C extends Connection> extends Registrat
Logger logger2 = this.logger;
if (message instanceof Registration) {
// make sure this connection was properly registered in the map. (IT SHOULD BE)
MetaChannel metaChannel = null;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
metaChannel = channelMap.get(channel.hashCode());
} finally {
registrationWrapper2.releaseChannelMap();
}
MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode());
//noinspection StatementWithEmptyBody
if (metaChannel != null) {
@ -257,12 +245,7 @@ class RegistrationRemoteHandlerClientTCP<C extends Connection> extends Registrat
metaChannel.ecdhKey = CryptoECC.generateKeyPair(eccSpec, new SecureRandom());
// register the channel!
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
channelMap.put(metaChannel.connectionID, metaChannel);
} finally {
registrationWrapper2.releaseChannelMap();
}
registrationWrapper2.addChannel(metaChannel.connectionID, metaChannel);
metaChannel.publicKey = registration.publicKey;

View File

@ -23,13 +23,10 @@ import dorkbox.network.pipeline.udp.KryoDecoderUdp;
import dorkbox.network.pipeline.udp.KryoEncoderUdp;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.CryptoAES;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import java.io.IOException;
@ -82,33 +79,12 @@ class RegistrationRemoteHandlerClientUDP<C extends Connection> extends Registrat
// The ORDER has to be TCP (always) -> UDP (optional) -> UDT (optional)
// UDP
boolean success = false;
InetSocketAddress udpRemoteAddress = (InetSocketAddress) channel.remoteAddress();
if (udpRemoteAddress != null) {
InetAddress udpRemoteServer = udpRemoteAddress.getAddress();
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
// associate TCP and UDP!
InetAddress tcpRemoteServer = ((InetSocketAddress) metaChannel.tcpChannel.remoteAddress()).getAddress();
if (checkEqual(tcpRemoteServer, udpRemoteServer)) {
channelMap.put(channel.hashCode(), metaChannel);
metaChannel.udpChannel = channel;
success = true;
// only allow one server per registration!
break;
}
}
} finally {
registrationWrapper2.releaseChannelMap();
}
boolean success = registrationWrapper.associateChannels(channel, udpRemoteServer, false);
if (!success) {
throw new IOException("UDP cannot connect to a remote server before TCP is established!");
}
@ -135,14 +111,8 @@ class RegistrationRemoteHandlerClientUDP<C extends Connection> extends Registrat
// if we also have a UDP channel, we will receive the "connected" message on UDP (otherwise it will be on TCP)
MetaChannel metaChannel = null;
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
metaChannel = channelMap.get(channel.hashCode());
} finally {
registrationWrapper2.releaseChannelMap();
}
MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode());
if (metaChannel != null) {
if (message instanceof Registration) {
@ -154,20 +124,12 @@ class RegistrationRemoteHandlerClientUDP<C extends Connection> extends Registrat
if (!OptimizeUtilsByteArray.canReadInt(payload)) {
this.logger.error("Invalid decryption of connection ID. Aborting.");
shutdown(registrationWrapper2, channel);
ReferenceCountUtil.release(message);
return;
}
Integer connectionID = OptimizeUtilsByteArray.readInt(payload, true);
MetaChannel metaChannel2 = null;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
metaChannel2 = channelMap.get(connectionID);
} finally {
registrationWrapper2.releaseChannelMap();
}
MetaChannel metaChannel2 = registrationWrapper2.getChannel(connectionID);
if (metaChannel2 != null) {
// hooray! we are successful
@ -189,7 +151,6 @@ class RegistrationRemoteHandlerClientUDP<C extends Connection> extends Registrat
.remove(this);
// if we are NOT done, then we will continue registering other protocols, so do nothing else here.
ReferenceCountUtil.release(message);
return;
}
}
@ -199,7 +160,5 @@ class RegistrationRemoteHandlerClientUDP<C extends Connection> extends Registrat
this.logger.error("Error registering UDP with remote server!");
shutdown(registrationWrapper2, channel);
ReferenceCountUtil.release(message);
}
}

View File

@ -21,8 +21,6 @@ import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.CryptoAES;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -73,33 +71,11 @@ class RegistrationRemoteHandlerClientUDT<C extends Connection> extends Registrat
// The ORDER has to be TCP (always) -> UDP (optional) -> UDT (optional)
// UDT
boolean success = false;
InetSocketAddress udtRemoteAddress = (InetSocketAddress) channel.remoteAddress();
if (udtRemoteAddress != null) {
InetAddress udtRemoteServer = udtRemoteAddress.getAddress();
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
// associate TCP and UDP!
InetAddress tcpRemoteServer = ((InetSocketAddress) metaChannel.tcpChannel.remoteAddress()).getAddress();
if (checkEqual(tcpRemoteServer, udtRemoteServer)) {
channelMap.put(channel.hashCode(), metaChannel);
metaChannel.udtChannel = channel;
success = true;
// only allow one server per registration!
break;
}
}
} finally {
registrationWrapper2.releaseChannelMap();
}
boolean success = registrationWrapper.associateChannels(channel, udtRemoteServer, true);
if (!success) {
throw new IOException("UDT cannot connect to a remote server before TCP is established!");
}
@ -126,15 +102,8 @@ class RegistrationRemoteHandlerClientUDT<C extends Connection> extends Registrat
Channel channel = context.channel();
// if we also have a UDP channel, we will receive the "connected" message on UDP (otherwise it will be on TCP)
MetaChannel metaChannel = null;
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
metaChannel = channelMap.get(channel.hashCode());
} finally {
registrationWrapper2.releaseChannelMap();
}
MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode());
Logger logger2 = this.logger;
if (metaChannel != null) {
@ -153,14 +122,7 @@ class RegistrationRemoteHandlerClientUDT<C extends Connection> extends Registrat
}
Integer connectionID = OptimizeUtilsByteArray.readInt(payload, true);
MetaChannel metaChannel2 = null;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
metaChannel2 = channelMap.get(connectionID);
} finally {
registrationWrapper2.releaseChannelMap();
}
MetaChannel metaChannel2 = registrationWrapper2.getChannel(connectionID);
if (metaChannel2 != null) {
// hooray! we are successful

View File

@ -22,9 +22,7 @@ import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.MathUtil;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.crypto.CryptoAES;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.serialization.EccPublicKeySerializer;
@ -106,12 +104,7 @@ class RegistrationRemoteHandlerServerTCP<C extends Connection> extends Registrat
MetaChannel metaChannel = new MetaChannel();
metaChannel.tcpChannel = channel;
try {
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
channelMap.put(channel.hashCode(), metaChannel);
} finally {
this.registrationWrapper.releaseChannelMap();
}
this.registrationWrapper.addChannel(channel.hashCode(), metaChannel);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
@ -134,13 +127,7 @@ class RegistrationRemoteHandlerServerTCP<C extends Connection> extends Registrat
if (message instanceof Registration) {
Registration registration = (Registration) message;
MetaChannel metaChannel = null;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
metaChannel = channelMap.get(channel.hashCode());
} finally {
registrationWrapper2.releaseChannelMap();
}
MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode());
// make sure this connection was properly registered in the map. (IT SHOULD BE)
Logger logger2 = this.logger;
@ -177,21 +164,10 @@ class RegistrationRemoteHandlerServerTCP<C extends Connection> extends Registrat
}
Integer connectionID = MathUtil.randomInt();
// if I'm unlucky, keep from confusing connections!
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
while (channelMap.containsKey(connectionID)) {
connectionID = MathUtil.randomInt();
}
metaChannel.connectionID = connectionID;
channelMap.put(connectionID, metaChannel);
} finally {
registrationWrapper2.releaseChannelMap();
}
Integer connectionID = registrationWrapper2.initializeChannel(metaChannel);
Registration register = new Registration();

View File

@ -18,6 +18,7 @@ package dorkbox.network.connection.registration.remote;
import dorkbox.network.Broadcast;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
@ -25,8 +26,6 @@ import dorkbox.network.connection.registration.Registration;
import dorkbox.network.connection.wrapper.UdpWrapper;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.CryptoAES;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -165,14 +164,15 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
private
void receivedUDP(final ChannelHandlerContext context,
final Channel channel,
final ByteBuf data,
final ByteBuf message,
final InetSocketAddress udpRemoteAddress) throws Exception {
// registration is the ONLY thing NOT encrypted
Logger logger2 = this.logger;
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
CryptoSerializationManager serializationManager2 = this.serializationManager;
if (KryoCryptoSerializationManager.isEncrypted(data)) {
if (KryoCryptoSerializationManager.isEncrypted(message)) {
// we need to FORWARD this message "down the pipeline".
ConnectionImpl connection = registrationWrapper2.getServerUDP(udpRemoteAddress);
@ -182,7 +182,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
Object object;
try {
object = serializationManager2.readWithCrypto(connection, data, data.writerIndex());
object = serializationManager2.readWithCrypto(connection, message, message.writerIndex());
} catch (Exception e) {
logger2.error("UDP unable to deserialize buffer", e);
shutdown(registrationWrapper2, channel);
@ -202,7 +202,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
Object object;
try {
object = serializationManager2.read(data, data.writerIndex());
object = serializationManager2.read(message, message.writerIndex());
} catch (Exception e) {
logger2.error("UDP unable to deserialize buffer", e);
shutdown(registrationWrapper2, channel);
@ -210,40 +210,11 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
}
if (object instanceof Registration) {
boolean matches = false;
MetaChannel metaChannel = null;
// find out and make sure that UDP and TCP are talking to the same server
InetAddress udpRemoteServer = udpRemoteAddress.getAddress();
MetaChannel metaChannel = registrationWrapper2.getAssociatedChannel_UDP(udpRemoteServer);
try {
// find out and make sure that UDP and TCP are talking to the same server
InetAddress udpRemoteServer = udpRemoteAddress.getAddress();
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
metaChannel = entries.next().value;
// only look at connections that do not have UDP already setup.
if (metaChannel.udpChannel == null) {
InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress();
InetAddress tcpRemoteAddress = tcpRemote.getAddress();
if (RegistrationRemoteHandler.checkEqual(tcpRemoteAddress, udpRemoteServer)) {
matches = true;
break;
}
else {
logger2.error("Mismatch UDP and TCP client addresses! UDP: {} TCP: {}", udpRemoteServer, tcpRemoteAddress);
shutdown(registrationWrapper2, channel);
return;
}
}
}
} finally {
registrationWrapper2.releaseChannelMap();
}
if (matches) {
if (metaChannel != null) {
// associate TCP and UDP!
metaChannel.udpChannel = channel;
metaChannel.udpRemoteAddress = udpRemoteAddress;
@ -269,7 +240,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
}
else {
// if we get here, there was a failure!
logger2.error("Error trying to register UDP without udp specified! UDP: {}", udpRemoteAddress);
logger2.error("Error trying to register UDP with incorrect udp specified! UDP: {}", udpRemoteAddress);
shutdown(registrationWrapper2, channel);
}
}
@ -295,21 +266,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
// also, once we notify, we unregister this.
if (registrationWrapper != null) {
try {
IntMap<MetaChannel> channelMap = registrationWrapper.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
if (metaChannel.localChannel == channel || metaChannel.tcpChannel == channel || metaChannel.udpChannel == channel) {
entries.remove();
metaChannel.close();
return metaChannel;
}
}
} finally {
registrationWrapper.releaseChannelMap();
}
return registrationWrapper.closeChannel(channel, EndPoint.maxShutdownWaitTimeInMilliSeconds);
}
return null;

View File

@ -21,12 +21,9 @@ import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.CryptoAES;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import java.net.InetAddress;
@ -80,41 +77,8 @@ class RegistrationRemoteHandlerServerUDT<C extends Connection> extends Registrat
// find out and make sure that UDP and TCP are talking to the same server
InetAddress udtRemoteAddress = ((InetSocketAddress) channel.remoteAddress()).getAddress();
boolean matches = false;
MetaChannel metaChannel = null;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
metaChannel = entries.next().value;
// only look at connections that do not have UDT already setup.
if (metaChannel.udtChannel == null) {
InetSocketAddress tcpRemote = (InetSocketAddress) metaChannel.tcpChannel.remoteAddress();
InetAddress tcpRemoteAddress = tcpRemote.getAddress();
if (checkEqual(tcpRemoteAddress, udtRemoteAddress)) {
matches = true;
}
else {
if (logger2.isErrorEnabled()) {
logger2.error(this.name,
"Mismatch UDT and TCP client addresses! UDP: {} TCP: {}",
udtRemoteAddress,
tcpRemoteAddress);
}
shutdown(registrationWrapper2, channel);
ReferenceCountUtil.release(message);
return;
}
}
}
} finally {
registrationWrapper2.releaseChannelMap();
}
if (matches) {
MetaChannel metaChannel = registrationWrapper2.getAssociatedChannel_UDT(udtRemoteAddress);
if (metaChannel != null) {
// associate TCP and UDT!
metaChannel.udtChannel = channel;
@ -142,15 +106,13 @@ class RegistrationRemoteHandlerServerUDT<C extends Connection> extends Registrat
if (logger2.isTraceEnabled()) {
logger2.trace("Register UDT connection from {}", udtRemoteAddress);
}
ReferenceCountUtil.release(message);
}
else {
// if we get here, there was a failure!
if (logger2.isErrorEnabled()) {
logger2.error("Error trying to register UDT without udt specified! UDT: {}", udtRemoteAddress);
logger2.error("Error trying to register UDT with incorrect udt specified! UDT: {}", udtRemoteAddress);
}
shutdown(registrationWrapper2, channel);
ReferenceCountUtil.release(message);
}
}
else {
@ -158,7 +120,6 @@ class RegistrationRemoteHandlerServerUDT<C extends Connection> extends Registrat
logger2.error("UDT attempting to spoof client! Unencrypted packet other than registration received.");
}
shutdown(registrationWrapper2, channel);
ReferenceCountUtil.release(message);
}
}
}