Added aborting of the client registration process, if it fails during the registration process. Wrapped logger.debug/trace into checks first. Tweaked caller of RmiBridge

This commit is contained in:
nathan 2014-08-22 13:51:02 +02:00
parent e41e0823a8
commit 28f6b65336
11 changed files with 251 additions and 169 deletions

View File

@ -45,8 +45,6 @@ import dorkbox.network.util.udt.UdtEndpointProxy;
public class Client extends EndPointClient {
private List<BootstrapWrapper> bootstraps = new LinkedList<BootstrapWrapper>();
private volatile boolean registrationInProgress = false;
private volatile int connectionTimeout = 5000; // default
/**
@ -265,6 +263,10 @@ public class Client extends EndPointClient {
// we will only do a local channel when NOT doing TCP/UDP channels. This is EXCLUSIVE. (XOR)
int size = this.bootstraps.size();
for (int i=0;i<size;i++) {
if (!this.registrationInProgress) {
break;
}
this.registrationComplete = i == size-1;
BootstrapWrapper bootstrapWrapper = this.bootstraps.get(i);
ChannelFuture future;
@ -310,7 +312,7 @@ public class Client extends EndPointClient {
// WAIT for the next one to complete.
try {
this.registrationLock.wait();
this.registrationLock.wait(connectionTimeout);
} catch (InterruptedException e) {
}
}

View File

@ -16,6 +16,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bouncycastle.crypto.params.ParametersWithIV;
import org.slf4j.Logger;
import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.connection.idle.IdleObjectSender;
@ -236,7 +237,10 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
*/
@Override
public final void self(Object message) {
this.logger.trace("Sending LOCAL {}", message);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Sending LOCAL {}", message);
}
this.sessionManager.notifyOnMessage(this, message);
}
@ -245,13 +249,18 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
*/
@Override
public final ConnectionPoint TCP(Object message) {
Logger logger2 = this.logger;
if (!this.closeInProgress.get()) {
this.logger.trace("Sending TCP {}", message);
if (logger2.isTraceEnabled()) {
logger2.trace("Sending TCP {}", message);
}
ConnectionPoint tcp = this.channelWrapper.tcp();
tcp.write(message);
return tcp;
} else {
this.logger.debug("writing TCP while closed: {}", message);
if (logger2.isDebugEnabled()) {
logger2.debug("writing TCP while closed: {}", message);
}
return null;
}
@ -262,13 +271,18 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
*/
@Override
public ConnectionPoint UDP(Object message) {
Logger logger2 = this.logger;
if (!this.closeInProgress.get()) {
this.logger.trace("Sending UDP {}", message);
if (logger2.isTraceEnabled()) {
logger2.trace("Sending UDP {}", message);
}
ConnectionPoint udp = this.channelWrapper.udp();
udp.write(message);
return udp;
} else {
this.logger.debug("writing UDP while closed: {}", message);
if (logger2.isDebugEnabled()) {
logger2.debug("writing UDP while closed: {}", message);
}
return null;
}
}
@ -278,13 +292,18 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
*/
@Override
public final ConnectionPoint UDT(Object message) {
Logger logger2 = this.logger;
if (!this.closeInProgress.get()) {
this.logger.trace("Sending UDT {}", message);
if (logger2.isTraceEnabled()) {
logger2.trace("Sending UDT {}", message);
}
ConnectionPoint udt = this.channelWrapper.udt();
udt.write(message);
return udt;
} else {
this.logger.debug("writing UDT while closed: {}", message);
if (logger2.isDebugEnabled()) {
logger2.debug("writing UDT while closed: {}", message);
}
return null;
}
}

View File

@ -10,6 +10,8 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import dorkbox.network.rmi.RmiMessages;
import dorkbox.network.util.ConcurrentHashMapFactory;
import dorkbox.util.ClassHelper;
@ -31,11 +33,11 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
public ConnectionManager(String name, Class<?> baseClass) {
this.name = name;
logger = org.slf4j.LoggerFactory.getLogger(name);
this.logger = org.slf4j.LoggerFactory.getLogger(name);
this.baseClass = baseClass;
listeners = new ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>() {
this.listeners = new ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>() {
private static final long serialVersionUID = 8404650379739727012L;
@Override
@ -44,7 +46,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
};
localManagers = new ConcurrentHashMapFactory<Connection, ConnectionManager>() {
this.localManagers = new ConcurrentHashMapFactory<Connection, ConnectionManager>() {
private static final long serialVersionUID = -1656860453153611896L;
@Override
@ -85,13 +87,13 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
Class<?> genericClass = ClassHelper.getGenericParameterAsClassForSuperClass(clazz, 0);
// if we are null, it means that we have no generics specified for our listener!
if (genericClass == baseClass || genericClass == null) {
if (genericClass == this.baseClass || genericClass == null) {
// we are the base class, so we are fine.
addListener0(listener);
return;
} else if (ClassHelper.hasInterface(Connection.class, genericClass) &&
!ClassHelper.hasParentClass(baseClass, genericClass)) {
!ClassHelper.hasParentClass(this.baseClass, genericClass)) {
// now we must make sure that the PARENT class is NOT the base class. ONLY the base class is allowed!
addListener0(listener);
@ -109,10 +111,13 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
private final void addListener0(Listener listener) {
Class<?> type = listener.getObjectType();
CopyOnWriteArrayList<Listener<Connection, Object>> list = listeners.getOrCreate(type);
CopyOnWriteArrayList<Listener<Connection, Object>> list = this.listeners.getOrCreate(type);
list.addIfAbsent(listener);
logger.trace("listener added: {} <{}>", listener.getClass().getName(), listener.getObjectType());
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("listener added: {} <{}>", listener.getClass().getName(), listener.getObjectType());
}
}
/**
@ -136,12 +141,15 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
Class<?> type = listener.getObjectType();
CopyOnWriteArrayList<Listener<Connection, Object>> list = listeners.get(type);
CopyOnWriteArrayList<Listener<Connection, Object>> list = this.listeners.get(type);
if (list != null) {
list.remove(listener);
}
logger.trace("listener removed: {} <{}>", listener.getClass().getName(), listener.getObjectType());
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("listener removed: {} <{}>", listener.getClass().getName(), listener.getObjectType());
}
}
/**
@ -150,9 +158,9 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public final void removeAll() {
listeners.clear();
this.listeners.clear();
logger.trace("all listeners removed !!");
this.logger.trace("all listeners removed !!");
}
/**
@ -166,9 +174,9 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
throw new IllegalArgumentException("classType cannot be null.");
}
listeners.remove(classType);
this.listeners.remove(classType);
logger.trace("all listeners removed for type: {}", classType.getClass().getName());
this.logger.trace("all listeners removed for type: {}", classType.getClass().getName());
}
@ -189,10 +197,10 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
Class<?> objectType = message.getClass();
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
CopyOnWriteArrayList<Listener<Connection, Object>> list = listeners.get(objectType);
CopyOnWriteArrayList<Listener<Connection, Object>> list = this.listeners.get(objectType);
if (list != null) {
for (Listener<Connection, Object> listener : list) {
if (shutdown) {
if (this.shutdown) {
return;
}
@ -214,7 +222,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
objectType = objectType.getSuperclass();
while (objectType != null) {
// check to see if we have what we are looking for in our CURRENT class
list = listeners.get(objectType);
list = this.listeners.get(objectType);
if (list != null) {
break;
@ -226,7 +234,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
if (list != null) {
for (Listener<Connection, Object> listener : list) {
if (shutdown) {
if (this.shutdown) {
return;
}
@ -234,7 +242,10 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
foundListener = true;
}
} else if (!foundListener) {
logger.debug("----------- LISTENER NOT REGISTERED FOR TYPE: {}", message.getClass().getSimpleName());
Logger logger2 = this.logger;
if (logger2.isDebugEnabled()) {
this.logger.debug("----------- LISTENER NOT REGISTERED FOR TYPE: {}", message.getClass().getSimpleName());
}
}
}
@ -244,7 +255,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional connection listener managers (non-global).
ConnectionManager localManager = localManagers.get(connection);
ConnectionManager localManager = this.localManagers.get(connection);
if (localManager != null) {
// if we found a listener during THIS method call, we need to let the NEXT method call know,
// so it doesn't spit out error for not handling a message (since that message MIGHT have
@ -260,13 +271,13 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public final void notifyOnIdle(Connection connection) {
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = listeners.entrySet();
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = this.listeners.entrySet();
CopyOnWriteArrayList<Listener<Connection,Object>> list;
for (Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
for (Listener<Connection,Object> listener : list) {
if (shutdown) {
if (this.shutdown) {
return;
}
@ -277,7 +288,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = localManagers.get(connection);
ConnectionManager localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.notifyOnIdle(connection);
}
@ -292,16 +303,16 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
public void connectionConnected(Connection connection) {
// only TCP channels are passed in.
// create a new connection!
connections.add(connection);
this.connections.add(connection);
try {
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = listeners.entrySet();
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = this.listeners.entrySet();
CopyOnWriteArrayList<Listener<Connection,Object>> list;
for (Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
for (Listener<Connection,Object> listener : list) {
if (shutdown) {
if (this.shutdown) {
return;
}
listener.connected(connection);
@ -311,7 +322,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = localManagers.get(connection);
ConnectionManager localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.connectionConnected(connection);
}
@ -327,13 +338,13 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public void connectionDisconnected(Connection connection) {
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = listeners.entrySet();
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = this.listeners.entrySet();
CopyOnWriteArrayList<Listener<Connection,Object>> list;
for (Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
for (Listener<Connection, Object> listener : list) {
if (shutdown) {
if (this.shutdown) {
return;
}
@ -343,15 +354,15 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = localManagers.get(connection);
ConnectionManager localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.connectionDisconnected(connection);
// remove myself from the "global" listeners so we can have our memory cleaned up.
localManagers.remove(connection);
this.localManagers.remove(connection);
}
connections.remove(connection);
this.connections.remove(connection);
}
@ -362,13 +373,13 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public void connectionError(Connection connection, Throwable throwable) {
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = listeners.entrySet();
Set<Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>>> entrySet = this.listeners.entrySet();
CopyOnWriteArrayList<Listener<Connection,Object>> list;
for (Entry<Type, CopyOnWriteArrayList<Listener<Connection, Object>>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
for (Listener<Connection, Object> listener : list) {
if (shutdown) {
if (this.shutdown) {
return;
}
@ -379,7 +390,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = localManagers.get(connection);
ConnectionManager localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.connectionError(connection, throwable);
}
@ -392,7 +403,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public List<Connection> getConnections() {
return Collections.unmodifiableList(connections);
return Collections.unmodifiableList(this.connections);
}
@ -403,15 +414,18 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
// it is POSSIBLE to add a connection-specfic listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners)
ConnectionManager lm = localManagers.getOrCreate(connection, connection.toString());
ConnectionManager lm = this.localManagers.getOrCreate(connection, connection.toString());
logger.debug("Connection specific Listener Manager added on connection: {}", connection);
Logger logger2 = this.logger;
if (logger2.isDebugEnabled()) {
this.logger.debug("Connection specific Listener Manager added on connection: {}", connection);
}
return lm;
}
final void removeListenerManager(Connection connection) {
localManagers.remove(connection);
this.localManagers.remove(connection);
}
@ -421,7 +435,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
* @return Returns a FAST list of active connections.
*/
public final Collection<Connection> getConnections0() {
return connections;
return this.connections;
}
/**
@ -430,8 +444,8 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
* @return Returns a FAST first connection (for client!).
*/
public final Connection getConnection0() {
if (connections.iterator().hasNext()) {
return connections.iterator().next();
if (this.connections.iterator().hasNext()) {
return this.connections.iterator().next();
} else {
throw new RuntimeException("Not connected to a remote computer. Unable to continue!");
}
@ -443,19 +457,19 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
* @return a boolean indicating if there are any listeners registered with this manager.
*/
final boolean hasListeners() {
return listeners.isEmpty();
return this.listeners.isEmpty();
}
/**
* Closes all associated resources/threads/connections
*/
final void stop() {
shutdown = true;
this.shutdown = true;
// disconnect the sessions
closeConnections();
listeners.clear();
this.listeners.clear();
}
/**
@ -463,7 +477,7 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*/
final void closeConnections() {
// close the sessions
Iterator<Connection> iterator = connections.iterator();
Iterator<Connection> iterator = this.connections.iterator();
while (iterator.hasNext()) {
Connection connection = iterator.next();
// Close the connection. Make sure the close operation ends because
@ -471,6 +485,6 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
// Necessary otherwise workers won't close.
connection.close();
}
connections.clear();
this.connections.clear();
}
}

View File

@ -140,7 +140,7 @@ public abstract class EndPoint {
this.name = name;
this.logger = org.slf4j.LoggerFactory.getLogger(name);
this.registrationWrapper = new RegistrationWrapper(this);
this.registrationWrapper = new RegistrationWrapper(this, this.logger);
// we have to be able to specify WHAT property store we want to use, since it can change!
if (options.settingsStore == null) {

View File

@ -10,6 +10,8 @@ import dorkbox.network.util.SecurityException;
public class EndPointClient extends EndPointWithSerialization {
protected final Object registrationLock = new Object();
protected volatile boolean registrationInProgress = false;
protected volatile boolean registrationComplete = false;
public EndPointClient(String name, ConnectionOptions options) throws InitializationException, SecurityException {
@ -23,16 +25,16 @@ public class EndPointClient extends EndPointWithSerialization {
@Override
protected boolean continueRegistration0() {
// we need to cache the value, since it can change in a different thread before we have the chance to return the value.
boolean complete = registrationComplete;
boolean complete = this.registrationComplete;
// notify the block, but only if we are not ready.
if (!complete) {
synchronized (registrationLock) {
registrationLock.notifyAll();
synchronized (this.registrationLock) {
this.registrationLock.notifyAll();
}
}
logger.trace("Registered protocol from server.");
this.logger.trace("Registered protocol from server.");
// only let us continue with connections (this starts up the client/server implementations) once ALL of the
// bootstraps have connected
@ -44,13 +46,21 @@ public class EndPointClient extends EndPointWithSerialization {
* will BLOCK until it has successfully registered it's connections.
*/
@Override
protected final void connectionConnected0(Connection connection) {
final void connectionConnected0(Connection connection) {
// invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need.
super.connectionConnected0(connection);
// notify the block
synchronized (registrationLock) {
registrationLock.notifyAll();
synchronized (this.registrationLock) {
this.registrationLock.notifyAll();
}
}
/**
* Internal call to abort registration if the shutdown command is issued during channel registration.
*/
void abortRegistration() {
this.registrationInProgress = false;
stop();
}
}

View File

@ -45,11 +45,11 @@ public class EndPointWithSerialization extends EndPoint {
}
// we don't care about un-instantiated/constructed members, since the class type is the only interest.
connectionManager = new ConnectionManager(name, connection0(null).getClass());
this.connectionManager = new ConnectionManager(name, connection0(null).getClass());
// setup our TCP kryo encoders
registrationWrapper.setKryoTcpEncoder(new KryoEncoder(serializationManager));
registrationWrapper.setKryoTcpCryptoEncoder(new KryoEncoderCrypto(serializationManager));
this.registrationWrapper.setKryoTcpEncoder(new KryoEncoder(this.serializationManager));
this.registrationWrapper.setKryoTcpCryptoEncoder(new KryoEncoderCrypto(this.serializationManager));
this.serializationManager.setReferences(false);
@ -65,14 +65,14 @@ public class EndPointWithSerialization extends EndPoint {
// add the ping listener (internal use only!)
connectionManager.add(new PingListener(name));
this.connectionManager.add(new PingListener(name));
Runtime.getRuntime().removeShutdownHook(shutdownHook);
shutdownHook = new Thread() {
Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
this.shutdownHook = new Thread() {
@Override
public void run() {
// connectionManager.shutdown accurately reflects the state of the app. Safe to use here
if (connectionManager != null && !connectionManager.shutdown) {
if (EndPointWithSerialization.this.connectionManager != null && !EndPointWithSerialization.this.connectionManager.shutdown) {
EndPointWithSerialization.this.stop();
}
}
@ -84,7 +84,7 @@ public class EndPointWithSerialization extends EndPoint {
* Returns the serialization wrapper if there is an object type that needs to be added outside of the basics.
*/
public SerializationManager getSerialization() {
return serializationManager;
return this.serializationManager;
}
/**
@ -95,16 +95,16 @@ public class EndPointWithSerialization extends EndPoint {
*/
public RmiBridge getRmiBridge() {
synchronized (this) {
if (remoteObjectSpace == null) {
if (this.remoteObjectSpace == null) {
if (isConnected()) {
throw new RuntimeException("Cannot create a remote object space after the remote endpoint has already connected!");
}
remoteObjectSpace = new RmiBridge(name);
this.remoteObjectSpace = new RmiBridge(this.logger, this.name);
}
}
return remoteObjectSpace;
return this.remoteObjectSpace;
}
@ -141,30 +141,30 @@ public class EndPointWithSerialization extends EndPoint {
wrapper = new ChannelLocalWrapper(metaChannel);
} else {
if (this instanceof EndPointServer) {
wrapper = new ChannelNetworkWrapper(metaChannel, registrationWrapper);
wrapper = new ChannelNetworkWrapper(metaChannel, this.registrationWrapper);
} else {
wrapper = new ChannelNetworkWrapper(metaChannel, null);
}
}
connection = newConnection(name);
connection = newConnection(this.name);
// now initialize the connection channels with whatever extra info they might need.
connection.init(this, new Bridge(wrapper, connectionManager));
connection.init(this, new Bridge(wrapper, this.connectionManager));
metaChannel.connection = connection;
// notify our remote object space that it is able to receive method calls.
synchronized (this) {
if (remoteObjectSpace != null) {
remoteObjectSpace.addConnection(connection);
if (this.remoteObjectSpace != null) {
this.remoteObjectSpace.addConnection(connection);
}
}
} else {
// getting the baseClass
// have to add the networkAssociate to a map of "connected" computers
connection = newConnection(name);
connection = newConnection(this.name);
}
return connection;
@ -176,27 +176,27 @@ public class EndPointWithSerialization extends EndPoint {
*
* Only the CLIENT injects in front of this)
*/
protected void connectionConnected0(Connection connection) {
isConnected.set(true);
void connectionConnected0(Connection connection) {
this.isConnected.set(true);
// prep the channel wrapper
connection.prep();
connectionManager.connectionConnected(connection);
this.connectionManager.connectionConnected(connection);
}
/**
* Expose methods to modify the listeners (connect/disconnect/idle/receive events).
*/
public final ListenerBridge listeners() {
return connectionManager;
return this.connectionManager;
}
/**
* Returns a non-modifiable list of active connections
*/
public List<Connection> getConnections() {
return connectionManager.getConnections();
return this.connectionManager.getConnections();
}
/**
@ -204,7 +204,7 @@ public class EndPointWithSerialization extends EndPoint {
*/
@SuppressWarnings("unchecked")
public <C extends Connection> Collection<C> getConnectionsAs() {
return (Collection<C>) connectionManager.getConnections();
return (Collection<C>) this.connectionManager.getConnections();
}
/**
@ -213,7 +213,7 @@ public class EndPointWithSerialization extends EndPoint {
@Override
public void close() {
// stop does the same as this + more
connectionManager.closeConnections();
this.connectionManager.closeConnections();
super.close();
}
@ -223,6 +223,6 @@ public class EndPointWithSerialization extends EndPoint {
*/
@Override
protected void stopExtraActions() {
connectionManager.stop();
this.connectionManager.stop();
}
}

View File

@ -11,6 +11,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.bouncycastle.crypto.CipherParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.slf4j.Logger;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.pipeline.KryoEncoder;
@ -39,9 +40,9 @@ public class RegistrationWrapper implements UdpServer {
private KryoEncoder kryoTcpEncoder;
private KryoEncoderCrypto kryoTcpCryptoEncoder;
public RegistrationWrapper(EndPoint endPoint) {
public RegistrationWrapper(EndPoint endPoint, Logger logger) {
this.endPoint = endPoint;
this.logger = org.slf4j.LoggerFactory.getLogger(endPoint.name);
this.logger = logger;
if (endPoint instanceof EndPointServer) {
this.udpRemoteMap = new ConcurrentHashMap<InetSocketAddress, ConnectionImpl>();
@ -141,8 +142,11 @@ public class RegistrationWrapper implements UdpServer {
byte[] hostAddress = address.getAddress();
ECPublicKeyParameters savedPublicKey = this.endPoint.propertyStore.getRegisteredServerKey(hostAddress);
Logger logger2 = this.logger;
if (savedPublicKey == null) {
this.logger.debug("Adding new remote IP address key for {}", address.getHostAddress());
if (logger2.isDebugEnabled()) {
logger2.debug("Adding new remote IP address key for {}", address.getHostAddress());
}
this.endPoint.propertyStore.addRegisteredServerKey(hostAddress, publicKey);
} else {
// COMPARE!
@ -155,7 +159,7 @@ public class RegistrationWrapper implements UdpServer {
}
//whoa! abort since something messed up!
this.logger.error("Invalid or non-matching public key from remote server. Their public key has changed. To fix, remove entry for: {}", byAddress);
logger2.error("Invalid or non-matching public key from remote server. Their public key has changed. To fix, remove entry for: {}", byAddress);
return false;
}
}
@ -166,7 +170,10 @@ public class RegistrationWrapper implements UdpServer {
public void removeRegisteredServerKey(byte[] hostAddress) throws SecurityException {
ECPublicKeyParameters savedPublicKey = this.endPoint.propertyStore.getRegisteredServerKey(hostAddress);
if (savedPublicKey != null) {
this.logger.debug("Deleteing remote IP address key {}.{}.{}.{}", hostAddress[0], hostAddress[1], hostAddress[2], hostAddress[3]);
Logger logger2 = this.logger;
if (logger2.isDebugEnabled()) {
logger2.debug("Deleteing remote IP address key {}.{}.{}.{}", hostAddress[0], hostAddress[1], hostAddress[2], hostAddress[3]);
}
this.endPoint.propertyStore.removeRegisteredServerKey(hostAddress);
}
}
@ -181,9 +188,12 @@ public class RegistrationWrapper implements UdpServer {
if (metaChannel != null && metaChannel.udpRemoteAddress != null) {
this.udpRemoteMap.put(metaChannel.udpRemoteAddress, metaChannel.connection);
this.logger.debug("Connected to remote UDP connection. [{} <== {}]",
metaChannel.udpChannel.localAddress().toString(),
metaChannel.udpRemoteAddress.toString());
Logger logger2 = this.logger;
if (logger2.isDebugEnabled()) {
logger2.debug("Connected to remote UDP connection. [{} <== {}]",
metaChannel.udpChannel.localAddress().toString(),
metaChannel.udpRemoteAddress.toString());
}
}
}
@ -195,7 +205,10 @@ public class RegistrationWrapper implements UdpServer {
public final void unRegisterServerUDP(InetSocketAddress udpRemoteAddress) {
if (udpRemoteAddress != null) {
this.udpRemoteMap.remove(udpRemoteAddress);
this.logger.info("Closed remote UDP connection: {}", udpRemoteAddress.toString());
Logger logger2 = this.logger;
if (logger2.isInfoEnabled()) {
logger2.info("Closed remote UDP connection: {}", udpRemoteAddress.toString());
}
}
}
@ -210,4 +223,10 @@ public class RegistrationWrapper implements UdpServer {
return null;
}
}
public void abortRegistrationIfClient() {
if (this.endPoint instanceof EndPointClient) {
((EndPointClient)this.endPoint).abortRegistration();
}
}
}

View File

@ -19,7 +19,7 @@ public abstract class RegistrationHandler extends ChannelInboundHandlerAdapter {
public RegistrationHandler(String name, RegistrationWrapper registrationWrapper) {
this.name = name + " Discovery/Registration";
logger = org.slf4j.LoggerFactory.getLogger(this.name);
this.logger = org.slf4j.LoggerFactory.getLogger(this.name);
this.registrationWrapper = registrationWrapper;
}
@ -34,7 +34,7 @@ public abstract class RegistrationHandler extends ChannelInboundHandlerAdapter {
context.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.error("Failed to initialize a channel. Closing: {}", context.channel(), t);
this.logger.error("Failed to initialize a channel. Closing: {}", context.channel(), t);
} finally {
if (!success) {
context.close();
@ -44,12 +44,12 @@ public abstract class RegistrationHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
logger.error("ChannelActive NOT IMPLEMENTED!");
this.logger.error("ChannelActive NOT IMPLEMENTED!");
}
@Override
public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
logger.error("MessageReceived NOT IMPLEMENTED!");
this.logger.error("MessageReceived NOT IMPLEMENTED!");
}
@Override
@ -61,7 +61,7 @@ public abstract class RegistrationHandler extends ChannelInboundHandlerAdapter {
public abstract void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception;
public MetaChannel shutdown(RegistrationWrapper registrationWrapper, Channel channel) {
logger.error("SHUTDOWN HANDLER REACHED! SOMETHING MESSED UP! TRYING TO ABORT");
this.logger.error("SHUTDOWN HANDLER REACHED! SOMETHING MESSED UP! TRYING TO ABORT");
// shutdown. Something messed up. Only reach this is something messed up.
// properly shutdown the TCP/UDP channels.
@ -84,6 +84,7 @@ public abstract class RegistrationHandler extends ChannelInboundHandlerAdapter {
}
} finally {
registrationWrapper.abortRegistrationIfClient();
registrationWrapper.releaseChannelMap();
}
}

View File

@ -12,6 +12,8 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import org.slf4j.Logger;
import dorkbox.network.Broadcast;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
@ -41,13 +43,13 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
public RegistrationRemoteHandlerServerUDP(String name, RegistrationWrapper registrationWrapper, SerializationManager serializationManager) {
this.name = name + " Registration-UDP-Server";
logger = org.slf4j.LoggerFactory.getLogger(this.name);
this.logger = org.slf4j.LoggerFactory.getLogger(this.name);
this.registrationWrapper = registrationWrapper;
this.serializationManager = serializationManager;
// absolutely MUST send packet > 0 across, otherwise netty will think it failed to write to the socket, and keep trying. (bug was fixed by netty. Keeping this code)
discoverResponseBuffer = Unpooled.buffer(1);
discoverResponseBuffer.writeByte(Broadcast.broadcastResponseID);
this.discoverResponseBuffer = Unpooled.buffer(1);
this.discoverResponseBuffer.writeByte(Broadcast.broadcastResponseID);
}
/**
@ -61,7 +63,7 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
// log UDP errors.
logger.error("Exception caught in UDP stream.", cause);
this.logger.error("Exception caught in UDP stream.", cause);
super.exceptionCaught(context, cause);
}
@ -91,18 +93,23 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
InetSocketAddress remoteAddress = msg.sender();
// must have a remote address in the packet. (ie, ignore broadcast)
Logger logger2 = this.logger;
if (remoteAddress == null) {
logger.debug("Ignoring packet with null UDP remote address. (Is it broadcast?)");
if (logger2.isDebugEnabled()) {
logger2.debug("Ignoring packet with null UDP remote address. (Is it broadcast?)");
}
return;
}
if (data.readableBytes() == 1) {
if (data.readByte() == Broadcast.broadcastID) {
// CANNOT use channel.getRemoteAddress()
channel.writeAndFlush(new UdpWrapper(discoverResponseBuffer, remoteAddress));
logger.debug("Responded to host discovery from: {}", remoteAddress);
channel.writeAndFlush(new UdpWrapper(this.discoverResponseBuffer, remoteAddress));
if (logger2.isDebugEnabled()) {
logger2.debug("Responded to host discovery from: {}", remoteAddress);
}
} else {
logger.error("Invalid signature for 'Discover Host' from remote address: {}", remoteAddress);
logger2.error("Invalid signature for 'Discover Host' from remote address: {}", remoteAddress);
}
} else {
// we cannot use the REGULAR pipeline, since we can't pass along the remote address for
@ -116,13 +123,13 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
public final void sendUDP(ChannelHandlerContext context, Object object, ByteBuf buffer, InetSocketAddress udpRemoteAddress) {
Connection networkConnection = registrationWrapper.getServerUDP(udpRemoteAddress);
Connection networkConnection = this.registrationWrapper.getServerUDP(udpRemoteAddress);
if (networkConnection != null) {
// try to write data! (IT SHOULD ALWAYS BE ENCRYPTED HERE!)
serializationManager.writeWithCryptoUdp(networkConnection, buffer, object);
this.serializationManager.writeWithCryptoUdp(networkConnection, buffer, object);
} else {
// this means we are still in the REGISTRATION phase.
serializationManager.write(buffer, object);
this.serializationManager.write(buffer, object);
}
}
@ -130,19 +137,20 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
// this will be invoked by the UdpRegistrationHandlerServer. Remember, TCP will be established first.
public final void receivedUDP(ChannelHandlerContext context, Channel channel, ByteBuf data, InetSocketAddress udpRemoteAddress) throws Exception {
// registration is the ONLY thing NOT encrypted
if (serializationManager.isEncrypted(data)) {
Logger logger2 = this.logger;
if (this.serializationManager.isEncrypted(data)) {
// we need to FORWARD this message "down the pipeline".
ConnectionImpl connection = registrationWrapper.getServerUDP(udpRemoteAddress);
ConnectionImpl connection = this.registrationWrapper.getServerUDP(udpRemoteAddress);
if (connection != null) {
// try to read data! (IT SHOULD ALWAYS BE ENCRYPTED HERE!)
Object object;
try {
object = serializationManager.readWithCryptoUdp(connection, data, data.writerIndex());
object = this.serializationManager.readWithCryptoUdp(connection, data, data.writerIndex());
} catch (NetException e) {
logger.error("UDP unable to deserialize buffer", e);
shutdown(registrationWrapper, channel);
logger2.error("UDP unable to deserialize buffer", e);
shutdown(this.registrationWrapper, channel);
return;
}
@ -159,10 +167,10 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
Object object;
try {
object = serializationManager.read(data, data.writerIndex());
object = this.serializationManager.read(data, data.writerIndex());
} catch (NetException e) {
logger.error("UDP unable to deserialize buffer", e);
shutdown(registrationWrapper, channel);
logger2.error("UDP unable to deserialize buffer", e);
shutdown(this.registrationWrapper, channel);
return;
}
@ -173,7 +181,7 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
try {
// find out and make sure that UDP and TCP are talking to the same server
InetAddress udpRemoteServer = udpRemoteAddress.getAddress();
IntMap<MetaChannel> channelMap = registrationWrapper.getAndLockChannelMap();
IntMap<MetaChannel> channelMap = this.registrationWrapper.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
@ -188,14 +196,14 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
matches = true;
break;
} else {
logger.error("Mismatch UDP and TCP client addresses! UDP: {} TCP: {}", udpRemoteServer, tcpRemoteAddress);
shutdown(registrationWrapper, channel);
logger2.error("Mismatch UDP and TCP client addresses! UDP: {} TCP: {}", udpRemoteServer, tcpRemoteAddress);
shutdown(this.registrationWrapper, channel);
return;
}
}
}
} finally {
registrationWrapper.releaseChannelMap();
this.registrationWrapper.releaseChannelMap();
}
@ -217,17 +225,19 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
channel.writeAndFlush(new UdpWrapper(register, udpRemoteAddress));
logger.trace("Register UDP connection from {}", udpRemoteAddress);
if (logger2.isTraceEnabled()) {
logger2.trace("Register UDP connection from {}", udpRemoteAddress);
}
return;
}
// if we get here, there was a failure!
logger.error("Error trying to register UDP without udp specified! UDP: {}", udpRemoteAddress);
shutdown(registrationWrapper, channel);
logger2.error("Error trying to register UDP without udp specified! UDP: {}", udpRemoteAddress);
shutdown(this.registrationWrapper, channel);
return;
}
else {
logger.error("UDP attempting to spoof client! Unencrypted packet other than registration received.");
logger2.error("UDP attempting to spoof client! Unencrypted packet other than registration received.");
shutdown(null, channel);
return;
}
@ -238,7 +248,7 @@ public class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<Da
* Copied from RegistrationHandler. There were issues accessing it as static with generics.
*/
public MetaChannel shutdown(RegistrationWrapper registrationWrapper, Channel channel) {
logger.error("SHUTDOWN HANDLER REACHED! SOMETHING MESSED UP! TRYING TO ABORT");
this.logger.error("SHUTDOWN HANDLER REACHED! SOMETHING MESSED UP! TRYING TO ABORT");
// shutdown. Something messed up. Only reach this is something messed up.
// properly shutdown the TCP/UDP channels.

View File

@ -18,6 +18,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
@ -83,7 +85,7 @@ public class RmiBridge {
public void received(final Connection connection, final InvokeMethod invokeMethod) {
boolean found = false;
Iterator<Connection> iterator = connections.iterator();
Iterator<Connection> iterator = RmiBridge.this.connections.iterator();
while (iterator.hasNext()) {
Connection c = iterator.next();
if (c == connection) {
@ -97,14 +99,14 @@ public class RmiBridge {
return;
}
final Object target = idToObject.get(invokeMethod.objectID);
final Object target = RmiBridge.this.idToObject.get(invokeMethod.objectID);
if (target == null) {
logger.warn("Ignoring remote invocation request for unknown object ID: {}",
RmiBridge.this.logger.warn("Ignoring remote invocation request for unknown object ID: {}",
invokeMethod.objectID);
return;
}
if (executor == null) {
if (RmiBridge.this.executor == null) {
defaultExectutor.execute(new Runnable() {
@Override
public void run() {
@ -114,7 +116,7 @@ public class RmiBridge {
}
});
} else {
executor.execute(new Runnable() {
RmiBridge.this.executor.execute(new Runnable() {
@Override
public void run() {
invoke(connection,
@ -138,17 +140,10 @@ public class RmiBridge {
* <p>
* For safety, this should ONLY be called by {@link EndPoint#getRmiBridge() }
*/
public RmiBridge(String name) {
public RmiBridge(Logger logger, String name) {
this.logger = logger;
this.name = "RMI - " + name + " (remote)";
logger = org.slf4j.LoggerFactory.getLogger(this.name);
Class<?> callerClass = sun.reflect.Reflection.getCallerClass(2);
// starts with will allow for anonymous inner classes.
if (callerClass != null && callerClass.getName().startsWith(EndPoint.class.getName())) {
instances.addIfAbsent(this);
} else {
throw new RuntimeException("It is UNSAFE to access this constructor DIRECTLY. Please use Endpoint.getRmiBridge()");
}
instances.addIfAbsent(this);
}
/**
@ -181,10 +176,13 @@ public class RmiBridge {
if (object == null) {
throw new IllegalArgumentException("object cannot be null.");
}
idToObject.put(objectID, object);
objectToID.put(object, objectID);
this.idToObject.put(objectID, object);
this.objectToID.put(object, objectID);
logger.trace("Object registered with ObjectSpace as {}:{}", objectID, object);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
this.logger.trace("Object registered with ObjectSpace as {}:{}", objectID, object);
}
}
/**
@ -192,14 +190,14 @@ public class RmiBridge {
* invocation messages.
*/
public void close() {
Iterator<Connection> iterator = connections.iterator();
Iterator<Connection> iterator = this.connections.iterator();
while (iterator.hasNext()) {
Connection connection = iterator.next();
connection.listeners().remove(invokeListener);
connection.listeners().remove(this.invokeListener);
}
instances.remove(this);
logger.trace("Closed ObjectSpace.");
this.logger.trace("Closed ObjectSpace.");
}
/**
@ -207,12 +205,15 @@ public class RmiBridge {
* no longer be able to access it.
*/
public void remove(int objectID) {
Object object = idToObject.remove(objectID);
Object object = this.idToObject.remove(objectID);
if (object != null) {
objectToID.remove(object, 0);
this.objectToID.remove(object, 0);
}
logger.trace("Object {} removed from ObjectSpace: {}", objectID, object);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object);
}
}
/**
@ -220,15 +221,18 @@ public class RmiBridge {
* no longer be able to access it.
*/
public void remove(Object object) {
if (!idToObject.containsValue(object, true)) {
if (!this.idToObject.containsValue(object, true)) {
return;
}
int objectID = idToObject.findKey(object, true, -1);
idToObject.remove(objectID);
objectToID.remove(object, 0);
int objectID = this.idToObject.findKey(object, true, -1);
this.idToObject.remove(objectID);
this.objectToID.remove(object, 0);
logger.trace("Object {} removed from ObjectSpace: {}", objectID, object);
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("Object {} removed from ObjectSpace: {}", objectID, object);
}
}
/**
@ -240,10 +244,10 @@ public class RmiBridge {
throw new IllegalArgumentException("connection cannot be null.");
}
connections.addIfAbsent(connection);
connection.listeners().add(invokeListener);
this.connections.addIfAbsent(connection);
connection.listeners().add(this.invokeListener);
logger.trace("Added connection to ObjectSpace: {}", connection);
this.logger.trace("Added connection to ObjectSpace: {}", connection);
}
/**
@ -255,10 +259,10 @@ public class RmiBridge {
throw new IllegalArgumentException("connection cannot be null.");
}
connection.listeners().remove(invokeListener);
connections.remove(connection);
connection.listeners().remove(this.invokeListener);
this.connections.remove(connection);
logger.trace("Removed connection from ObjectSpace: {}", connection);
this.logger.trace("Removed connection from ObjectSpace: {}", connection);
}
/**
@ -271,7 +275,7 @@ public class RmiBridge {
* The remote side of this connection requested the invocation.
*/
protected void invoke(Connection connection, Object target, InvokeMethod invokeMethod) {
if (logger.isDebugEnabled()) {
if (this.logger.isDebugEnabled()) {
String argString = "";
if (invokeMethod.args != null) {
argString = Arrays.deepToString(invokeMethod.args);
@ -283,7 +287,7 @@ public class RmiBridge {
stringBuilder.append(":").append(invokeMethod.objectID);
stringBuilder.append("#").append(invokeMethod.method.getName());
stringBuilder.append("(").append(argString).append(")");
logger.debug(stringBuilder.toString());
this.logger.debug(stringBuilder.toString());
}
byte responseID = invokeMethod.responseID;

View File

@ -33,7 +33,10 @@ public class UdtJniLoader implements LibraryLoader {
location = tempFile.getParent();
tempFile.delete();
logger.debug("Adjusted UDT JNI library location: {}", location);
Logger logger2 = logger;
if (logger2.isDebugEnabled()) {
logger2.debug("Adjusted UDT JNI library location: {}", location);
}
LibraryLoaderUDT loader = new LibraryLoaderUDT();
loader.load(location);