Code cleanup

This commit is contained in:
nathan 2015-07-18 00:31:44 +02:00
parent b075b34e74
commit 066a307c5f
7 changed files with 211 additions and 158 deletions

View File

@ -1,114 +1,129 @@
package dorkbox.network.connection.wrapper; package dorkbox.network.connection.wrapper;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.local.LocalAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.ConnectionPointWriter;
import dorkbox.network.connection.Connection; import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionPointWriter;
import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ISessionManager; import dorkbox.network.connection.ISessionManager;
import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.MetaChannel;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.local.LocalAddress;
import org.bouncycastle.crypto.params.ParametersWithIV;
public class ChannelLocalWrapper implements ChannelWrapper, ConnectionPointWriter { import java.util.concurrent.atomic.AtomicBoolean;
public
class ChannelLocalWrapper implements ChannelWrapper, ConnectionPointWriter {
private final Channel channel; private final Channel channel;
private String remoteAddress; private String remoteAddress;
private AtomicBoolean shouldFlush = new AtomicBoolean(false); private final AtomicBoolean shouldFlush = new AtomicBoolean(false);
public ChannelLocalWrapper(MetaChannel metaChannel) { public
ChannelLocalWrapper(MetaChannel metaChannel) {
this.channel = metaChannel.localChannel; this.channel = metaChannel.localChannel;
} }
/**
* Initialize the connection with any extra info that is needed but was unavailable at the channel construction.
*/
@Override
public final void init() {
this.remoteAddress = ((LocalAddress) this.channel.remoteAddress()).id();
}
/** /**
* Write an object to the underlying channel * Write an object to the underlying channel
*/ */
@Override @Override
public void write(Object object) { public
void write(Object object) {
this.channel.write(object); this.channel.write(object);
this.shouldFlush.set(true); this.shouldFlush.set(true);
} }
@Override @Override
public void waitForWriteToComplete() { public
void waitForWriteToComplete() {
// it's immediate, since it's in the same JVM. // it's immediate, since it's in the same JVM.
} }
@Override
public
ConnectionPointWriter tcp() {
return this;
}
@Override
public
ConnectionPointWriter udp() {
return this;
}
@Override
public
ConnectionPointWriter udt() {
return this;
}
/**
* Initialize the connection with any extra info that is needed but was unavailable at the channel construction.
*/
@Override
public final
void init() {
this.remoteAddress = ((LocalAddress) this.channel.remoteAddress()).id();
}
/** /**
* Flushes the contents of the LOCAL pipes to the actual transport. * Flushes the contents of the LOCAL pipes to the actual transport.
*/ */
@Override @Override
public void flush() { public
void flush() {
if (this.shouldFlush.compareAndSet(true, false)) { if (this.shouldFlush.compareAndSet(true, false)) {
this.channel.flush(); this.channel.flush();
} }
} }
@Override @Override
public void close(Connection connection, ISessionManager sessionManager) { public
EventLoop getEventLoop() {
return this.channel.eventLoop();
}
@Override
public
ParametersWithIV cryptoParameters() {
return null;
}
@Override
public final
String getRemoteHost() {
return this.remoteAddress;
}
@Override
public
void close(Connection connection, ISessionManager sessionManager) {
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
this.shouldFlush.set(false); this.shouldFlush.set(false);
// Wait until the connection is closed or the connection attempt fails. // Wait until the connection is closed or the connection attempt fails.
this.channel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); this.channel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
} }
@Override @Override
public ConnectionPointWriter tcp() { public
return this; int id() {
}
@Override
public ConnectionPointWriter udp() {
return this;
}
@Override
public ConnectionPointWriter udt() {
return this;
}
@Override
public EventLoop getEventLoop() {
return this.channel.eventLoop();
}
@Override
public ParametersWithIV cryptoParameters() {
return null;
}
@Override
public final String getRemoteHost() {
return this.remoteAddress;
}
@Override
public int id() {
return this.channel.hashCode(); return this.channel.hashCode();
} }
@Override @Override
public int hashCode() { public
int hashCode() {
return this.channel.hashCode(); return this.channel.hashCode();
} }
@Override @Override
public boolean equals(Object obj) { public
boolean equals(Object obj) {
if (this == obj) { if (this == obj) {
return true; return true;
} }
@ -123,7 +138,8 @@ public class ChannelLocalWrapper implements ChannelWrapper, ConnectionPointWrite
if (other.remoteAddress != null) { if (other.remoteAddress != null) {
return false; return false;
} }
} else if (!this.remoteAddress.equals(other.remoteAddress)) { }
else if (!this.remoteAddress.equals(other.remoteAddress)) {
return false; return false;
} }
return true; return true;

View File

@ -1,21 +1,22 @@
package dorkbox.network.connection.wrapper; package dorkbox.network.connection.wrapper;
import dorkbox.network.connection.ConnectionPointWriter;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import dorkbox.network.connection.ConnectionPointWriter; public
class ChannelNetwork implements ConnectionPointWriter {
public class ChannelNetwork implements ConnectionPointWriter { private final Channel channel;
private final AtomicBoolean shouldFlush = new AtomicBoolean(false);
private volatile ChannelFuture lastWriteFuture; private volatile ChannelFuture lastWriteFuture;
private final Channel channel;
private AtomicBoolean shouldFlush = new AtomicBoolean(false);
public ChannelNetwork(Channel channel) { public
ChannelNetwork(Channel channel) {
this.channel = channel; this.channel = channel;
} }
@ -23,7 +24,8 @@ public class ChannelNetwork implements ConnectionPointWriter {
* Write an object to the underlying channel * Write an object to the underlying channel
*/ */
@Override @Override
public void write(Object object) { public
void write(Object object) {
this.lastWriteFuture = this.channel.write(object); this.lastWriteFuture = this.channel.write(object);
this.shouldFlush.set(true); this.shouldFlush.set(true);
} }
@ -33,20 +35,23 @@ public class ChannelNetwork implements ConnectionPointWriter {
* <b>DO NOT use this in the same thread as receiving messages! It will deadlock.</b> * <b>DO NOT use this in the same thread as receiving messages! It will deadlock.</b>
*/ */
@Override @Override
public void waitForWriteToComplete() { public
void waitForWriteToComplete() {
if (this.lastWriteFuture != null) { if (this.lastWriteFuture != null) {
this.lastWriteFuture.awaitUninterruptibly(); this.lastWriteFuture.awaitUninterruptibly();
} }
} }
@Override @Override
public void flush() { public
void flush() {
if (this.shouldFlush.compareAndSet(true, false)) { if (this.shouldFlush.compareAndSet(true, false)) {
this.channel.flush(); this.channel.flush();
} }
} }
public void close(long maxShutdownWaitTimeInMilliSeconds) { public
void close(long maxShutdownWaitTimeInMilliSeconds) {
// Wait until all messages are flushed before closing the channel. // Wait until all messages are flushed before closing the channel.
if (this.lastWriteFuture != null) { if (this.lastWriteFuture != null) {
this.lastWriteFuture.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); this.lastWriteFuture.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
@ -54,10 +59,12 @@ public class ChannelNetwork implements ConnectionPointWriter {
} }
this.shouldFlush.set(false); this.shouldFlush.set(false);
this.channel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); this.channel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
} }
public int id() { public
int id() {
return this.channel.hashCode(); return this.channel.hashCode();
} }
} }

View File

@ -1,18 +1,19 @@
package dorkbox.network.connection.wrapper; package dorkbox.network.connection.wrapper;
import dorkbox.network.connection.UdpServer;
import dorkbox.network.util.exceptions.NetException;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import dorkbox.network.connection.UdpServer; public
import dorkbox.network.util.exceptions.NetException; class ChannelNetworkUdp extends ChannelNetwork {
public class ChannelNetworkUdp extends ChannelNetwork {
private final InetSocketAddress udpRemoteAddress; private final InetSocketAddress udpRemoteAddress;
private final UdpServer udpServer; private final UdpServer udpServer;
public ChannelNetworkUdp(Channel channel, InetSocketAddress udpRemoteAddress, UdpServer udpServer) { public
ChannelNetworkUdp(Channel channel, InetSocketAddress udpRemoteAddress, UdpServer udpServer) {
super(channel); super(channel);
if (udpRemoteAddress == null) { if (udpRemoteAddress == null) {
@ -24,13 +25,15 @@ public class ChannelNetworkUdp extends ChannelNetwork {
} }
@Override @Override
public void write(Object object) { public
void write(Object object) {
// this shoots out the SERVER pipeline, which is SLIGHTLY different! // this shoots out the SERVER pipeline, which is SLIGHTLY different!
super.write(new UdpWrapper(object, udpRemoteAddress)); super.write(new UdpWrapper(object, udpRemoteAddress));
} }
@Override @Override
public void close(long maxShutdownWaitTimeInMilliSeconds) { public
void close(long maxShutdownWaitTimeInMilliSeconds) {
// we ONLY want to close the UDP channel when we are STOPPING the server, otherwise we close the UDP channel // we ONLY want to close the UDP channel when we are STOPPING the server, otherwise we close the UDP channel
// that listens for new connections! SEE Server.close(). // that listens for new connections! SEE Server.close().
// super.close(maxShutdownWaitTimeInMilliSeconds); // super.close(maxShutdownWaitTimeInMilliSeconds);

View File

@ -1,21 +1,16 @@
package dorkbox.network.connection.wrapper; package dorkbox.network.connection.wrapper;
import dorkbox.network.connection.*;
import dorkbox.network.connection.registration.MetaChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import java.net.InetSocketAddress;
import org.bouncycastle.crypto.params.KeyParameter; import org.bouncycastle.crypto.params.KeyParameter;
import org.bouncycastle.crypto.params.ParametersWithIV; import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.ConnectionPointWriter; import java.net.InetSocketAddress;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ISessionManager;
import dorkbox.network.connection.UdpServer;
import dorkbox.network.connection.registration.MetaChannel;
public class ChannelNetworkWrapper implements ChannelWrapper { public
class ChannelNetworkWrapper implements ChannelWrapper {
private final ChannelNetwork tcp; private final ChannelNetwork tcp;
private final ChannelNetwork udp; private final ChannelNetwork udp;
@ -33,7 +28,8 @@ public class ChannelNetworkWrapper implements ChannelWrapper {
/** /**
* @param udpServer is null when created by the client, non-null when created by the server * @param udpServer is null when created by the client, non-null when created by the server
*/ */
public ChannelNetworkWrapper(MetaChannel metaChannel, UdpServer udpServer) { public
ChannelNetworkWrapper(MetaChannel metaChannel, UdpServer udpServer) {
Channel tcpChannel = metaChannel.tcpChannel; Channel tcpChannel = metaChannel.tcpChannel;
this.eventLoop = tcpChannel.eventLoop(); this.eventLoop = tcpChannel.eventLoop();
@ -43,21 +39,25 @@ public class ChannelNetworkWrapper implements ChannelWrapper {
if (metaChannel.udpChannel != null) { if (metaChannel.udpChannel != null) {
if (metaChannel.udpRemoteAddress != null) { if (metaChannel.udpRemoteAddress != null) {
this.udp = new ChannelNetworkUdp(metaChannel.udpChannel, metaChannel.udpRemoteAddress, udpServer); this.udp = new ChannelNetworkUdp(metaChannel.udpChannel, metaChannel.udpRemoteAddress, udpServer);
} else { }
else {
this.udp = new ChannelNetwork(metaChannel.udpChannel); this.udp = new ChannelNetwork(metaChannel.udpChannel);
} }
} else { }
else {
this.udp = null; this.udp = null;
} }
if (metaChannel.udtChannel != null) { if (metaChannel.udtChannel != null) {
this.udt = new ChannelNetwork(metaChannel.udtChannel); this.udt = new ChannelNetwork(metaChannel.udtChannel);
} else { }
else {
this.udt = null; this.udt = null;
} }
this.remoteAddress = ((InetSocketAddress)tcpChannel.remoteAddress()).getAddress().getHostAddress(); this.remoteAddress = ((InetSocketAddress) tcpChannel.remoteAddress()).getAddress()
.getHostAddress();
this.remotePublicKeyChanged = metaChannel.changedRemoteKey; this.remotePublicKeyChanged = metaChannel.changedRemoteKey;
// AES key & IV (only for networked connections) // AES key & IV (only for networked connections)
@ -65,23 +65,44 @@ public class ChannelNetworkWrapper implements ChannelWrapper {
// TODO: have to be able to get a NEW IV, so we can rotate keys! // TODO: have to be able to get a NEW IV, so we can rotate keys!
} }
public final
boolean remoteKeyChanged() {
return this.remotePublicKeyChanged;
}
@Override
public
ConnectionPointWriter tcp() {
return this.tcp;
}
@Override
public
ConnectionPointWriter udp() {
return this.udp;
}
@Override
public
ConnectionPointWriter udt() {
return this.udt;
}
/** /**
* Initialize the connection with any extra info that is needed but was unavailable at the channel construction. * Initialize the connection with any extra info that is needed but was unavailable at the channel construction.
*/ */
@Override @Override
public final void init() { public final
void init() {
// nothing to do. // nothing to do.
} }
public final boolean remoteKeyChanged() {
return this.remotePublicKeyChanged;
}
/** /**
* Flushes the contents of the TCP/UDP/UDT/etc pipes to the actual transport. * Flushes the contents of the TCP/UDP/UDT/etc pipes to the actual transport.
*/ */
@Override @Override
public void flush() { public
void flush() {
this.tcp.flush(); this.tcp.flush();
if (this.udp != null) { if (this.udp != null) {
@ -94,38 +115,27 @@ public class ChannelNetworkWrapper implements ChannelWrapper {
} }
@Override @Override
public EventLoop getEventLoop() { public
EventLoop getEventLoop() {
return this.eventLoop; return this.eventLoop;
} }
@Override @Override
public ParametersWithIV cryptoParameters() { public
ParametersWithIV cryptoParameters() {
return this.cryptoAesKeyAndIV; return this.cryptoAesKeyAndIV;
} }
@Override @Override
public ConnectionPointWriter tcp() { public
return this.tcp; String getRemoteHost() {
}
@Override
public ConnectionPointWriter udp() {
return this.udp;
}
@Override
public ConnectionPointWriter udt() {
return this.udt;
}
@Override
public String getRemoteHost() {
return this.remoteAddress; return this.remoteAddress;
} }
@Override @Override
public void close(final Connection connection, final ISessionManager sessionManager) { public
void close(final Connection connection, final ISessionManager sessionManager) {
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
this.tcp.close(maxShutdownWaitTimeInMilliSeconds); this.tcp.close(maxShutdownWaitTimeInMilliSeconds);
@ -143,17 +153,20 @@ public class ChannelNetworkWrapper implements ChannelWrapper {
} }
@Override @Override
public String toString() { public
return "NetworkConnection [" + getRemoteHost() + "]"; int id() {
}
@Override
public int id() {
return this.tcp.id(); return this.tcp.id();
} }
@Override @Override
public boolean equals(Object obj) { public
int hashCode() {
return this.remoteAddress.hashCode();
}
@Override
public
boolean equals(Object obj) {
if (this == obj) { if (this == obj) {
return true; return true;
} }
@ -169,14 +182,16 @@ public class ChannelNetworkWrapper implements ChannelWrapper {
if (other.remoteAddress != null) { if (other.remoteAddress != null) {
return false; return false;
} }
} else if (!this.remoteAddress.equals(other.remoteAddress)) { }
else if (!this.remoteAddress.equals(other.remoteAddress)) {
return false; return false;
} }
return true; return true;
} }
@Override @Override
public int hashCode() { public
return this.remoteAddress.hashCode(); String toString() {
return "NetworkConnection [" + getRemoteHost() + "]";
} }
} }

View File

@ -3,21 +3,26 @@ package dorkbox.network.connection.wrapper;
import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.ConnectionPoint;
import dorkbox.network.connection.ConnectionPointWriter; import dorkbox.network.connection.ConnectionPointWriter;
public class ChannelNull implements ConnectionPointWriter { public
class ChannelNull implements ConnectionPointWriter {
private static final ConnectionPoint INSTANCE = new ChannelNull(); private static final ConnectionPoint INSTANCE = new ChannelNull();
public static ConnectionPoint get() {
public static
ConnectionPoint get() {
return INSTANCE; return INSTANCE;
} }
private ChannelNull() { private
ChannelNull() {
} }
/** /**
* Write an object to the underlying channel * Write an object to the underlying channel
*/ */
@Override @Override
public void write(Object object) { public
void write(Object object) {
} }
/** /**
@ -25,10 +30,12 @@ public class ChannelNull implements ConnectionPointWriter {
* <b>DO NOT use this in the same thread as receiving messages! It will deadlock.</b> * <b>DO NOT use this in the same thread as receiving messages! It will deadlock.</b>
*/ */
@Override @Override
public void waitForWriteToComplete() { public
void waitForWriteToComplete() {
} }
@Override @Override
public void flush() { public
void flush() {
} }
} }

View File

@ -1,46 +1,47 @@
package dorkbox.network.connection.wrapper; package dorkbox.network.connection.wrapper;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionPointWriter;
import dorkbox.network.connection.ISessionManager;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import org.bouncycastle.crypto.params.ParametersWithIV; import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.ConnectionPointWriter; public
import dorkbox.network.connection.Connection; interface ChannelWrapper {
import dorkbox.network.connection.ISessionManager;
public interface ChannelWrapper { ConnectionPointWriter tcp();
public ConnectionPointWriter tcp(); ConnectionPointWriter udp();
public ConnectionPointWriter udp();
public ConnectionPointWriter udt(); ConnectionPointWriter udt();
/** /**
* Initialize the connection with any extra info that is needed but was unavailable at the channel construction. * Initialize the connection with any extra info that is needed but was unavailable at the channel construction.
*/ */
public void init(); void init();
/** /**
* Flushes the contents of the TCP/UDP/UDT/etc pipes to the actual transport. * Flushes the contents of the TCP/UDP/UDT/etc pipes to the actual transport.
*/ */
public void flush(); void flush();
public EventLoop getEventLoop(); EventLoop getEventLoop();
public ParametersWithIV cryptoParameters(); ParametersWithIV cryptoParameters();
/** /**
* @return the remote host (can be local, tcp, udp, udt) * @return the remote host (can be local, tcp, udp, udt)
*/ */
public String getRemoteHost(); String getRemoteHost();
public void close(final Connection connection, final ISessionManager sessionManager); void close(final Connection connection, final ISessionManager sessionManager);
int id();
@Override @Override
public String toString(); boolean equals(Object obj);
public int id();
@Override @Override
public boolean equals(Object obj); String toString();
} }

View File

@ -2,21 +2,25 @@ package dorkbox.network.connection.wrapper;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
public class UdpWrapper { public
class UdpWrapper {
private final Object object; private final Object object;
private final InetSocketAddress remoteAddress; private final InetSocketAddress remoteAddress;
public UdpWrapper(Object object, InetSocketAddress remoteAddress2) { public
UdpWrapper(Object object, InetSocketAddress remoteAddress2) {
this.object = object; this.object = object;
this.remoteAddress = remoteAddress2; this.remoteAddress = remoteAddress2;
} }
public Object object() { public
Object object() {
return this.object; return this.object;
} }
public InetSocketAddress remoteAddress() { public
InetSocketAddress remoteAddress() {
return this.remoteAddress; return this.remoteAddress;
} }
} }