Code polish. Changed ByteArrayWrapper to static access, so the construction is very clear

This commit is contained in:
nathan 2014-08-26 14:24:31 +02:00
parent 93c7d78ab6
commit 1be37a6c47
18 changed files with 502 additions and 176 deletions

View File

@ -58,12 +58,12 @@ public interface Connection {
/**
* @return true if this connection is also configured to use UDP
*/
public boolean hasUdp();
public boolean hasUDP();
/**
* @return true if this connection is also configured to use UDT
*/
public boolean hasUdt();
public boolean hasUDT();
/**
* Expose methods to send objects to a destination (such as a custom object or a standard ping)

View File

@ -186,11 +186,11 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
*/
public final void ping0(PingMessage ping) {
if (this.channelWrapper.udp() != null) {
send().UDP(ping).flush();
UDP(ping).flush();
} else if (this.channelWrapper.udt() != null) {
send().UDT(ping).flush();
UDT(ping).flush();
} else {
send().TCP(ping).flush();
TCP(ping).flush();
}
}
@ -212,7 +212,7 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
* @return true if this connection is also configured to use UDP
*/
@Override
public final boolean hasUdp() {
public final boolean hasUDP() {
return this.channelWrapper.udp() != null;
}
@ -220,7 +220,7 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
* @return true if this connection is also configured to use UDT
*/
@Override
public final boolean hasUdt() {
public final boolean hasUDT() {
return this.channelWrapper.udt() != null;
}

View File

@ -3,10 +3,15 @@ package dorkbox.network.connection;
public interface ConnectionPoint {
/**
* Writes data to the pipe. <b>DOES NOT FLUSH</b> the pipes to the wire!
* Writes data to the pipe. <b>DOES NOT FLUSH</b> the pipe to the wire!
*/
public void write(Object object);
/**
* Waits for the last write to complete. Useful when sending large amounts of data at once.
*/
public void waitForWriteToComplete();
/**
* Flushes the contents of the TCP/UDP/UDT/etc pipes to the wire.
*/

View File

@ -130,7 +130,7 @@ public class EndPointWithSerialization extends EndPoint {
* @param bridge null when retrieving the subclass type (internal use only). Non-null when creating a new (and real) connection.
* @return a new network connection
*/
public ConnectionImpl newConnection(String name) {
public Connection newConnection(String name) {
return new ConnectionImpl(name);
}
@ -142,8 +142,8 @@ public class EndPointWithSerialization extends EndPoint {
*
* @param metaChannel can be NULL (when getting the baseClass)
*/
protected final ConnectionImpl connection0(MetaChannel metaChannel) {
ConnectionImpl connection;
protected final Connection connection0(MetaChannel metaChannel) {
Connection connection;
// setup the extras needed by the network connection.
// These properties are ASSGINED in the same thread that CREATED the object. Only the AES info needs to be

View File

@ -125,7 +125,7 @@ class PropertyStore extends SettingsStore {
public synchronized ECPublicKeyParameters getRegisteredServerKey(byte[] hostAddress) throws dorkbox.network.util.exceptions.SecurityException {
checkAccess(RegistrationWrapper.class);
return this.props.registeredServer.get(new ByteArrayWrapper(hostAddress));
return this.props.registeredServer.get(ByteArrayWrapper.noCopy(hostAddress));
}
/**
@ -135,7 +135,7 @@ class PropertyStore extends SettingsStore {
public synchronized void addRegisteredServerKey(byte[] hostAddress, ECPublicKeyParameters publicKey) throws dorkbox.network.util.exceptions.SecurityException {
checkAccess(RegistrationWrapper.class);
this.props.registeredServer.put(new ByteArrayWrapper(hostAddress), publicKey);
this.props.registeredServer.put(ByteArrayWrapper.noCopy(hostAddress), publicKey);
this.storage.save();
}
@ -146,7 +146,7 @@ class PropertyStore extends SettingsStore {
public synchronized boolean removeRegisteredServerKey(byte[] hostAddress) throws dorkbox.network.util.exceptions.SecurityException {
checkAccess(RegistrationWrapper.class);
ECPublicKeyParameters remove = this.props.registeredServer.remove(new ByteArrayWrapper(hostAddress));
ECPublicKeyParameters remove = this.props.registeredServer.remove(ByteArrayWrapper.noCopy(hostAddress));
this.storage.save();
return remove != null;

View File

@ -117,7 +117,7 @@ public class RegistrationWrapper implements UdpServer {
*
* @param metaChannel can be NULL (when getting the baseClass)
*/
public ConnectionImpl connection0(MetaChannel metaChannel) {
public Connection connection0(MetaChannel metaChannel) {
if (this.endPoint instanceof EndPointWithSerialization) {
return ((EndPointWithSerialization)this.endPoint).connection0(metaChannel);
}
@ -186,7 +186,7 @@ public class RegistrationWrapper implements UdpServer {
@Override
public final void registerServerUDP(MetaChannel metaChannel) {
if (metaChannel != null && metaChannel.udpRemoteAddress != null) {
this.udpRemoteMap.put(metaChannel.udpRemoteAddress, metaChannel.connection);
this.udpRemoteMap.put(metaChannel.udpRemoteAddress, (ConnectionImpl) metaChannel.connection);
Logger logger2 = this.logger;
if (logger2.isDebugEnabled()) {

View File

@ -6,50 +6,50 @@ import dorkbox.network.connection.Listener;
abstract public class IdleSender<C extends Connection, M> extends Listener<C, M> implements IdleBridge {
boolean started;
volatile boolean started;
IdleListener<C, M> idleListener;
@Override
public void idle(C connection) {
if (!started) {
started = true;
start();
}
if (!this.started) {
this.started = true;
start();
}
M message = next();
if (message == null) {
connection.listeners().remove(this);
} else {
if (idleListener != null) {
idleListener.send(connection, message);
} else {
throw new RuntimeException("Invalid idle listener. Please specify .TCP(), .UDP(), or .UDT()");
}
}
}
@Override
public void TCP() {
idleListener = new IdleListenerTCP<C, M>();
M message = next();
if (message == null) {
connection.listeners().remove(this);
} else {
if (this.idleListener != null) {
this.idleListener.send(connection, message);
} else {
throw new RuntimeException("Invalid idle listener. Please specify .TCP(), .UDP(), or .UDT()");
}
}
}
@Override
@Override
public void TCP() {
this.idleListener = new IdleListenerTCP<C, M>();
}
@Override
public void UDP() {
idleListener = new IdleListenerUDP<C, M>();
}
this.idleListener = new IdleListenerUDP<C, M>();
}
@Override
@Override
public void UDT() {
idleListener = new IdleListenerUDT<C, M>();
}
this.idleListener = new IdleListenerUDT<C, M>();
}
/** Called once, before the first send. Subclasses can override this method to send something so the receiving side expects
* subsequent objects. */
protected void start () {
}
/** Called once, before the first send. Subclasses can override this method to send something so the receiving side expects
* subsequent objects. */
protected void start () {
}
/** Returns the next object to send, or null if no more objects will be sent. */
abstract protected M next ();
/** Returns the next object to send, or null if no more objects will be sent. */
abstract protected M next ();
}

View File

@ -7,7 +7,7 @@ import java.net.InetSocketAddress;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.Connection;
public class MetaChannel {
@ -28,7 +28,7 @@ public class MetaChannel {
public Channel udtChannel = null;
public ConnectionImpl connection; // only needed until the connection has been notified.
public Connection connection; // only needed until the connection has been notified.
public ECPublicKeyParameters publicKey; // used for ECC crypto + handshake on NETWORK (remote) connections. This is the remote public key.
@ -43,40 +43,40 @@ public class MetaChannel {
public boolean changedRemoteKey = false;
public void close() {
if (localChannel != null) {
localChannel.close();
if (this.localChannel != null) {
this.localChannel.close();
}
if (tcpChannel != null) {
tcpChannel.close();
if (this.tcpChannel != null) {
this.tcpChannel.close();
}
if (udtChannel != null) {
udtChannel.close();
if (this.udtChannel != null) {
this.udtChannel.close();
}
// only the CLIENT will have this.
if (udpChannel != null && udpRemoteAddress == null) {
udpChannel.close();
if (this.udpChannel != null && this.udpRemoteAddress == null) {
this.udpChannel.close();
}
}
public void close(long maxShutdownWaitTimeInMilliSeconds) {
if (localChannel != null) {
localChannel.close();
if (this.localChannel != null) {
this.localChannel.close();
}
if (tcpChannel != null) {
tcpChannel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
if (this.tcpChannel != null) {
this.tcpChannel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
}
if (udtChannel != null) {
udtChannel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
if (this.udtChannel != null) {
this.udtChannel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
}
// only the CLIENT will have this.
if (udpChannel != null && udpRemoteAddress == null) {
udpChannel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
if (this.udpChannel != null && this.udpRemoteAddress == null) {
this.udpChannel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
}
}
@ -84,10 +84,10 @@ public class MetaChannel {
* Update the TCP round trip time. Make sure to REFRESH this every time you SEND TCP data!!
*/
public void updateTcpRoundTripTime() {
nanoSecBetweenTCP = System.nanoTime() - nanoSecBetweenTCP;
this.nanoSecBetweenTCP = System.nanoTime() - this.nanoSecBetweenTCP;
}
public long getNanoSecBetweenTCP() {
return nanoSecBetweenTCP;
return this.nanoSecBetweenTCP;
}
}

View File

@ -1,10 +1,12 @@
package dorkbox.network.connection.registration.local;
import org.slf4j.Logger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import org.slf4j.Logger;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
@ -42,7 +44,7 @@ public abstract class RegistrationLocalHandler extends RegistrationHandler {
// have to setup connection handler
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection);
pipeline.addLast(CONNECTION_HANDLER, (ConnectionImpl) metaChannel.connection);
}
/**

View File

@ -75,7 +75,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
///////////////////////
// DECODE (or upstream)
///////////////////////
pipeline.addFirst(FRAME_AND_KRYO_DECODER, new KryoDecoder(serializationManager)); // cannot be shared because of possible fragmentation.
pipeline.addFirst(FRAME_AND_KRYO_DECODER, new KryoDecoder(this.serializationManager)); // cannot be shared because of possible fragmentation.
// this makes the proper event get raised in the registrationHandler to kill NEW idle connections. Once "connected" they last a lot longer.
// we ALWAYS have this initial IDLE handler, so we don't have to worry about a slow-loris attack against the server.
@ -85,7 +85,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
/////////////////////////
// ENCODE (or downstream)
/////////////////////////
pipeline.addFirst(FRAME_AND_KRYO_ENCODER, registrationWrapper.getKryoTcpEncoder()); // this is shared
pipeline.addFirst(FRAME_AND_KRYO_ENCODER, this.registrationWrapper.getKryoTcpEncoder()); // this is shared
}
/**
@ -130,7 +130,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
}
stringBuilder.append("]");
logger.debug(stringBuilder.toString());
this.logger.debug(stringBuilder.toString());
}
/**
@ -154,7 +154,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
// have to setup AFTER establish connection, data, as we don't want to enable AES until we're ready.
protected final void setupConnectionCrypto(MetaChannel metaChannel) {
if (logger.isDebugEnabled()) {
if (this.logger.isDebugEnabled()) {
String type = "TCP";
if (metaChannel.udpChannel != null) {
type += "/UDP";
@ -165,38 +165,38 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
InetSocketAddress address = (InetSocketAddress)metaChannel.tcpChannel.remoteAddress();
logger.debug("Encrypting {} session with {}", type, address.getAddress());
this.logger.debug("Encrypting {} session with {}", type, address.getAddress());
}
ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline();
int idleTimeout = registrationWrapper.getIdleTimeout();
int idleTimeout = this.registrationWrapper.getIdleTimeout();
// add the new handlers (FORCE encryption and longer IDLE handler)
pipeline.replace(FRAME_AND_KRYO_DECODER, FRAME_AND_KRYO_CRYPTO_DECODER, new KryoDecoderCrypto(serializationManager)); // cannot be shared because of possible fragmentation.
pipeline.replace(FRAME_AND_KRYO_DECODER, FRAME_AND_KRYO_CRYPTO_DECODER, new KryoDecoderCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation.
if (idleTimeout > 0) {
pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, registrationWrapper.getIdleTimeout(), TimeUnit.MILLISECONDS));
pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, this.registrationWrapper.getIdleTimeout(), TimeUnit.MILLISECONDS));
} else {
pipeline.remove(IDLE_HANDLER);
}
pipeline.replace(FRAME_AND_KRYO_ENCODER, FRAME_AND_KRYO_CRYPTO_ENCODER, registrationWrapper.getKryoTcpCryptoEncoder()); // this is shared
pipeline.replace(FRAME_AND_KRYO_ENCODER, FRAME_AND_KRYO_CRYPTO_ENCODER, this.registrationWrapper.getKryoTcpCryptoEncoder()); // this is shared
if (metaChannel.udpChannel != null && metaChannel.udpRemoteAddress == null) {
// CLIENT ONLY. The server handles this very differently.
pipeline = metaChannel.udpChannel.pipeline();
pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, new KryoDecoderUdpCrypto(serializationManager));
pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, new KryoEncoderUdpCrypto(serializationManager));
pipeline.replace(KRYO_DECODER, KRYO_CRYPTO_DECODER, new KryoDecoderUdpCrypto(this.serializationManager));
pipeline.replace(KRYO_ENCODER, KRYO_CRYPTO_ENCODER, new KryoEncoderUdpCrypto(this.serializationManager));
}
if (metaChannel.udtChannel != null) {
pipeline = metaChannel.udtChannel.pipeline();
pipeline.replace(FRAME_AND_KRYO_DECODER, FRAME_AND_KRYO_CRYPTO_DECODER, new KryoDecoderCrypto(serializationManager)); // cannot be shared because of possible fragmentation.
pipeline.replace(FRAME_AND_KRYO_DECODER, FRAME_AND_KRYO_CRYPTO_DECODER, new KryoDecoderCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation.
if (idleTimeout > 0) {
pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS));
} else {
pipeline.remove(IDLE_HANDLER);
}
pipeline.replace(FRAME_AND_KRYO_ENCODER, FRAME_AND_KRYO_CRYPTO_ENCODER, registrationWrapper.getKryoTcpCryptoEncoder());
pipeline.replace(FRAME_AND_KRYO_ENCODER, FRAME_AND_KRYO_CRYPTO_ENCODER, this.registrationWrapper.getKryoTcpCryptoEncoder());
}
}
@ -225,7 +225,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
// add the "connected"/"normal" handler now that we have established a "new" connection.
// This will have state, etc. for this connection.
ConnectionImpl connection = registrationWrapper.connection0(metaChannel);
ConnectionImpl connection = (ConnectionImpl) this.registrationWrapper.connection0(metaChannel);
tcpPipe.addLast(CONNECTION_HANDLER, connection);
if (udpPipe != null) {
@ -246,7 +246,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
// now that we are CONNECTED, we want to remove ourselves (and channel ID's) from the map.
// they will be ADDED in another map, in the followup handler!!
try {
IntMap<MetaChannel> channelMap = registrationWrapper.getAndLockChannelMap();
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
channelMap.remove(metaChannel.tcpChannel.hashCode());
channelMap.remove(metaChannel.connectionID);
@ -272,7 +272,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
}
}
} finally {
registrationWrapper.releaseChannelMap();
this.registrationWrapper.releaseChannelMap();
}
if (registerServer) {
@ -280,7 +280,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
setupServerUdpConnection(metaChannel);
}
if (logger.isInfoEnabled()) {
if (this.logger.isInfoEnabled()) {
String type = "TCP";
if (metaChannel.udpChannel != null) {
type += "/UDP";
@ -290,7 +290,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
}
InetSocketAddress address = (InetSocketAddress)metaChannel.tcpChannel.remoteAddress();
logger.info("Created a {} connection with {}", type, address.getAddress());
this.logger.info("Created a {} connection with {}", type, address.getAddress());
}
}
@ -308,14 +308,14 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
* to the pipeline are finished.
*/
protected final void notifyConnection(MetaChannel metaChannel) {
registrationWrapper.connectionConnected0(metaChannel.connection);
this.registrationWrapper.connectionConnected0(metaChannel.connection);
}
@Override
public final void channelInactive(ChannelHandlerContext context) throws Exception {
Channel channel = context.channel();
logger.info("Closed connection: {}", channel.remoteAddress());
this.logger.info("Closed connection: {}", channel.remoteAddress());
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
// also, once we notify, we unregister this.
@ -323,7 +323,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
// on the server, we only get this for TCP events!
try {
IntMap<MetaChannel> channelMap = registrationWrapper.getAndLockChannelMap();
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
@ -336,7 +336,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
}
} finally {
registrationWrapper.releaseChannelMap();
this.registrationWrapper.releaseChannelMap();
}
super.channelInactive(context);
@ -346,7 +346,7 @@ public abstract class RegistrationRemoteHandler extends RegistrationHandler {
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
Channel channel = context.channel();
logger.error("Unexpected exception while trying to send/receive data on Client remote (network) channel. ({})" + System.getProperty("line.separator"), channel.remoteAddress(), cause);
this.logger.error("Unexpected exception while trying to send/receive data on Client remote (network) channel. ({})" + System.getProperty("line.separator"), channel.remoteAddress(), cause);
if (channel.isOpen()) {
channel.close();
}

View File

@ -31,13 +31,21 @@ public class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint {
*/
@Override
public final void init() {
remoteAddress = ((LocalAddress) this.channel.remoteAddress()).id();
this.remoteAddress = ((LocalAddress) this.channel.remoteAddress()).id();
}
/**
* Write an object to the underlying channel
*/
@Override
public void write(Object object) {
this.channel.write(object);
this.shouldFlush.set(true);
}
@Override
public void write(Object object) {
channel.write(object);
shouldFlush.set(true);
public void waitForWriteToComplete() {
// it's immediate, since it's in the same JVM.
}
/**
@ -45,8 +53,8 @@ public class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint {
*/
@Override
public void flush() {
if (shouldFlush.compareAndSet(true, false)) {
channel.flush();
if (this.shouldFlush.compareAndSet(true, false)) {
this.channel.flush();
}
}
@ -54,8 +62,10 @@ public class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint {
public void close(Connection connection, ISessionManager sessionManager) {
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
this.shouldFlush.set(false);
// Wait until the connection is closed or the connection attempt fails.
channel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
this.channel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
}
@Override
@ -85,17 +95,17 @@ public class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint {
@Override
public final String getRemoteHost() {
return remoteAddress;
return this.remoteAddress;
}
@Override
public int id() {
return channel.hashCode();
return this.channel.hashCode();
}
@Override
public int hashCode() {
return channel.hashCode();
return this.channel.hashCode();
}
@Override

View File

@ -2,6 +2,9 @@ package dorkbox.network.connection.wrapper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import dorkbox.network.connection.ConnectionPoint;
public class ChannelNetwork implements ConnectionPoint {
@ -9,39 +12,52 @@ public class ChannelNetwork implements ConnectionPoint {
private volatile ChannelFuture lastWriteFuture;
private final Channel channel;
private volatile boolean shouldFlush = false;
private AtomicBoolean shouldFlush = new AtomicBoolean(false);
public ChannelNetwork(Channel channel) {
this.channel = channel;
}
/**
* Write an object to the underlying channel
*/
@Override
public void write(Object object) {
shouldFlush = true;
lastWriteFuture = channel.write(object);
this.lastWriteFuture = this.channel.write(object);
this.shouldFlush.set(true);
}
/**
* Waits for the last write to complete. Useful when sending large amounts of data at once.
* <b>DO NOT use this in the same thread as receiving messages! It will deadlock.</b>
*/
@Override
public void waitForWriteToComplete() {
if (this.lastWriteFuture != null) {
this.lastWriteFuture.awaitUninterruptibly();
}
}
@Override
public void flush() {
if (shouldFlush) {
shouldFlush = false;
channel.flush();
if (this.shouldFlush.compareAndSet(true, false)) {
this.channel.flush();
}
}
public void close(long maxShutdownWaitTimeInMilliSeconds) {
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
lastWriteFuture = null;
if (this.lastWriteFuture != null) {
this.lastWriteFuture.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
this.lastWriteFuture = null;
}
shouldFlush = false;
channel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
this.shouldFlush.set(false);
this.channel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
}
public int id() {
return channel.hashCode();
return this.channel.hashCode();
}
}

View File

@ -99,25 +99,39 @@ public abstract class BaseTest {
}
public void stopEndPoints(int stopAfterMillis) {
if (this.timer == null) {
this.timer = new Timer("UnitTest timeout timer");
}
if (stopAfterMillis > 0) {
if (this.timer == null) {
this.timer = new Timer("UnitTest timeout timer");
}
// don't automatically timeout when we are testing.
this.timer.schedule(new TimerTask() {
@Override
public void run () {
synchronized (BaseTest.this.endPoints) {
for (EndPoint endPoint : BaseTest.this.endPoints) {
endPoint.stop();
// don't automatically timeout when we are testing.
this.timer.schedule(new TimerTask() {
@Override
public void run () {
synchronized (BaseTest.this.endPoints) {
for (EndPoint endPoint : BaseTest.this.endPoints) {
endPoint.stop();
}
BaseTest.this.endPoints.clear();
}
BaseTest.this.endPoints.clear();
BaseTest.this.timer.cancel();
BaseTest.this.timer.purge();
BaseTest.this.timer = null;
}
}, stopAfterMillis);
} else {
synchronized (BaseTest.this.endPoints) {
for (EndPoint endPoint : BaseTest.this.endPoints) {
endPoint.stop();
}
BaseTest.this.endPoints.clear();
}
}, stopAfterMillis);
if (BaseTest.this.timer != null) {
BaseTest.this.timer.cancel();
BaseTest.this.timer.purge();
BaseTest.this.timer = null;
}
}
}
public void waitForThreads(int stopAfterSecondsOrMillis) {
@ -138,14 +152,15 @@ public abstract class BaseTest {
TimerTask failTask = null;
if (stopAfterMillis > 0L) {
stopEndPoints(stopAfterMillis);
failTask = new TimerTask() {
@Override
public void run () {
stopEndPoints();
BaseTest.this.fail_check = true;
}
};
this.timer.schedule(failTask, stopAfterMillis+3000L);
this.timer.schedule(failTask, stopAfterMillis+10000L);
}
while (true) {

View File

@ -0,0 +1,281 @@
package dorkbox.network;
import static org.junit.Assert.fail;
import java.util.Arrays;
import dorkbox.network.PingPongTest.TYPE;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.util.SerializationManager;
import dorkbox.network.util.exceptions.InitializationException;
import dorkbox.network.util.exceptions.SecurityException;
@SuppressWarnings({"rawtypes"})
public class ChunkedDataTest extends BaseTest {
private volatile boolean success = false;
enum ConnectionType {
TCP,
UDP,
UDT
}
// have to test sending objects
public void ObjectSender() throws InitializationException, SecurityException {
final Data mainData = new Data();
populateData(mainData);
System.err.println("-- TCP");
ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions.tcpPort = tcpPort;
connectionOptions.host = host;
sendObject(mainData, connectionOptions, ConnectionType.TCP);
System.err.println("-- UDP");
connectionOptions = new ConnectionOptions();
connectionOptions.tcpPort = tcpPort;
connectionOptions.udpPort = udpPort;
connectionOptions.host = host;
sendObject(mainData, connectionOptions, ConnectionType.TCP);
System.err.println("-- UDT");
connectionOptions = new ConnectionOptions();
connectionOptions.tcpPort = tcpPort;
connectionOptions.udtPort = udtPort;
connectionOptions.host = host;
sendObject(mainData, connectionOptions, ConnectionType.TCP);
}
private void sendObject(final Data mainData, ConnectionOptions connectionOptions, final ConnectionType type) throws InitializationException, SecurityException {
Server server = new Server(connectionOptions);
register(server.getSerialization());
addEndPoint(server);
server.setIdleTimeout(100);
server.bind(false);
server.listeners().add(new Listener<Connection, Data>() {
@Override
public void connected (Connection connection) {
Data data = new Data();
populateData(data);
IdleBridge sendOnIdle = connection.sendOnIdle(data);
switch (type) {
case TCP: sendOnIdle.TCP(); break;
case UDP: sendOnIdle.UDP(); break;
case UDT: sendOnIdle.UDT(); break;
}
}
});
// ----
Client client = new Client(connectionOptions);
register(client.getSerialization());
addEndPoint(client);
client.listeners().add(new Listener<Connection, Data>() {
@Override
public void received(Connection connection, Data object) {
if (mainData.equals(object)) {
ChunkedDataTest.this.success = true;
}
System.err.println("finished!");
stopEndPoints();
}
});
client.connect(5000);
waitForThreads();
if (!this.success) {
fail();
}
}
private void populateData(Data data) {
StringBuilder buffer = new StringBuilder();
for (int i = 0; i < 3000; i++) {
buffer.append('a');
}
data.string = buffer.toString();
data.strings = new String[] {"abcdefghijklmnopqrstuvwxyz0123456789", "", null, "!@#$", "<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>"};
data.ints = new int[] {-1234567, 1234567, -1, 0, 1, Integer.MAX_VALUE, Integer.MIN_VALUE};
data.shorts = new short[] {-12345, 12345, -1, 0, 1, Short.MAX_VALUE, Short.MIN_VALUE};
data.floats = new float[] {0, -0, 1, -1, 123456, -123456, 0.1f, 0.2f, -0.3f, (float)Math.PI, Float.MAX_VALUE, Float.MIN_VALUE};
data.doubles = new double[] {0, -0, 1, -1, 123456, -123456, 0.1d, 0.2d, -0.3d, Math.PI, Double.MAX_VALUE, Double.MIN_VALUE};
data.longs = new long[] {0, -0, 1, -1, 123456, -123456, 99999999999l, -99999999999l, Long.MAX_VALUE, Long.MIN_VALUE};
data.bytes = new byte[] {-123, 123, -1, 0, 1, Byte.MAX_VALUE, Byte.MIN_VALUE};
data.chars = new char[] {32345, 12345, 0, 1, 63, Character.MAX_VALUE, Character.MIN_VALUE};
data.booleans = new boolean[] {true, false};
data.Ints = new Integer[] {-1234567, 1234567, -1, 0, 1, Integer.MAX_VALUE, Integer.MIN_VALUE};
data.Shorts = new Short[] {-12345, 12345, -1, 0, 1, Short.MAX_VALUE, Short.MIN_VALUE};
data.Floats = new Float[] {0f, -0f, 1f, -1f, 123456f, -123456f, 0.1f, 0.2f, -0.3f, (float)Math.PI, Float.MAX_VALUE, Float.MIN_VALUE};
data.Doubles = new Double[] {0d, -0d, 1d, -1d, 123456d, -123456d, 0.1d, 0.2d, -0.3d, Math.PI, Double.MAX_VALUE, Double.MIN_VALUE};
data.Longs = new Long[] {0l, -0l, 1l, -1l, 123456l, -123456l, 99999999999l, -99999999999l, Long.MAX_VALUE, Long.MIN_VALUE};
data.Bytes = new Byte[] {-123, 123, -1, 0, 1, Byte.MAX_VALUE, Byte.MIN_VALUE};
data.Chars = new Character[] {32345, 12345, 0, 1, 63, Character.MAX_VALUE, Character.MIN_VALUE};
data.Booleans = new Boolean[] {true, false};
}
private void register (SerializationManager kryoMT) {
kryoMT.register(int[].class);
kryoMT.register(short[].class);
kryoMT.register(float[].class);
kryoMT.register(double[].class);
kryoMT.register(long[].class);
kryoMT.register(byte[].class);
kryoMT.register(char[].class);
kryoMT.register(boolean[].class);
kryoMT.register(String[].class);
kryoMT.register(Integer[].class);
kryoMT.register(Short[].class);
kryoMT.register(Float[].class);
kryoMT.register(Double[].class);
kryoMT.register(Long[].class);
kryoMT.register(Byte[].class);
kryoMT.register(Character[].class);
kryoMT.register(Boolean[].class);
kryoMT.register(Data.class);
kryoMT.register(TYPE.class);
}
static public class Data {
public String string;
public String[] strings;
public int[] ints;
public short[] shorts;
public float[] floats;
public double[] doubles;
public long[] longs;
public byte[] bytes;
public char[] chars;
public boolean[] booleans;
public Integer[] Ints;
public Short[] Shorts;
public Float[] Floats;
public Double[] Doubles;
public Long[] Longs;
public Byte[] Bytes;
public Character[] Chars;
public Boolean[] Booleans;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(this.Booleans);
result = prime * result + Arrays.hashCode(this.Bytes);
result = prime * result + Arrays.hashCode(this.Chars);
result = prime * result + Arrays.hashCode(this.Doubles);
result = prime * result + Arrays.hashCode(this.Floats);
result = prime * result + Arrays.hashCode(this.Ints);
result = prime * result + Arrays.hashCode(this.Longs);
result = prime * result + Arrays.hashCode(this.Shorts);
result = prime * result + Arrays.hashCode(this.booleans);
result = prime * result + Arrays.hashCode(this.bytes);
result = prime * result + Arrays.hashCode(this.chars);
result = prime * result + Arrays.hashCode(this.doubles);
result = prime * result + Arrays.hashCode(this.floats);
result = prime * result + Arrays.hashCode(this.ints);
result = prime * result + Arrays.hashCode(this.longs);
result = prime * result + Arrays.hashCode(this.shorts);
result = prime * result + (this.string == null ? 0 : this.string.hashCode());
result = prime * result + Arrays.hashCode(this.strings);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Data other = (Data) obj;
if (!Arrays.equals(this.Booleans, other.Booleans)) {
return false;
}
if (!Arrays.equals(this.Bytes, other.Bytes)) {
return false;
}
if (!Arrays.equals(this.Chars, other.Chars)) {
return false;
}
if (!Arrays.equals(this.Doubles, other.Doubles)) {
return false;
}
if (!Arrays.equals(this.Floats, other.Floats)) {
return false;
}
if (!Arrays.equals(this.Ints, other.Ints)) {
return false;
}
if (!Arrays.equals(this.Longs, other.Longs)) {
return false;
}
if (!Arrays.equals(this.Shorts, other.Shorts)) {
return false;
}
if (!Arrays.equals(this.booleans, other.booleans)) {
return false;
}
if (!Arrays.equals(this.bytes, other.bytes)) {
return false;
}
if (!Arrays.equals(this.chars, other.chars)) {
return false;
}
if (!Arrays.equals(this.doubles, other.doubles)) {
return false;
}
if (!Arrays.equals(this.floats, other.floats)) {
return false;
}
if (!Arrays.equals(this.ints, other.ints)) {
return false;
}
if (!Arrays.equals(this.longs, other.longs)) {
return false;
}
if (!Arrays.equals(this.shorts, other.shorts)) {
return false;
}
if (this.string == null) {
if (other.string != null) {
return false;
}
} else if (!this.string.equals(other.string)) {
return false;
}
if (!Arrays.equals(this.strings, other.strings)) {
return false;
}
return true;
}
@Override
public String toString () {
return "Data";
}
}
}

View File

@ -46,7 +46,7 @@ public class ClientSendTest extends BaseTest {
client.listeners().add(new Listener<Connection, AMessage>() {
@Override
public void received (Connection connection, AMessage object) {
checkPassed.set(true);
ClientSendTest.this.checkPassed.set(true);
stopEndPoints();
}
});
@ -55,7 +55,7 @@ public class ClientSendTest extends BaseTest {
waitForThreads();
if (!checkPassed.get()) {
if (!this.checkPassed.get()) {
fail("Client and server failed to send messages!");
}
}

View File

@ -101,10 +101,7 @@ public class IdleTest extends BaseTest {
@Override
public void connected (Connection connection) {
Data data = new Data();
populateData(data);
IdleBridge sendOnIdle = connection.sendOnIdle(data);
IdleBridge sendOnIdle = connection.sendOnIdle(mainData);
switch (type) {
case TCP: sendOnIdle.TCP(); break;

View File

@ -16,7 +16,7 @@ import dorkbox.network.util.SerializationManager;
import dorkbox.network.util.exceptions.InitializationException;
import dorkbox.network.util.exceptions.SecurityException;
public class BufferTest extends BaseTest {
public class LargeBufferTest extends BaseTest {
private static final int OBJ_SIZE = 1024 * 10;
private volatile int finalCheckAmount = 0;
@ -43,15 +43,15 @@ public class BufferTest extends BaseTest {
@Override
public void received (Connection connection, LargeMessage object) {
System.err.println("Server ack message: " + received.get());
// System.err.println("Server ack message: " + received.get());
connection.send().TCP(object);
receivedBytes.addAndGet(object.bytes.length);
this.receivedBytes.addAndGet(object.bytes.length);
if (received.incrementAndGet() == messageCount) {
if (this.received.incrementAndGet() == messageCount) {
System.out.println("Server received all " + messageCount + " messages!");
System.out.println("Server received and sent " + receivedBytes.get() + " bytes.");
serverCheck = finalCheckAmount - receivedBytes.get();
System.out.println("Server missed " + serverCheck + " bytes.");
System.out.println("Server received and sent " + this.receivedBytes.get() + " bytes.");
LargeBufferTest.this.serverCheck = LargeBufferTest.this.finalCheckAmount - this.receivedBytes.get();
System.out.println("Server missed " + LargeBufferTest.this.serverCheck + " bytes.");
stopEndPoints();
}
}
@ -68,16 +68,16 @@ public class BufferTest extends BaseTest {
@Override
public void received (Connection connection, LargeMessage object) {
receivedBytes.addAndGet(object.bytes.length);
this.receivedBytes.addAndGet(object.bytes.length);
int count = received.incrementAndGet();
int count = this.received.incrementAndGet();
//System.out.println("Client received " + count + " messages.");
if (count == messageCount) {
System.out.println("Client received all " + messageCount + " messages!");
System.out.println("Client received and sent " + receivedBytes.get() + " bytes.");
clientCheck = finalCheckAmount - receivedBytes.get();
System.out.println("Client missed " + clientCheck + " bytes.");
System.out.println("Client received and sent " + this.receivedBytes.get() + " bytes.");
LargeBufferTest.this.clientCheck = LargeBufferTest.this.finalCheckAmount - this.receivedBytes.get();
System.out.println("Client missed " + LargeBufferTest.this.clientCheck + " bytes.");
}
}
});
@ -87,7 +87,7 @@ public class BufferTest extends BaseTest {
random.nextBytes(b);
for (int i = 0; i < messageCount; i++) {
finalCheckAmount += OBJ_SIZE;
this.finalCheckAmount += OBJ_SIZE;
System.err.println(" Client sending number: " + i);
client.send().TCP(new LargeMessage(b));
}
@ -95,12 +95,12 @@ public class BufferTest extends BaseTest {
waitForThreads();
if (clientCheck > 0) {
fail("Client missed " + clientCheck + " bytes.");
if (this.clientCheck > 0) {
fail("Client missed " + this.clientCheck + " bytes.");
}
if (serverCheck > 0) {
fail("Server missed " + serverCheck + " bytes.");
if (this.serverCheck > 0) {
fail("Server missed " + this.serverCheck + " bytes.");
}
}

View File

@ -30,7 +30,7 @@ public class MultipleThreadTest extends BaseTest {
private final int threadCount = 15;
private final int clientCount = 13;
private List<Client> clients = new ArrayList<Client>(clientCount);
private List<Client> clients = new ArrayList<Client>(this.clientCount);
@Test
public void multipleThreads () throws IOException, InitializationException, SecurityException {
@ -51,15 +51,15 @@ public class MultipleThreadTest extends BaseTest {
System.err.println("Client connected to server.");
// kickoff however many threads we need, and send data to the client.
for (int i = 1; i <= threadCount; i++) {
for (int i = 1; i <= MultipleThreadTest.this.threadCount; i++) {
final int index = i;
new Thread() {
@Override
public void run () {
for (int i = 1; i <= messageCount; i++) {
int incrementAndGet = sent.getAndIncrement();
for (int i = 1; i <= MultipleThreadTest.this.messageCount; i++) {
int incrementAndGet = MultipleThreadTest.this.sent.getAndIncrement();
DataClass dataClass = new DataClass("Server -> client. Thread #" + index + " message# " + incrementAndGet, incrementAndGet);
sentStringsToClientDebug.put(incrementAndGet, dataClass);
MultipleThreadTest.this.sentStringsToClientDebug.put(incrementAndGet, dataClass);
connection.send().TCP(dataClass).flush();
}
}
@ -70,15 +70,15 @@ public class MultipleThreadTest extends BaseTest {
@Override
public void received (Connection connection, DataClass object) {
int incrementAndGet = receivedServer.getAndIncrement();
int incrementAndGet = MultipleThreadTest.this.receivedServer.getAndIncrement();
if (incrementAndGet == messageCount * clientCount) {
if (incrementAndGet == MultipleThreadTest.this.messageCount * MultipleThreadTest.this.clientCount) {
System.err.println("Server DONE " + incrementAndGet);
// note. this is getting called BEFORE it's ready?
stopEndPoints();
synchronized (lock) {
lock.notifyAll();
synchronized (MultipleThreadTest.this.lock) {
MultipleThreadTest.this.lock.notifyAll();
}
}
}
@ -86,11 +86,11 @@ public class MultipleThreadTest extends BaseTest {
// ----
for (int i = 1; i <= clientCount; i++) {
for (int i = 1; i <= this.clientCount; i++) {
final int index = i;
Client client = new Client(connectionOptions);
clients.add(client);
this.clients.add(client);
client.getSerialization().register(String[].class);
client.getSerialization().register(DataClass.class);
addEndPoint(client);
@ -104,14 +104,14 @@ public class MultipleThreadTest extends BaseTest {
@Override
public void received (Connection connection, DataClass object) {
int clientLocalCounter = received.getAndIncrement();
sentStringsToClientDebug.remove(object.index);
int clientLocalCounter = this.received.getAndIncrement();
MultipleThreadTest.this.sentStringsToClientDebug.remove(object.index);
// we finished!!
if (clientLocalCounter == messageCount * threadCount) {
System.err.println("Client #" + index + " received " + clientLocalCounter + " (" + totalClientCounter.getAndIncrement() + ") Sending back " + messageCount + " messages.");
if (clientLocalCounter == MultipleThreadTest.this.messageCount * MultipleThreadTest.this.threadCount) {
System.err.println("Client #" + index + " received " + clientLocalCounter + " (" + MultipleThreadTest.this.totalClientCounter.getAndIncrement() + ") Sending back " + MultipleThreadTest.this.messageCount + " messages.");
// now spam back messages!
for (int i = 0; i < messageCount; i++) {
for (int i = 0; i < MultipleThreadTest.this.messageCount; i++) {
connection.send().TCP(new DataClass("Client #" + index + " -> Server message " + i, index));
}
}
@ -124,25 +124,25 @@ public class MultipleThreadTest extends BaseTest {
// the ONLY way to safely work in the server is with LISTENERS. Everything else can FAIL, because of it's async. nature.
// our clients should receive messageCount * threadCount * clientCount TOTAL messages
System.err.println("SEND COUNTSS: " + threadCount * clientCount * messageCount + " and then " + messageCount * clientCount + " total messages");
System.err.println("SEND COUNTS: " + this.threadCount * this.clientCount * this.messageCount + " and then " + this.messageCount * this.clientCount + " total messages");
synchronized (lock) {
synchronized (this.lock) {
try {
lock.wait(150 * 1000); // 15 secs
this.lock.wait(150 * 1000); // 15 secs
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!sentStringsToClientDebug.isEmpty()) {
System.err.println("MISSED DATA: " + sentStringsToClientDebug.size());
for (Entry<Integer, DataClass> i : sentStringsToClientDebug.entrySet()) {
if (!this.sentStringsToClientDebug.isEmpty()) {
System.err.println("MISSED DATA: " + this.sentStringsToClientDebug.size());
for (Entry<Integer, DataClass> i : this.sentStringsToClientDebug.entrySet()) {
System.err.println(i.getKey() + " : " + i.getValue().data);
}
}
stopEndPoints();
assertEquals(messageCount * clientCount, receivedServer.get()-1); // offset by 1 since we start at 1.
assertEquals(this.messageCount * this.clientCount, this.receivedServer.get()-1); // offset by 1 since we start at 1.
}
public static class DataClass {