Removed a decent amount of unnecessary generics. Fixed a few bugs wrt

removing connection managers/listeners. Moved RMI message logic to
clean up how it worked.
This commit is contained in:
nathan 2018-01-25 15:48:15 +01:00
parent 1d2008beef
commit 4cb9d8ad59
44 changed files with 1198 additions and 1208 deletions

View File

@ -32,7 +32,6 @@ import dorkbox.network.rmi.RemoteObjectCallback;
import dorkbox.network.rmi.TimeoutException;
import dorkbox.util.NamedThreadFactory;
import dorkbox.util.OS;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@ -61,7 +60,7 @@ import io.netty.util.internal.PlatformDependent;
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public
class Client<C extends Connection> extends EndPointClient<C> implements Connection {
class Client<C extends Connection> extends EndPointClient implements Connection {
/**
* Gets the version number.
*/
@ -78,7 +77,7 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
* Starts a LOCAL <b>only</b> client, with the default local channel name and serialization scheme
*/
public
Client() throws InitializationException, SecurityException, IOException {
Client() throws SecurityException {
this(Configuration.localOnly());
}
@ -86,8 +85,7 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
* Starts a TCP & UDP client (or a LOCAL client), with the specified serialization scheme
*/
public
Client(String host, int tcpPort, int udpPort, String localChannelName)
throws InitializationException, SecurityException, IOException {
Client(String host, int tcpPort, int udpPort, String localChannelName) throws SecurityException {
this(new Configuration(host, tcpPort, udpPort, localChannelName));
}
@ -96,7 +94,7 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
*/
@SuppressWarnings("AutoBoxing")
public
Client(final Configuration config) throws InitializationException, SecurityException, IOException {
Client(final Configuration config) throws SecurityException {
super(config);
String threadName = Client.class.getSimpleName();
@ -144,7 +142,7 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
localBootstrap.group(localBoss)
.channel(LocalChannel.class)
.remoteAddress(new LocalAddress(config.localChannelName))
.handler(new RegistrationLocalHandlerClient<C>(threadName, registrationWrapper));
.handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper));
manageForShutdown(localBoss);
}
@ -181,9 +179,9 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.remoteAddress(config.host, config.tcpPort)
.handler(new RegistrationRemoteHandlerClientTCP<C>(threadName,
registrationWrapper,
serializationManager));
.handler(new RegistrationRemoteHandlerClientTCP(threadName,
registrationWrapper,
serializationManager));
// android screws up on this!!
tcpBootstrap.option(ChannelOption.TCP_NODELAY, !isAndroid)
@ -217,9 +215,9 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.localAddress(new InetSocketAddress(0)) // bind to wildcard
.remoteAddress(new InetSocketAddress(config.host, config.udpPort))
.handler(new RegistrationRemoteHandlerClientUDP<C>(threadName,
registrationWrapper,
serializationManager));
.handler(new RegistrationRemoteHandlerClientUDP(threadName,
registrationWrapper,
serializationManager));
// Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0)
// in order to WRITE: write as normal, just make sure it ends in .255
@ -419,9 +417,8 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
public
<Iface> void createRemoteObject(final Class<Iface> interfaceClass, final RemoteObjectCallback<Iface> callback) {
try {
C connection0 = connectionManager.getConnection0();
connection0.createRemoteObject(interfaceClass, callback);
} catch (IOException e) {
connection.createRemoteObject(interfaceClass, callback);
} catch (NullPointerException e) {
logger.error("Error creating remote object!", e);
}
}
@ -452,9 +449,8 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
public
<Iface> void getRemoteObject(final int objectId, final RemoteObjectCallback<Iface> callback) {
try {
C connection0 = connectionManager.getConnection0();
connection0.getRemoteObject(objectId, callback);
} catch (IOException e) {
connection.getRemoteObject(objectId, callback);
} catch (NullPointerException e) {
logger.error("Error getting remote object!", e);
}
}
@ -464,11 +460,12 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
* <p/>
* Make <b>sure</b> that you only call this <b>after</b> the client connects!
* <p/>
* This is preferred to {@link EndPointBase#getConnections()} getConnections()}, as it properly does some error checking
* This is preferred to {@link EndPointBase#getConnections()}, as it properly does some error checking
*/
@SuppressWarnings("unchecked")
public
C getConnection() {
return connection;
return (C) connection;
}
/**

View File

@ -26,7 +26,6 @@ import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerS
import dorkbox.util.NamedThreadFactory;
import dorkbox.util.OS;
import dorkbox.util.Property;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
@ -58,7 +57,7 @@ import io.netty.channel.unix.UnixChannelOption;
* To put it bluntly, ONLY have the server do work inside of a listener!
*/
public
class Server<C extends Connection> extends EndPointServer<C> {
class Server<C extends Connection> extends EndPointServer {
/**
@ -93,7 +92,7 @@ class Server<C extends Connection> extends EndPointServer<C> {
* Starts a LOCAL <b>only</b> server, with the default serialization scheme.
*/
public
Server() throws InitializationException, SecurityException, IOException {
Server() throws SecurityException {
this(Configuration.localOnly());
}
@ -102,7 +101,7 @@ class Server<C extends Connection> extends EndPointServer<C> {
*/
@SuppressWarnings("AutoBoxing")
public
Server(Configuration config) throws InitializationException, SecurityException, IOException {
Server(Configuration config) throws SecurityException {
// watch-out for serialization... it can be NULL incoming. The EndPoint (superclass) sets it, if null, so
// you have to make sure to use this.serialization
super(config);
@ -186,7 +185,7 @@ class Server<C extends Connection> extends EndPointServer<C> {
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.localAddress(new LocalAddress(localChannelName))
.childHandler(new RegistrationLocalHandlerServer<C>(threadName, registrationWrapper));
.childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper));
manageForShutdown(localBoss);
manageForShutdown(localWorker);
@ -219,9 +218,9 @@ class Server<C extends Connection> extends EndPointServer<C> {
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.childHandler(new RegistrationRemoteHandlerServerTCP<C>(threadName,
registrationWrapper,
serializationManager));
.childHandler(new RegistrationRemoteHandlerServerTCP(threadName,
registrationWrapper,
serializationManager));
// have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
if (hostName.equals("0.0.0.0")) {
@ -265,9 +264,9 @@ class Server<C extends Connection> extends EndPointServer<C> {
// not binding to specific address, since it's driven by TCP, and that can be bound to a specific address
.localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets!
.handler(new RegistrationRemoteHandlerServerUDP<C>(threadName,
registrationWrapper,
serializationManager));
.handler(new RegistrationRemoteHandlerServerUDP(threadName,
registrationWrapper,
serializationManager));
// Enable to READ from MULTICAST data (ie, 192.168.1.0)

View File

@ -47,7 +47,6 @@ interface Connection {
@SuppressWarnings("rawtypes")
EndPointBase getEndPoint();
/**
* @return the connection (TCP or LOCAL) id of this connection.
*/

View File

@ -17,11 +17,12 @@ package dorkbox.network.connection;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.util.AbstractMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -38,13 +39,20 @@ import dorkbox.network.connection.ping.PingTuple;
import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelNull;
import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.rmi.InvokeMethod;
import dorkbox.network.rmi.InvokeMethodResult;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.rmi.RemoteObjectCallback;
import dorkbox.network.rmi.Rmi;
import dorkbox.network.rmi.RmiBridge;
import dorkbox.network.rmi.RmiMessage;
import dorkbox.network.rmi.RmiObjectHandler;
import dorkbox.network.rmi.RmiProxyHandler;
import dorkbox.network.rmi.RmiRegistration;
import dorkbox.network.rmi.TimeoutException;
import dorkbox.network.serialization.CryptoSerializationManager;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.LockFreeHashMap;
import dorkbox.util.generics.ClassHelper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
@ -68,6 +76,24 @@ import io.netty.util.concurrent.Promise;
@Sharable
public
class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConnection, Connection, Listeners, ConnectionBridge {
// default false
public static boolean ENABLE_PROXY_OBJECTS = true;
public static
boolean isTcp(Class<? extends Channel> channelClass) {
return channelClass == NioSocketChannel.class || channelClass == EpollSocketChannel.class;
}
public static
boolean isUdp(Class<? extends Channel> channelClass) {
return channelClass == NioDatagramChannel.class || channelClass == EpollDatagramChannel.class;
}
public static
boolean isLocal(Class<? extends Channel> channelClass) {
return channelClass == LocalChannel.class;
}
private final org.slf4j.Logger logger;
@ -82,14 +108,14 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
private final Object messageInProgressLock = new Object();
private final AtomicBoolean messageInProgress = new AtomicBoolean(false);
private ISessionManager<Connection> sessionManager;
private ChannelWrapper<Connection> channelWrapper;
private ISessionManager sessionManager;
private ChannelWrapper channelWrapper;
private boolean isLoopback;
private volatile PingFuture pingFuture = null;
// used to store connection local listeners (instead of global listeners). Only possible on the server.
private volatile ConnectionManager<Connection> localListenerManager;
private volatile ConnectionManager localListenerManager;
// while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error.
private boolean remoteKeyChanged;
@ -108,21 +134,32 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
//
// RMI fields
//
protected CountDownLatch rmi;
private final RmiBridge rmiBridge;
private final Map<Integer, RemoteObject> proxyIdCache = new WeakHashMap<Integer, RemoteObject>(8);
private final IntMap<RemoteObjectCallback> rmiRegistrationCallbacks = new IntMap<RemoteObjectCallback>();
private final Map<Integer, RemoteObject> proxyIdCache;
private final List<Listener.OnMessageReceived<Connection, InvokeMethodResult>> proxyListeners;
private final IntMap<RemoteObjectCallback> rmiRegistrationCallbacks;
private int rmiCallbackId = 0; // protected by synchronized (rmiRegistrationCallbacks)
/**
* All of the parameters can be null, when metaChannel wants to get the base class type
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public
ConnectionImpl(final Logger logger, final EndPointBase endPoint, final RmiBridge rmiBridge) {
this.logger = logger;
this.endPoint = endPoint;
this.rmiBridge = rmiBridge;
if (endPoint != null && endPoint.globalRmiBridge != null) {
// rmi is enabled.
proxyIdCache = new LockFreeHashMap<Integer, RemoteObject>();
proxyListeners = new CopyOnWriteArrayList<Listener.OnMessageReceived<Connection, InvokeMethodResult>>();
rmiRegistrationCallbacks = new IntMap<RemoteObjectCallback>();
} else {
proxyIdCache = null;
proxyListeners = null;
rmiRegistrationCallbacks = null;
}
}
/**
@ -130,8 +167,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
* <p/>
* This happens BEFORE prep.
*/
@SuppressWarnings("unchecked")
void init(final ChannelWrapper channelWrapper, final ConnectionManager<Connection> sessionManager) {
final
void init(final ChannelWrapper channelWrapper, final ISessionManager sessionManager) {
this.sessionManager = sessionManager;
this.channelWrapper = channelWrapper;
@ -151,6 +188,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
* <p/>
* This happens AFTER init.
*/
final
void prep() {
if (this.channelWrapper != null) {
this.channelWrapper.init();
@ -201,7 +239,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
return this.channelWrapper.getRemoteHost();
}
/**
* @return true if this connection is established on the loopback interface
*/
@ -216,7 +253,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
*/
@Override
public
EndPointBase<Connection> getEndPoint() {
EndPointBase getEndPoint() {
return this.endPoint;
}
@ -529,7 +566,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
public
void channelRead(Object object) throws Exception {
// prevent close from occurring SMACK in the middle of a message in progress.
// delay close until it's finished.
this.messageInProgress.set(true);
@ -568,7 +604,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
Channel channel = context.channel();
Class<? extends Channel> channelClass = channel.getClass();
boolean isTCP = channelClass == NioSocketChannel.class || channelClass == EpollSocketChannel.class;
boolean isTCP = isTcp(channelClass);
boolean isLocal = isLocal(channelClass);
if (this.logger.isInfoEnabled()) {
String type;
@ -576,10 +613,10 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
if (isTCP) {
type = "TCP";
}
else if (channelClass == NioDatagramChannel.class || channelClass == EpollDatagramChannel.class) {
else if (isUdp(channelClass)) {
type = "UDP";
}
else if (channelClass == LocalChannel.class) {
else if (isLocal) {
type = "LOCAL";
}
else {
@ -597,7 +634,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
}
// our master channels are TCP/LOCAL (which are mutually exclusive). Only key disconnect events based on the status of them.
if (isTCP || channelClass == LocalChannel.class) {
if (isTCP || isLocal) {
// this is because channelInactive can ONLY happen when netty shuts down the channel.
// and connection.close() can be called by the user.
this.sessionManager.onDisconnected(this);
@ -618,6 +655,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
@Override
public final
void close() {
close(false);
}
final
void close(final boolean keepListeners) {
// only close if we aren't already in the middle of closing.
if (this.closeInProgress.compareAndSet(false, true)) {
int idleTimeoutMs = this.endPoint.getIdleTimeout();
@ -657,9 +700,29 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
}
}
}
// remove all listeners, but ONLY if we are the server. If we remove all listeners and we are the client, then we remove
// ALL logic from the client! The server is OK because the server listeners per connection are dynamically added
if (!keepListeners) {
removeAll();
}
// proxy listeners are cleared in the removeAll() call
if (proxyIdCache != null) {
synchronized (proxyIdCache) {
proxyIdCache.clear();
}
}
if (rmiRegistrationCallbacks != null) {
synchronized (rmiRegistrationCallbacks) {
rmiRegistrationCallbacks.clear();
}
}
}
}
/**
* Marks the connection to be closed as soon as possible. This is evaluated when the current
* thread execution returns to the network stack.
@ -716,7 +779,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
* (via connection.addListener), meaning that ONLY that listener attached to
* the connection is notified on that event (ie, admin type listeners)
*/
@SuppressWarnings("rawtypes")
@Override
public final
Listeners add(Listener listener) {
@ -758,7 +820,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
* connection.removeListener), meaning that ONLY that listener attached to
* the connection is removed
*/
@SuppressWarnings("rawtypes")
@Override
public final
Listeners remove(Listener listener) {
@ -777,7 +838,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
this.localListenerManager.remove(listener);
if (!this.localListenerManager.hasListeners()) {
((EndPointServer<Connection>) this.endPoint).removeListenerManager(this);
((EndPointServer) this.endPoint).removeListenerManager(this);
}
}
}
@ -793,10 +854,16 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
/**
* Removes all registered listeners from this connection/endpoint to NO
* LONGER be notified of connect/disconnect/idle/receive(object) events.
*
* This includes all proxy listeners
*/
@Override
public final
Listeners removeAll() {
if (proxyListeners != null) {
proxyListeners.clear();
}
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
@ -812,7 +879,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
this.localListenerManager.removeAll();
this.localListenerManager = null;
((EndPointServer<Connection>) this.endPoint).removeListenerManager(this);
((EndPointServer) this.endPoint).removeListenerManager(this);
}
}
}
@ -825,8 +892,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
}
/**
* Removes all registered listeners (of the object type) from this
* connection/endpoint to NO LONGER be notified of
* Removes all registered listeners (of the object type) from this connection/endpoint to NO LONGER be notified of
* connect/disconnect/idle/receive(object) events.
*/
@Override
@ -848,7 +914,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
if (!this.localListenerManager.hasListeners()) {
this.localListenerManager = null;
((EndPointServer<Connection>) this.endPoint).removeListenerManager(this);
((EndPointServer) this.endPoint).removeListenerManager(this);
}
}
}
@ -908,7 +974,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
@SuppressWarnings({"UnnecessaryLocalVariable", "unchecked", "Duplicates"})
@Override
public final
<Iface> void createRemoteObject(final Class<Iface> interfaceClass, final RemoteObjectCallback<Iface> callback) {
@ -931,7 +996,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
TCP(message).flush();
}
@SuppressWarnings({"UnnecessaryLocalVariable", "unchecked", "Duplicates"})
@Override
public final
<Iface> void getRemoteObject(final int objectId, final RemoteObjectCallback<Iface> callback) {
@ -952,21 +1016,59 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
TCP(message).flush();
}
// default false
public static boolean ENABLE_PROXY_OBJECTS = true;
/**
* Manages the RMI stuff for a connection.
*
* @return true if there was RMI stuff done, false if the message was "normal" and nothing was done
*/
boolean manageRmi(final Object message) {
if (message instanceof RmiMessage) {
RmiObjectHandler rmiObjectHandler = channelWrapper.manageRmi();
if (message instanceof InvokeMethod) {
rmiObjectHandler.invoke(this, (InvokeMethod) message, rmiBridge.getListener());
}
else if (message instanceof InvokeMethodResult) {
for (Listener.OnMessageReceived<Connection, InvokeMethodResult> proxyListener : proxyListeners) {
proxyListener.received(this, (InvokeMethodResult) message);
}
}
else if (message instanceof RmiRegistration) {
rmiObjectHandler.registration(this, (RmiRegistration) message);
}
return true;
}
return false;
}
/**
* Objects that are on the "local" in-jvm connection have fixup their objects. For "network" connections, this is automatically done.
*/
Object fixupRmi(final Object message) {
// "local RMI" objects have to be modified, this part does that
RmiObjectHandler rmiObjectHandler = channelWrapper.manageRmi();
return rmiObjectHandler.normalMessages(this, message);
}
/**
* This will remove the invoke and invoke response listeners for this the remote object
*/
public
void removeRmiListeners(final int objectID, final Listener listener) {
}
/**
* For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically.
* For local connections, we have to switch it appropriately in the LocalRmiProxy
*
* @param implementationClass
* @param callbackId
* @return
*/
public final
RmiRegistration createNewRmiObject(final Class<?> interfaceClass, final Class<?> implementationClass, final int callbackId) {
CryptoSerializationManager manager = getEndPoint().serializationManager;
KryoExtra kryo = null;
@ -1090,14 +1192,37 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
}
/**
* Used by RMI for the LOCAL side, to get the proxy object as an interface
* Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)}
* <p>
* <p>
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
* remotely.
* <p>
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will
* have the proxy object replaced with the registered object.
*
* @param objectID is the RMI object ID
* @param iFace must be the interface the proxy will bind to
* @see RemoteObject
*
* @param objectID this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param iFace this is the RMI interface
*/
@Override
public
RemoteObject getProxyObject(final int objectID, final Class<?> iFace) {
if (iFace == null) {
throw new IllegalArgumentException("iface cannot be null.");
}
if (!iFace.isInterface()) {
throw new IllegalArgumentException("iface must be an interface.");
}
synchronized (proxyIdCache) {
// we want to have a connection specific cache of IDs, using weak references.
// because this is PER CONNECTION, this is safe.
@ -1105,7 +1230,18 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
if (remoteObject == null) {
// duplicates are fine, as they represent the same object (as specified by the ID) on the remote side.
remoteObject = rmiBridge.createProxyObject(this, objectID, iFace);
// remoteObject = rmiBridge.createProxyObject(this, objectID, iFace);
// the ACTUAL proxy is created in the connection impl.
RmiProxyHandler proxyObject = new RmiProxyHandler(this, objectID, iFace);
proxyListeners.add(proxyObject.getListener());
Class<?>[] temp = new Class<?>[2];
temp[0] = RemoteObject.class;
temp[1] = iFace;
remoteObject = (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, proxyObject);
proxyIdCache.put(objectID, remoteObject);
}

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.connection;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@ -38,54 +37,52 @@ import dorkbox.util.generics.TypeResolver;
// .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other.
@SuppressWarnings("unchecked")
public
class ConnectionManager<C extends Connection> implements Listeners, ISessionManager<C>, ConnectionPoint, ConnectionBridgeServer<C>,
ConnectionExceptSpecifiedBridgeServer<C> {
class ConnectionManager<C extends Connection> implements Listeners, ISessionManager, ConnectionPoint, ConnectionBridgeServer,
ConnectionExceptSpecifiedBridgeServer {
/**
* Specifies the load-factor for the IdentityMap used to manage keeping track of the number of connections + listeners
*/
@Property
public static final float LOAD_FACTOR = 0.8F;
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, IdentityMap> localManagersREF = AtomicReferenceFieldUpdater.newUpdater(
ConnectionManager.class,
IdentityMap.class,
"localManagers");
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, ConcurrentEntry> connectionsREF = AtomicReferenceFieldUpdater.newUpdater(
ConnectionManager.class,
ConcurrentEntry.class,
"connectionsHead");
private final String loggerName;
private final OnConnectedManager<C> onConnectedManager;
private final OnDisconnectedManager<C> onDisconnectedManager;
private final OnIdleManager<C> onIdleManager;
private final OnMessageReceivedManager<C> onMessageReceivedManager;
private final OnConnectedManager onConnectedManager;
private final OnDisconnectedManager onDisconnectedManager;
private final OnIdleManager onIdleManager;
private final OnMessageReceivedManager onMessageReceivedManager;
@SuppressWarnings({"FieldCanBeLocal", "unused"})
private volatile ConcurrentEntry<C> connectionsHead = null; // reference to the first element
private volatile ConcurrentEntry<Connection> connectionsHead = null; // reference to the first element
// This is ONLY touched by a single thread, maintains a map of entries for FAST lookup during connection remove.
private final IdentityMap<C, ConcurrentEntry> connectionEntries = new IdentityMap<C, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
private final IdentityMap<Connection, ConcurrentEntry> connectionEntries = new IdentityMap<Connection, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
@SuppressWarnings("unused")
private volatile IdentityMap<Connection, ConnectionManager<C>> localManagers = new IdentityMap<Connection, ConnectionManager<C>>(8, ConnectionManager.LOAD_FACTOR);
private volatile IdentityMap<Connection, ConnectionManager> localManagers = new IdentityMap<Connection, ConnectionManager>(8, ConnectionManager.LOAD_FACTOR);
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object singleWriterLock2 = new Object();
private final Object singleWriterLock3 = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, IdentityMap> localManagersREF =
AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class,
IdentityMap.class,
"localManagers");
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, ConcurrentEntry> connectionsREF =
AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class,
ConcurrentEntry.class,
"connectionsHead");
private final Object singleWriterConnectionsLock = new Object();
private final Object singleWriterLocalManagerLock = new Object();
/**
@ -102,10 +99,10 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
this.logger = org.slf4j.LoggerFactory.getLogger(loggerName);
this.baseClass = baseClass;
onConnectedManager = new OnConnectedManager<C>(logger);
onDisconnectedManager = new OnDisconnectedManager<C>(logger);
onIdleManager = new OnIdleManager<C>(logger);
onMessageReceivedManager = new OnMessageReceivedManager<C>(logger);
onConnectedManager = new OnConnectedManager(logger);
onDisconnectedManager = new OnDisconnectedManager(logger);
onIdleManager = new OnIdleManager(logger);
onMessageReceivedManager = new OnMessageReceivedManager(logger);
}
/**
@ -117,7 +114,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
* It is POSSIBLE to add a server connection ONLY (ie, not global) listener (via connection.addListener), meaning that ONLY that
* listener attached to the connection is notified on that event (ie, admin type listeners)
*/
@SuppressWarnings("rawtypes")
@Override
public final
Listeners add(final Listener listener) {
@ -149,25 +145,24 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
/**
* INTERNAL USE ONLY
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private
void addListener0(final Listener listener) {
boolean found = false;
if (listener instanceof Listener.OnConnected) {
onConnectedManager.add((Listener.OnConnected<C>) listener);
onConnectedManager.add((Listener.OnConnected) listener);
found = true;
}
if (listener instanceof Listener.OnDisconnected) {
onDisconnectedManager.add((Listener.OnDisconnected<C>) listener);
onDisconnectedManager.add((Listener.OnDisconnected) listener);
found = true;
}
if (listener instanceof Listener.OnIdle) {
onIdleManager.add((Listener.OnIdle<C>) listener);
onIdleManager.add((Listener.OnIdle) listener);
found = true;
}
if (listener instanceof Listener.OnMessageReceived) {
onMessageReceivedManager.add((Listener.OnMessageReceived<C, Object>) listener);
onMessageReceivedManager.add((Listener.OnMessageReceived) listener);
found = true;
}
@ -196,7 +191,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
* It is POSSIBLE to remove a server-connection 'non-global' listener (via connection.removeListener), meaning that ONLY that listener
* attached to the connection is removed
*/
@SuppressWarnings("rawtypes")
@Override
public final
Listeners remove(final Listener listener) {
@ -206,16 +200,16 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
boolean found = false;
if (listener instanceof Listener.OnConnected) {
found = onConnectedManager.remove((Listener.OnConnected<C>) listener);
found = onConnectedManager.remove((Listener.OnConnected) listener);
}
if (listener instanceof Listener.OnDisconnected) {
found |= onDisconnectedManager.remove((Listener.OnDisconnected<C>) listener);
found |= onDisconnectedManager.remove((Listener.OnDisconnected) listener);
}
if (listener instanceof Listener.OnIdle) {
found |= onIdleManager.remove((Listener.OnIdle<C>) listener);
found |= onIdleManager.remove((Listener.OnIdle) listener);
}
if (listener instanceof Listener.OnMessageReceived) {
found |= onMessageReceivedManager.remove((Listener.OnMessageReceived<C, Object>) listener);
found |= onMessageReceivedManager.remove((Listener.OnMessageReceived) listener);
}
final Logger logger2 = this.logger;
@ -290,13 +284,22 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/
@Override
public final
void onMessage(final C connection, final Object message) {
void onMessage(final ConnectionImpl connection, final Object message) {
notifyOnMessage0(connection, message, false);
}
@SuppressWarnings("Duplicates")
private
boolean notifyOnMessage0(final C connection, final Object message, boolean foundListener) {
boolean notifyOnMessage0(final ConnectionImpl connection, Object message, boolean foundListener) {
if (connection.manageRmi(message)) {
// if we are an RMI message/registration, we have very specific, defined behavior. We do not use the "normal" listener callback pattern
// because these methods are rare, and require special functionality
return true;
}
message = connection.fixupRmi(message);
foundListener |= onMessageReceivedManager.notifyReceived(connection, message, shutdown);
// now have to account for additional connection listener managers (non-global).
@ -316,23 +319,19 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
.flush();
}
else {
if (this.logger.isErrorEnabled()) {
this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}",
message.getClass()
.getSimpleName());
}
this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}",
message.getClass()
.getSimpleName());
}
return foundListener;
}
/**
* Invoked when a Connection has been idle for a while.
* <p/>
* {@link ISessionManager}
*/
@Override
public final
void onIdle(final C connection) {
void onIdle(final Connection connection) {
boolean foundListener = onIdleManager.notifyIdle(connection, shutdown);
if (foundListener) {
@ -342,8 +341,8 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
// now have to account for additional (local) listener managers.
// access a snapshot of the managers (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
ConnectionManager<C> localManager = localManagers.get(connection);
final IdentityMap<Connection, ConnectionManager> localManagers = localManagersREF.get(this);
ConnectionManager localManager = localManagers.get(connection);
if (localManager != null) {
localManager.onIdle(connection);
}
@ -351,13 +350,10 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
/**
* Invoked when a Channel is open, bound to a local address, and connected to a remote address.
* <p/>
* {@link ISessionManager}
*/
@SuppressWarnings("Duplicates")
@Override
public
void onConnected(final C connection) {
void onConnected(final Connection connection) {
addConnection(connection);
boolean foundListener = onConnectedManager.notifyConnected(connection, shutdown);
@ -369,8 +365,8 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
// now have to account for additional (local) listener managers.
// access a snapshot of the managers (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
ConnectionManager<C> localManager = localManagers.get(connection);
final IdentityMap<Connection, ConnectionManager> localManagers = localManagersREF.get(this);
ConnectionManager localManager = localManagers.get(connection);
if (localManager != null) {
localManager.onConnected(connection);
}
@ -378,13 +374,10 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
/**
* Invoked when a Channel was disconnected from its remote peer.
* <p/>
* {@link ISessionManager}
*/
@SuppressWarnings("Duplicates")
@Override
public
void onDisconnected(final C connection) {
void onDisconnected(final Connection connection) {
boolean foundListener = onDisconnectedManager.notifyDisconnected(connection, shutdown);
if (foundListener) {
@ -395,8 +388,8 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
// now have to account for additional (local) listener managers.
// access a snapshot of the managers (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
ConnectionManager<C> localManager = localManagers.get(connection);
final IdentityMap<Connection, ConnectionManager> localManagers = localManagersREF.get(this);
ConnectionManager localManager = localManagers.get(connection);
if (localManager != null) {
localManager.onDisconnected(connection);
@ -415,11 +408,11 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*
* @param connection the connection to add
*/
void addConnection(final C connection) {
void addConnection(final Connection connection) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock2) {
synchronized (singleWriterConnectionsLock) {
// access a snapshot of the connections (single-writer-principle)
ConcurrentEntry head = connectionsREF.get(this);
@ -442,12 +435,12 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*
* @param connection the connection to remove
*/
private
void removeConnection(C connection) {
public
void removeConnection(Connection connection) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock2) {
synchronized (singleWriterConnectionsLock) {
// access a snapshot of the connections (single-writer-principle)
ConcurrentEntry concurrentEntry = connectionEntries.get(connection);
@ -477,36 +470,34 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
@Override
public
List<C> getConnections() {
synchronized (singleWriterLock2) {
final IdentityMap.Keys<C> keys = this.connectionEntries.keys();
return keys.toArray();
synchronized (singleWriterConnectionsLock) {
final IdentityMap.Keys<Connection> keys = this.connectionEntries.keys();
return (List<C>) keys.toArray();
}
}
final
ConnectionManager<C> addListenerManager(final Connection connection) {
ConnectionManager addListenerManager(final Connection connection) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
// are notified of that listener.
// it is POSSIBLE to add a connection-specific listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners)
ConnectionManager<C> manager;
ConnectionManager manager;
boolean created = false;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock3) {
synchronized (singleWriterLocalManagerLock) {
// access a snapshot of the managers (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
final IdentityMap<Connection, ConnectionManager> localManagers = localManagersREF.get(this);
manager = localManagers.get(connection);
if (manager == null) {
created = true;
manager = new ConnectionManager<C>(loggerName + "-" + connection.toString() + " Specific",
ConnectionManager.this.baseClass);
manager = new ConnectionManager(loggerName + "-" + connection.toString() + " Specific", ConnectionManager.this.baseClass);
localManagers.put(connection, manager);
// save this snapshot back to the original (single writer principle)
@ -531,11 +522,11 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock3) {
synchronized (singleWriterLocalManagerLock) {
// access a snapshot of the managers (single-writer-principle)
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
final IdentityMap<Connection, ConnectionManager> localManagers = localManagersREF.get(this);
final ConnectionManager<C> removed = localManagers.remove(connection);
final ConnectionManager removed = localManagers.remove(connection);
if (removed != null) {
wasRemoved = true;
@ -552,23 +543,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
}
}
/**
* BE CAREFUL! Only for internal use!
*
* @return Returns a FAST first connection (for client!).
*/
public final
C getConnection0() throws IOException {
ConcurrentEntry<C> head1 = connectionsREF.get(this);
if (head1 != null) {
return head1.getValue();
}
else {
throw new IOException("Not connected to a remote computer. Unable to continue!");
}
}
/**
* BE CAREFUL! Only for internal use!
*
@ -587,7 +561,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
this.shutdown.set(true);
// disconnect the sessions
closeConnections();
closeConnections(false);
onConnectedManager.clear();
onDisconnectedManager.clear();
@ -596,22 +570,31 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
}
/**
* Close all connections ONLY
* Close all connections ONLY.
*
* Only keep the listeners for connections IF we are the client. If we remove listeners as a client, ALL of the client logic will
* be lost. The server is reactive, so listeners are added to connections as needed (instead of before startup, which is what the client does).
*/
final
void closeConnections() {
void closeConnections(boolean keepListeners) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock2) {
synchronized (singleWriterConnectionsLock) {
// don't need anything fast or fancy here, because this method will only be called once
final IdentityMap.Keys<C> keys = connectionEntries.keys();
for (C connection : keys) {
final IdentityMap.Keys<Connection> keys = connectionEntries.keys();
for (Connection connection : keys) {
// Close the connection. Make sure the close operation ends because
// all I/O operations are asynchronous in Netty.
// Also necessary otherwise workers won't close.
connection.close();
if (keepListeners && connection instanceof ConnectionImpl) {
((ConnectionImpl) connection).close(true);
}
else {
connection.close();
}
}
this.connectionEntries.clear();
@ -636,7 +619,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/
@Override
public
ConnectionExceptSpecifiedBridgeServer<C> except() {
ConnectionExceptSpecifiedBridgeServer except() {
return this;
}
@ -650,8 +633,8 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
@Override
public
void flush() {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
ConcurrentEntry<Connection> current = connectionsREF.get(this);
Connection c;
while (current != null) {
c = current.getValue();
current = current.next();
@ -667,9 +650,9 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/
@Override
public
ConnectionPoint TCP(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
ConnectionPoint TCP(final Connection connection, final Object message) {
ConcurrentEntry<Connection> current = connectionsREF.get(this);
Connection c;
while (current != null) {
c = current.getValue();
current = current.next();
@ -688,9 +671,9 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
*/
@Override
public
ConnectionPoint UDP(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
ConnectionPoint UDP(final Connection connection, final Object message) {
ConcurrentEntry<Connection> current = connectionsREF.get(this);
Connection c;
while (current != null) {
c = current.getValue();
current = current.next();
@ -709,8 +692,8 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
@Override
public
void self(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
ConcurrentEntry<ConnectionImpl> current = connectionsREF.get(this);
ConnectionImpl c;
while (current != null) {
c = current.getValue();
current = current.next();
@ -725,8 +708,8 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
@Override
public
ConnectionPoint TCP(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
ConcurrentEntry<Connection> current = connectionsREF.get(this);
Connection c;
while (current != null) {
c = current.getValue();
current = current.next();
@ -743,8 +726,8 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
@Override
public
ConnectionPoint UDP(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this);
C c;
ConcurrentEntry<Connection> current = connectionsREF.get(this);
Connection c;
while (current != null) {
c = current.getValue();
current = current.next();

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.connection;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.List;
import java.util.concurrent.Executor;
@ -36,13 +35,15 @@ import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.pipeline.KryoEncoder;
import dorkbox.network.pipeline.KryoEncoderCrypto;
import dorkbox.network.rmi.RmiBridge;
import dorkbox.network.rmi.RmiObjectHandler;
import dorkbox.network.rmi.RmiObjectLocalHandler;
import dorkbox.network.rmi.RmiObjectNetworkHandler;
import dorkbox.network.serialization.Serialization;
import dorkbox.network.store.NullSettingsStore;
import dorkbox.network.store.SettingsStore;
import dorkbox.util.Property;
import dorkbox.util.crypto.CryptoECC;
import dorkbox.util.entropy.Entropy;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
import io.netty.util.NetUtil;
@ -50,7 +51,7 @@ import io.netty.util.NetUtil;
* represents the base of a client/server end point
*/
public abstract
class EndPointBase<C extends Connection> extends EndPoint {
class EndPointBase extends EndPoint {
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
// it results in severe UDP packet loss and contention.
//
@ -89,14 +90,19 @@ class EndPointBase<C extends Connection> extends EndPoint {
@Property
public static int udpMaxSize = 508;
protected final ConnectionManager<C> connectionManager;
protected final ConnectionManager connectionManager;
protected final dorkbox.network.serialization.CryptoSerializationManager serializationManager;
protected final RegistrationWrapper<C> registrationWrapper;
protected final RegistrationWrapper registrationWrapper;
final ECPrivateKeyParameters privateKey;
final ECPublicKeyParameters publicKey;
final SecureRandom secureRandom;
// we only want one instance of these created. These will be called appropriately
private final RmiObjectHandler rmiHandler;
private final RmiObjectLocalHandler localRmiHandler;
private final RmiObjectNetworkHandler networkRmiHandler;
final RmiBridge globalRmiBridge;
private final Executor rmiExecutor;
@ -117,12 +123,10 @@ class EndPointBase<C extends Connection> extends EndPoint {
* @param type this is either "Client" or "Server", depending on who is creating this endpoint.
* @param config these are the specific connection options
*
* @throws InitializationException
* @throws SecurityException
* @throws SecurityException if unable to initialize/generate ECC keys
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public
EndPointBase(Class<? extends EndPointBase> type, final Configuration config) throws InitializationException, SecurityException, IOException {
EndPointBase(Class<? extends EndPointBase> type, final Configuration config) throws SecurityException {
super(type);
// make sure that 'localhost' is ALWAYS our specific loopback IP address
@ -193,7 +197,7 @@ class EndPointBase<C extends Connection> extends EndPoint {
} catch (Exception e) {
String message = "Unable to initialize/generate ECC keys. FORCED SHUTDOWN.";
logger.error(message);
throw new InitializationException(message);
throw new SecurityException(message);
}
}
@ -209,17 +213,22 @@ class EndPointBase<C extends Connection> extends EndPoint {
secureRandom = new SecureRandom(propertyStore.getSalt());
// we don't care about un-instantiated/constructed members, since the class type is the only interest.
connectionManager = new ConnectionManager<C>(type.getSimpleName(), connection0(null).getClass());
//noinspection unchecked
connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass());
// add the ping listener (internal use only!)
connectionManager.add(new PingSystemListener());
if (rmiEnabled) {
// these register the listener for registering a class implementation for RMI (internal use only)
connectionManager.add(new RegisterRmiNetworkHandler());
rmiHandler = null;
localRmiHandler = new RmiObjectLocalHandler();
networkRmiHandler = new RmiObjectNetworkHandler();
globalRmiBridge = new RmiBridge(logger, config.rmiExecutor, true);
}
else {
rmiHandler = new RmiObjectHandler();
localRmiHandler = null;
networkRmiHandler = null;
globalRmiBridge = null;
}
@ -318,7 +327,6 @@ class EndPointBase<C extends Connection> extends EndPoint {
*
* @param metaChannel can be NULL (when getting the baseClass)
*/
@SuppressWarnings("unchecked")
protected final
Connection connection0(MetaChannel metaChannel) {
ConnectionImpl connection;
@ -332,31 +340,35 @@ class EndPointBase<C extends Connection> extends EndPoint {
// These properties are ASSIGNED in the same thread that CREATED the object. Only the AES info needs to be
// volatile since it is the only thing that changes.
if (metaChannel != null) {
ChannelWrapper<C> wrapper;
ChannelWrapper wrapper;
connection = newConnection(logger, this, rmiBridge);
metaChannel.connection = connection;
if (metaChannel.localChannel != null) {
wrapper = new ChannelLocalWrapper(metaChannel);
}
else {
if (this instanceof EndPointServer) {
wrapper = new ChannelNetworkWrapper(metaChannel, registrationWrapper);
if (rmiEnabled) {
wrapper = new ChannelLocalWrapper(metaChannel, localRmiHandler);
}
else {
wrapper = new ChannelNetworkWrapper(metaChannel, null);
wrapper = new ChannelLocalWrapper(metaChannel, rmiHandler);
}
}
else {
RmiObjectHandler rmiObjectHandler = rmiHandler;
if (rmiEnabled) {
rmiObjectHandler = networkRmiHandler;
}
if (this instanceof EndPointServer) {
wrapper = new ChannelNetworkWrapper(metaChannel, registrationWrapper, rmiObjectHandler);
}
else {
wrapper = new ChannelNetworkWrapper(metaChannel, null, rmiObjectHandler);
}
}
// now initialize the connection channels with whatever extra info they might need.
connection.init(wrapper, (ConnectionManager<Connection>) connectionManager);
if (rmiBridge != null) {
// notify our remote object space that it is able to receive method calls.
connection.listeners()
.add(rmiBridge.getListener());
}
connection.init(wrapper, connectionManager);
}
else {
// getting the connection baseClass
@ -374,14 +386,13 @@ class EndPointBase<C extends Connection> extends EndPoint {
* <p/>
* Only the CLIENT injects in front of this)
*/
@SuppressWarnings("unchecked")
void connectionConnected0(ConnectionImpl connection) {
isConnected.set(true);
// prep the channel wrapper
connection.prep();
connectionManager.onConnected((C) connection);
connectionManager.onConnected(connection);
}
/**
@ -396,7 +407,7 @@ class EndPointBase<C extends Connection> extends EndPoint {
* Returns a non-modifiable list of active connections
*/
public
List<C> getConnections() {
<C extends Connection> List<C> getConnections() {
return connectionManager.getConnections();
}
@ -413,13 +424,13 @@ class EndPointBase<C extends Connection> extends EndPoint {
* <p/>
* The server should ALWAYS use STOP.
*/
public
void closeConnections() {
void closeConnections(boolean shouldKeepListeners) {
// give a chance to other threads.
Thread.yield();
// stop does the same as this + more
connectionManager.closeConnections();
// stop does the same as this + more. Only keep the listeners for connections IF we are the client. If we remove listeners as a client,
// ALL of the client logic will be lost. The server is reactive, so listeners are added to connections as needed (instead of before startup)
connectionManager.closeConnections(shouldKeepListeners);
// Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed.
registrationWrapper.closeChannels(maxShutdownWaitTimeInMilliSeconds);
@ -442,7 +453,7 @@ class EndPointBase<C extends Connection> extends EndPoint {
@Override
protected
void shutdownChannelsPre() {
closeConnections();
closeConnections(false);
// this does a closeConnections + clear_listeners
connectionManager.stop();
@ -465,7 +476,6 @@ class EndPointBase<C extends Connection> extends EndPoint {
return result;
}
@SuppressWarnings("rawtypes")
@Override
public
boolean equals(Object obj) {

View File

@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import dorkbox.network.Client;
import dorkbox.network.Configuration;
import dorkbox.network.connection.bridge.ConnectionBridge;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
@ -34,9 +33,10 @@ import io.netty.channel.ChannelOption;
* This serves the purpose of making sure that specific methods are not available to the end user.
*/
public
class EndPointClient<C extends Connection> extends EndPointBase<C> {
class EndPointClient extends EndPointBase {
protected C connection;
// is valid when there is a connection to the server, otherwise it is null
protected volatile Connection connection;
private CountDownLatch registration;
@ -51,7 +51,7 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> {
public
EndPointClient(Configuration config) throws InitializationException, SecurityException, IOException {
EndPointClient(Configuration config) throws SecurityException {
super(Client.class, config);
}
@ -211,7 +211,7 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> {
};
//noinspection unchecked
this.connection = (C) connection;
this.connection = connection;
synchronized (bootstrapLock) {
// we're done with registration, so no need to keep this around
@ -251,16 +251,19 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> {
* <p/>
* This is used, for example, when reconnecting to a server.
*/
@Override
public
void closeConnections() {
super.closeConnections();
// Only keep the listeners for connections IF we are the client. If we remove listeners as a client,
// ALL of the client logic will be lost. The server is reactive, so listeners are added to connections as needed (instead of before startup)
closeConnections(true);
// make sure we're not waiting on registration
registrationCompleted();
// for the CLIENT only, we clear these connections! (the server only clears them on shutdown)
shutdownChannels();
connection = null;
}
/**

View File

@ -15,22 +15,19 @@
*/
package dorkbox.network.connection;
import java.io.IOException;
import dorkbox.network.Configuration;
import dorkbox.network.Server;
import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
/**
* This serves the purpose of making sure that specific methods are not available to the end user.
*/
public
class EndPointServer<C extends Connection> extends EndPointBase<C> {
class EndPointServer extends EndPointBase {
public
EndPointServer(final Configuration config) throws InitializationException, SecurityException, IOException {
EndPointServer(final Configuration config) throws SecurityException {
super(Server.class, config);
}
@ -39,7 +36,7 @@ class EndPointServer<C extends Connection> extends EndPointBase<C> {
*/
@Override
public
ConnectionBridgeServer<C> send() {
ConnectionBridgeServer send() {
return this.connectionManager;
}
@ -53,7 +50,7 @@ class EndPointServer<C extends Connection> extends EndPointBase<C> {
* @return a newly created listener manager for the connection
*/
final
ConnectionManager<C> addListenerManager(final C connection) {
ConnectionManager addListenerManager(final Connection connection) {
return this.connectionManager.addListenerManager(connection);
}
@ -67,7 +64,7 @@ class EndPointServer<C extends Connection> extends EndPointBase<C> {
* This removes the listener manager for that specific connection
*/
final
void removeListenerManager(final C connection) {
void removeListenerManager(final Connection connection) {
this.connectionManager.removeListenerManager(connection);
}
@ -80,7 +77,7 @@ class EndPointServer<C extends Connection> extends EndPointBase<C> {
* @param connection the connection to add
*/
public
void add(C connection) {
void add(Connection connection) {
connectionManager.addConnection(connection);
}
@ -93,7 +90,7 @@ class EndPointServer<C extends Connection> extends EndPointBase<C> {
* @param connection the connection to remove
*/
public
void remove(C connection) {
connectionManager.addConnection(connection);
void remove(Connection connection) {
connectionManager.removeConnection(connection);
}
}

View File

@ -18,24 +18,29 @@ package dorkbox.network.connection;
import java.util.Collection;
public
interface ISessionManager<C extends Connection> {
interface ISessionManager {
/**
* Called when a message is received
*/
void onMessage(C connection, Object message);
void onMessage(ConnectionImpl connection, Object message);
/**
* Called when the connection has been idle (read & write) for 2 seconds
*/
void onIdle(C connection);
void onIdle(Connection connection);
/**
* Invoked when a Channel is open, bound to a local address, and connected to a remote address.
*/
void onConnected(Connection connection);
void onConnected(C connection);
void onDisconnected(C connection);
/**
* Invoked when a Channel was disconnected from its remote peer.
*/
void onDisconnected(Connection connection);
/**
* Returns a non-modifiable list of active connections. This is extremely slow, and not recommended!
*/
Collection<C> getConnections();
<C extends Connection> Collection<C> getConnections();
}

View File

@ -47,13 +47,13 @@ import io.netty.channel.ChannelPipeline;
* This is in the connection package, so it can access the endpoint methods that it needs to (without having to publicly expose them)
*/
public
class RegistrationWrapper<C extends Connection> implements UdpServer {
class RegistrationWrapper implements UdpServer {
private final org.slf4j.Logger logger;
private final KryoEncoder kryoEncoder;
private final KryoEncoderCrypto kryoEncoderCrypto;
private final EndPointBase<C> endPointBaseConnection;
private final EndPointBase endPointBaseConnection;
// keeps track of connections (TCP/UDP-client)
private final ReentrantLock channelMapLock = new ReentrantLock();
@ -77,7 +77,7 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
public
RegistrationWrapper(final EndPointBase<C> endPointBaseConnection,
RegistrationWrapper(final EndPointBase endPointBaseConnection,
final Logger logger,
final KryoEncoder kryoEncoder,
final KryoEncoderCrypto kryoEncoderCrypto) {
@ -94,14 +94,6 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
}
}
/**
* @return true if RMI is enabled
*/
public
boolean rmiEnabled() {
return endPointBaseConnection.globalRmiBridge != null;
}
public
KryoEncoder getKryoEncoder() {
return this.kryoEncoder;
@ -161,8 +153,7 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
/**
* Internal call by the pipeline when: - creating a new network connection - when determining the baseClass for generics
*
* @param metaChannel
* can be NULL (when getting the baseClass)
* @param metaChannel can be NULL (when getting the baseClass)
*/
public
Connection connection0(MetaChannel metaChannel) {
@ -317,7 +308,7 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
public
void abortRegistrationIfClient() {
if (this.endPointBaseConnection instanceof EndPointClient) {
((EndPointClient<C>) this.endPointBaseConnection).abortRegistration();
((EndPointClient) this.endPointBaseConnection).abortRegistration();
}
}
@ -411,7 +402,7 @@ class RegistrationWrapper<C extends Connection> implements UdpServer {
* they will be ADDED in another map, in the followup handler!!
*/
public
boolean setupChannels(final RegistrationRemoteHandler<C> handler, final MetaChannel metaChannel) {
boolean setupChannels(final RegistrationRemoteHandler handler, final MetaChannel metaChannel) {
boolean registerServer = false;
try {

View File

@ -15,14 +15,12 @@
*/
package dorkbox.network.connection.bridge;
import dorkbox.network.connection.Connection;
public
interface ConnectionBridgeServer<C extends Connection> extends ConnectionBridgeBase {
interface ConnectionBridgeServer extends ConnectionBridgeBase {
/**
* Exposes methods to send the object to all server connections (except the specified one) over the network. (or via LOCAL when it's a
* local channel).
*/
ConnectionExceptSpecifiedBridgeServer<C> except();
ConnectionExceptSpecifiedBridgeServer except();
}

View File

@ -19,17 +19,17 @@ import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionPoint;
public
interface ConnectionExceptSpecifiedBridgeServer<C extends Connection> {
interface ConnectionExceptSpecifiedBridgeServer {
/**
* Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local
* channel).
*/
ConnectionPoint TCP(C connection, Object message);
ConnectionPoint TCP(Connection connection, Object message);
/**
* Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local
* channel).
*/
ConnectionPoint UDP(C connection, Object message);
ConnectionPoint UDP(Connection connection, Object message);
}

View File

@ -15,16 +15,18 @@
*/
package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnConnected;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.util.collections.ConcurrentEntry;
import org.slf4j.Logger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
@ -33,93 +35,84 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/
@SuppressWarnings("Duplicates")
public final
class OnConnectedManager<C extends Connection> {
class OnConnectedManager {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnConnectedManager, ConcurrentEntry> REF = AtomicReferenceFieldUpdater.newUpdater(
OnConnectedManager.class,
ConcurrentEntry.class,
"head_");
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnConnected<C>, ConcurrentEntry> entries = new IdentityMap<OnConnected<C>, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnConnected<C>> head = null; // reference to the first element
private final IdentityMap<OnConnected, ConcurrentEntry> entries = new IdentityMap<OnConnected, ConcurrentEntry>(32,
ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnConnected> head_ = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnConnectedManager, ConcurrentEntry> REF =
AtomicReferenceFieldUpdater.newUpdater(OnConnectedManager.class,
ConcurrentEntry.class,
"head");
public
OnConnectedManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnConnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
public synchronized
void add(final OnConnected listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnConnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
public synchronized
boolean remove(final OnConnected listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head1 = REF.get(this);
if (concurrentEntry != null) {
ConcurrentEntry head = REF.get(this);
if (concurrentEntry == head1) {
// if it was second, now it's first
head1 = head1.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head1);
entries.remove(listener);
return true;
} else {
return false;
if (concurrentEntry == head) {
// if it was second, now it's first
head = head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
entries.remove(listener);
return true;
}
else {
return false;
}
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyConnected(final C connection, final AtomicBoolean shutdown) {
<C extends Connection> boolean notifyConnected(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnConnected<C>> head = REF.get(this);
ConcurrentEntry<OnConnected<C>> current = head;
OnConnected<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
@ -143,14 +136,9 @@ class OnConnectedManager<C extends Connection> {
/**
* called on shutdown
*/
public
public synchronized
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
this.entries.clear();
this.head = null;
}
this.entries.clear();
this.head_ = null;
}
}

View File

@ -15,16 +15,18 @@
*/
package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnDisconnected;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.util.collections.ConcurrentEntry;
import org.slf4j.Logger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
@ -33,82 +35,74 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/
@SuppressWarnings("Duplicates")
public final
class OnDisconnectedManager<C extends Connection> {
class OnDisconnectedManager {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnDisconnectedManager, ConcurrentEntry> REF = AtomicReferenceFieldUpdater.newUpdater(
OnDisconnectedManager.class,
ConcurrentEntry.class,
"head_");
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnDisconnected<C>, ConcurrentEntry> entries = new IdentityMap<OnDisconnected<C>, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnDisconnected<C>> head = null; // reference to the first element
private final IdentityMap<OnDisconnected, ConcurrentEntry> entries = new IdentityMap<OnDisconnected, ConcurrentEntry>(32,
ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnDisconnected> head_ = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnDisconnectedManager, ConcurrentEntry> REF =
AtomicReferenceFieldUpdater.newUpdater(OnDisconnectedManager.class,
ConcurrentEntry.class,
"head");
public
OnDisconnectedManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnDisconnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
public synchronized
void add(final OnDisconnected listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnDisconnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
public synchronized
boolean remove(final OnDisconnected<Connection> listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head1 = REF.get(this);
if (concurrentEntry != null) {
ConcurrentEntry head = REF.get(this);
if (concurrentEntry == head1) {
// if it was second, now it's first
head1 = head1.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head1);
entries.remove(listener);
return true;
} else {
return false;
if (concurrentEntry == head) {
// if it was second, now it's first
head = head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
entries.remove(listener);
return true;
}
else {
return false;
}
}
@ -116,11 +110,11 @@ class OnDisconnectedManager<C extends Connection> {
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) {
<C extends Connection> boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnDisconnected<C>> head = REF.get(this);
ConcurrentEntry<OnDisconnected<C>> current = head;
OnDisconnected<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
@ -133,7 +127,10 @@ class OnDisconnectedManager<C extends Connection> {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e);
logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.",
listener,
connection,
e);
}
}
}
@ -144,14 +141,9 @@ class OnDisconnectedManager<C extends Connection> {
/**
* called on shutdown
*/
public
public synchronized
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
this.entries.clear();
this.head = null;
}
this.entries.clear();
this.head_ = null;
}
}

View File

@ -15,16 +15,18 @@
*/
package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.network.connection.Listener.OnIdle;
import dorkbox.util.collections.ConcurrentEntry;
import org.slf4j.Logger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
@ -33,94 +35,85 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/
@SuppressWarnings("Duplicates")
public final
class OnIdleManager<C extends Connection> {
class OnIdleManager {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnIdleManager, ConcurrentEntry> REF = AtomicReferenceFieldUpdater.newUpdater(
OnIdleManager.class,
ConcurrentEntry.class,
"head_");
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnIdle<C>, ConcurrentEntry> entries = new IdentityMap<OnIdle<C>, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnIdle<C>> head = null; // reference to the first element
private final IdentityMap<OnIdle, ConcurrentEntry<OnIdle>> entries = new IdentityMap<OnIdle, ConcurrentEntry<OnIdle>>(32,
ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnIdle> head_ = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnIdleManager, ConcurrentEntry> REF =
AtomicReferenceFieldUpdater.newUpdater(OnIdleManager.class,
ConcurrentEntry.class,
"head");
public
OnIdleManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnIdle<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
public synchronized
void add(final OnIdle listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnIdle<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
public synchronized
boolean remove(final OnIdle listener) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head1 = REF.get(this);
if (concurrentEntry != null) {
ConcurrentEntry head = REF.get(this);
if (concurrentEntry == head1) {
// if it was second, now it's first
head1 = head1.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head1);
entries.remove(listener);
return true;
if (concurrentEntry == head) {
// if it was second, now it's first
head = head.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
return false;
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
entries.remove(listener);
return true;
}
else {
return false;
}
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyIdle(final C connection, final AtomicBoolean shutdown) {
<C extends Connection> boolean notifyIdle(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnIdle<C>> head = REF.get(this);
ConcurrentEntry<OnIdle<C>> current = head;
OnIdle<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
@ -146,14 +139,9 @@ class OnIdleManager<C extends Connection> {
/**
* called on shutdown
*/
public
public synchronized
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
this.entries.clear();
this.head = null;
}
this.entries.clear();
this.head_ = null;
}
}

View File

@ -31,7 +31,6 @@ import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.network.connection.Listener.OnMessageReceived;
import dorkbox.network.connection.Listener.SelfDefinedType;
import dorkbox.network.rmi.RmiMessages;
import dorkbox.util.collections.ConcurrentEntry;
import dorkbox.util.collections.ConcurrentIterator;
import dorkbox.util.generics.ClassHelper;
@ -43,256 +42,12 @@ import dorkbox.util.generics.ClassHelper;
*/
@SuppressWarnings("Duplicates")
public final
class OnMessageReceivedManager<C extends Connection> {
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
@SuppressWarnings("unused")
private volatile IdentityMap<Type, ConcurrentIterator> listeners = new IdentityMap<Type, ConcurrentIterator>(32, ConnectionManager.LOAD_FACTOR);
class OnMessageReceivedManager {
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnMessageReceivedManager, IdentityMap> REF =
AtomicReferenceFieldUpdater.newUpdater(OnMessageReceivedManager.class,
IdentityMap.class,
"listeners");
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
public
OnMessageReceivedManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnMessageReceived<C, Object> listener) {
final Class<?> type;
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
ConcurrentIterator subscribedListeners = listeners.get(type);
if (subscribedListeners == null) {
subscribedListeners = new ConcurrentIterator();
listeners.put(type, subscribedListeners);
}
subscribedListeners.add(listener);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnMessageReceived<C, Object> listener) {
final Class<?> type;
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
boolean found = false;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
final ConcurrentIterator concurrentIterator = listeners.get(type);
if (concurrentIterator != null) {
concurrentIterator.remove(listener);
found = true;
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
return found;
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) {
boolean found = false;
Class<?> objectType = message.getClass();
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
ConcurrentIterator concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<OnMessageReceived<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<OnMessageReceived<C, Object>> current = head;
OnMessageReceived<C, Object> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
}
}
}
found = head != null; // true if we have something to publish to, otherwise false
}
if (!(message instanceof RmiMessages)) {
// we march through all super types of the object, and find the FIRST set
// of listeners that are registered and cast it as that, and notify the method.
// NOTICE: we do NOT call ALL TYPE -- meaning, if we have Object->Foo->Bar
// and have listeners for Object and Foo
// we will call Bar (from the above code)
// we will call Foo (from this code)
// we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object!
objectType = objectType.getSuperclass();
while (objectType != null) {
// check to see if we have what we are looking for in our CURRENT class
concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<OnMessageReceived<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<OnMessageReceived<C, Object>> current = head;
OnMessageReceived<C, Object> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
}
}
}
found = head != null; // true if we have something to publish to, otherwise false
break;
}
// NO MATCH, so walk up.
objectType = objectType.getSuperclass();
}
}
return found;
}
public
void removeAll() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
listeners.clear();
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean removeAll(final Class<?> classType) {
boolean found;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
found = listeners.remove(classType) != null;
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
return found;
}
/**
* called on shutdown
*/
public
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
// The iterators for this map are NOT THREAD SAFE!
// using .entries() is what we are supposed to use!
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries();
for (final IdentityMap.Entry<Type, ConcurrentIterator> next : entries) {
if (next.value != null) {
next.value.clear();
}
}
listeners.clear();
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}
private static final AtomicReferenceFieldUpdater<OnMessageReceivedManager, IdentityMap> REF = AtomicReferenceFieldUpdater.newUpdater(
OnMessageReceivedManager.class,
IdentityMap.class,
"listeners");
/**
* Gets the referenced object type for a specific listener, but ONLY necessary for listeners that receive messages
@ -321,4 +76,211 @@ class OnMessageReceivedManager<C extends Connection> {
return Object.class;
}
}
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
private volatile IdentityMap<Type, ConcurrentIterator> listeners = new IdentityMap<Type, ConcurrentIterator>(32, ConnectionManager.LOAD_FACTOR);
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
public
OnMessageReceivedManager(final Logger logger) {
this.logger = logger;
}
public
void add(final OnMessageReceived listener) {
final Class<?> type;
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
synchronized (this) {
// access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
ConcurrentIterator subscribedListeners = listeners.get(type);
if (subscribedListeners == null) {
subscribedListeners = new ConcurrentIterator();
listeners.put(type, subscribedListeners);
}
subscribedListeners.add(listener);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnMessageReceived listener) {
final Class<?> type;
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
boolean found = false;
synchronized (this) {
// access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
final ConcurrentIterator concurrentIterator = listeners.get(type);
if (concurrentIterator != null) {
concurrentIterator.remove(listener);
found = true;
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
return found;
}
/**
* @return true if a listener was found, false otherwise
*/
public
<C extends Connection> boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) {
boolean found = false;
Class<?> objectType = message.getClass();
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
ConcurrentIterator concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<OnMessageReceived<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<OnMessageReceived<C, Object>> current = head;
OnMessageReceived<C, Object> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
}
}
}
found = head != null; // true if we have something to publish to, otherwise false
}
// we march through all super types of the object, and find the FIRST set
// of listeners that are registered and cast it as that, and notify the method.
// NOTICE: we do NOT call ALL TYPES -- meaning, if we have Object->Foo->Bar
// and have listeners for Object and Foo
// we will call Bar (from the above code)
// we will call Foo (from this code)
// we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object!
objectType = objectType.getSuperclass();
while (objectType != null) {
// check to see if we have what we are looking for in our CURRENT class
concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<OnMessageReceived<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<OnMessageReceived<C, Object>> current = head;
OnMessageReceived<C, Object> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
}
}
}
found = head != null; // true if we have something to publish to, otherwise false
break;
}
// NO MATCH, so walk up.
objectType = objectType.getSuperclass();
}
return found;
}
public synchronized
void removeAll() {
listeners.clear();
}
/**
* @return true if the listener was removed, false otherwise
*/
public synchronized
boolean removeAll(final Class<?> classType) {
boolean found;
// access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
found = listeners.remove(classType) != null;
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
return found;
}
/**
* called on shutdown
*/
public
void clear() {
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
// The iterators for this map are NOT THREAD SAFE!
// using .entries() is what we are supposed to use!
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries();
for (final IdentityMap.Entry<Type, ConcurrentIterator> next : entries) {
if (next.value != null) {
next.value.clear();
}
}
listeners.clear();
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.connection.registration;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.connection.RegistrationWrapper;
import io.netty.channel.Channel;
@ -25,16 +24,16 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
@Sharable
public abstract
class RegistrationHandler<C extends Connection> extends ChannelInboundHandlerAdapter {
class RegistrationHandler extends ChannelInboundHandlerAdapter {
protected static final String CONNECTION_HANDLER = "connectionHandler";
protected final RegistrationWrapper<C> registrationWrapper;
protected final RegistrationWrapper registrationWrapper;
protected final org.slf4j.Logger logger;
protected final String name;
public
RegistrationHandler(final String name, RegistrationWrapper<C> registrationWrapper) {
RegistrationHandler(final String name, RegistrationWrapper registrationWrapper) {
this.name = name;
this.logger = org.slf4j.LoggerFactory.getLogger(this.name);
this.registrationWrapper = registrationWrapper;
@ -85,7 +84,7 @@ class RegistrationHandler<C extends Connection> extends ChannelInboundHandlerAda
void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) throws Exception;
public
MetaChannel shutdown(final RegistrationWrapper<C> registrationWrapper, final Channel channel) {
MetaChannel shutdown(final RegistrationWrapper registrationWrapper, final Channel channel) {
// shutdown. Something messed up or was incorrect
// properly shutdown the TCP/UDP channels.
if (channel.isOpen()) {

View File

@ -17,8 +17,6 @@ package dorkbox.network.connection.registration.local;
import static dorkbox.network.connection.EndPointBase.maxShutdownWaitTimeInMilliSeconds;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.RegisterRmiLocalHandler;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.RegistrationHandler;
@ -26,11 +24,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
public abstract
class RegistrationLocalHandler<C extends Connection> extends RegistrationHandler<C> {
static final String LOCAL_RMI_HANDLER = "localRmiHandler";
final RegisterRmiLocalHandler rmiLocalHandler = new RegisterRmiLocalHandler();
class RegistrationLocalHandler extends RegistrationHandler {
RegistrationLocalHandler(String name, RegistrationWrapper<C> registrationWrapper) {
RegistrationLocalHandler(String name, RegistrationWrapper registrationWrapper) {
super(name, registrationWrapper);
}

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.connection.registration.local;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
@ -26,10 +25,10 @@ import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
public
class RegistrationLocalHandlerClient<C extends Connection> extends RegistrationLocalHandler<C> {
class RegistrationLocalHandlerClient extends RegistrationLocalHandler {
public
RegistrationLocalHandlerClient(String name, RegistrationWrapper<C> registrationWrapper) {
RegistrationLocalHandlerClient(String name, RegistrationWrapper registrationWrapper) {
super(name, registrationWrapper);
}
@ -78,15 +77,6 @@ class RegistrationLocalHandlerClient<C extends Connection> extends RegistrationL
ConnectionImpl connection = metaChannel.connection;
// add our RMI handlers
if (registrationWrapper.rmiEnabled()) {
///////////////////////
// DECODE (or upstream)
///////////////////////
pipeline.addFirst(LOCAL_RMI_HANDLER, rmiLocalHandler);
}
// have to setup connection handler
pipeline.addLast(CONNECTION_HANDLER, connection);

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.connection.registration.local;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
@ -25,10 +24,10 @@ import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
public
class RegistrationLocalHandlerServer<C extends Connection> extends RegistrationLocalHandler<C> {
class RegistrationLocalHandlerServer extends RegistrationLocalHandler {
public
RegistrationLocalHandlerServer(String name, RegistrationWrapper<C> registrationWrapper) {
RegistrationLocalHandlerServer(String name, RegistrationWrapper registrationWrapper) {
super(name, registrationWrapper);
}
@ -78,14 +77,6 @@ class RegistrationLocalHandlerServer<C extends Connection> extends RegistrationL
}
if (connection != null) {
// add our RMI handlers
if (registrationWrapper.rmiEnabled()) {
///////////////////////
// DECODE (or upstream)
///////////////////////
pipeline.addFirst(LOCAL_RMI_HANDLER, rmiLocalHandler);
}
// have to setup connection handler
pipeline.addLast(CONNECTION_HANDLER, connection);

View File

@ -27,7 +27,6 @@ import org.bouncycastle.crypto.engines.IESEngine;
import org.bouncycastle.crypto.modes.GCMBlockCipher;
import org.slf4j.Logger;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
@ -42,15 +41,11 @@ import dorkbox.util.crypto.CryptoECC;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ReferenceCountUtil;
public abstract
class RegistrationRemoteHandler<C extends Connection> extends RegistrationHandler<C> {
class RegistrationRemoteHandler extends RegistrationHandler {
static final String KRYO_ENCODER = "kryoEncoder";
static final String KRYO_DECODER = "kryoDecoder";
@ -100,7 +95,7 @@ class RegistrationRemoteHandler<C extends Connection> extends RegistrationHandle
protected final CryptoSerializationManager serializationManager;
RegistrationRemoteHandler(final String name,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final CryptoSerializationManager serializationManager) {
super(name, registrationWrapper);
@ -143,46 +138,50 @@ class RegistrationRemoteHandler<C extends Connection> extends RegistrationHandle
// add the channel so we can access it later.
// do NOT want to add UDP channels, since they are tracked differently.
if (this.logger.isInfoEnabled()) {
Channel channel = context.channel();
Class<? extends Channel> channelClass = channel.getClass();
boolean isUdp = ConnectionImpl.isUdp(channelClass);
// this whole bit is inside a if (logger.isDebugEnabled()) section.
Channel channel = context.channel();
Class<? extends Channel> channelClass = channel.getClass();
StringBuilder stringBuilder = new StringBuilder(96);
StringBuilder stringBuilder = new StringBuilder(96);
stringBuilder.append("Connected to remote ");
if (channelClass == NioSocketChannel.class || channelClass == EpollSocketChannel.class) {
stringBuilder.append("TCP");
}
else if (channelClass == NioDatagramChannel.class || channelClass == EpollDatagramChannel.class) {
stringBuilder.append("UDP");
}
else {
stringBuilder.append("UNKNOWN");
}
stringBuilder.append(" connection. [");
stringBuilder.append(channel.localAddress());
boolean isSessionless = channel instanceof NioDatagramChannel;
if (isSessionless) {
if (channel.remoteAddress() != null) {
stringBuilder.append(" ==> ");
stringBuilder.append(channel.remoteAddress());
stringBuilder.append("Connected to remote ");
if (ConnectionImpl.isTcp(channelClass)) {
stringBuilder.append("TCP");
}
else if (isUdp) {
stringBuilder.append("UDP");
}
else if (ConnectionImpl.isLocal(channelClass)) {
stringBuilder.append("LOCAL");
}
else {
// this means we are LISTENING.
stringBuilder.append(" <== ");
stringBuilder.append("?????");
stringBuilder.append("UNKNOWN");
}
}
else {
stringBuilder.append(getConnectionDirection());
stringBuilder.append(channel.remoteAddress());
}
stringBuilder.append("]");
this.logger.info(stringBuilder.toString());
stringBuilder.append(" connection. [");
stringBuilder.append(channel.localAddress());
// this means we are "Sessionless"
if (isUdp) {
if (channel.remoteAddress() != null) {
stringBuilder.append(" ==> ");
stringBuilder.append(channel.remoteAddress());
}
else {
// this means we are LISTENING.
stringBuilder.append(" <== ");
stringBuilder.append("?????");
}
}
else {
stringBuilder.append(getConnectionDirection());
stringBuilder.append(channel.remoteAddress());
}
stringBuilder.append("]");
this.logger.info(stringBuilder.toString());
}
}
@Override
@ -276,7 +275,7 @@ class RegistrationRemoteHandler<C extends Connection> extends RegistrationHandle
final
boolean verifyAesInfo(final Object message,
final Channel channel,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final MetaChannel metaChannel,
final Logger logger) {

View File

@ -15,16 +15,14 @@
*/
package dorkbox.network.connection.registration.remote;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.serialization.CryptoSerializationManager;
public
class RegistrationRemoteHandlerClient<C extends Connection> extends RegistrationRemoteHandler<C> {
class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
public
RegistrationRemoteHandlerClient(final String name,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final CryptoSerializationManager serializationManager) {
super(name, registrationWrapper, serializationManager);
}

View File

@ -31,10 +31,10 @@ import org.bouncycastle.jce.ECNamedCurveTable;
import org.bouncycastle.jce.spec.ECParameterSpec;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
@ -49,14 +49,14 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public
class RegistrationRemoteHandlerClientTCP<C extends Connection> extends RegistrationRemoteHandlerClient<C> {
class RegistrationRemoteHandlerClientTCP extends RegistrationRemoteHandlerClient {
private static final String DELETE_IP = "eleteIP"; // purposefully missing the "D", since that is a system parameter, which starts with "-D"
private static final ECParameterSpec eccSpec = ECNamedCurveTable.getParameterSpec(CryptoECC.curve25519);
public
RegistrationRemoteHandlerClientTCP(final String name,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final CryptoSerializationManager serializationManager) {
super(name, registrationWrapper, serializationManager);
@ -151,7 +151,7 @@ class RegistrationRemoteHandlerClientTCP<C extends Connection> extends Registrat
void channelRead(final ChannelHandlerContext context, final Object message) throws Exception {
Channel channel = context.channel();
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
RegistrationWrapper registrationWrapper2 = this.registrationWrapper;
Logger logger2 = this.logger;
if (message instanceof Registration) {
// make sure this connection was properly registered in the map. (IT SHOULD BE)
@ -232,8 +232,10 @@ class RegistrationRemoteHandlerClientTCP<C extends Connection> extends Registrat
* see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange
*/
byte[] ecdhPubKeyBytes = Arrays.copyOfRange(payload, intLength, payload.length);
ECPublicKeyParameters ecdhPubKey = EccPublicKeySerializer.read(new Input(ecdhPubKeyBytes));
if (ecdhPubKey == null) {
ECPublicKeyParameters ecdhPubKey;
try {
ecdhPubKey = EccPublicKeySerializer.read(new Input(ecdhPubKeyBytes));
} catch (KryoException e) {
logger2.error("Invalid decode of ecdh public key. Aborting.");
shutdown(registrationWrapper2, channel);

View File

@ -21,7 +21,6 @@ import java.net.InetSocketAddress;
import org.slf4j.Logger;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
@ -37,11 +36,11 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.FixedRecvByteBufAllocator;
public
class RegistrationRemoteHandlerClientUDP<C extends Connection> extends RegistrationRemoteHandlerClient<C> {
class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient {
public
RegistrationRemoteHandlerClientUDP(final String name,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final CryptoSerializationManager serializationManager) {
super(name, registrationWrapper, serializationManager);
}
@ -118,7 +117,7 @@ class RegistrationRemoteHandlerClientUDP<C extends Connection> extends Registrat
// if we also have a UDP channel, we will receive the "connected" message on UDP (otherwise it will be on TCP)
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
RegistrationWrapper registrationWrapper2 = this.registrationWrapper;
MetaChannel metaChannel = registrationWrapper2.getChannel(channel.hashCode());
if (metaChannel != null) {

View File

@ -15,17 +15,15 @@
*/
package dorkbox.network.connection.registration.remote;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.serialization.CryptoSerializationManager;
public
class RegistrationRemoteHandlerServer<C extends Connection> extends RegistrationRemoteHandler<C> {
class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
public
RegistrationRemoteHandlerServer(final String name,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final CryptoSerializationManager serializationManager) {
super(name, registrationWrapper, serializationManager);
}

View File

@ -32,10 +32,10 @@ import org.bouncycastle.jce.spec.ECParameterSpec;
import org.bouncycastle.util.Arrays;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
@ -49,7 +49,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public
class RegistrationRemoteHandlerServerTCP<C extends Connection> extends RegistrationRemoteHandlerServer<C> {
class RegistrationRemoteHandlerServerTCP extends RegistrationRemoteHandlerServer {
private static final long ECDH_TIMEOUT = 10L * 60L * 60L * 1000L * 1000L * 1000L; // 10 minutes in nanoseconds
@ -61,7 +61,7 @@ class RegistrationRemoteHandlerServerTCP<C extends Connection> extends Registrat
public
RegistrationRemoteHandlerServerTCP(final String name,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final CryptoSerializationManager serializationManager) {
super(name, registrationWrapper, serializationManager);
}
@ -264,8 +264,10 @@ class RegistrationRemoteHandlerServerTCP<C extends Connection> extends Registrat
* Diffie-Hellman-Merkle key exchange for the AES key
* see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange
*/
ECPublicKeyParameters ecdhPubKey = EccPublicKeySerializer.read(new Input(payload));
if (ecdhPubKey == null) {
ECPublicKeyParameters ecdhPubKey;
try {
ecdhPubKey = EccPublicKeySerializer.read(new Input(payload));
} catch (KryoException e) {
logger2.error("Invalid decode of ecdh public key. Aborting.");
shutdown(registrationWrapper2, channel);

View File

@ -23,7 +23,6 @@ import java.util.List;
import org.slf4j.Logger;
import dorkbox.network.Broadcast;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.connection.KryoExtra;
@ -45,19 +44,18 @@ import io.netty.handler.codec.MessageToMessageCodec;
@Sharable
public
class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageToMessageCodec<DatagramPacket, UdpWrapper> {
class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<DatagramPacket, UdpWrapper> {
// this is for the SERVER only. UDP channel is ALWAYS the SAME channel (it's the server's listening channel).
private final org.slf4j.Logger logger;
private final ByteBuf discoverResponseBuffer;
private final RegistrationWrapper<C> registrationWrapper;
private final RegistrationWrapper registrationWrapper;
private final CryptoSerializationManager serializationManager;
public
RegistrationRemoteHandlerServerUDP(final String name,
final RegistrationWrapper<C> registrationWrapper,
final RegistrationWrapper registrationWrapper,
final CryptoSerializationManager serializationManager) {
final String name1 = name + " Registration-UDP-Server";
this.logger = org.slf4j.LoggerFactory.getLogger(name1);
@ -176,7 +174,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
// registration is the ONLY thing NOT encrypted
Logger logger2 = this.logger;
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
RegistrationWrapper registrationWrapper2 = this.registrationWrapper;
CryptoSerializationManager serializationManager2 = this.serializationManager;
if (KryoExtra.isEncrypted(message)) {
@ -262,7 +260,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
* Copied from RegistrationHandler. There were issues accessing it as static with generics.
*/
public
MetaChannel shutdown(final RegistrationWrapper<C> registrationWrapper, final Channel channel) {
MetaChannel shutdown(final RegistrationWrapper registrationWrapper, final Channel channel) {
this.logger.error("SHUTDOWN HANDLER REACHED! SOMETHING MESSED UP! TRYING TO ABORT");
// shutdown. Something messed up. Only reach this is something messed up.

View File

@ -19,28 +19,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPointWriter;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.connection.ISessionManager;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.rmi.RmiObjectHandler;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.local.LocalAddress;
public
class ChannelLocalWrapper<C extends Connection> implements ChannelWrapper<C>, ConnectionPointWriter {
class ChannelLocalWrapper implements ChannelWrapper, ConnectionPointWriter {
private final Channel channel;
private final RmiObjectHandler rmiObjectHandler;
private final AtomicBoolean shouldFlush = new AtomicBoolean(false);
private String remoteAddress;
public
ChannelLocalWrapper(MetaChannel metaChannel) {
ChannelLocalWrapper(MetaChannel metaChannel, final RmiObjectHandler rmiObjectHandler) {
this.channel = metaChannel.localChannel;
this.rmiObjectHandler = rmiObjectHandler;
}
/**
* Write an object to the underlying channel
*/
@ -111,6 +114,12 @@ class ChannelLocalWrapper<C extends Connection> implements ChannelWrapper<C>, Co
return true;
}
@Override
public
RmiObjectHandler manageRmi() {
return rmiObjectHandler;
}
@Override
public final
String getRemoteHost() {
@ -119,7 +128,7 @@ class ChannelLocalWrapper<C extends Connection> implements ChannelWrapper<C>, Co
@Override
public
void close(Connection connection, ISessionManager<C> sessionManager) {
void close(ConnectionImpl connection, ISessionManager sessionManager) {
long maxShutdownWaitTimeInMilliSeconds = EndPointBase.maxShutdownWaitTimeInMilliSeconds;
this.shouldFlush.set(false);
@ -153,7 +162,7 @@ class ChannelLocalWrapper<C extends Connection> implements ChannelWrapper<C>, Co
if (getClass() != obj.getClass()) {
return false;
}
ChannelLocalWrapper<C> other = (ChannelLocalWrapper<C>) obj;
ChannelLocalWrapper other = (ChannelLocalWrapper) obj;
if (this.remoteAddress == null) {
if (other.remoteAddress != null) {
return false;

View File

@ -20,19 +20,20 @@ import java.net.InetSocketAddress;
import org.bouncycastle.crypto.params.KeyParameter;
import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPointWriter;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.connection.ISessionManager;
import dorkbox.network.connection.UdpServer;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.rmi.RmiObjectHandler;
import dorkbox.util.FastThreadLocal;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.NetUtil;
public
class ChannelNetworkWrapper<C extends Connection> implements ChannelWrapper<C> {
class ChannelNetworkWrapper implements ChannelWrapper {
private final ChannelNetwork tcp;
private final ChannelNetwork udp;
@ -49,12 +50,16 @@ class ChannelNetworkWrapper<C extends Connection> implements ChannelWrapper<C> {
private final byte[] aesIV; // AES-GCM requires 12 bytes
private final FastThreadLocal<ParametersWithIV> cryptoParameters;
private final RmiObjectHandler rmiObjectHandler;
/**
* @param udpServer is null when created by the client, non-null when created by the server
* @param rmiObjectHandler is a no-op handler if RMI is disabled, otherwise handles RMI object registration
*/
public
ChannelNetworkWrapper(MetaChannel metaChannel, UdpServer udpServer) {
ChannelNetworkWrapper(MetaChannel metaChannel, UdpServer udpServer, final RmiObjectHandler rmiObjectHandler) {
this.rmiObjectHandler = rmiObjectHandler;
Channel tcpChannel = metaChannel.tcpChannel;
this.eventLoop = tcpChannel.eventLoop();
@ -138,7 +143,6 @@ class ChannelNetworkWrapper<C extends Connection> implements ChannelWrapper<C> {
return this.eventLoop;
}
/**
* @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal
* because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't
@ -156,16 +160,21 @@ class ChannelNetworkWrapper<C extends Connection> implements ChannelWrapper<C> {
return isLoopback;
}
@Override
public
RmiObjectHandler manageRmi() {
return rmiObjectHandler;
}
@Override
public
String getRemoteHost() {
return this.remoteAddress;
}
@Override
public
void close(final Connection connection, final ISessionManager<C> sessionManager) {
void close(final ConnectionImpl connection, final ISessionManager sessionManager) {
long maxShutdownWaitTimeInMilliSeconds = EndPointBase.maxShutdownWaitTimeInMilliSeconds;
this.tcp.close(maxShutdownWaitTimeInMilliSeconds);
@ -203,7 +212,6 @@ class ChannelNetworkWrapper<C extends Connection> implements ChannelWrapper<C> {
return false;
}
@SuppressWarnings("rawtypes")
ChannelNetworkWrapper other = (ChannelNetworkWrapper) obj;
if (this.remoteAddress == null) {
if (other.remoteAddress != null) {

View File

@ -17,16 +17,16 @@ package dorkbox.network.connection.wrapper;
import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPointWriter;
import dorkbox.network.connection.ISessionManager;
import dorkbox.network.rmi.RmiObjectHandler;
import io.netty.channel.EventLoop;
public
interface ChannelWrapper<C extends Connection> {
interface ChannelWrapper {
ConnectionPointWriter tcp();
ConnectionPointWriter udp();
/**
@ -54,12 +54,14 @@ interface ChannelWrapper<C extends Connection> {
*/
boolean isLoopback();
RmiObjectHandler manageRmi();
/**
* @return the remote host (can be local, tcp, udp)
*/
String getRemoteHost();
void close(final Connection connection, final ISessionManager<C> sessionManager);
void close(ConnectionImpl connection, ISessionManager sessionManager);
int id();

View File

@ -39,8 +39,9 @@ package dorkbox.network.rmi;
* Internal message to invoke methods remotely.
*/
public
class InvokeMethod implements RmiMessages {
class InvokeMethod implements RmiMessage {
public int objectID; // the registered kryo ID for the object
public CachedMethod cachedMethod;
public Object[] args;
@ -49,7 +50,6 @@ class InvokeMethod implements RmiMessages {
// possible duplicate IDs. A response data of 0 means to not respond.
public byte responseData;
public
InvokeMethod() {
}
}

View File

@ -38,8 +38,9 @@ package dorkbox.network.rmi;
* Internal message to return the result of a remotely invoked method.
*/
public
class InvokeMethodResult implements RmiMessages {
class InvokeMethodResult implements RmiMessage {
public int objectID;
public byte responseID;
public Object result;
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
abstract
class RemoteInvocationResponse<C extends Connection> implements Listener.OnDisconnected<C>,
Listener.OnMessageReceived<C, InvokeMethodResult> {
}

View File

@ -35,7 +35,6 @@
package dorkbox.network.rmi;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
@ -188,7 +187,7 @@ class RmiBridge {
*/
@SuppressWarnings("rawtypes")
public
Listener getListener() {
Listener.OnMessageReceived<ConnectionImpl, InvokeMethod> getListener() {
return this.invokeListener;
}
@ -412,49 +411,4 @@ class RmiBridge {
return id;
}
/**
* Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)}
* <p>
* <p>
* Returns a proxy object that implements the specified interfaces. Methods invoked on the proxy object will be invoked remotely on the
* object with the specified ID in the ObjectSpace for the specified connection. If the remote end of the connection has not {@link
* RmiBridge#register(int, Object)} added the connection to the ObjectSpace, the remote method invocations will be ignored.
* <p/>
* Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link
* RemoteObject#setResponseTimeout(int) response timeout}.
* <p/>
* If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be
* called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be
* called on the update thread.
* <p/>
* If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will
* have the proxy object replaced with the registered object.
*
* @see RemoteObject
*
* @param connection this is really the network client -- there is ONLY ever 1 connection
* @param objectID this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param iFace this is the RMI interface
*/
public
RemoteObject createProxyObject(Connection connection, int objectID, Class<?> iFace) {
if (connection == null) {
throw new IllegalArgumentException("connection cannot be null.");
}
if (iFace == null) {
throw new IllegalArgumentException("iface cannot be null.");
}
if (!iFace.isInterface()) {
throw new IllegalArgumentException("iface must be an interface.");
}
Class<?>[] temp = new Class<?>[2];
temp[0] = RemoteObject.class;
temp[1] = iFace;
return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(),
temp,
new RmiProxyHandler(connection, objectID, iFace));
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
/**
*
*/
public
interface RmiMessage {}

View File

@ -1,20 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
// used by RMI. This is to keep track and not throw errors when connections are notified of it's arrival
public
interface RmiMessages {}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.Listener;
public
class RmiObjectHandler {
public
RmiObjectHandler() {
}
public
void invoke(final ConnectionImpl connection, final InvokeMethod message, final Listener.OnMessageReceived<ConnectionImpl, InvokeMethod> rmiInvokeListener) {
}
public
void registration(final ConnectionImpl connection, final RmiRegistration message) {
}
public
Object normalMessages(final ConnectionImpl connection, final Object message) {
return message;
}
}

View File

@ -13,28 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection;
package dorkbox.network.rmi;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.rmi.CachedMethod;
import dorkbox.network.rmi.InvokeMethod;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.rmi.RmiBridge;
import dorkbox.network.rmi.RmiMessages;
import dorkbox.network.rmi.RmiProxyHandler;
import dorkbox.network.rmi.RmiRegistration;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.connection.KryoExtra;
import dorkbox.network.connection.Listener;
import dorkbox.network.serialization.CryptoSerializationManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
/**
* This is for a local-connection (same-JVM) RMI method invocation
@ -44,22 +38,22 @@ import io.netty.handler.codec.MessageToMessageDecoder;
* This is for a LOCAL connection (same-JVM)
*/
public
class RegisterRmiLocalHandler extends MessageToMessageDecoder<Object> {
class RmiObjectLocalHandler extends RmiObjectHandler {
private static final boolean ENABLE_PROXY_OBJECTS = ConnectionImpl.ENABLE_PROXY_OBJECTS;
private static final Field[] NO_REMOTE_FIELDS = new Field[0];
// private static final AtomicReferenceFieldUpdater<RegisterRmiLocalHandler, IdentityMap> rmiFieldsREF = AtomicReferenceFieldUpdater.newUpdater(
// RegisterRmiLocalHandler.class,
// private static final AtomicReferenceFieldUpdater<RmiObjectLocalHandler, IdentityMap> rmiFieldsREF = AtomicReferenceFieldUpdater.newUpdater(
// RmiObjectLocalHandler.class,
// IdentityMap.class,
// "fieldCache");
private static final AtomicReferenceFieldUpdater<RegisterRmiLocalHandler, IdentityMap> implToProxyREF = AtomicReferenceFieldUpdater.newUpdater(
RegisterRmiLocalHandler.class,
private static final AtomicReferenceFieldUpdater<RmiObjectLocalHandler, IdentityMap> implToProxyREF = AtomicReferenceFieldUpdater.newUpdater(
RmiObjectLocalHandler.class,
IdentityMap.class,
"implToProxy");
private static final AtomicReferenceFieldUpdater<RegisterRmiLocalHandler, IdentityMap> remoteObjectREF = AtomicReferenceFieldUpdater.newUpdater(
RegisterRmiLocalHandler.class,
private static final AtomicReferenceFieldUpdater<RmiObjectLocalHandler, IdentityMap> remoteObjectREF = AtomicReferenceFieldUpdater.newUpdater(
RmiObjectLocalHandler.class,
IdentityMap.class,
"objectHasRemoteObjects");
@ -69,215 +63,74 @@ class RegisterRmiLocalHandler extends MessageToMessageDecoder<Object> {
public
RegisterRmiLocalHandler() {
RmiObjectLocalHandler() {
}
@Override
protected
void decode(final ChannelHandlerContext context, final Object msg, final List<Object> out) throws Exception {
ConnectionImpl connection = (ConnectionImpl) context.pipeline()
.last();
public
void invoke(final ConnectionImpl connection, final InvokeMethod invokeMethod, final Listener.OnMessageReceived<ConnectionImpl, InvokeMethod> rmiInvokeListener) {
int methodClassID = invokeMethod.cachedMethod.methodClassID;
int methodIndex = invokeMethod.cachedMethod.methodIndex;
// have to replace the cached methods with the correct (remote) version, otherwise the wrong methods CAN BE invoked.
if (msg instanceof RmiRegistration) {
receivedRegistration(connection, (RmiRegistration) msg);
CryptoSerializationManager serialization = connection.getEndPoint()
.getSerialization();
CachedMethod cachedMethod;
try {
cachedMethod = serialization.getMethods(methodClassID)[methodIndex];
} catch (Exception ex) {
String errorMessage;
KryoExtra kryo = null;
try {
kryo = serialization.takeKryo();
Class<?> methodClass = kryo.getRegistration(methodClassID)
.getType();
errorMessage = "Invalid method index " + methodIndex + " for class: " + methodClass.getName();
} finally {
serialization.returnKryo(kryo);
}
throw new KryoException(errorMessage);
}
Object[] args;
Serializer<?>[] serializers = cachedMethod.serializers;
int argStartIndex;
if (cachedMethod.overriddenMethod) {
// did we override our cached method? This is not common.
// this is specifically when we override an interface method, with an implementation method + Connection parameter (@ index 0)
argStartIndex = 1;
args = new Object[serializers.length + 1];
args[0] = connection;
}
else {
if (msg instanceof InvokeMethod) {
InvokeMethod invokeMethod = (InvokeMethod) msg;
int methodClassID = invokeMethod.cachedMethod.methodClassID;
int methodIndex = invokeMethod.cachedMethod.methodIndex;
// have to replace the cached methods with the correct (remote) version, otherwise the wrong methods CAN BE invoked.
CryptoSerializationManager serialization = connection.getEndPoint()
.getSerialization();
CachedMethod cachedMethod;
try {
cachedMethod = serialization.getMethods(methodClassID)[methodIndex];
} catch (Exception ex) {
String errorMessage;
KryoExtra kryo = null;
try {
kryo = serialization.takeKryo();
Class<?> methodClass = kryo.getRegistration(methodClassID)
.getType();
errorMessage = "Invalid method index " + methodIndex + " for class: " + methodClass.getName();
} finally {
serialization.returnKryo(kryo);
}
throw new KryoException(errorMessage);
}
Object[] args;
Serializer<?>[] serializers = cachedMethod.serializers;
int argStartIndex;
if (cachedMethod.overriddenMethod) {
// did we override our cached method? This is not common.
// this is specifically when we override an interface method, with an implementation method + Connection parameter (@ index 0)
argStartIndex = 1;
args = new Object[serializers.length + 1];
args[0] = connection;
}
else {
argStartIndex = 0;
args = new Object[serializers.length];
}
for (int i = 0, n = serializers.length, j = argStartIndex; i < n; i++, j++) {
args[j] = invokeMethod.args[i];
}
// overwrite the invoke method fields with UPDATED versions that have the correct (remote side) implementation/args
invokeMethod.cachedMethod = cachedMethod;
invokeMethod.args = args;
}
receivedNormal(connection, msg, out);
argStartIndex = 0;
args = new Object[serializers.length];
}
for (int i = 0, n = serializers.length, j = argStartIndex; i < n; i++, j++) {
args[j] = invokeMethod.args[i];
}
// overwrite the invoke method fields with UPDATED versions that have the correct (remote side) implementation/args
invokeMethod.cachedMethod = cachedMethod;
invokeMethod.args = args;
// default action, now that we have swapped out things
rmiInvokeListener.received(connection, invokeMethod);
}
private
void receivedNormal(final ConnectionImpl connection, final Object msg, final List<Object> out) {
// else, this was "just a local message"
if (msg instanceof RmiMessages) {
// don't even process these message types
out.add(msg);
return;
}
// because we NORMALLY pass around just the object (there is no serialization going on...) we have to explicitly check to see
// if this object, or any of it's fields MIGHT HAVE BEEN an RMI Proxy (or should be on), and switcheroo it here.
// NORMALLY this is automatic since the kryo IDs on each side point to the "correct object" for serialization, but here we don't do that.
// maybe this object is supposed to switch to a proxy object?? (note: we cannot send proxy objects over local/network connections)
@SuppressWarnings("unchecked")
IdentityMap<Object, Object> implToProxy = implToProxyREF.get(this);
IdentityMap<Object, Field[]> objectHasRemoteObjects = remoteObjectREF.get(this);
Object proxy = implToProxy.get(msg);
if (proxy != null) {
// we have a proxy object. nothing left to do.
out.add(proxy);
return;
}
Class<?> messageClass = msg.getClass();
// are there any fields of this message class that COULD contain remote object fields? (NOTE: not RMI fields yet...)
final Field[] remoteObjectFields = objectHasRemoteObjects.get(messageClass);
if (remoteObjectFields == null) {
// maybe one of it's fields is a proxy object?
// we cache the fields that have to be replaced, so subsequent invocations are significantly more preformat
final ArrayList<Field> fields = new ArrayList<Field>();
// we have to walk the hierarchy of this object to check ALL fields, public and private, using getDeclaredFields()
while (messageClass != Object.class) {
// this will get ALL fields that are
for (Field field : messageClass.getDeclaredFields()) {
final Class<?> type = field.getType();
if (type.isInterface()) {
boolean prev = field.isAccessible();
final Object o;
try {
field.setAccessible(true);
o = field.get(msg);
if (o instanceof RemoteObject) {
RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o);
int id = handler.objectID;
field.set(msg, connection.getImplementationObject(id));
fields.add(field);
}
else {
// is a field supposed to be a proxy?
proxy = implToProxy.get(o);
if (proxy != null) {
field.set(msg, proxy);
fields.add(field);
}
}
} catch (IllegalAccessException e) {
e.printStackTrace();
// logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e);
} finally {
field.setAccessible(prev);
}
}
}
messageClass = messageClass.getSuperclass();
}
Field[] array;
if (fields.isEmpty()) {
// no need to ever process this class again.
array = NO_REMOTE_FIELDS;
}
else {
array = fields.toArray(new Field[fields.size()]);
}
//noinspection SynchronizeOnNonFinalField
synchronized (objectHasRemoteObjects) {
// i know what I'm doing. This must be synchronized.
objectHasRemoteObjects.put(messageClass, array);
}
}
else if (remoteObjectFields != NO_REMOTE_FIELDS) {
// quickly replace objects as necessary
for (Field field : remoteObjectFields) {
boolean prev = field.isAccessible();
final Object o;
try {
field.setAccessible(true);
o = field.get(msg);
if (o instanceof RemoteObject) {
RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o);
int id = handler.objectID;
field.set(msg, connection.getImplementationObject(id));
}
else {
// is a field supposed to be a proxy?
proxy = implToProxy.get(o);
if (proxy != null) {
field.set(msg, proxy);
}
}
} catch (IllegalAccessException e) {
e.printStackTrace();
// logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e);
} finally {
field.setAccessible(prev);
}
}
}
out.add(msg);
}
private
void receivedRegistration(final ConnectionImpl connection, final RmiRegistration registration) {
@Override
public
void registration(final ConnectionImpl connection, final RmiRegistration registration) {
// manage creating/getting/notifying this RMI object
// these fields are ALWAYS present!
@ -292,7 +145,7 @@ class RegisterRmiLocalHandler extends MessageToMessageDecoder<Object> {
// have to convert the iFace -> Impl
EndPointBase<Connection> endPoint = connection.getEndPoint();
EndPointBase endPoint = connection.getEndPoint();
CryptoSerializationManager serialization = endPoint.getSerialization();
Class<?> rmiImpl = serialization.getRmiImpl(registration.interfaceClass);
@ -344,6 +197,135 @@ class RegisterRmiLocalHandler extends MessageToMessageDecoder<Object> {
}
}
@Override
public
Object normalMessages(final ConnectionImpl connection, final Object message) {
// else, this was "just a local message"
// because we NORMALLY pass around just the object (there is no serialization going on...) we have to explicitly check to see
// if this object, or any of it's fields MIGHT HAVE BEEN an RMI Proxy (or should be on), and switcheroo it here.
// NORMALLY this is automatic since the kryo IDs on each side point to the "correct object" for serialization, but here we don't do that.
// maybe this object is supposed to switch to a proxy object?? (note: we cannot send proxy objects over local/network connections)
@SuppressWarnings("unchecked")
IdentityMap<Object, Object> implToProxy = implToProxyREF.get(this);
IdentityMap<Object, Field[]> objectHasRemoteObjects = remoteObjectREF.get(this);
Object proxy = implToProxy.get(message);
if (proxy != null) {
// we have a proxy object. nothing left to do.
return proxy;
}
// otherwise we MIGHT have to modify the fields in the object...
Class<?> messageClass = message.getClass();
// are there any fields of this message class that COULD contain remote object fields? (NOTE: not RMI fields yet...)
final Field[] remoteObjectFields = objectHasRemoteObjects.get(messageClass);
if (remoteObjectFields == null) {
// maybe one of it's fields is a proxy object?
// we cache the fields that have to be replaced, so subsequent invocations are significantly more preformat
final ArrayList<Field> fields = new ArrayList<Field>();
// we have to walk the hierarchy of this object to check ALL fields, public and private, using getDeclaredFields()
while (messageClass != Object.class) {
// this will get ALL fields that are
for (Field field : messageClass.getDeclaredFields()) {
final Class<?> type = field.getType();
if (type.isInterface()) {
boolean prev = field.isAccessible();
final Object o;
try {
field.setAccessible(true);
o = field.get(message);
if (o instanceof RemoteObject) {
RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o);
int id = handler.objectID;
field.set(message, connection.getImplementationObject(id));
fields.add(field);
}
else {
// is a field supposed to be a proxy?
proxy = implToProxy.get(o);
if (proxy != null) {
field.set(message, proxy);
fields.add(field);
}
}
} catch (IllegalAccessException e) {
e.printStackTrace();
// logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e);
} finally {
field.setAccessible(prev);
}
}
}
messageClass = messageClass.getSuperclass();
}
Field[] array;
if (fields.isEmpty()) {
// no need to ever process this class again.
array = NO_REMOTE_FIELDS;
}
else {
array = fields.toArray(new Field[fields.size()]);
}
//noinspection SynchronizeOnNonFinalField
synchronized (this.objectHasRemoteObjects) {
// i know what I'm doing. This must be synchronized.
this.objectHasRemoteObjects.put(messageClass, array);
}
}
else if (remoteObjectFields != NO_REMOTE_FIELDS) {
// quickly replace objects as necessary
for (Field field : remoteObjectFields) {
boolean prev = field.isAccessible();
final Object o;
try {
field.setAccessible(true);
o = field.get(message);
if (o instanceof RemoteObject) {
RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o);
int id = handler.objectID;
field.set(message, connection.getImplementationObject(id));
}
else {
// is a field supposed to be a proxy?
proxy = implToProxy.get(o);
if (proxy != null) {
field.set(message, proxy);
}
}
} catch (IllegalAccessException e) {
e.printStackTrace();
// logger.error("Error checking RMI fields for: {}.{}", remoteClassObject.getKey(), field.getName(), e);
} finally {
field.setAccessible(prev);
}
}
}
return message;
}
// private
// LocalRmiClassEncoder replaceFieldObjects(final ConnectionImpl connection, final Object object, final Class<?> implClass) {
// Field[] rmiFields = fieldCache.get(implClass);

View File

@ -1,31 +1,40 @@
/*
* Copyright 2010 dorkbox, llc
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection;
package dorkbox.network.rmi;
import dorkbox.network.rmi.RmiBridge;
import dorkbox.network.rmi.RmiRegistration;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.Listener;
class RegisterRmiNetworkHandler implements Listener.OnMessageReceived<ConnectionImpl, RmiRegistration> {
public
class RmiObjectNetworkHandler extends RmiObjectHandler {
RegisterRmiNetworkHandler() {
public
RmiObjectNetworkHandler() {
}
@Override
public
void received(final ConnectionImpl connection, final RmiRegistration registration) {
void invoke(final ConnectionImpl connection, final InvokeMethod message, final Listener.OnMessageReceived<ConnectionImpl, InvokeMethod> rmiInvokeListener) {
// default, nothing fancy
rmiInvokeListener.received(connection, message);
}
@Override
public
void registration(final ConnectionImpl connection, final RmiRegistration registration) {
// manage creating/getting/notifying this RMI object
// these fields are ALWAYS present!
@ -42,7 +51,8 @@ class RegisterRmiNetworkHandler implements Listener.OnMessageReceived<Connection
// For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically.
RmiRegistration registrationResult = connection.createNewRmiObject(interfaceClass, interfaceClass, callbackId);
connection.TCP(registrationResult).flush();
connection.TCP(registrationResult)
.flush();
}
// Check if we are getting an already existing REMOTE object. This check is always AFTER the check to create a new object
@ -51,7 +61,8 @@ class RegisterRmiNetworkHandler implements Listener.OnMessageReceived<Connection
//
// GET a LOCAL rmi object, if none get a specific, GLOBAL rmi object (objects that are not bound to a single connection).
RmiRegistration registrationResult = connection.getExistingRmiObject(interfaceClass, registration.rmiId, callbackId);
connection.TCP(registrationResult).flush();
connection.TCP(registrationResult)
.flush();
}
}
else {

View File

@ -47,8 +47,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.connection.KryoExtra;
import dorkbox.network.connection.Listener;
import dorkbox.network.serialization.RmiSerializationManager;
/**
@ -66,13 +68,14 @@ class RmiProxyHandler implements InvocationHandler {
private final InvokeMethodResult[] responseTable = new InvokeMethodResult[64];
private final boolean[] pendingResponses = new boolean[64];
private final Connection connection;
private final ConnectionImpl connection;
public final int objectID; // this is the RMI id
public final int ID; // this is the KRYO id
private final String proxyString;
private final RemoteInvocationResponse<Connection> responseListener;
private final
Listener.OnMessageReceived<Connection, InvokeMethodResult> responseListener;
private int timeoutMillis = 3000;
private boolean isAsync = false;
@ -92,7 +95,8 @@ class RmiProxyHandler implements InvocationHandler {
* @param objectID this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param iFace this is the RMI interface
*/
RmiProxyHandler(final Connection connection, final int objectID, final Class<?> iFace) {
public
RmiProxyHandler(final ConnectionImpl connection, final int objectID, final Class<?> iFace) {
super();
this.connection = connection;
@ -115,14 +119,7 @@ class RmiProxyHandler implements InvocationHandler {
this.logger = LoggerFactory.getLogger(connection.getEndPoint().getName() + ":" + this.getClass().getSimpleName());
this.responseListener = new RemoteInvocationResponse<Connection>() {
@Override
public
void disconnected(Connection connection) {
close();
}
this.responseListener = new Listener.OnMessageReceived<Connection, InvokeMethodResult>() {
@Override
public
void received(Connection connection, InvokeMethodResult invokeMethodResult) {
@ -146,12 +143,12 @@ class RmiProxyHandler implements InvocationHandler {
}
}
};
connection.listeners()
.add(this.responseListener);
}
public
Listener.OnMessageReceived<Connection, InvokeMethodResult> getListener() {
return responseListener;
}
@SuppressWarnings({"AutoUnboxing", "AutoBoxing", "NumericCastThatLosesPrecision", "IfCanBeSwitch"})
@Override
@ -163,7 +160,7 @@ class RmiProxyHandler implements InvocationHandler {
String name = method.getName();
if (name.equals("close")) {
close();
connection.removeRmiListeners(objectID, getListener());
return null;
}
else if (name.equals("setResponseTimeout")) {
@ -403,12 +400,6 @@ class RmiProxyHandler implements InvocationHandler {
throw new TimeoutException("Response timed out.");
}
private
void close() {
this.connection.listeners()
.remove(this.responseListener);
}
@Override
public
int hashCode() {

View File

@ -19,7 +19,7 @@ package dorkbox.network.rmi;
* Message specifically to register a class implementation for RMI
*/
public
class RmiRegistration {
class RmiRegistration implements RmiMessage {
public boolean isRequest;
/**

View File

@ -308,7 +308,9 @@ class Serialization implements CryptoSerializationManager, RmiSerializationManag
final boolean registrationRequired,
final boolean implementationRequired,
final SerializerFactory factory) {
this.forbidInterfaceRegistration = implementationRequired;
this.kryoPool = ObjectPool.NonBlockingSoftReference(new PoolableObject<KryoExtra>() {
@Override
public

View File

@ -29,7 +29,6 @@ import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listeners;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
public
@ -39,7 +38,7 @@ class ReuseTest extends BaseTest {
@Test
public
void socketReuse() throws InitializationException, SecurityException, IOException, InterruptedException {
void socketReuse() throws SecurityException, IOException {
this.serverCount = new AtomicInteger(0);
this.clientCount = new AtomicInteger(0);
@ -104,7 +103,7 @@ class ReuseTest extends BaseTest {
System.err.println("Waiting...");
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
} catch (InterruptedException ignored) {
}
}
@ -119,7 +118,7 @@ class ReuseTest extends BaseTest {
@Test
public
void localReuse() throws InitializationException, SecurityException, IOException, InterruptedException {
void localReuse() throws SecurityException, IOException {
this.serverCount = new AtomicInteger(0);
this.clientCount = new AtomicInteger(0);

View File

@ -1,13 +1,10 @@
package dorkbox.network.rmi.multiJVM;
import java.io.IOException;
import dorkbox.network.Server;
import dorkbox.network.rmi.RmiTest;
import dorkbox.network.rmi.TestCow;
import dorkbox.network.rmi.TestCowImpl;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
/**
@ -31,12 +28,8 @@ class TestServer
Server server = null;
try {
server = new Server(configuration);
} catch (InitializationException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
// server.setIdleTimeout(0);