Code cleanup, logging added

This commit is contained in:
nathan 2015-06-28 13:03:36 +02:00
parent 0a7ae1238e
commit 1db46e3554

View File

@ -1,35 +1,6 @@
package dorkbox.network.connection;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import java.lang.annotation.Annotation;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.bouncycastle.crypto.params.IESParameters;
import org.bouncycastle.crypto.params.IESWithCipherParameters;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.factories.SerializerFactory;
import dorkbox.network.ConnectionOptions;
import dorkbox.network.connection.bridge.ConnectionBridgeBase;
import dorkbox.network.connection.registration.MetaChannel;
@ -54,7 +25,6 @@ import dorkbox.network.util.serializers.FieldAnnotationAwareSerializer;
import dorkbox.network.util.serializers.IgnoreSerialization;
import dorkbox.network.util.store.NullSettingsStore;
import dorkbox.network.util.store.SettingsStore;
import dorkbox.network.util.udt.UdtEndpointProxy;
import dorkbox.util.Sys;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
@ -63,9 +33,33 @@ import dorkbox.util.crypto.serialization.EccPrivateKeySerializer;
import dorkbox.util.crypto.serialization.EccPublicKeySerializer;
import dorkbox.util.crypto.serialization.IesParametersSerializer;
import dorkbox.util.crypto.serialization.IesWithCipherParametersSerializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.bouncycastle.crypto.params.IESParameters;
import org.bouncycastle.crypto.params.IESWithCipherParameters;
import org.slf4j.Logger;
/** represents the base of a client/server end point */
public abstract class EndPoint {
import java.lang.annotation.Annotation;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* represents the base of a client/server end point
*/
public abstract
class EndPoint {
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
// it results in severe UDP packet loss and contention.
//
@ -83,7 +77,7 @@ public abstract class EndPoint {
/**
* this can be changed to a more specialized value, if necessary
* this can be changed to a more specialized value, if necessary
*/
public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
public static final String LOCAL_CHANNEL = "local_channel";
@ -97,17 +91,17 @@ public abstract class EndPoint {
/**
* The default size for UDP packets is 768 bytes.
*
* <p/>
* You could increase or decrease this value to avoid truncated packets
* or to improve memory footprint respectively.
*
* <p/>
* Please also note that a large UDP packet might be truncated or
* dropped by your router no matter how you configured this option.
* In UDP, a packet is truncated or dropped if it is larger than a
* certain size, depending on router configuration. IPv4 routers
* truncate and IPv6 routers drop a large packet. That's why it is
* safe to send small packets in UDP.
*
* <p/>
* 512 is recommended to prevent fragmentation.
* This can be set higher on an internal lan! (or use UDT to make UDP transfers easy)
*/
@ -120,16 +114,8 @@ public abstract class EndPoint {
// Needed for NIO selectors on Android 2.2, and to force IPv4.
System.setProperty("java.net.preferIPv4Stack", Boolean.TRUE.toString());
System.setProperty("java.net.preferIPv6Addresses", Boolean.FALSE.toString());
// need to make sure UDT uses our loader instead of the default loader
try {
// all of this must be proxied to another class, so THIS class doesn't have unmet dependencies.
// Annoying and abusing the classloader, but it works well.
Class.forName("com.barchart.udt.nio.SelectorProviderUDT");
UdtEndpointProxy.setLibraryLoaderClassName();
} catch (Throwable ignored) {}
} catch (AccessControlException ignored) {}
} catch (AccessControlException ignored) {
}
}
protected final org.slf4j.Logger logger;
@ -158,7 +144,9 @@ public abstract class EndPoint {
protected AtomicBoolean isConnected = new AtomicBoolean(false);
/** in milliseconds. default is disabled! */
/**
* in milliseconds. default is disabled!
*/
private volatile int idleTimeout = 0;
private ConcurrentHashMap<Class<?>, EndpointTool> toolMap = new ConcurrentHashMap<Class<?>, EndpointTool>();
@ -171,7 +159,8 @@ public abstract class EndPoint {
boolean disableRemoteKeyValidation;
public EndPoint(String name, ConnectionOptions options) throws InitializationException, SecurityException {
public
EndPoint(String name, ConnectionOptions options) throws InitializationException, SecurityException {
this.name = name;
this.logger = org.slf4j.LoggerFactory.getLogger(name);
@ -191,7 +180,8 @@ public abstract class EndPoint {
// we have to be able to specify WHAT property store we want to use, since it can change!
if (options.settingsStore == null) {
this.propertyStore = new PropertyStore(name);
} else {
}
else {
this.propertyStore = options.settingsStore;
}
@ -207,21 +197,13 @@ public abstract class EndPoint {
if (privateKey == null || publicKey == null) {
try {
// EntropyProvider p = Entropy.getProvider();
// if (!(entropy instanceof SimpleEntropy)) {
// // have to do the prompt thing?
//
// System.err.println("There are no ECC keys for the " + name + " yet. Please press keyboard keys (numbers/letters/etc) to generate entropy.");
// System.err.flush();
// }
// seed our RNG based off of this and create our ECC keys
byte[] seedBytes = Entropy.get("There are no ECC keys for the " + name + " yet.");
byte[] seedBytes = Entropy.get("There are no ECC keys for the " + name + " yet");
SecureRandom secureRandom = new SecureRandom(seedBytes);
secureRandom.nextBytes(seedBytes);
System.err.println("Now generating ECC (" + Crypto.ECC.p521_curve + ") keys. Please wait!");
AsymmetricCipherKeyPair generateKeyPair = Crypto.ECC.generateKeyPair(Crypto.ECC.p521_curve, new SecureRandom(seedBytes));
this.logger.debug("Now generating ECC (" + Crypto.ECC.p521_curve + ") keys. Please wait!");
AsymmetricCipherKeyPair generateKeyPair = Crypto.ECC.generateKeyPair(Crypto.ECC.p521_curve, secureRandom);
privateKey = (ECPrivateKeyParameters) generateKeyPair.getPrivate();
publicKey = (ECPublicKeyParameters) generateKeyPair.getPublic();
@ -230,7 +212,7 @@ public abstract class EndPoint {
this.propertyStore.savePrivateKey(privateKey);
this.propertyStore.savePublicKey(publicKey);
System.err.println("Done with ECC keys!");
this.logger.debug("Done with ECC keys!");
} catch (Exception e) {
String message = "Unable to initialize/generate ECC keys. FORCED SHUTDOWN.";
this.logger.error(message);
@ -240,7 +222,8 @@ public abstract class EndPoint {
this.privateKey = privateKey;
this.publicKey = publicKey;
} else {
}
else {
this.privateKey = null;
this.publicKey = null;
}
@ -250,7 +233,8 @@ public abstract class EndPoint {
this.shutdownHook = new Thread() {
@Override
public void run() {
public
void run() {
// connectionManager.shutdown accurately reflects the state of the app. Safe to use here
if (EndPoint.this.connectionManager != null && !EndPoint.this.connectionManager.shutdown) {
EndPoint.this.stop();
@ -264,7 +248,8 @@ public abstract class EndPoint {
// serialization stuff
if (options.serializationManager != null) {
this.serializationManager = options.serializationManager;
} else {
}
else {
this.serializationManager = new KryoSerializationManager();
}
@ -305,17 +290,20 @@ public abstract class EndPoint {
*/
if (options.enableRmi) {
this.rmiBridge = new RmiBridge(this.logger, this.serializationManager);
} else {
}
else {
this.rmiBridge = null;
}
}
public void disableRemoteKeyValidation() {
public
void disableRemoteKeyValidation() {
Logger logger2 = this.logger;
if (isConnected()) {
logger2.error("Cannot disable the remote key validation after this endpoint is connected!");
} else {
}
else {
if (logger2.isInfoEnabled()) {
logger2.info("WARNING: Disabling remote key validation is a security risk!!");
}
@ -328,23 +316,28 @@ public abstract class EndPoint {
* a database, etc, or can be a "null" property store, which does nothing
*/
@SuppressWarnings("unchecked")
public <T extends SettingsStore> T getPropertyStore() {
public
<T extends SettingsStore> T getPropertyStore() {
return (T) this.propertyStore;
}
/**
* TODO maybe remove this? method call is used by jetty ssl
*
* @return the ECC public key in use by this endpoint
*/
public ECPrivateKeyParameters getPrivateKey() {
public
ECPrivateKeyParameters getPrivateKey() {
return this.privateKey;
}
/**
* TODO maybe remove this? method call is used by jetty ssl
*
* @return the ECC private key in use by this endpoint
*/
public ECPublicKeyParameters getPublicKey() {
public
ECPublicKeyParameters getPublicKey() {
return this.publicKey;
}
@ -352,41 +345,46 @@ public abstract class EndPoint {
* Internal call by the pipeline to notify the client to continue registering the different session protocols.
* The server does not use this.
*/
protected boolean registerNextProtocol0() {
protected
boolean registerNextProtocol0() {
return true;
}
/**
* The {@link Listener:idle()} will be triggered when neither read nor write
* has happened for the specified period of time (in milli-seconds)
* <br>
* Specify {@code 0} to disable (default).
* The {@link Listener:idle()} will be triggered when neither read nor write
* has happened for the specified period of time (in milli-seconds)
* <br>
* Specify {@code 0} to disable (default).
*/
public void setIdleTimeout(int idleTimeout) {
public
void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
/**
* The amount of milli-seconds that must elapse with no read or write before {@link Listener.idle()}
* will be triggered
* The amount of milli-seconds that must elapse with no read or write before {@link Listener.idle()}
* will be triggered
*/
public int getIdleTimeout() {
public
int getIdleTimeout() {
return this.idleTimeout;
}
/**
* Return the connection status of this endpoint.
* <p>
* <p/>
* Once a server has connected to ANY client, it will always return true until server.close() is called
*/
public final boolean isConnected() {
public final
boolean isConnected() {
return this.isConnected.get();
}
/**
* Add a channel future to be tracked and managed for shutdown.
*/
protected final void manageForShutdown(ChannelFuture future) {
protected final
void manageForShutdown(ChannelFuture future) {
synchronized (this.shutdownChannelList) {
this.shutdownChannelList.add(future);
}
@ -395,7 +393,8 @@ public abstract class EndPoint {
/**
* Add an eventloop group to be tracked & managed for shutdown
*/
protected final void manageForShutdown(EventLoopGroup loopGroup) {
protected final
void manageForShutdown(EventLoopGroup loopGroup) {
synchronized (this.eventLoopGroups) {
this.eventLoopGroups.add(loopGroup);
}
@ -403,20 +402,19 @@ public abstract class EndPoint {
/**
* Returns the serialization wrapper if there is an object type that needs to be added outside of the basics.
*/
public SerializationManager getSerialization() {
public
SerializationManager getSerialization() {
return this.serializationManager;
}
/**
* Gets the remote method invocation (RMI) bridge for this endpoint.
*/
public Rmi rmi() {
public
Rmi rmi() {
if (this.rmiBridge == null) {
throw new NetException("Cannot use a remote object space that has NOT been created first! Configure the ConnectionOptions!");
}
@ -426,13 +424,14 @@ public abstract class EndPoint {
/**
* This method allows the connections used by the client/server to be subclassed (custom implementations).
* <p>
* <p/>
* As this is for the network stack, the new connection type MUST subclass {@link Connection}
*
* @param bridge null when retrieving the subclass type (internal use only). Non-null when creating a new (and real) connection.
* @return a new network connection
*/
public Connection newConnection(String name) {
public
Connection newConnection(String name) {
return new ConnectionImpl(name);
}
@ -443,7 +442,8 @@ public abstract class EndPoint {
*
* @param metaChannel can be NULL (when getting the baseClass)
*/
protected final Connection connection0(MetaChannel metaChannel) {
protected final
Connection connection0(MetaChannel metaChannel) {
Connection connection;
// setup the extras needed by the network connection.
@ -454,10 +454,12 @@ public abstract class EndPoint {
if (metaChannel.localChannel != null) {
wrapper = new ChannelLocalWrapper(metaChannel);
} else {
}
else {
if (this instanceof EndPointServer) {
wrapper = new ChannelNetworkWrapper(metaChannel, this.registrationWrapper);
} else {
}
else {
wrapper = new ChannelNetworkWrapper(metaChannel, null);
}
}
@ -473,7 +475,8 @@ public abstract class EndPoint {
if (this.rmiBridge != null) {
connection.listeners().add(this.rmiBridge.getListener());
}
} else {
}
else {
// getting the baseClass
// have to add the networkAssociate to a map of "connected" computers
@ -486,7 +489,7 @@ public abstract class EndPoint {
/**
* Internal call by the pipeline to notify the "Connection" object that it has "connected", meaning that modifications
* to the pipeline are finished.
*
* <p/>
* Only the CLIENT injects in front of this)
*/
void connectionConnected0(Connection connection) {
@ -501,14 +504,16 @@ public abstract class EndPoint {
/**
* Expose methods to modify the listeners (connect/disconnect/idle/receive events).
*/
public final ListenerBridge listeners() {
public final
ListenerBridge listeners() {
return this.connectionManager;
}
/**
* Returns a non-modifiable list of active connections
*/
public List<Connection> getConnections() {
public
List<Connection> getConnections() {
return this.connectionManager.getConnections();
}
@ -516,14 +521,16 @@ public abstract class EndPoint {
* Returns a non-modifiable list of active connections
*/
@SuppressWarnings("unchecked")
public <C extends Connection> Collection<C> getConnectionsAs() {
public
<C extends Connection> Collection<C> getConnectionsAs() {
return (Collection<C>) this.connectionManager.getConnections();
}
/**
* Expose methods to send objects to a destination.
*/
public abstract ConnectionBridgeBase send();
public abstract
ConnectionBridgeBase send();
/**
* Returns a proxy object that implements the specified interfaces. Methods
@ -532,31 +539,33 @@ public abstract class EndPoint {
* remote end of the connection has not {@link #addConnection(Connection)
* added} the connection to the ObjectSpace, the remote method invocations
* will be ignored.
* <p>
* <p/>
* Methods that return a value will throw {@link TimeoutException} if the
* response is not received with the
* {@link RemoteObject#setResponseTimeout(int) response timeout}.
* <p>
* <p/>
* If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
* (the default), then methods that return a value must not be called from
* the update thread for the connection. An exception will be thrown if this
* occurs. Methods with a void return value can be called on the update
* thread.
* <p>
* <p/>
* If a proxy returned from this method is part of an object graph sent over
* the network, the object graph on the receiving side will have the proxy
* object replaced with the registered object.
*
* @see RemoteObject
*/
public RemoteObject getRemoteObject(Connection connection, int objectID, Class<?>[] ifaces) {
public
RemoteObject getRemoteObject(Connection connection, int objectID, Class<?>[] ifaces) {
return this.rmiBridge.getRemoteObject(connection, objectID, ifaces);
}
/**
* Registers a tool with the server, to be used by other services.
*/
public void registerTool(EndpointTool toolClass) {
public
void registerTool(EndpointTool toolClass) {
if (toolClass == null) {
throw new IllegalArgumentException("Tool must not be null! Unable to add tool");
}
@ -569,7 +578,7 @@ public abstract class EndPoint {
Class<?> clazz2;
Class<EndpointTool> cls = EndpointTool.class;
for (int i=0;i<length;i++) {
for (int i = 0; i < length; i++) {
clazz2 = interfaces[i];
if (cls.isAssignableFrom(clazz2)) {
index = i;
@ -580,7 +589,8 @@ public abstract class EndPoint {
if (index == -1) {
throw new IllegalArgumentException("Unable to discover tool interface! WHOOPS!");
}
} else {
}
else {
index = 0;
}
@ -594,7 +604,8 @@ public abstract class EndPoint {
/**
* Only get the tools in the ModuleStart (ie: load) methods. If done in the constructor, the tool might not be available yet
*/
public <T extends EndpointTool> T getTool(Class<?> toolClass) {
public
<T extends EndpointTool> T getTool(Class<?> toolClass) {
if (toolClass == null) {
throw new IllegalArgumentException("Tool must not be null! Unable to add tool");
}
@ -604,16 +615,18 @@ public abstract class EndPoint {
return tool;
}
public String getName() {
public
String getName() {
return this.name;
}
/**
* Closes all connections ONLY (keeps the server/client running).
* <p>
* <p/>
* This is used, for example, when reconnecting to a server. The server should ALWAYS use STOP.
*/
public void close() {
public
void close() {
// give a chance to other threads.
Thread.yield();
@ -623,11 +636,13 @@ public abstract class EndPoint {
this.isConnected.set(false);
}
protected final String stopWithErrorMessage(Logger logger2, String errorMessage, Throwable throwable) {
protected final
String stopWithErrorMessage(Logger logger2, String errorMessage, Throwable throwable) {
if (logger2.isDebugEnabled() && throwable != null) {
// extra info if debug is enabled
logger2.error(errorMessage, throwable.getCause());
} else {
}
else {
logger2.error(errorMessage);
}
@ -637,10 +652,11 @@ public abstract class EndPoint {
/**
* Safely closes all associated resources/threads/connections
* <p>
* <p/>
* Override stopExtraActions() if you want to provide extra behavior to stopping the endpoint
*/
public final void stop() {
public final
void stop() {
// only permit us to "stop" once!
if (!this.stopCalled.compareAndSet(false, true)) {
return;
@ -658,10 +674,12 @@ public abstract class EndPoint {
if (!inEventThread) {
stopInThread();
} else {
}
else {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
public
void run() {
EndPoint.this.stopInThread();
EndPoint.this.blockWhileShutdown.countDown();
}
@ -680,7 +698,8 @@ public abstract class EndPoint {
}
// This actually does the "stopping", since there is some logic to making sure we don't deadlock, this is important
private final void stopInThread() {
private final
void stopInThread() {
// make sure we are not trying to stop during a startup procedure.
// This will wait until we have finished starting up/shutting down.
synchronized (this.shutdownInProgress) {
@ -754,13 +773,15 @@ public abstract class EndPoint {
/**
* Extra EXTERNAL actions to perform when stopping this endpoint.
*/
public void stopExtraActions() {
public
void stopExtraActions() {
}
/**
* Determines if the specified thread (usually the current thread) is a member of a group's threads.
*/
protected static final boolean checkInEventGroup(Thread currentThread, EventLoopGroup group) {
protected static final
boolean checkInEventGroup(Thread currentThread, EventLoopGroup group) {
if (group != null) {
Set<EventExecutor> children = group.children();
for (EventExecutor e : children) {
@ -777,7 +798,8 @@ public abstract class EndPoint {
*
* @param blockUntilTerminate if TRUE, then this endpoint will block until STOP is called, otherwise it will not block
*/
public final void waitForStop(boolean blockUntilTerminate) {
public final
void waitForStop(boolean blockUntilTerminate) {
if (blockUntilTerminate) {
// we now BLOCK until the stop method is called.
try {
@ -789,12 +811,14 @@ public abstract class EndPoint {
}
@Override
public String toString() {
public
String toString() {
return "EndPoint [" + this.name + "]";
}
@Override
public int hashCode() {
public
int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (this.name == null ? 0 : this.name.hashCode());
@ -804,7 +828,8 @@ public abstract class EndPoint {
}
@Override
public boolean equals(Object obj) {
public
boolean equals(Object obj) {
if (this == obj) {
return true;
}
@ -819,21 +844,24 @@ public abstract class EndPoint {
if (other.name != null) {
return false;
}
} else if (!this.name.equals(other.name)) {
}
else if (!this.name.equals(other.name)) {
return false;
}
if (this.privateKey == null) {
if (other.privateKey != null) {
return false;
}
} else if (!Crypto.ECC.compare(this.privateKey, other.privateKey)) {
}
else if (!Crypto.ECC.compare(this.privateKey, other.privateKey)) {
return false;
}
if (this.publicKey == null) {
if (other.publicKey != null) {
return false;
}
} else if (!Crypto.ECC.compare(this.publicKey, other.publicKey)) {
}
else if (!Crypto.ECC.compare(this.publicKey, other.publicKey)) {
return false;
}
return true;