Code polish. Added comments. Cleaned up ping

This commit is contained in:
nathan 2014-09-09 13:04:07 +02:00
parent 53b6d8cb2d
commit f4845a6b1f
11 changed files with 134 additions and 140 deletions

View File

@ -29,7 +29,7 @@ public interface ConnectionBridge {
public ConnectionPoint UDT(Object message);
/**
* Sends a "ping" packet, trying UDP, then UDT, then TCP (in that order) to measure round trip time to the remote connection.
* Sends a "ping" packet, trying UDP, then UDT, then TCP (in that order) to measure <b>ROUND TRIP</b> time to the remote connection.
*
* @return Ping can have a listener attached, which will get called when the ping returns.
*/

View File

@ -24,7 +24,6 @@ import dorkbox.network.connection.idle.IdleSender;
import dorkbox.network.connection.ping.Ping;
import dorkbox.network.connection.ping.PingFuture;
import dorkbox.network.connection.ping.PingMessage;
import dorkbox.network.connection.ping.PingUtil;
import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelNull;
import dorkbox.network.connection.wrapper.ChannelWrapper;
@ -42,15 +41,14 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
private AtomicBoolean closeInProgress = new AtomicBoolean(false);
private AtomicBoolean alreadyClosed = new AtomicBoolean(false);
private AtomicBoolean messageInProgress = new AtomicBoolean(false);
private final Object closeInProgressLock = new Object();
private final Object messageInProgressLock = new Object();
private AtomicBoolean messageInProgress = new AtomicBoolean(false);
private ISessionManager sessionManager;
private ChannelWrapper channelWrapper;
private EndPointWithSerialization endPoint;
private final PingUtil pingUtil;
private volatile PingFuture pingFuture = null;
@ -63,8 +61,6 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
public ConnectionImpl(String name) {
this.name = name;
this.pingUtil = new PingUtil();
this.logger = org.slf4j.LoggerFactory.getLogger(name);
}
@ -149,34 +145,31 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
}
/**
* Updates the ping times for this connection.
* Updates the ping times for this connection (called when this connection gets a REPLY ping message).
*/
public final void updatePingResponse(PingMessage ping) {
synchronized (this.pingUtil) {
this.pingUtil.updatePing(ping);
if (this.pingFuture != null) {
this.pingFuture.setSuccess(this.pingUtil);
}
if (this.pingFuture != null) {
this.pingFuture.setSuccess(ping);
}
}
/**
* Sends a "ping" packet, trying UDP, then UDT, then TCP (in that order) to measure round trip time to the remote connection.
* Sends a "ping" packet, trying UDP, then UDT, then TCP (in that order) to measure <b>ROUND TRIP</b> time to the remote connection.
*
* @return Ping can have a listener attached, which will get called when the ping returns.
*/
@Override
public final Ping ping() {
synchronized (this.pingUtil) {
if (this.pingFuture != null) {
this.pingFuture.cancel();
}
this.pingFuture = this.channelWrapper.pingFuture();
PingFuture pingFuture2 = this.pingFuture;
if (pingFuture2 != null) {
pingFuture2.cancel();
}
ping0(this.pingUtil.pingMessage());
this.pingFuture = this.channelWrapper.pingFuture();
PingMessage ping = new PingMessage();
ping.id = this.pingFuture.getId();
ping0(ping);
return this.pingFuture;
}
@ -196,14 +189,12 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
}
/**
* Returns the last calculated TCP return trip time, or -1 if {@link #updateReturnTripTime()} has never been called or the
* {@link PingMessage} response has not yet been received.
* Returns the last calculated TCP return trip time, or -1 if or the {@link PingMessage} response has not yet been received.
*/
public final int getLastRoundTripTime() {
if (this.pingFuture != null) {
synchronized (this.pingUtil) {
return this.pingFuture.getResponse();
}
PingFuture pingFuture2 = this.pingFuture;
if (pingFuture2 != null) {
return pingFuture2.getResponse();
} else {
return -1;
}
@ -460,6 +451,13 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
this.channelWrapper.close(this, this.sessionManager);
// close out the ping future
PingFuture pingFuture2 = this.pingFuture;
if (pingFuture2 != null) {
pingFuture2.cancel();
}
this.pingFuture = null;
// want to wait for the "channelInactive" method to FINISH before allowing our current thread to continue!
synchronized (this.closeInProgressLock) {
if (!this.alreadyClosed.get()) {
@ -518,12 +516,14 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
@Override
public final void add(Listener listener) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
// are notified of that listener.
// it is POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
// I add one listener, and ALL connections are notified of that listener.
//
// HOWEVER, it is also POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners)
// synchronized because this should be uncommon, and we want to make sure that when the manager
// synchronized because this should be VERY uncommon, and we want to make sure that when the manager
// is empty, we can remove it from this connection.
synchronized (this) {
if (this.localListenerManager == null) {
@ -553,9 +553,11 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
@Override
public final void remove(Listener listener) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
// are notified of that listener.
// it is POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
// I add one listener, and ALL connections are notified of that listener.
//
// HOWEVER, it is also POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners)
// synchronized because this should be uncommon, and we want to make sure that when the manager
@ -581,9 +583,11 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
@Override
public final void removeAll() {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
// are notified of that listener.
// it is POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
// I add one listener, and ALL connections are notified of that listener.
//
// HOWEVER, it is also POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners)
// synchronized because this should be uncommon, and we want to make sure that when the manager
@ -609,9 +613,11 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
@Override
public final void removeAll(Class<?> classType) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
// are notified of that listener.
// it is POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
// I add one listener, and ALL connections are notified of that listener.
//
// HOWEVER, it is also POSSIBLE to add a local listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners)
// synchronized because this should be uncommon, and we want to make sure that when the manager

View File

@ -20,10 +20,10 @@ import dorkbox.util.ClassHelper;
// objects that are somehow equal to each other.
public class ConnectionManager implements ListenerBridge, ISessionManager {
private volatile ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<Listener<Connection, Object>>> listeners;
private volatile ConcurrentHashMapFactory<Connection, ConnectionManager> localManagers;
private volatile CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
// these are final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change)
private final ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<Listener<Connection, Object>>> listeners;
private final ConcurrentHashMapFactory<Connection, ConnectionManager> localManagers;
private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
/** Used by the listener subsystem to determine types. */
private final Class<?> baseClass;
@ -38,7 +38,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
this.baseClass = baseClass;
this.listeners = new ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>() {
private static final long serialVersionUID = 8404650379739727012L;
private static final long serialVersionUID = 1L;
@Override
public CopyOnWriteArrayList<Listener<Connection, Object>> createNewOject(Object... args) {
@ -47,7 +47,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
};
this.localManagers = new ConcurrentHashMapFactory<Connection, ConnectionManager>() {
private static final long serialVersionUID = -1656860453153611896L;
private static final long serialVersionUID = 1L;
@Override
public ConnectionManager createNewOject(Object... args) {
@ -307,7 +307,6 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public void connectionConnected(Connection connection) {
// only TCP channels are passed in.
// create a new connection!
this.connections.add(connection);

View File

@ -31,11 +31,12 @@ public class RegistrationWrapper implements UdpServer {
private final EndPoint endPoint;
// keeps track of connections (TCP/UDT/UDP-client)
private ReentrantLock channelMapLock = new ReentrantLock();
private final ReentrantLock channelMapLock = new ReentrantLock();
private IntMap<MetaChannel> channelMap = new IntMap<MetaChannel>();
// keeps track of connections (UDP-server)
private volatile ConcurrentMap<InetSocketAddress, ConnectionImpl> udpRemoteMap;
// this is final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change)
private final ConcurrentMap<InetSocketAddress, ConnectionImpl> udpRemoteMap;
private KryoEncoder kryoTcpEncoder;
private KryoEncoderCrypto kryoTcpCryptoEncoder;
@ -75,6 +76,9 @@ public class RegistrationWrapper implements UdpServer {
public IntMap<MetaChannel> getAndLockChannelMap() {
// try to lock access
this.channelMapLock.lock();
// guarantee that the contents of this map are visible across threads
synchronized (this.channelMap) {}
return this.channelMap;
}

View File

@ -5,20 +5,28 @@ import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class PingFuture implements Ping {
private static AtomicInteger pingCounter = new AtomicInteger(0);
private final Promise<Integer> promise;
private final int id;
private final long sentTime;
/**
* Protected constructor for when we are completely overriding this class. (Used by the "local" connection for instant pings)
*/
protected PingFuture() {
promise = null;
this(null);
}
public PingFuture(Promise<Integer> promise) {
this.promise = promise;
this.id = pingCounter.getAndIncrement();
this.sentTime = System.currentTimeMillis();
}
/**
@ -27,7 +35,7 @@ public class PingFuture implements Ping {
@Override
public int getResponse() {
try {
return promise.syncUninterruptibly().get();
return this.promise.syncUninterruptibly().get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
@ -36,10 +44,17 @@ public class PingFuture implements Ping {
/**
* Tells this ping future, that it was successful
* This is when the endpoint that ORIGINALLY sent the ping, finally receives a response.
*/
public void setSuccess(PingUtil pingUtil) {
promise.setSuccess(pingUtil.getReturnTripTime());
public void setSuccess(PingMessage ping) {
if (ping.id == this.id) {
long longTime = System.currentTimeMillis() - this.sentTime;
if (longTime < Integer.MAX_VALUE) {
this.promise.setSuccess((int)longTime);
} else {
this.promise.setSuccess(Integer.MAX_VALUE);
}
}
}
/**
@ -49,7 +64,7 @@ public class PingFuture implements Ping {
*/
@Override
public void addListener(GenericFutureListener<? extends Future<? super Object>> listener) {
promise.addListener(listener);
this.promise.addListener(listener);
}
/**
@ -60,7 +75,7 @@ public class PingFuture implements Ping {
*/
@Override
public void removeListener(GenericFutureListener<? extends Future<? super Object>> listener) {
promise.removeListener(listener);
this.promise.removeListener(listener);
}
/**
@ -68,6 +83,13 @@ public class PingFuture implements Ping {
*/
@Override
public void cancel() {
promise.tryFailure(new PingCanceledException());
this.promise.tryFailure(new PingCanceledException());
}
/**
* @return the ID of this ping future
*/
public int getId() {
return this.id;
}
}

View File

@ -6,6 +6,7 @@ import io.netty.util.concurrent.GenericFutureListener;
public class PingFutureLocal extends PingFuture {
public PingFutureLocal() {
super();
}
/**
@ -20,7 +21,7 @@ public class PingFutureLocal extends PingFuture {
* Tells this ping future, that it was successful
*/
@Override
public void setSuccess(PingUtil pingUtil) {
public void setSuccess(PingMessage ping) {
}
/**

View File

@ -21,8 +21,9 @@ public class PingListener extends Listener<ConnectionImpl, PingMessage> {
}
connection.updatePingResponse(ping);
} else {
// return the ping from whence it came
if (logger2.isTraceEnabled()) {
logger2.trace( "Received a ping from {}", connection);
logger2.trace( "Received a ping request from {}. Sending a reply.", connection);
}
ping.isReply = true;

View File

@ -6,7 +6,4 @@ package dorkbox.network.connection.ping;
public class PingMessage {
public int id;
public boolean isReply;
/** The ping round-trip time in milliseconds */
public transient int time;
}

View File

@ -1,37 +0,0 @@
package dorkbox.network.connection.ping;
public class PingUtil {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PingUtil.class);
//all methods are protected by this!
private int lastPingID = 0;
private long lastPingSendTime;
// if something takes longer than 2billion seconds (signed int) and the connection doesn't time out. We have problems.
/** The ping round-trip time in nanoseconds */
private int returnTripTime;
public PingUtil() {
}
public final synchronized PingMessage pingMessage() {
PingMessage ping = new PingMessage();
ping.id = lastPingID++;
lastPingSendTime = System.currentTimeMillis();
return ping;
}
public final synchronized int getReturnTripTime() {
return returnTripTime;
}
public final synchronized void updatePing(PingMessage ping) {
if (ping.id == lastPingID - 1) {
ping.time = returnTripTime = (int)(System.currentTimeMillis() - lastPingSendTime);
logger.trace("Return trip time: {}", returnTripTime);
}
}
}

View File

@ -13,6 +13,9 @@ public abstract class ConcurrentHashMapFactory<K, V> extends ConcurrentHashMap<K
public abstract V createNewOject(Object... args);
/** Thread safe method to get the value in the map. If the value doesn't exist,
* it will create a new one (and put the new one in the map)
*/
public final V getOrCreate(K key, Object... args) {
V orig = get(key);

View File

@ -22,7 +22,7 @@ public class PingTest extends BaseTest {
// ping prefers the following order: UDP, UDT, TCP
@Test
public void pingTCP() throws IOException, InitializationException, SecurityException {
response = -1;
this.response = -1;
ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions.tcpPort = tcpPort;
@ -44,23 +44,33 @@ public class PingTest extends BaseTest {
@Override
public void connected(Connection connection) {
System.err.println("Testing TCP ping");
}
@Override
public void received(Connection connection, PingMessage ping) {
response = ping.time;
System.err.println("Ping return time: " + response);
if (count++ < 10) {
connection.send().ping();
} else {
stopEndPoints();
for (int i=0;i<10;i++) {
int response2 = connection.send().ping().getResponse();
System.err.println("Ping B roundtime: " + response2);
}
}
// @Override
// public void received(Connection connection, PingMessage ping) {
// response = ping.time;
// System.err.println("Ping return time: " + response);
//
// if (count++ < 10) {
// connection.send().ping();
// } else {
// stopEndPoints();
// }
// }
});
client.connect(5000);
client.ping();
for (int i=0;i<10;i++) {
int response2 = client.ping().getResponse();
System.err.println("Ping A roundtime: " + response2);
}
// alternate way to register for the receipt of a one-off ping response
// PingFuture ping = connection.ping();
// ping.addListener(new ChannelFutureListener() {
@ -80,7 +90,7 @@ public class PingTest extends BaseTest {
waitForThreads();
if (response == -1) {
if (this.response == -1) {
fail();
}
}
@ -88,7 +98,7 @@ public class PingTest extends BaseTest {
// ping prefers the following order: UDP, UDT, TCP
@Test
public void pingUDP() throws IOException, InitializationException, SecurityException {
response = -1;
this.response = -1;
ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions.tcpPort = tcpPort;
@ -113,18 +123,6 @@ public class PingTest extends BaseTest {
public void connected(Connection connection) {
System.err.println("Testing UDP ping");
}
@Override
public void received(Connection connection, PingMessage ping) {
response = ping.time;
System.err.println("Ping return time: " + response);
if (count++ < 10) {
connection.send().ping();
} else {
stopEndPoints();
}
}
});
client.connect(5000);
@ -148,7 +146,7 @@ public class PingTest extends BaseTest {
waitForThreads();
if (response == -1) {
if (this.response == -1) {
fail();
}
}
@ -157,7 +155,7 @@ public class PingTest extends BaseTest {
// ping prefers the following order: UDP, UDT, TCP
@Test
public void pingUDT() throws IOException, InitializationException, SecurityException {
response = -1;
this.response = -1;
ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions.tcpPort = tcpPort;
@ -182,17 +180,17 @@ public class PingTest extends BaseTest {
System.err.println("Testing UDT ping");
}
@Override
public void received(Connection connection, PingMessage ping) {
response = ping.time;
System.err.println("Ping return time: " + response);
if (count++ < 10) {
connection.send().ping();
} else {
stopEndPoints();
}
}
// @Override
// public void received(Connection connection, PingMessage ping) {
// PingTest.this.response = ping.time;
// System.err.println("Ping return time: " + PingTest.this.response);
//
// if (this.count++ < 10) {
// connection.send().ping();
// } else {
// stopEndPoints();
// }
// }
});
client.connect(5000);
@ -216,7 +214,7 @@ public class PingTest extends BaseTest {
waitForThreads();
if (response == -1) {
if (this.response == -1) {
fail();
}
}