RMI method override implemented. lots of misc fixes

This commit is contained in:
nathan 2015-07-28 02:23:01 +02:00
parent c92974af61
commit 3fd056f87d
60 changed files with 1214 additions and 868 deletions

View File

@ -31,7 +31,6 @@ import dorkbox.network.util.udt.UdtEndpointProxy;
import dorkbox.util.NamedThreadFactory;
import dorkbox.util.OS;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.exceptions.SecurityException;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@ -60,7 +59,7 @@ import java.net.InetSocketAddress;
*/
@SuppressWarnings("unused")
public
class Client extends EndPointClient implements Connection {
class Client<C extends Connection> extends EndPointClient<C> implements Connection {
/**
* Starts a LOCAL <b>only</b> client, with the default local channel name and serialization scheme
@ -134,6 +133,10 @@ class Client extends EndPointClient implements Connection {
throw new IllegalArgumentException("You must define what host you want to connect to.");
}
if (options.tcpPort < 0 && options.udpPort < 0 && options.udtPort < 0) {
throw new IllegalArgumentException("You must define what port you want to connect to.");
}
if (options.tcpPort > 0) {
Bootstrap tcpBootstrap = new Bootstrap();
this.bootstraps.add(new BootstrapWrapper("TCP", options.tcpPort, tcpBootstrap));
@ -251,17 +254,21 @@ class Client extends EndPointClient implements Connection {
/**
* Allows the client to reconnect to the last connected server
*
* @throws InterruptedException if the client is unable to reconnect in the previously requested connection-timeout
*/
public
void reconnect() {
void reconnect() throws IOException {
reconnect(this.connectionTimeout);
}
/**
* Allows the client to reconnect to the last connected server
*
* @throws InterruptedException if the client is unable to reconnect in the requested time
*/
public
void reconnect(int connectionTimeout) {
void reconnect(int connectionTimeout) throws IOException {
// close out all old connections
close();
@ -271,9 +278,11 @@ class Client extends EndPointClient implements Connection {
/**
* will attempt to connect to the server, with a 30 second timeout.
*
* @throws IOException if the client is unable to connect in 30 seconds
*/
public
void connect() {
void connect() throws IOException {
connect(30000);
}
@ -283,9 +292,10 @@ class Client extends EndPointClient implements Connection {
* will BLOCK until completed
*
* @param connectionTimeout wait for x milliseconds. 0 will wait indefinitely
* @throws IOException if the client is unable to connect in the requested time
*/
public
void connect(int connectionTimeout) {
void connect(int connectionTimeout) throws IOException {
this.connectionTimeout = connectionTimeout;
// make sure we are not trying to connect during a close or stop event.
@ -303,22 +313,24 @@ class Client extends EndPointClient implements Connection {
try {
this.registrationLock.wait(connectionTimeout);
} catch (InterruptedException e) {
this.logger.error("Registration thread interrupted!");
throw new IOException("Unable to complete registration within '" + connectionTimeout + "' milliseconds", e);
}
}
connection = this.connectionManager.getConnection0();
}
@Override
public
boolean hasRemoteKeyChanged() {
return this.connectionManager.getConnection0().hasRemoteKeyChanged();
return this.connection.hasRemoteKeyChanged();
}
@Override
public
String getRemoteHost() {
return this.connectionManager.getConnection0().getRemoteHost();
return this.connection.getRemoteHost();
}
@Override
@ -330,25 +342,25 @@ class Client extends EndPointClient implements Connection {
@Override
public
int id() {
return this.connectionManager.getConnection0().id();
return this.connection.id();
}
@Override
public
String idAsHex() {
return this.connectionManager.getConnection0().idAsHex();
return this.connection.idAsHex();
}
@Override
public
boolean hasUDP() {
return this.connectionManager.getConnection0().hasUDP();
return this.connection.hasUDP();
}
@Override
public
boolean hasUDT() {
return this.connectionManager.getConnection0().hasUDT();
return this.connection.hasUDT();
}
/**
@ -357,8 +369,7 @@ class Client extends EndPointClient implements Connection {
@Override
public
IdleBridge sendOnIdle(IdleSender<?, ?> sender) {
return this.connectionManager.getConnection0()
.sendOnIdle(sender);
return this.connection.sendOnIdle(sender);
}
/**
@ -367,33 +378,13 @@ class Client extends EndPointClient implements Connection {
@Override
public
IdleBridge sendOnIdle(Object message) {
return this.connectionManager.getConnection0()
.sendOnIdle(message);
return this.connection.sendOnIdle(message);
}
/**
* Fetches the connection used by the client.
* <p/>
* Make <b>sure</b> that you only call this <b>after</b> the client connects!
* <p/>
* This is preferred to {@link EndPoint#getConnections()} getConnections()}, as it properly does some error checking
*/
public
Connection getConnection() {
return this.connectionManager.getConnection0();
}
/**
* Closes all connections ONLY (keeps the server/client running).
* <p/>
* This is used, for example, when reconnecting to a server.
*/
@Override
public
void close() {
synchronized (this.registrationLock) {
this.registrationLock.notify();
}
void closeAsap() {
//TODO
}
/**
@ -421,11 +412,11 @@ class Client extends EndPointClient implements Connection {
*/
@Override
public
<Iface, Impl extends Iface> Iface createRemoteObject(final Class<Impl> remoteImplementationClass) throws NetException {
return this.connectionManager.getConnection0().createRemoteObject(remoteImplementationClass);
<Iface, Impl extends Iface> Iface createRemoteObject(final Class<Impl> remoteImplementationClass) throws IOException {
return this.connectionManager.getConnection0()
.createRemoteObject(remoteImplementationClass);
}
/**
* Returns a new proxy object implements the specified interface. Methods invoked on the proxy object will be
* invoked remotely on the object with the specified ID in the ObjectSpace for the current connection.
@ -451,7 +442,33 @@ class Client extends EndPointClient implements Connection {
*/
@Override
public
<Iface, Impl extends Iface> Iface getRemoteObject(final int objectId) throws NetException {
return this.connectionManager.getConnection0().getRemoteObject(objectId);
<Iface, Impl extends Iface> Iface getRemoteObject(final int objectId) throws IOException {
return this.connectionManager.getConnection0()
.getRemoteObject(objectId);
}
/**
* Fetches the connection used by the client.
* <p/>
* Make <b>sure</b> that you only call this <b>after</b> the client connects!
* <p/>
* This is preferred to {@link EndPoint#getConnections()} getConnections()}, as it properly does some error checking
*/
public
C getConnection() {
return this.connection;
}
/**
* Closes all connections ONLY (keeps the server/client running).
* <p/>
* This is used, for example, when reconnecting to a server.
*/
@Override
public
void close() {
synchronized (this.registrationLock) {
this.registrationLock.notify();
}
}
}

View File

@ -15,6 +15,7 @@
*/
package dorkbox.network;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPointServer;
import dorkbox.network.connection.registration.local.RegistrationLocalHandlerServer;
import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerTCP;
@ -56,7 +57,7 @@ import java.io.IOException;
* To put it bluntly, ONLY have the server do work inside of a listener!
*/
public
class Server extends EndPointServer {
class Server<C extends Connection> extends EndPointServer<C> {
/**
* The maximum queue length for incoming connection indications (a request to connect). If a connection indication arrives when
@ -323,6 +324,7 @@ class Server extends EndPointServer {
* you want to continue running code after this method invocation, bind should be called in a separate,
* non-daemon thread - or with false as the parameter.
*/
@SuppressWarnings("AutoBoxing")
public
void bind(boolean blockUntilTerminate) {
// make sure we are not trying to connect during a close or stop event.

View File

@ -18,12 +18,12 @@ package dorkbox.network.connection;
import dorkbox.network.connection.wrapper.ChannelWrapper;
public
class Bridge {
class Bridge <C extends Connection> {
final ChannelWrapper channelWrapper;
final ISessionManager sessionManager;
final ISessionManager<C> sessionManager;
Bridge(ChannelWrapper channelWrapper, ISessionManager sessionManager) {
Bridge(ChannelWrapper channelWrapper, ISessionManager<C> sessionManager) {
this.channelWrapper = channelWrapper;
this.sessionManager = sessionManager;
}

View File

@ -20,7 +20,8 @@ import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.connection.idle.IdleSender;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.rmi.TimeoutException;
import dorkbox.util.exceptions.NetException;
import java.io.IOException;
@SuppressWarnings("unused")
public
@ -40,7 +41,7 @@ interface Connection {
/**
* @return the endpoint associated with this connection
*/
EndPoint getEndPoint();
EndPoint<Connection> getEndPoint();
/**
@ -89,6 +90,12 @@ interface Connection {
*/
void close();
/**
* Marks the connection to be closed as soon as possible. This is evaluated when the current
* thread execution returns to the network stack.
*/
void closeAsap();
/**
* Returns a new proxy object implements the specified interface. Methods invoked on the proxy object will be
* invoked remotely on the object with the specified ID in the ObjectSpace for the current connection.
@ -112,7 +119,7 @@ interface Connection {
*
* @see RemoteObject
*/
<Iface, Impl extends Iface> Iface createRemoteObject(final Class<Impl> remoteImplementationClass) throws NetException;
<Iface, Impl extends Iface> Iface createRemoteObject(final Class<Impl> remoteImplementationClass) throws IOException;
/**
@ -138,5 +145,7 @@ interface Connection {
*
* @see RemoteObject
*/
<Iface, Impl extends Iface> Iface getRemoteObject(final int objectId) throws NetException;
<Iface, Impl extends Iface> Iface getRemoteObject(final int objectId) throws IOException;
}

View File

@ -25,11 +25,10 @@ import dorkbox.network.connection.ping.PingTuple;
import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelNull;
import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.rmi.RMI;
import dorkbox.network.rmi.RemoteObject;
import dorkbox.network.rmi.RemoteProxy;
import dorkbox.network.rmi.RmiBridge;
import dorkbox.network.rmi.RmiRegistration;
import dorkbox.util.exceptions.NetException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@ -72,19 +71,22 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
private final Object messageInProgressLock = new Object();
private final AtomicBoolean messageInProgress = new AtomicBoolean(false);
private ISessionManager sessionManager;
private ISessionManager<ConnectionImpl> sessionManager;
private ChannelWrapper channelWrapper;
private volatile PingFuture pingFuture = null;
// used to store connection local listeners (instead of global listeners). Only possible on the server.
private volatile ConnectionManager localListenerManager;
private volatile ConnectionManager<ConnectionImpl> localListenerManager;
// while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error.
private boolean remoteKeyChanged;
private final EndPoint<ConnectionImpl> endPoint;
private final EndPoint endPoint;
// when true, the connection will be closed (either as RMI or as 'normal' listener execution) when the thread execution returns control
// back to the network stack
private boolean closeAsap = false;
private volatile ObjectRegistrationLatch objectRegistrationLatch;
private final Object remoteObjectLock = new Object();
@ -442,6 +444,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
this.messageInProgressLock.notifyAll();
}
}
// in some cases, we want to close the current connection -- and given the way the system is designed, we cannot always close it before
// we return. This will let us close the connection when our business logic is finished.
if (closeAsap) {
close();
}
}
@Override
@ -551,6 +559,16 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
}
}
/**
* Marks the connection to be closed as soon as possible. This is evaluated when the current
* thread execution returns to the network stack.
*/
@Override
public final
void closeAsap() {
closeAsap = true;
}
@Override
public
void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
@ -611,7 +629,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
// is empty, we can remove it from this connection.
synchronized (this) {
if (this.localListenerManager == null) {
this.localListenerManager = ((EndPointServer) this.endPoint).addListenerManager(this);
this.localListenerManager = ((EndPointServer<ConnectionImpl>) this.endPoint).addListenerManager(this);
}
this.localListenerManager.add(listener);
}
@ -654,7 +672,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
this.localListenerManager.remove(listener);
if (!this.localListenerManager.hasListeners()) {
((EndPointServer) this.endPoint).removeListenerManager(this);
((EndPointServer<ConnectionImpl>) this.endPoint).removeListenerManager(this);
}
}
}
@ -687,7 +705,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
this.localListenerManager.removeAll();
this.localListenerManager = null;
((EndPointServer) this.endPoint).removeListenerManager(this);
((EndPointServer<ConnectionImpl>) this.endPoint).removeListenerManager(this);
}
}
}
@ -721,7 +739,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
if (!this.localListenerManager.hasListeners()) {
this.localListenerManager = null;
((EndPointServer) this.endPoint).removeListenerManager(this);
((EndPointServer<ConnectionImpl>) this.endPoint).removeListenerManager(this);
}
}
}
@ -780,7 +798,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
@SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"})
@Override
public final
<Iface, Impl extends Iface> Iface createRemoteObject(final Class<Impl> remoteImplementationClass) throws NetException {
<Iface, Impl extends Iface> Iface createRemoteObject(final Class<Impl> remoteImplementationClass) throws IOException {
// only one register can happen at a time
synchronized (remoteObjectLock) {
objectRegistrationLatch = new ObjectRegistrationLatch();
@ -793,12 +811,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
if (!objectRegistrationLatch.latch.await(2, TimeUnit.SECONDS)) {
final String errorMessage = "Timed out getting registration ID for: " + remoteImplementationClass;
logger.error(errorMessage);
throw new NetException(errorMessage);
throw new IOException(errorMessage);
}
} catch (InterruptedException e) {
final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass;
logger.error(errorMessage, e);
throw new NetException(errorMessage, e);
throw new IOException(errorMessage, e);
}
// local var to prevent double hit on volatile field
@ -806,7 +824,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
if (latch.hasError) {
final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass;
logger.error(errorMessage);
throw new NetException(errorMessage);
throw new IOException(errorMessage);
}
return (Iface) latch.remoteObject;
@ -816,7 +834,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
@SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"})
@Override
public final
<Iface, Impl extends Iface> Iface getRemoteObject(final int objectId) throws NetException {
<Iface, Impl extends Iface> Iface getRemoteObject(final int objectId) throws IOException {
// only one register can happen at a time
synchronized (remoteObjectLock) {
objectRegistrationLatch = new ObjectRegistrationLatch();
@ -829,12 +847,12 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
if (!objectRegistrationLatch.latch.await(2, TimeUnit.SECONDS)) {
final String errorMessage = "Timed out getting registration for ID: " + objectId;
logger.error(errorMessage);
throw new NetException(errorMessage);
throw new IOException(errorMessage);
}
} catch (InterruptedException e) {
final String errorMessage = "Error getting registration for ID: " + objectId;
logger.error(errorMessage, e);
throw new NetException(errorMessage, e);
throw new IOException(errorMessage, e);
}
// local var to prevent double hit on volatile field
@ -842,7 +860,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
if (latch.hasError) {
final String errorMessage = "Error getting registration for ID: " + objectId;
logger.error(errorMessage);
throw new NetException(errorMessage);
throw new IOException(errorMessage);
}
return (Iface) latch.remoteObject;
@ -883,7 +901,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
if (annotations != null) {
for (Annotation annotation : annotations) {
if (annotation.annotationType().equals(RemoteProxy.class)) {
if (annotation.annotationType().equals(RMI.class)) {
boolean prev = field.isAccessible();
field.setAccessible(true);
final Object o = field.get(remoteClassObject.object);

View File

@ -17,10 +17,10 @@ package dorkbox.network.connection;
import dorkbox.network.rmi.RmiMessages;
import dorkbox.network.util.ConcurrentHashMapFactory;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.ClassHelper;
import org.slf4j.Logger;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.Map.Entry;
@ -29,14 +29,14 @@ import java.util.concurrent.CopyOnWriteArrayList;
//note that we specifically DO NOT implement equals/hashCode, because we cannot create two separate
// objects that are somehow equal to each other.
public
class ConnectionManager implements ListenerBridge, ISessionManager {
class ConnectionManager<C extends Connection> implements ListenerBridge, ISessionManager<C> {
public static Listener<?> unRegisteredType_Listener = null;
// these are final, because the REFERENCE to these will never change. They ARE NOT immutable objects (meaning their content can change)
private final ConcurrentHashMapFactory<Type, CopyOnWriteArrayList<ListenerRaw<Connection, Object>>> listeners;
private final ConcurrentHashMapFactory<Connection, ConnectionManager> localManagers;
private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
private final ConcurrentHashMapFactory<Connection, ConnectionManager<C>> localManagers;
private final CopyOnWriteArrayList<C> connections = new CopyOnWriteArrayList<C>();
/**
* Used by the listener subsystem to determine types.
@ -61,13 +61,13 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
}
};
this.localManagers = new ConcurrentHashMapFactory<Connection, ConnectionManager>() {
this.localManagers = new ConcurrentHashMapFactory<Connection, ConnectionManager<C>>() {
private static final long serialVersionUID = 1L;
@Override
public
ConnectionManager createNewObject(Object... args) {
return new ConnectionManager(loggerName + "-" + args[0] + " Specific", ConnectionManager.this.baseClass);
ConnectionManager<C> createNewObject(Object... args) {
return new ConnectionManager<C>(loggerName + "-" + args[0] + " Specific", ConnectionManager.this.baseClass);
}
};
}
@ -283,7 +283,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
// now have to account for additional connection listener managers (non-global).
ConnectionManager localManager = this.localManagers.get(connection);
ConnectionManager<C> localManager = this.localManagers.get(connection);
if (localManager != null) {
// if we found a listener during THIS method call, we need to let the NEXT method call know,
// so it doesn't spit out error for not handling a message (since that message MIGHT have
@ -328,7 +328,11 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
return;
}
listener.idle(connection);
try {
listener.idle(connection);
} catch (IOException e) {
logger.error("Unable to notify listener on idle.", e);
}
}
connection.send()
.flush();
@ -336,7 +340,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = this.localManagers.get(connection);
ConnectionManager<C> localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.notifyOnIdle(connection);
}
@ -349,7 +353,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public
void connectionConnected(Connection connection) {
void connectionConnected(C connection) {
// create a new connection!
this.connections.add(connection);
@ -371,7 +375,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = this.localManagers.get(connection);
ConnectionManager<C> localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.connectionConnected(connection);
}
@ -387,7 +391,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public
void connectionDisconnected(Connection connection) {
void connectionDisconnected(C connection) {
Set<Entry<Type, CopyOnWriteArrayList<ListenerRaw<Connection, Object>>>> entrySet = this.listeners.entrySet();
CopyOnWriteArrayList<ListenerRaw<Connection, Object>> list;
for (Entry<Type, CopyOnWriteArrayList<ListenerRaw<Connection, Object>>> entry : entrySet) {
@ -404,7 +408,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = this.localManagers.get(connection);
ConnectionManager<C> localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.connectionDisconnected(connection);
@ -442,7 +446,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
}
// now have to account for additional (local) listener managers.
ConnectionManager localManager = this.localManagers.get(connection);
ConnectionManager<C> localManager = this.localManagers.get(connection);
if (localManager != null) {
localManager.connectionError(connection, throwable);
}
@ -455,20 +459,20 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
*/
@Override
public
List<Connection> getConnections() {
List<C> getConnections() {
return Collections.unmodifiableList(this.connections);
}
final
ConnectionManager addListenerManager(Connection connection) {
ConnectionManager<C> addListenerManager(Connection connection) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
// are notified of that listener.
// it is POSSIBLE to add a connection-specfic listener (via connection.addListener), meaning that ONLY
// that listener is notified on that event (ie, admin type listeners)
ConnectionManager lm = this.localManagers.getOrCreate(connection, connection.toString());
ConnectionManager<C> lm = this.localManagers.getOrCreate(connection, connection.toString());
Logger logger2 = this.logger;
if (logger2.isDebugEnabled()) {
@ -490,7 +494,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
* @return Returns a FAST list of active connections.
*/
public final
Collection<Connection> getConnections0() {
Collection<C> getConnections0() {
return this.connections;
}
@ -500,14 +504,14 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
* @return Returns a FAST first connection (for client!).
*/
public final
Connection getConnection0() {
C getConnection0() throws IOException {
if (this.connections.iterator()
.hasNext()) {
return this.connections.iterator()
.next();
}
else {
throw new NetException("Not connected to a remote computer. Unable to continue!");
throw new IOException("Not connected to a remote computer. Unable to continue!");
}
}
@ -540,7 +544,7 @@ class ConnectionManager implements ListenerBridge, ISessionManager {
final
void closeConnections() {
// close the sessions
Iterator<Connection> iterator = this.connections.iterator();
Iterator<C> iterator = this.connections.iterator();
//noinspection WhileLoopReplaceableByForEach
while (iterator.hasNext()) {
Connection connection = iterator.next();

View File

@ -62,7 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* represents the base of a client/server end point
*/
public abstract
class EndPoint {
class EndPoint<C extends Connection> {
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
// it results in severe UDP packet loss and contention.
//
@ -122,7 +122,7 @@ class EndPoint {
protected final org.slf4j.Logger logger;
protected final Class<? extends EndPoint> type;
protected final ConnectionManager connectionManager;
protected final ConnectionManager<C> connectionManager;
protected final CryptoSerializationManager serializationManager;
protected final RegistrationWrapper registrationWrapper;
@ -137,9 +137,6 @@ class EndPoint {
private final Executor rmiExecutor;
private final boolean rmiEnabled;
// When using RMI, we want to keep track of what our current thread execution connection is. This is because we store state info in the
// connection object. This is the only way to retrieve this info "out-of-band"
private final ThreadLocal<Connection> currentConnection = new ThreadLocal<Connection>();
// the eventLoop groups are used to track and manage the event loops for startup/shutdown
private final List<EventLoopGroup> eventLoopGroups = new ArrayList<EventLoopGroup>(8);
@ -193,7 +190,6 @@ class EndPoint {
// setup our RMI serialization managers. Can only be called once
serializationManager.initRmiSerialization();
}
rmiExecutor = options.rmiExecutor;
@ -278,7 +274,7 @@ class EndPoint {
// we don't care about un-instantiated/constructed members, since the class type is the only interest.
this.connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass());
this.connectionManager = new ConnectionManager<C>(type.getSimpleName(), connection0(null).getClass());
// add the ping listener (internal use only!)
this.connectionManager.add(new PingSystemListener());
@ -291,6 +287,8 @@ class EndPoint {
else {
this.globalRmiBridge = null;
}
serializationManager.finishInit();
}
public
@ -314,8 +312,8 @@ class EndPoint {
*/
@SuppressWarnings("unchecked")
public
<T extends SettingsStore> T getPropertyStore() {
return (T) this.propertyStore;
<S extends SettingsStore> S getPropertyStore() {
return (S) this.propertyStore;
}
/**
@ -395,7 +393,7 @@ class EndPoint {
* @return a new network connection
*/
protected
ConnectionImpl newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) {
ConnectionImpl newConnection(final Logger logger, final EndPoint<C> endPoint, final RmiBridge rmiBridge) {
return new ConnectionImpl(logger, endPoint, rmiBridge);
}
@ -437,7 +435,7 @@ class EndPoint {
metaChannel.connection = connection;
// now initialize the connection channels with whatever extra info they might need.
connection.init(new Bridge(wrapper, this.connectionManager));
connection.init(new Bridge<C>(wrapper, this.connectionManager));
if (rmiBridge != null) {
// notify our remote object space that it is able to receive method calls.
@ -461,13 +459,14 @@ class EndPoint {
* <p/>
* Only the CLIENT injects in front of this)
*/
@SuppressWarnings("unchecked")
void connectionConnected0(ConnectionImpl connection) {
this.isConnected.set(true);
// prep the channel wrapper
connection.prep();
this.connectionManager.connectionConnected(connection);
this.connectionManager.connectionConnected((C) connection);
}
/**
@ -482,7 +481,7 @@ class EndPoint {
* Returns a non-modifiable list of active connections
*/
public
List<Connection> getConnections() {
List<C> getConnections() {
return this.connectionManager.getConnections();
}
@ -491,8 +490,8 @@ class EndPoint {
*/
@SuppressWarnings("unchecked")
public
<C extends Connection> Collection<C> getConnectionsAs() {
return (Collection<C>) this.connectionManager.getConnections();
Collection<C> getConnectionsAs() {
return this.connectionManager.getConnections();
}
/**
@ -505,7 +504,7 @@ class EndPoint {
* Registers a tool with the server, to be used by other services.
*/
public
<T extends EndPointTool> void registerTool(T toolClass) {
<Tool extends EndPointTool> void registerTool(Tool toolClass) {
if (toolClass == null) {
throw new IllegalArgumentException("Tool must not be null! Unable to add tool");
}
@ -520,13 +519,13 @@ class EndPoint {
* Only get the tools in the ModuleStart (ie: load) methods. If done in the constructor, the tool might not be available yet
*/
public
<T extends EndPointTool> T getTool(Class<T> toolClass) {
<Tool extends EndPointTool> Tool getTool(Class<Tool> toolClass) {
if (toolClass == null) {
throw new IllegalArgumentException("Tool must not be null! Unable to add tool");
}
@SuppressWarnings("unchecked")
T tool = (T) this.toolMap.get(toolClass);
Tool tool = (Tool) this.toolMap.get(toolClass);
return tool;
}
@ -734,6 +733,7 @@ class EndPoint {
return result;
}
@SuppressWarnings("rawtypes")
@Override
public
boolean equals(Object obj) {
@ -788,19 +788,4 @@ class EndPoint {
globalRmiBridge.register(globalObjectId, globalObject);
return globalObjectId;
}
/**
* When using RMI, we want to keep track of what our current thread execution connection is.
* <p/>
* This is because we store state info in the connection object. This is the only way to retrieve this info "out-of-band"
*/
public
Connection getCurrentConnection() {
return this.currentConnection.get();
}
public
void setCurrentConnection(final Connection currentConnection) {
this.currentConnection.set(currentConnection);
}
}

View File

@ -34,7 +34,9 @@ import java.util.concurrent.atomic.AtomicInteger;
* This serves the purpose of making sure that specific methods are not available to the end user.
*/
public
class EndPointClient extends EndPoint implements Runnable {
class EndPointClient<C extends Connection> extends EndPoint<C> implements Runnable {
protected C connection;
protected final Object registrationLock = new Object();
protected final AtomicInteger connectingBootstrap = new AtomicInteger(0);
@ -152,8 +154,7 @@ class EndPointClient extends EndPoint implements Runnable {
ConnectionBridge send() {
ConnectionBridgeFlushAlways connectionBridgeFlushAlways2 = this.connectionBridgeFlushAlways;
if (connectionBridgeFlushAlways2 == null) {
ConnectionBridge clientBridge = this.connectionManager.getConnection0()
.send();
ConnectionBridge clientBridge = this.connection.send();
this.connectionBridgeFlushAlways = new ConnectionBridgeFlushAlways(clientBridge);
}

View File

@ -27,7 +27,7 @@ import java.io.IOException;
* This serves the purpose of making sure that specific methods are not available to the end user.
*/
public
class EndPointServer extends EndPoint {
class EndPointServer<C extends Connection> extends EndPoint<C> {
private final ServerConnectionBridge serverConnections;
@ -57,7 +57,7 @@ class EndPointServer extends EndPoint {
* @return a newly created listener manager for the connection
*/
final
ConnectionManager addListenerManager(Connection connection) {
ConnectionManager<C> addListenerManager(C connection) {
return this.connectionManager.addListenerManager(connection);
}
@ -71,7 +71,7 @@ class EndPointServer extends EndPoint {
* This removes the listener manager for that specific connection
*/
final
void removeListenerManager(Connection connection) {
void removeListenerManager(C connection) {
this.connectionManager.removeListenerManager(connection);
}
}

View File

@ -18,29 +18,29 @@ package dorkbox.network.connection;
import java.util.Collection;
public
interface ISessionManager {
interface ISessionManager<T extends Connection> {
/**
* Called when a message is received
*/
void notifyOnMessage(Connection connection, Object message);
void notifyOnMessage(T connection, Object message);
/**
* Called when the connection has been idle (read & write) for 2 seconds
*/
void notifyOnIdle(Connection connection);
void notifyOnIdle(T connection);
void connectionConnected(Connection connection);
void connectionConnected(T connection);
void connectionDisconnected(Connection connection);
void connectionDisconnected(T connection);
/**
* Called when there is an error of some kind during the up/down stream process
*/
void connectionError(Connection connection, Throwable throwable);
void connectionError(T connection, Throwable throwable);
/**
* Returns a non-modifiable list of active connections
*/
Collection<Connection> getConnections();
Collection<T> getConnections();
}

View File

@ -27,7 +27,6 @@ import dorkbox.network.connection.ping.PingMessage;
import dorkbox.network.rmi.*;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.crypto.Crypto;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.objectPool.ObjectPool;
import dorkbox.util.objectPool.ObjectPoolFactory;
import dorkbox.util.objectPool.PoolableObject;
@ -43,6 +42,7 @@ import org.bouncycastle.crypto.params.IESWithCipherParameters;
import org.jctools.util.Pow2;
import org.slf4j.Logger;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
@ -91,7 +91,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
public static boolean useUnsafeMemory = false;
private static final String OBJECT_ID = "objectID";
private boolean rmiInitialized = false;
private boolean initialized = false;
/**
* The default serialization manager. This is static, since serialization must be consistent within the JVM. This can be changed.
@ -159,7 +159,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
private static
void decompress(ByteBuf inputBuffer, ByteBuf outputBuffer, Inflater decompress) {
void decompress(ByteBuf inputBuffer, ByteBuf outputBuffer, Inflater decompress) throws IOException {
byte[] in = new byte[inputBuffer.readableBytes()];
inputBuffer.readBytes(in);
@ -173,7 +173,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
numBytes = decompress.inflate(out, 0, out.length);
} catch (DataFormatException e) {
logger.error("Error inflating data.", e);
throw new NetException(e.getCause());
throw new IOException(e.getCause());
}
outputBuffer.writeBytes(out, 0, numBytes);
@ -213,7 +213,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}
private static
void snappyDecompress(ByteBuf inputBuffer, ByteBuf outputBuffer, SnappyAccess snappy) {
void snappyDecompress(ByteBuf inputBuffer, ByteBuf outputBuffer, SnappyAccess snappy) throws IOException {
try {
int idx = inputBuffer.readerIndex();
final int inSize = inputBuffer.writerIndex() - idx;
@ -280,7 +280,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
break;
}
} catch (Exception e) {
throw new NetException("Unable to decompress SNAPPY data!! " + e.getMessage());
throw new IOException("Unable to decompress SNAPPY data!! " + e.getMessage());
}
}
@ -429,6 +429,34 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}, capacity);
}
/**
* If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is
* automatically registered using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
*
* @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
* @see ClassResolver#getRegistration(Class)
*/
@Override
public synchronized
Registration getRegistration(Class<?> clazz) {
Kryo kryo = null;
Registration r = null;
try {
kryo = this.pool.take();
r = kryo.getRegistration(clazz);
} catch (InterruptedException e) {
final String msg = "Interrupted during getRegistration()";
logger.error(msg);
} finally {
if (kryo != null) {
this.pool.release(kryo);
}
}
return r;
}
/**
* Registers the class using the lowest, next available integer ID and the
* {@link Kryo#getDefaultSerializer(Class) default serializer}. If the class
@ -443,6 +471,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
@Override
public synchronized
void register(Class<?> clazz) {
if (initialized) {
throw new RuntimeException("Cannot register classes after initialization.");
}
Kryo kryo;
try {
for (int i = 0; i < capacity; i++) {
@ -468,6 +500,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
@Override
public synchronized
void register(Class<?> clazz, Serializer<?> serializer) {
if (initialized) {
throw new RuntimeException("Cannot register classes after initialization.");
}
Kryo kryo;
try {
for (int i = 0; i < capacity; i++) {
@ -495,6 +531,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
@Override
public synchronized
void register(Class<?> clazz, Serializer<?> serializer, int id) {
if (initialized) {
throw new RuntimeException("Cannot register classes after initialization.");
}
Kryo kryo;
try {
for (int i = 0; i < capacity; i++) {
@ -507,27 +547,75 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}
}
/**
* Objects that we want to use RMI with must be accessed via an interface. This method configures the serialization of an
* implementation to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation
* class is ALREADY registered, then it's registration will be overwritten by this one
*
* @param ifaceClass The interface used to access the remote object
* @param implClass The implementation class of the interface
*/
@Override
public synchronized
<Iface, Impl extends Iface> void registerRemote(final Class<Iface> ifaceClass, final Class<Impl> implClass) {
register(implClass, new RemoteObjectSerializer<Impl>());
// After all common registrations, register OtherObjectImpl only on the server using the remote object interface ID.
// This causes OtherObjectImpl to be serialized as OtherObject.
int otherObjectID = getRegistration(implClass).getId();
// this overrides the 'otherObjectID' with the specified class/serializer, so that when we WRITE this ID, the impl ID is written.
register(ifaceClass, new RemoteObjectSerializer<Impl>(), otherObjectID);
// we have to save this info in CachedMethod.
CachedMethod.registerOverridden(ifaceClass, implClass);
}
/**
* Necessary to register classes for RMI, only called once when the RMI bridge is created.
*/
@Override
public synchronized
void initRmiSerialization() {
if (rmiInitialized) {
if (initialized) {
// already initialized.
return;
}
rmiInitialized = true;
InvokeMethodSerializer methodSerializer = new InvokeMethodSerializer();
Serializer<Object> invocationSerializer = new Serializer<Object>() {
@Override
public
void write(Kryo kryo, Output output, Object object) {
RemoteInvocationHandler handler = (RemoteInvocationHandler) Proxy.getInvocationHandler(object);
output.writeInt(handler.objectID, true);
}
@Override
@SuppressWarnings({"unchecked", "AutoBoxing"})
public
Object read(Kryo kryo, Input input, Class<Object> type) {
int objectID = input.readInt(true);
KryoExtra kryoExtra = (KryoExtra) kryo;
Object object = kryoExtra.connection.getRegisteredObject(objectID);
if (object == null) {
logger.error("Unknown object ID in RMI ObjectSpace: {}", objectID);
}
return object;
}
};
Kryo kryo;
try {
for (int i = 0; i < capacity; i++) {
kryo = this.pool.take();
// necessary for the RMI bridge. Only called once, but for all kryo instances
kryo.register(Class.class);
kryo.register(RmiRegistration.class);
kryo.register(InvokeMethod.class, methodSerializer);
kryo.register(Object[].class);
kryo.register(InvokeMethod.class, new InvokeMethodSerializer());
FieldSerializer<InvokeMethodResult> resultSerializer = new FieldSerializer<InvokeMethodResult>(kryo, InvokeMethodResult.class) {
@Override
@ -547,30 +635,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
};
resultSerializer.removeField(OBJECT_ID);
kryo.register(InvokeMethodResult.class, resultSerializer);
kryo.register(InvocationHandler.class, new Serializer<Object>() {
@Override
public
void write(Kryo kryo, Output output, Object object) {
RemoteInvocationHandler handler = (RemoteInvocationHandler) Proxy.getInvocationHandler(object);
output.writeInt(handler.objectID, true);
}
@Override
@SuppressWarnings({"unchecked"})
public
Object read(Kryo kryo, Input input, Class<Object> type) {
int objectID = input.readInt(true);
KryoExtra kryoExtra = (KryoExtra) kryo;
Object object = kryoExtra.connection.getRegisteredObject(objectID);
if (object == null) {
logger.error("Unknown object ID in RMI ObjectSpace: {}", objectID);
}
return object;
}
});
kryo.register(InvocationHandler.class, invocationSerializer);
this.pool.release(kryo);
}
@ -579,6 +644,19 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}
}
/**
* Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration.
*/
public
void finishInit() {
initialized = true;
}
@Override
public
boolean initialized() {
return initialized;
}
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
@ -589,7 +667,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public final
void write(ByteBuf buffer, Object message) {
void write(ByteBuf buffer, Object message) throws IOException {
write0(null, buffer, message, false);
}
@ -602,7 +680,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public final
Object read(ByteBuf buffer, int length) {
Object read(ByteBuf buffer, int length) throws IOException {
return read0(null, buffer, length, false);
}
@ -611,7 +689,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public
void writeFullClassAndObject(Output output, Object value) {
void writeFullClassAndObject(final Logger logger, Output output, Object value) throws IOException {
Kryo kryo = null;
boolean prev = false;
@ -623,8 +701,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
kryo.writeClassAndObject(output, value);
} catch (Exception ex) {
final String msg = "Unable to serialize buffer";
logger.error(msg, ex);
throw new NetException(msg, ex);
if (logger != null) {
logger.error(msg, ex);
}
throw new IOException(msg, ex);
} finally {
if (kryo != null) {
kryo.setRegistrationRequired(prev);
@ -635,7 +715,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
@Override
public
Object readFullClassAndObject(final Input input) {
Object readFullClassAndObject(final Logger logger, final Input input) throws IOException {
Kryo kryo = null;
boolean prev = false;
@ -647,8 +727,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
return kryo.readClassAndObject(input);
} catch (Exception ex) {
final String msg = "Unable to deserialize buffer";
logger.error(msg, ex);
throw new NetException(msg, ex);
if (logger != null) {
logger.error(msg, ex);
}
throw new IOException(msg, ex);
} finally {
if (kryo != null) {
kryo.setRegistrationRequired(prev);
@ -669,61 +751,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
this.pool.release(kryo);
}
/**
* If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is
* automatically registered using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
*
* @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
* @see ClassResolver#getRegistration(Class)
*/
@Override
public
Registration getRegistration(Class<?> clazz) {
Kryo kryo = null;
Registration r = null;
try {
kryo = this.pool.take();
r = kryo.getRegistration(clazz);
} catch (InterruptedException e) {
final String msg = "Interrupted during getRegistration()";
logger.error(msg);
} finally {
if (kryo != null) {
this.pool.release(kryo);
}
}
return r;
}
/**
* Objects that we want to use RMI with must be accessed via an interface. This method configures the serialization of an
* implementation to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation
* class is ALREADY registered, then it's registration will be overwritten by this one
*
* @param ifaceClass The interface used to access the remote object
* @param implClass The implementation class of the interface
*/
@Override
public
<Iface, Impl extends Iface> void registerRemote(final Class<Iface> ifaceClass, final Class<Impl> implClass) {
register(implClass, new RemoteObjectSerializer<Impl>());
// After all common registrations, register OtherObjectImpl only on the server using the remote object interface ID.
// This causes OtherObjectImpl to be serialized as OtherObject.
int otherObjectID = getRegistration(implClass).getId();
// this overrides the 'otherObjectID' with the specified class/serializer
register(ifaceClass, new RemoteObjectSerializer<Impl>(), otherObjectID);
}
/**
* Determines if this buffer is encrypted or not.
*/
@Override
public final
public static
boolean isEncrypted(ByteBuf buffer) {
// read off the magic byte
byte magicByte = buffer.getByte(buffer.readerIndex());
@ -737,11 +768,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public final
void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) {
if (connection == null) {
throw new NetException("Unable to perform crypto when NO network connection!");
}
void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException {
write0(connection, buffer, message, true);
}
@ -752,11 +779,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public final
void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) {
if (connection == null) {
throw new NetException("Unable to perform crypto when NO network connection!");
}
void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException {
write0(connection, buffer, message, true);
}
@ -770,11 +793,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public final
Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) {
if (connection == null) {
throw new NetException("Unable to perform crypto when NO network connection!");
}
Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException {
return read0(connection, buffer, length, true);
}
@ -788,11 +807,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public final
Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) {
if (connection == null) {
throw new NetException("Unable to perform crypto when NO network connection!");
}
Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException {
return read0(connection, buffer, length, true);
}
@ -801,7 +816,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@SuppressWarnings("unchecked")
private
void write0(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) {
void write0(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) throws IOException {
final KryoExtra kryo = (KryoExtra) this.pool.takeUninterruptibly();
Logger logger2 = logger;
@ -826,7 +841,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
// connection will ALWAYS be of type Connection or NULL.
// used by RMI/some serializers to determine which connection wrote this object
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
kryo.connection = (ConnectionImpl) connection;
kryo.connection = connection;
kryo.writeClassAndObject(kryo.output, message);
@ -893,7 +908,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
} catch (Exception ex) {
final String msg = "Unable to serialize buffer";
logger2.error(msg, ex);
throw new NetException(msg, ex);
throw new IOException(msg, ex);
} finally {
// release resources
kryo.output.setBuffer(NULL_BUFFER);
@ -911,7 +926,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
private
Object read0(final ConnectionImpl connection, final ByteBuf buffer, final int length, final boolean doCrypto) {
Object read0(final ConnectionImpl connection, final ByteBuf buffer, final int length, final boolean doCrypto) throws IOException {
final KryoExtra kryo = (KryoExtra) this.pool.takeUninterruptibly();
Logger logger2 = logger;
@ -938,7 +953,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
// AES CRYPTO STUFF
if (doCrypto) {
if ((magicByte & crypto) != crypto) {
throw new NetException("Unable to perform crypto when data does not use crypto!");
throw new IOException("Unable to perform crypto when data does not use crypto!");
}
if (logger2.isTraceEnabled()) {
@ -977,7 +992,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
// connection will ALWAYS be of type IConnection or NULL.
// used by RMI/some serializers to determine which connection read this object
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
kryo.connection = (ConnectionImpl) connection;
kryo.connection = connection;
Object object = kryo.readClassAndObject(kryo.input);
@ -985,7 +1000,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
} catch (Exception ex) {
final String msg = "Unable to deserialize buffer";
logger2.error(msg, ex);
throw new NetException(msg, ex);
throw new IOException(msg, ex);
} finally {
// make sure the end of the buffer is in the correct spot.
// move the reader index to the end of the object (since we are reading encrypted data

View File

@ -16,26 +16,14 @@
package dorkbox.network.connection;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.util.Util;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.network.pipeline.ByteBufInput;
import dorkbox.network.pipeline.ByteBufOutput;
import dorkbox.network.rmi.AsmCachedMethod;
import dorkbox.network.rmi.CachedMethod;
import dorkbox.util.crypto.bouncycastle.GCMBlockCipher_ByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.compression.SnappyAccess;
import org.bouncycastle.crypto.engines.AESFastEngine;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@ -52,9 +40,6 @@ class KryoExtra extends Kryo {
final ByteBuf tmpBuffer2;
final GCMBlockCipher_ByteBuf aesEngine;
private final Map<Class<?>, CachedMethod[]> methodCache = new ConcurrentHashMap<Class<?>, CachedMethod[]>();
private final boolean asmEnabled = !KryoCryptoSerializationManager.useUnsafeMemory;
// not thread safe
public ConnectionImpl connection;
@ -72,111 +57,4 @@ class KryoExtra extends Kryo {
this.tmpBuffer2 = Unpooled.buffer(1024);
this.aesEngine = new GCMBlockCipher_ByteBuf(new AESFastEngine());
}
public
CachedMethod[] getMethods(Class<?> type) {
CachedMethod[] cachedMethods = this.methodCache.get(type); // Maybe should cache per Kryo instance?
if (cachedMethods != null) {
return cachedMethods;
}
ArrayList<Method> allMethods = new ArrayList<Method>();
Class<?> nextClass = type;
while (nextClass != null) {
Collections.addAll(allMethods, nextClass.getDeclaredMethods());
nextClass = nextClass.getSuperclass();
if (nextClass == Object.class) {
break;
}
}
ArrayList<Method> methods = new ArrayList<Method>(Math.max(1, allMethods.size()));
for (int i = 0, n = allMethods.size(); i < n; i++) {
Method method = allMethods.get(i);
int modifiers = method.getModifiers();
if (Modifier.isStatic(modifiers)) {
continue;
}
if (Modifier.isPrivate(modifiers)) {
continue;
}
if (method.isSynthetic()) {
continue;
}
methods.add(method);
}
Collections.sort(methods, new Comparator<Method>() {
@Override
public
int compare(Method o1, Method o2) {
// Methods are sorted so they can be represented as an index.
int diff = o1.getName()
.compareTo(o2.getName());
if (diff != 0) {
return diff;
}
Class<?>[] argTypes1 = o1.getParameterTypes();
Class<?>[] argTypes2 = o2.getParameterTypes();
if (argTypes1.length > argTypes2.length) {
return 1;
}
if (argTypes1.length < argTypes2.length) {
return -1;
}
for (int i = 0; i < argTypes1.length; i++) {
diff = argTypes1[i].getName()
.compareTo(argTypes2[i].getName());
if (diff != 0) {
return diff;
}
}
throw new RuntimeException("Two methods with same signature!"); // Impossible.
}
});
Object methodAccess = null;
if (asmEnabled && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) {
methodAccess = MethodAccess.get(type);
}
int n = methods.size();
cachedMethods = new CachedMethod[n];
for (int i = 0; i < n; i++) {
Method method = methods.get(i);
Class<?>[] parameterTypes = method.getParameterTypes();
CachedMethod cachedMethod = null;
if (methodAccess != null) {
try {
AsmCachedMethod asmCachedMethod = new AsmCachedMethod();
asmCachedMethod.methodAccessIndex = ((MethodAccess) methodAccess).getIndex(method.getName(), parameterTypes);
asmCachedMethod.methodAccess = (MethodAccess) methodAccess;
cachedMethod = asmCachedMethod;
} catch (RuntimeException ignored) {
}
}
if (cachedMethod == null) {
cachedMethod = new CachedMethod();
}
cachedMethod.method = method;
cachedMethod.methodClassID = getRegistration(method.getDeclaringClass()).getId();
cachedMethod.methodIndex = i;
// Store the serializer for each final parameter.
cachedMethod.serializers = new Serializer<?>[parameterTypes.length];
for (int ii = 0, nn = parameterTypes.length; ii < nn; ii++) {
if (isFinal(parameterTypes[ii])) {
cachedMethod.serializers[ii] = getSerializer(parameterTypes[ii]);
}
}
cachedMethods[i] = cachedMethod;
}
this.methodCache.put(type, cachedMethods);
return cachedMethods;
}
}

View File

@ -17,6 +17,8 @@ package dorkbox.network.connection;
import dorkbox.util.ClassHelper;
import java.io.IOException;
public abstract
class ListenerRaw<C extends Connection, M extends Object> {
@ -103,7 +105,7 @@ class ListenerRaw<C extends Connection, M extends Object> {
*/
@SuppressWarnings("unused")
public
void idle(C connection) {
void idle(C connection) throws IOException {
}
/**

View File

@ -55,9 +55,13 @@ class PropertyStore extends SettingsStore {
public
void init(Class<? extends EndPoint> type, final SerializationManager serializationManager, Storage storage) throws IOException {
// make sure our custom types are registered
serializationManager.register(HashMap.class);
serializationManager.register(ByteArrayWrapper.class);
serializationManager.register(DB_Server.class);
// only register if not ALREADY initialized, since we can initialize in the server and in the client. This creates problems if
// running inside the same JVM (we don't permit it)
if (!serializationManager.initialized()) {
serializationManager.register(HashMap.class);
serializationManager.register(ByteArrayWrapper.class);
serializationManager.register(DB_Server.class);
}
if (storage == null) {
this.storage = Store.Memory()
@ -67,16 +71,16 @@ class PropertyStore extends SettingsStore {
this.storage = storage;
}
servers = this.storage.load(DatabaseStorage.SERVERS, new HashMap<ByteArrayWrapper, DB_Server>(16));
servers = this.storage.getAndPut(DatabaseStorage.SERVERS, new HashMap<ByteArrayWrapper, DB_Server>(16));
//use map to keep track of recid, so we can get record info during restarts.
DB_Server localServer = servers.get(DB_Server.IP_0_0_0_0);
DB_Server localServer = servers.get(DB_Server.IP_LOCALHOST);
if (localServer == null) {
localServer = new DB_Server();
servers.put(DB_Server.IP_0_0_0_0, localServer);
servers.put(DB_Server.IP_LOCALHOST, localServer);
// have to always specify what we are saving
this.storage.commit(DatabaseStorage.SERVERS, servers);
this.storage.putAndSave(DatabaseStorage.SERVERS, servers);
}
}
@ -88,7 +92,7 @@ class PropertyStore extends SettingsStore {
ECPrivateKeyParameters getPrivateKey() throws dorkbox.util.exceptions.SecurityException {
checkAccess(EndPoint.class);
return servers.get(DB_Server.IP_0_0_0_0)
return servers.get(DB_Server.IP_LOCALHOST)
.getPrivateKey();
}
@ -100,11 +104,11 @@ class PropertyStore extends SettingsStore {
void savePrivateKey(ECPrivateKeyParameters serverPrivateKey) throws SecurityException {
checkAccess(EndPoint.class);
servers.get(DB_Server.IP_0_0_0_0)
servers.get(DB_Server.IP_LOCALHOST)
.setPrivateKey(serverPrivateKey);
// have to always specify what we are saving
storage.commit(DatabaseStorage.SERVERS, servers);
storage.putAndSave(DatabaseStorage.SERVERS, servers);
}
/**
@ -115,7 +119,7 @@ class PropertyStore extends SettingsStore {
ECPublicKeyParameters getPublicKey() throws SecurityException {
checkAccess(EndPoint.class);
return servers.get(DB_Server.IP_0_0_0_0)
return servers.get(DB_Server.IP_LOCALHOST)
.getPublicKey();
}
@ -127,11 +131,11 @@ class PropertyStore extends SettingsStore {
void savePublicKey(ECPublicKeyParameters serverPublicKey) throws SecurityException {
checkAccess(EndPoint.class);
servers.get(DB_Server.IP_0_0_0_0)
servers.get(DB_Server.IP_LOCALHOST)
.setPublicKey(serverPublicKey);
// have to always specify what we are saving
storage.commit(DatabaseStorage.SERVERS, servers);
storage.putAndSave(DatabaseStorage.SERVERS, servers);
}
/**
@ -140,7 +144,7 @@ class PropertyStore extends SettingsStore {
@Override
public synchronized
byte[] getSalt() {
final DB_Server localServer = servers.get(DB_Server.IP_0_0_0_0);
final DB_Server localServer = servers.get(DB_Server.IP_LOCALHOST);
byte[] salt = localServer.getSalt();
// we don't care who gets the server salt
@ -156,7 +160,7 @@ class PropertyStore extends SettingsStore {
localServer.setSalt(bytes);
// have to always specify what we are saving
storage.commit(DatabaseStorage.SERVERS, servers);
storage.putAndSave(DatabaseStorage.SERVERS, servers);
}
return salt;

View File

@ -18,6 +18,8 @@ package dorkbox.network.connection.idle;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ListenerRaw;
import java.io.IOException;
public abstract
class IdleSender<C extends Connection, M> extends ListenerRaw<C, M> {
final IdleListener<C, M> idleListener;
@ -30,7 +32,7 @@ class IdleSender<C extends Connection, M> extends ListenerRaw<C, M> {
@Override
public
void idle(C connection) {
void idle(C connection) throws IOException {
if (!this.started) {
this.started = true;
start();
@ -58,5 +60,5 @@ class IdleSender<C extends Connection, M> extends ListenerRaw<C, M> {
* Returns the next object to send, or null if no more objects will be sent.
*/
protected abstract
M next();
M next() throws IOException;
}

View File

@ -16,7 +16,6 @@
package dorkbox.network.connection.idle;
import dorkbox.network.connection.Connection;
import dorkbox.util.exceptions.NetException;
import java.io.IOException;
import java.io.InputStream;
@ -36,23 +35,19 @@ class InputStreamSender<C extends Connection> extends IdleSender<C, byte[]> {
@Override
protected final
byte[] next() {
try {
int total = 0;
while (total < this.chunk.length) {
int count = this.input.read(this.chunk, total, this.chunk.length - total);
if (count < 0) {
if (total == 0) {
return null;
}
byte[] partial = new byte[total];
System.arraycopy(this.chunk, 0, partial, 0, total);
return onNext(partial);
byte[] next() throws IOException {
int total = 0;
while (total < this.chunk.length) {
int count = this.input.read(this.chunk, total, this.chunk.length - total);
if (count < 0) {
if (total == 0) {
return null;
}
total += count;
byte[] partial = new byte[total];
System.arraycopy(this.chunk, 0, partial, 0, total);
return onNext(partial);
}
} catch (IOException ex) {
throw new NetException(ex);
total += count;
}
return onNext(this.chunk);
}

View File

@ -15,10 +15,10 @@
*/
package dorkbox.network.connection.ping;
import dorkbox.util.exceptions.NetException;
import java.io.IOException;
public
class PingCanceledException extends NetException {
class PingCanceledException extends IOException {
private static final long serialVersionUID = 9045461384091038605L;

View File

@ -21,7 +21,6 @@ import dorkbox.network.connection.registration.Registration;
import dorkbox.network.pipeline.udp.KryoDecoderUdp;
import dorkbox.network.pipeline.udp.KryoEncoderUdp;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
@ -32,6 +31,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -112,7 +112,7 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient
}
if (!success) {
throw new NetException("UDP cannot connect to a remote server before TCP is established!");
throw new IOException("UDP cannot connect to a remote server before TCP is established!");
}
if (logger2.isTraceEnabled()) {
@ -124,7 +124,7 @@ class RegistrationRemoteHandlerClientUDP extends RegistrationRemoteHandlerClient
channel.writeAndFlush(registration);
}
else {
throw new NetException("UDP cannot connect to remote server! No remote address specified!");
throw new IOException("UDP cannot connect to remote server! No remote address specified!");
}
}

View File

@ -19,7 +19,6 @@ import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
@ -29,6 +28,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -103,7 +103,7 @@ class RegistrationRemoteHandlerClientUDT extends RegistrationRemoteHandlerClient
}
if (!success) {
throw new NetException("UDT cannot connect to a remote server before TCP is established!");
throw new IOException("UDT cannot connect to a remote server before TCP is established!");
}
if (logger2.isTraceEnabled()) {
@ -115,7 +115,7 @@ class RegistrationRemoteHandlerClientUDT extends RegistrationRemoteHandlerClient
channel.writeAndFlush(registration);
}
else {
throw new NetException("UDT cannot connect to remote server! No remote address specified!");
throw new IOException("UDT cannot connect to remote server! No remote address specified!");
}
}

View File

@ -17,6 +17,7 @@ package dorkbox.network.connection.registration.remote;
import dorkbox.network.Broadcast;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.RegistrationWrapper;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
@ -26,7 +27,6 @@ import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.Crypto;
import dorkbox.util.exceptions.NetException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@ -36,6 +36,7 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageCodec;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
@ -97,11 +98,25 @@ class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<DatagramP
// this is regular registration stuff
ByteBuf buffer = context.alloc()
.buffer();
// writes data into buffer
sendUDP(context, object, buffer, remoteAddress);
if (buffer != null) {
out.add(new DatagramPacket(buffer, remoteAddress));
// writes data into buffer
try {
ConnectionImpl networkConnection = this.registrationWrapper.getServerUDP(remoteAddress);
if (networkConnection != null) {
// try to write data! (IT SHOULD ALWAYS BE ENCRYPTED HERE!)
this.serializationManager.writeWithCryptoUdp(networkConnection, buffer, object);
}
else {
// this means we are still in the REGISTRATION phase.
this.serializationManager.write(buffer, object);
}
if (buffer != null) {
out.add(new DatagramPacket(buffer, remoteAddress));
}
} catch (IOException e) {
logger.error("Unable to write data to the socket.", e);
throw e;
}
}
}
@ -144,22 +159,6 @@ class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<DatagramP
}
@SuppressWarnings("unused")
public final
void sendUDP(ChannelHandlerContext context, Object object, ByteBuf buffer, InetSocketAddress udpRemoteAddress) {
ConnectionImpl networkConnection = this.registrationWrapper.getServerUDP(udpRemoteAddress);
if (networkConnection != null) {
// try to write data! (IT SHOULD ALWAYS BE ENCRYPTED HERE!)
this.serializationManager.writeWithCryptoUdp(networkConnection, buffer, object);
}
else {
// this means we are still in the REGISTRATION phase.
this.serializationManager.write(buffer, object);
}
}
// this will be invoked by the UdpRegistrationHandlerServer. Remember, TCP will be established first.
@SuppressWarnings("unused")
private
@ -169,7 +168,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<DatagramP
RegistrationWrapper registrationWrapper2 = this.registrationWrapper;
CryptoSerializationManager serializationManager2 = this.serializationManager;
if (serializationManager2.isEncrypted(data)) {
if (KryoCryptoSerializationManager.isEncrypted(data)) {
// we need to FORWARD this message "down the pipeline".
ConnectionImpl connection = registrationWrapper2.getServerUDP(udpRemoteAddress);
@ -180,10 +179,10 @@ class RegistrationRemoteHandlerServerUDP extends MessageToMessageCodec<DatagramP
try {
object = serializationManager2.readWithCryptoUdp(connection, data, data.writerIndex());
} catch (NetException e) {
} catch (Exception e) {
logger2.error("UDP unable to deserialize buffer", e);
shutdown(registrationWrapper2, channel);
return;
throw e;
}
connection.channelRead(object);

View File

@ -46,6 +46,8 @@ class ChannelLocalWrapper implements ChannelWrapper, ConnectionPointWriter {
@Override
public
void write(Object object) {
// we should check to see if this class is registered as having RMI methods present.
this.channel.write(object);
this.shouldFlush.set(true);
}

View File

@ -16,7 +16,6 @@
package dorkbox.network.connection.wrapper;
import dorkbox.network.connection.UdpServer;
import dorkbox.util.exceptions.NetException;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
@ -31,10 +30,6 @@ class ChannelNetworkUdp extends ChannelNetwork {
ChannelNetworkUdp(Channel channel, InetSocketAddress udpRemoteAddress, UdpServer udpServer) {
super(channel);
if (udpRemoteAddress == null) {
throw new NetException("Cannot create a server UDP channel wihtout a remote udp address!");
}
this.udpRemoteAddress = udpRemoteAddress;
this.udpServer = udpServer; // ONLY valid in the server!
}

View File

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.IOException;
import java.util.List;
public
@ -39,7 +40,12 @@ class KryoDecoder extends ByteToMessageDecoder {
protected
Object readObject(CryptoSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
return serializationManager.read(in, length);
try {
return serializationManager.read(in, length);
} catch (IOException e) {
context.fireExceptionCaught(e);
return null;
}
}
@Override

View File

@ -21,22 +21,33 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
// on client this is MessageToMessage *because of the UdpDecoder in the pipeline!)
import java.io.IOException;
// on client this is MessageToMessage (because of the UdpDecoder in the pipeline!)
public
class KryoDecoderCrypto extends KryoDecoder {
public
KryoDecoderCrypto(CryptoSerializationManager serializationManager) {
KryoDecoderCrypto(final CryptoSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
Object readObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, ByteBuf in, int length) {
ChannelHandler last = ctx.pipeline()
.last();
return serializationManager.readWithCryptoTcp((ConnectionImpl) last, in, length);
Object readObject(final CryptoSerializationManager serializationManager,
final ChannelHandlerContext context,
final ByteBuf in,
final int length) {
ChannelHandler last = context.pipeline()
.last();
try {
return serializationManager.readWithCryptoTcp((ConnectionImpl) last, in, length);
} catch (IOException e) {
context.fireExceptionCaught(e);
return null;
}
}
}

View File

@ -18,12 +18,13 @@ package dorkbox.network.pipeline;
import com.esotericsoftware.kryo.KryoException;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.bytes.OptimizeUtilsByteBuf;
import dorkbox.util.exceptions.NetException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
@Sharable
public
class KryoEncoder extends MessageToByteEncoder<Object> {
@ -33,7 +34,7 @@ class KryoEncoder extends MessageToByteEncoder<Object> {
public
KryoEncoder(CryptoSerializationManager serializationManager) {
KryoEncoder(final CryptoSerializationManager serializationManager) {
super();
this.serializationManager = serializationManager;
this.optimize = OptimizeUtilsByteBuf.get();
@ -42,14 +43,22 @@ class KryoEncoder extends MessageToByteEncoder<Object> {
// the crypto writer will override this
@SuppressWarnings("unused")
protected
void writeObject(CryptoSerializationManager kryoWrapper, ChannelHandlerContext context, Object msg, ByteBuf buffer) {
void writeObject(final CryptoSerializationManager kryoWrapper,
final ChannelHandlerContext context,
final Object msg,
final ByteBuf buffer) {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
kryoWrapper.write(buffer, msg);
try {
kryoWrapper.write(buffer, msg);
} catch (IOException ex) {
context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass()
.getName(), ex));
}
}
@Override
protected
void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
void encode(final ChannelHandlerContext context, final Object msg, final ByteBuf out) throws Exception {
// we don't necessarily start at 0!!
int startIndex = out.writerIndex();
@ -60,7 +69,7 @@ class KryoEncoder extends MessageToByteEncoder<Object> {
out.writeInt(0); // put an int in, which is the same size as reservedLengthIndex
try {
writeObject(this.serializationManager, ctx, msg, out);
writeObject(this.serializationManager, context, msg, out);
// now set the frame (if it's TCP)!
int length = out.readableBytes() - startIndex -
@ -80,8 +89,8 @@ class KryoEncoder extends MessageToByteEncoder<Object> {
optimize.writeInt(out, length, true);
out.setIndex(newIndex, oldIndex);
} catch (KryoException ex) {
ctx.fireExceptionCaught(new NetException("Unable to serialize object of type: " + msg.getClass()
.getName(), ex));
context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass()
.getName(), ex));
}
}
}

View File

@ -22,20 +22,31 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
@Sharable
public
class KryoEncoderCrypto extends KryoEncoder {
public
KryoEncoderCrypto(CryptoSerializationManager serializationManager) {
KryoEncoderCrypto(final CryptoSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) {
ChannelHandler last = ctx.pipeline()
.last();
serializationManager.writeWithCryptoTcp((ConnectionImpl) last, buffer, msg);
void writeObject(final CryptoSerializationManager serializationManager,
final ChannelHandlerContext context,
final Object msg,
final ByteBuf buffer) {
ChannelHandler last = context.pipeline()
.last();
try {
serializationManager.writeWithCryptoTcp((ConnectionImpl) last, buffer, msg);
} catch (IOException ex) {
context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass()
.getName(), ex));
}
}
}

View File

@ -15,14 +15,16 @@
*/
package dorkbox.network.pipeline.udp;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.NetException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@Sharable
@ -46,13 +48,21 @@ class KryoDecoderUdp extends MessageToMessageDecoder<DatagramPacket> {
// there is a REMOTE possibility that UDP traffic BEAT the TCP registration traffic, which means that THIS packet
// COULD be encrypted!
if (serializationManager.isEncrypted(data)) {
throw new NetException("Encrypted UDP packet received before registration complete. WHOOPS!");
if (KryoCryptoSerializationManager.isEncrypted(data)) {
String message = "Encrypted UDP packet received before registration complete.";
LoggerFactory.getLogger(this.getClass()).error(message);
throw new IOException(message);
} else {
try {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
Object read = serializationManager.read(data, data.writerIndex());
out.add(read);
} catch (IOException e) {
String message = "Unable to deserialize object";
LoggerFactory.getLogger(this.getClass()).error(message, e);
throw new IOException(message, e);
}
}
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
Object read = serializationManager.read(data, data.writerIndex());
out.add(read);
}
}
}

View File

@ -23,7 +23,9 @@ import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@Sharable
@ -43,8 +45,15 @@ class KryoDecoderUdpCrypto extends MessageToMessageDecoder<DatagramPacket> {
ChannelHandler last = ctx.pipeline()
.last();
ByteBuf data = in.content();
Object object = serializationManager.readWithCryptoUdp((ConnectionImpl) last, data, data.readableBytes());
out.add(object);
try {
ByteBuf data = in.content();
Object object = serializationManager.readWithCryptoUdp((ConnectionImpl) last, data, data.readableBytes());
out.add(object);
} catch (IOException e) {
String message = "Unable to deserialize object";
LoggerFactory.getLogger(this.getClass()).error(message, e);
throw new IOException(message, e);
}
}
}

View File

@ -15,17 +15,17 @@
*/
package dorkbox.network.pipeline.udp;
import com.esotericsoftware.kryo.KryoException;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.NetException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
@ -46,9 +46,8 @@ class KryoEncoderUdp extends MessageToMessageEncoder<Object> {
}
// the crypto writer will override this
@SuppressWarnings("unused")
protected
void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) {
void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer)
throws IOException {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
serializationManager.write(buffer, msg);
}
@ -63,21 +62,25 @@ class KryoEncoderUdp extends MessageToMessageEncoder<Object> {
// no size info, since this is UDP, it is not segmented
writeObject(this.serializationManager, ctx, msg, outBuffer);
// have to check to see if we are too big for UDP!
if (outBuffer.readableBytes() > EndPoint.udpMaxSize) {
System.err.println("Object larger than MAX udp size! " + EndPoint.udpMaxSize + "/" + outBuffer.readableBytes());
throw new NetException("Object is TOO BIG FOR UDP! " + msg.toString() + " (" + EndPoint.udpMaxSize + "/" +
outBuffer.readableBytes() + ")");
String message = "Object is TOO BIG FOR UDP! " + msg.toString() + " (" + EndPoint.udpMaxSize + "/" +
outBuffer.readableBytes() + ")";
LoggerFactory.getLogger(this.getClass()).error(message);
throw new IOException(message);
}
DatagramPacket packet = new DatagramPacket(outBuffer,
(InetSocketAddress) ctx.channel()
.remoteAddress());
out.add(packet);
} catch (KryoException ex) {
throw new NetException("Unable to serialize object of type: " + msg.getClass()
.getName(), ex);
} catch (Exception e) {
String message = "Unable to serialize object of type: " + msg.getClass()
.getName();
LoggerFactory.getLogger(this.getClass()).error(message, e);
throw new IOException(message, e);
}
}
}

View File

@ -22,6 +22,8 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
@Sharable
public
class KryoEncoderUdpCrypto extends KryoEncoderUdp {
@ -32,8 +34,8 @@ class KryoEncoderUdpCrypto extends KryoEncoderUdp {
}
@Override
protected
void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) {
void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer)
throws IOException {
ChannelHandler last = ctx.pipeline()
.last();

View File

@ -35,6 +35,7 @@
package dorkbox.network.rmi;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.network.connection.Connection;
import java.lang.reflect.InvocationTargetException;
@ -45,9 +46,18 @@ class AsmCachedMethod extends CachedMethod {
@Override
public
Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
Object invoke(final Connection connection, Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
try {
return this.methodAccess.invoke(target, this.methodAccessIndex, args);
if (origMethod == null) {
return this.methodAccess.invoke(target, this.methodAccessIndex, args);
} else {
int length = args.length;
Object[] newArgs = new Object[length + 1];
newArgs[0] = connection;
System.arraycopy(args, 0, newArgs, 1, length);
return this.methodAccess.invoke(target, this.methodAccessIndex, newArgs);
}
} catch (Exception ex) {
throw new InvocationTargetException(ex);
}

View File

@ -34,22 +34,259 @@
*/
package dorkbox.network.rmi;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.util.Util;
import com.esotericsoftware.reflectasm.MethodAccess;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.util.ClassHelper;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public
class CachedMethod {
private static final Map<Class<?>, CachedMethod[]> methodCache = new ConcurrentHashMap<Class<?>, CachedMethod[]>(EndPoint.DEFAULT_THREAD_POOL_SIZE);
private static final Map<Class<?>, Class<?>> overriddenMethods = new HashMap<Class<?>, Class<?>>();
// type will be likely be the interface
public static
CachedMethod[] getMethods(final Kryo kryo, final Class<?> type) {
CachedMethod[] cachedMethods = methodCache.get(type);
if (cachedMethods != null) {
return cachedMethods;
}
// race-conditions are OK, because we just recreate the same thing.
ArrayList<Method> methods = getMethods(type);
// In situations where we want to pass in the Connection (to an RMI method), we have to be able to override method A, with method B.
// This is to support calling RMI methods from an interface (that does pass the connection reference) to
// an implType, that DOES pass the connection reference. The remote side (that initiates the RMI calls), MUST use
// the interface, and the implType may override the method, so that we add the connection as the first in
// the list of parameters.
//
// for example:
// Interface: foo(String x)
// Impl: foo(Connection c, String x)
//
// The implType (if it exists, with the same name, and with the same signature+connection) will be called from the interface.
// This MUST hold valid for both remote and local connection types.
// To facilitate this functionality, for methods with the same name, the "overriding" method is the one that inherits the Connection
// interface as the first parameter.
Map<Method, Method> overriddenMethods = getOverriddenMethods(type, methods);
MethodAccess methodAccess = null;
if (kryo.getAsmEnabled() && !Util.isAndroid && Modifier.isPublic(type.getModifiers())) {
methodAccess = MethodAccess.get(type);
}
int n = methods.size();
cachedMethods = new CachedMethod[n];
for (int i = 0; i < n; i++) {
Method origMethod = methods.get(i);
Method method = origMethod;
MethodAccess localMethodAccess = methodAccess;
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?>[] asmParameterTypes = parameterTypes;
Method overriddenMethod = overriddenMethods.remove(method);
boolean overridden = overriddenMethod != null;
if (overridden) {
// we can override the details of this method BECAUSE (and only because) our kryo registration override will return
// the correct object for this overridden method to be called on.
method = overriddenMethod;
Class<?> overrideType = method.getDeclaringClass();
if (kryo.getAsmEnabled() && !Util.isAndroid && Modifier.isPublic(overrideType.getModifiers())) {
localMethodAccess = MethodAccess.get(overrideType);
asmParameterTypes = method.getParameterTypes();
}
}
CachedMethod cachedMethod = null;
if (localMethodAccess != null) {
try {
AsmCachedMethod asmCachedMethod = new AsmCachedMethod();
asmCachedMethod.methodAccessIndex = localMethodAccess.getIndex(method.getName(), asmParameterTypes);
asmCachedMethod.methodAccess = localMethodAccess;
cachedMethod = asmCachedMethod;
} catch (RuntimeException ignored) {
ignored.printStackTrace();
}
}
if (cachedMethod == null) {
cachedMethod = new CachedMethod();
}
cachedMethod.method = method;
cachedMethod.origMethod = origMethod;
cachedMethod.methodClassID = kryo.getRegistration(method.getDeclaringClass()).getId();
cachedMethod.methodIndex = i;
// Store the serializer for each final parameter.
// ONLY for the ORIGINAL method, not he overridden one.
cachedMethod.serializers = new Serializer<?>[parameterTypes.length];
for (int ii = 0, nn = parameterTypes.length; ii < nn; ii++) {
if (kryo.isFinal(parameterTypes[ii])) {
cachedMethod.serializers[ii] = kryo.getSerializer(parameterTypes[ii]);
}
}
cachedMethods[i] = cachedMethod;
}
methodCache.put(type, cachedMethods);
return cachedMethods;
}
private static
Map<Method, Method> getOverriddenMethods(final Class<?> type, final ArrayList<Method> origMethods) {
final Class<?> implType = overriddenMethods.get(type);
if (implType != null) {
ArrayList<Method> implMethods = getMethods(implType);
HashMap<Method, Method> overrideMap = new HashMap<Method, Method>(implMethods.size());
for (Method origMethod : origMethods) {
String name = origMethod.getName();
Class<?>[] types = origMethod.getParameterTypes();
int modLength = types.length + 1;
METHOD_CHECK:
for (Method implMethod : implMethods) {
String checkName = implMethod.getName();
Class<?>[] checkTypes = implMethod.getParameterTypes();
int checkLength = checkTypes.length;
if (modLength != checkLength || !(name.equals(checkName))) {
continue;
}
// checkLength > 0
Class<?> checkType = checkTypes[0];
if (ClassHelper.hasInterface(dorkbox.network.connection.Connection.class, checkType)) {
// now we check to see if our "check" method is equal to our "cached" method + Connection
for (int k = 1; k < checkLength; k++) {
if (types[k-1] == checkTypes[k]) {
overrideMap.put(origMethod, implMethod);
break METHOD_CHECK;
}
}
}
}
}
return overrideMap;
} else {
return new HashMap<Method, Method>(0);
}
}
private static
ArrayList<Method> getMethods(final Class<?> type) {
ArrayList<Method> allMethods = new ArrayList<Method>();
Class<?> nextClass = type;
while (nextClass != null) {
Collections.addAll(allMethods, nextClass.getDeclaredMethods());
nextClass = nextClass.getSuperclass();
if (nextClass == Object.class) {
break;
}
}
ArrayList<Method> methods = new ArrayList<Method>(Math.max(1, allMethods.size()));
for (int i = 0, n = allMethods.size(); i < n; i++) {
Method method = allMethods.get(i);
int modifiers = method.getModifiers();
if (Modifier.isStatic(modifiers)) {
continue;
}
if (Modifier.isPrivate(modifiers)) {
continue;
}
if (method.isSynthetic()) {
continue;
}
methods.add(method);
}
Collections.sort(methods, new Comparator<Method>() {
@Override
public
int compare(Method o1, Method o2) {
// Methods are sorted so they can be represented as an index.
int diff = o1.getName()
.compareTo(o2.getName());
if (diff != 0) {
return diff;
}
Class<?>[] argTypes1 = o1.getParameterTypes();
Class<?>[] argTypes2 = o2.getParameterTypes();
if (argTypes1.length > argTypes2.length) {
return 1;
}
if (argTypes1.length < argTypes2.length) {
return -1;
}
for (int i = 0; i < argTypes1.length; i++) {
diff = argTypes1[i].getName()
.compareTo(argTypes2[i].getName());
if (diff != 0) {
return diff;
}
}
throw new RuntimeException("Two methods with same signature!"); // Impossible.
}
});
return methods;
}
public Method method;
public int methodClassID;
public int methodIndex;
/**
* in some cases, we want to override the cached method, with one that supports passing 'Connection' as the first argument.
* This is completely OPTIONAL, however - greatly adds functionality to RMI methods.
*/
public transient Method origMethod;
@SuppressWarnings("rawtypes")
public Serializer[] serializers;
public
Object invoke(Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
return this.method.invoke(target, args);
Object invoke(final Connection connection, Object target, Object[] args) throws IllegalAccessException, InvocationTargetException {
// did we override our cached method?
if (origMethod == null) {
return this.method.invoke(target, args);
} else {
int length = args.length;
Object[] newArgs = new Object[length + 1];
newArgs[0] = connection;
System.arraycopy(args, 0, newArgs, 1, length);
return this.method.invoke(target, newArgs);
}
}
/**
* Called by the SerializationManager, so that RMI classes that are overridden for serialization purposes, can check to see if
* certain methods need to be overridden.
*/
public static
void registerOverridden(final Class<?> ifaceClass, final Class<?> implClass) {
overriddenMethods.put(ifaceClass, implClass);
}
}

View File

@ -47,7 +47,6 @@ class InvokeMethod implements RmiMessages {
// possible duplicate IDs. A response data of 0 means to not respond.
public byte responseData;
public InvokeMethod() {
}
}

View File

@ -1,27 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
import dorkbox.util.objectPool.PoolableObject;
public
class InvokeMethodPoolable implements PoolableObject<InvokeMethod> {
@Override
public
InvokeMethod create() {
return new InvokeMethod();
}
}

View File

@ -39,7 +39,6 @@ import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import dorkbox.network.connection.KryoExtra;
/**
* Internal message to invoke methods remotely.
@ -61,7 +60,9 @@ class InvokeMethodSerializer extends Serializer<InvokeMethod> {
Serializer[] serializers = object.cachedMethod.serializers;
Object[] args = object.args;
for (int i = 0, n = serializers.length; i < n; i++) {
int i = 0, n = serializers.length;
for (; i < n; i++) {
Serializer serializer = serializers[i];
if (serializer != null) {
kryo.writeObjectOrNull(output, args[i], serializer);
@ -87,15 +88,14 @@ class InvokeMethodSerializer extends Serializer<InvokeMethod> {
byte methodIndex = input.readByte();
try {
KryoExtra kryoExtra = (KryoExtra) kryo;
invokeMethod.cachedMethod = kryoExtra.getMethods(methodClass)[methodIndex];
invokeMethod.cachedMethod = CachedMethod.getMethods(kryo, methodClass)[methodIndex];
} catch (IndexOutOfBoundsException ex) {
throw new KryoException("Invalid method index " + methodIndex + " for class: " + methodClass.getName());
}
Serializer<?>[] serializers = invokeMethod.cachedMethod.serializers;
Class<?>[] parameterTypes = invokeMethod.cachedMethod.method.getParameterTypes();
CachedMethod cachedMethod = invokeMethod.cachedMethod;
Serializer<?>[] serializers = cachedMethod.serializers;
Class<?>[] parameterTypes = cachedMethod.method.getParameterTypes();
Object[] args = new Object[serializers.length];
invokeMethod.args = args;
for (int i = 0, n = args.length; i < n; i++) {

View File

@ -17,8 +17,21 @@ package dorkbox.network.rmi;
import java.lang.annotation.*;
/**
* This specifies to the serializer, that this field is an RMI object.
* <p/>
* Additional behavior of RMI methods, is if there is another method (of the same name and signature), with the addition of a
* Connection parameter in the first position, THAT method will be called instead, an will have the current connection object passed
* into the method.
* <p/>
* It is mandatory for the correct implementation (as per the interface guideline) to exist, and should return null.
* <p/>
* IE: foo(String something)... -> foo(Connection connection, String something)....
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Inherited
@Target(value = {ElementType.FIELD, ElementType.ANNOTATION_TYPE})
@Target(value = {ElementType.FIELD})
public
@interface RemoteProxy {}
@interface RMI {}

View File

@ -35,16 +35,15 @@
package dorkbox.network.rmi;
import com.esotericsoftware.kryo.Kryo;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.KryoExtra;
import dorkbox.network.connection.ListenerRaw;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.objectPool.ObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Arrays;
@ -82,12 +81,9 @@ class RemoteInvocationHandler implements InvocationHandler {
final InvokeMethodResult[] responseTable = new InvokeMethodResult[64];
final boolean[] pendingResponses = new boolean[64];
private final ObjectPool<InvokeMethod> invokeMethodPool;
public
RemoteInvocationHandler(ObjectPool<InvokeMethod> invokeMethodPool, Connection connection, final int objectID) {
RemoteInvocationHandler(Connection connection, final int objectID) {
super();
this.invokeMethodPool = invokeMethodPool;
this.connection = connection;
this.objectID = objectID;
@ -129,6 +125,7 @@ class RemoteInvocationHandler implements InvocationHandler {
.add(this.responseListener);
}
@SuppressWarnings({"AutoUnboxing", "AutoBoxing"})
@Override
public
Object invoke(Object proxy, Method method, Object[] args) throws Exception {
@ -189,41 +186,62 @@ class RemoteInvocationHandler implements InvocationHandler {
return this.connection;
}
// Should never happen, for debugging purposes only
throw new Exception("Invocation handler could not find RemoteObject method. Check ObjectSpace.java");
throw new Exception("Invocation handler could not find RemoteObject method.");
}
else if (!this.remoteToString && declaringClass == Object.class && method.getName()
.equals("toString")) {
return "<proxy>";
}
final Logger logger1 = RemoteInvocationHandler.logger;
EndPoint endPoint = this.connection.getEndPoint();
EndPoint<Connection> endPoint = this.connection.getEndPoint();
final CryptoSerializationManager serializationManager = endPoint.getSerialization();
InvokeMethod invokeMethod = this.invokeMethodPool.take();
InvokeMethod invokeMethod = new InvokeMethod();
invokeMethod.objectID = this.objectID;
invokeMethod.args = args;
final CryptoSerializationManager serializationManager = endPoint.getSerialization();
// thread safe access.
final KryoExtra kryo = (KryoExtra) serializationManager.take();
final Kryo kryo = serializationManager.take();
if (kryo == null) {
String msg = "Interrupted during kryo pool.take()";
logger.error(msg);
logger1.error(msg);
return msg;
}
CachedMethod[] cachedMethods = kryo.getMethods(method.getDeclaringClass());
// which method do we access?
CachedMethod[] cachedMethods = CachedMethod.getMethods(kryo, method.getDeclaringClass());
serializationManager.release(kryo);
for (int i = 0, n = cachedMethods.length; i < n; i++) {
CachedMethod cachedMethod = cachedMethods[i];
if (cachedMethod.method.equals(method)) {
Method checkMethod = cachedMethod.origMethod;
if (checkMethod == null) {
checkMethod = cachedMethod.method;
}
// In situations where we want to pass in the Connection (to an RMI method), we have to be able to override method A, with method B.
// This is to support calling RMI methods from an interface (that does pass the connection reference) to
// an implementation, that DOES pass the connection reference. The remote side (that initiates the RMI calls), MUST use
// the interface, and the implementation may override the method, so that we add the connection as the first in
// the list of parameters.
//
// for example:
// Interface: foo(String x)
// Impl: foo(Connection c, String x)
//
// The implementation (if it exists, with the same name, and with the same signature+connection) will be called from the interface.
// This MUST hold valid for both remote and local connection types.
if (checkMethod.equals(method)) {
invokeMethod.cachedMethod = cachedMethod;
break;
}
}
if (invokeMethod.cachedMethod == null) {
String msg = "Method not found: " + method;
logger.error(msg);
logger1.error(msg);
return msg;
}
@ -270,21 +288,22 @@ class RemoteInvocationHandler implements InvocationHandler {
.flush();
}
if (logger.isDebugEnabled()) {
if (logger1.isTraceEnabled()) {
String argString = "";
if (args != null) {
argString = Arrays.deepToString(args);
argString = argString.substring(1, argString.length() - 1);
}
logger.debug(this.connection + " sent: " + method.getDeclaringClass()
logger1.trace(this.connection + " sent: " + method.getDeclaringClass()
.getSimpleName() +
"#" + method.getName() + "(" + argString + ")");
}
this.lastResponseID = (byte) (invokeMethod.responseData & RmiBridge.responseIdMask);
this.invokeMethodPool.release(invokeMethod);
if (this.nonBlocking || this.udp) {
if (this.nonBlocking || this.udp || this.udt) {
Class<?> returnType = method.getReturnType();
if (returnType.isPrimitive()) {
if (returnType == int.class) {
@ -334,12 +353,16 @@ class RemoteInvocationHandler implements InvocationHandler {
}
}
/**
* A timeout of 0 means that we want to disable waiting, otherwise - it waits in milliseconds
*/
private
Object waitForResponse(byte responseID) {
Object waitForResponse(byte responseID) throws IOException {
long endTime = System.currentTimeMillis() + this.timeoutMillis;
long remaining = this.timeoutMillis;
while (remaining > 0) {
if (remaining == 0) {
// just wait however log it takes.
InvokeMethodResult invokeMethodResult;
synchronized (this) {
invokeMethodResult = this.responseTable[responseID];
@ -352,18 +375,54 @@ class RemoteInvocationHandler implements InvocationHandler {
else {
this.lock.lock();
try {
this.responseCondition.await(remaining, TimeUnit.MILLISECONDS);
this.responseCondition.await();
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
throw new NetException(e);
throw new IOException("Response timed out.", e);
} finally {
this.lock.unlock();
}
}
remaining = endTime - System.currentTimeMillis();
synchronized (this) {
invokeMethodResult = this.responseTable[responseID];
}
if (invokeMethodResult != null) {
this.lastResponseID = null;
return invokeMethodResult.result;
}
}
else {
// wait for the specified time
while (remaining > 0) {
InvokeMethodResult invokeMethodResult;
synchronized (this) {
invokeMethodResult = this.responseTable[responseID];
}
if (invokeMethodResult != null) {
this.lastResponseID = null;
return invokeMethodResult.result;
}
else {
this.lock.lock();
try {
this.responseCondition.await(remaining, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
throw new IOException("Response timed out.", e);
} finally {
this.lock.unlock();
}
}
remaining = endTime - System.currentTimeMillis();
}
}
// only get here if we timeout
throw new TimeoutException("Response timed out.");

View File

@ -44,7 +44,7 @@ import dorkbox.network.connection.Connection;
public
interface RemoteObject {
/**
* Sets the milliseconds to wait for a method to return value. Default is 3000.
* Sets the milliseconds to wait for a method to return value. Default is 3000, 0 disables (ie: waits forever)
*/
void setResponseTimeout(int timeoutMillis);

View File

@ -40,7 +40,6 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.KryoExtra;
import dorkbox.util.exceptions.NetException;
/**
* Serializes an object registered with the RmiBridge so the receiving side
@ -62,7 +61,7 @@ class RemoteObjectSerializer<T> extends Serializer<T> {
KryoExtra kryoExtra = (KryoExtra) kryo;
int id = kryoExtra.connection.getRegisteredId(object);
if (id == Integer.MAX_VALUE) {
throw new NetException("Object not found in an ObjectSpace: " + object);
throw new RuntimeException("Object not found in RMI objectSpace: " + object);
}
output.writeInt(id, true);

View File

@ -34,6 +34,15 @@
*/
package dorkbox.network.rmi;
import com.esotericsoftware.kryo.util.IntMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ListenerRaw;
import dorkbox.util.collections.ObjectIntMap;
import org.slf4j.Logger;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.concurrent.Executor;
@ -42,29 +51,31 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IntMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ListenerRaw;
import dorkbox.util.collections.ObjectIntMap;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.objectPool.ObjectPool;
import dorkbox.util.objectPool.ObjectPoolFactory;
/**
* Allows methods on objects to be invoked remotely over TCP, UDP, or UDT. Objects are
* {@link dorkbox.network.util.RMISerializationManager#registerRemote(Class, Class)}, and endpoint connections
* can then {@link Connection#createRemoteObject(Class, Class)} for the registered objects.
* can then {@link Connection#createRemoteObject(Class)} for the registered objects.
* <p/>
* It costs at least 2 bytes more to use remote method invocation than just
* sending the parameters. If the method has a return value which is not
* {@link RemoteObject#setNonBlocking(boolean) ignored}, an extra byte is
* written. If the type of a parameter is not final (note that primitives are final)
* then an extra byte is written for that parameter.
* <p/>
* <p/>
* In situations where we want to pass in the Connection (to an RMI method), we have to be able to override method A, with method B.
* <p/>
* This is to support calling RMI methods from an interface (that does pass the connection reference) to
* an implementation, that DOES pass the connection reference. The remote side (that initiates the RMI calls), MUST use
* the interface, and the implementation may override the method, so that we add the connection as the first in
* the list of parameters.
* <p/>
* for example:
* Interface: foo(String x)
* Impl: foo(Connection c, String x)
* <p/>
* The implementation (if it exists, with the same name, and with the same signature+connection) will be called from the interface.
* This MUST hold valid for both remote and local connection types.
*
* @author Nathan Sweet <misc@n4te.com>, Nathan Robinson
*/
@ -101,16 +112,16 @@ class RmiBridge {
private final Executor executor;
// 4096 concurrent method invocations max
private static final ObjectPool<InvokeMethod> invokeMethodPool = ObjectPoolFactory.create(new InvokeMethodPoolable(), 4096);
private final ListenerRaw<ConnectionImpl, InvokeMethod> invokeListener = new ListenerRaw<ConnectionImpl, InvokeMethod>() {
@SuppressWarnings("AutoBoxing")
@Override
public
void received(final ConnectionImpl connection, final InvokeMethod invokeMethod) {
int objectID = invokeMethod.objectID;
// have to make sure to get the correct object (global vs local)
// This is what is overridden when registering interfaces/classes for RMI.
// objectID is the interface ID, and this returns the implementation ID.
final Object target = connection.getRegisteredObject(objectID);
if (target == null) {
@ -124,20 +135,60 @@ class RmiBridge {
Executor executor2 = RmiBridge.this.executor;
if (executor2 == null) {
invoke(connection, target, invokeMethod);
try {
invoke(connection, target, invokeMethod);
} catch (IOException e) {
logger.error("Unable to invoke method.", e);
}
}
else {
executor2.execute(new Runnable() {
@Override
public
void run() {
invoke(connection, target, invokeMethod);
try {
invoke(connection, target, invokeMethod);
} catch (IOException e) {
logger.error("Unable to invoke method.", e);
}
}
});
}
}
};
//for (int i = 0; i < cachedMethods.length; i++) {
// Method cachedMethod = cachedMethods[i].method;
// String name = cachedMethod.getName();
// Class<?>[] types = cachedMethod.getParameterTypes();
// int modLength = types.length + 1;
//
// for (int j = i+1; j < cachedMethods.length; j++) {
// Method checkMethod = cachedMethods[j].method;
// String checkName = checkMethod.getName();
// Class<?>[] checkTypes = cachedMethod.getParameterTypes();
// int checkLength = checkTypes.length;
//
// if (modLength != checkLength || !(name.equals(checkName))) {
// break;
// }
//
// // checkLength > 0
// Class<?> checkType = checkTypes[0];
// if (!checkType.isAssignableFrom(com.sun.jdi.connect.spi.Connection.class)) {
// break;
// }
//
// // now we check to see if our "check" method is equal to our "cached" method + Connection
//
// }
//}
//
/**
* Creates an RmiBridge with no connections. Connections must be
* {@link RmiBridge#register(int, Object)} added to allow the remote end of
@ -179,7 +230,9 @@ class RmiBridge {
* @param connection The remote side of this connection requested the invocation.
*/
protected
void invoke(Connection connection, Object target, InvokeMethod invokeMethod) {
void invoke(final Connection connection, final Object target, final InvokeMethod invokeMethod) throws IOException {
CachedMethod cachedMethod = invokeMethod.cachedMethod;
Logger logger2 = this.logger;
if (logger2.isDebugEnabled()) {
String argString = "";
@ -197,29 +250,25 @@ class RmiBridge {
stringBuilder.append(":")
.append(invokeMethod.objectID);
stringBuilder.append("#")
.append(invokeMethod.cachedMethod.method.getName());
.append(cachedMethod.method.getName());
stringBuilder.append("(")
.append(argString)
.append(")");
if (cachedMethod.origMethod != null) {
stringBuilder.append(" [Connection param override]");
}
logger2.debug(stringBuilder.toString());
}
byte responseData = invokeMethod.responseData;
boolean transmitReturnVal = (responseData & returnValueMask) == returnValueMask;
boolean transmitExceptions = (responseData & returnExceptionMask) == returnExceptionMask;
int responseID = responseData & responseIdMask;
Object result;
CachedMethod cachedMethod = invokeMethod.cachedMethod;
// we have to provide access to the connection (since the RMI-server is generally going to keep the state in
// the connection object.
connection.getEndPoint()
.setCurrentConnection(connection);
try {
result = cachedMethod.invoke(target, invokeMethod.args);
result = cachedMethod.invoke(connection, target, invokeMethod.args);
} catch (Exception ex) {
if (transmitExceptions) {
Throwable cause = ex.getCause();
@ -236,8 +285,8 @@ class RmiBridge {
result = cause;
}
else {
throw new NetException("Error invoking method: " + cachedMethod.method.getDeclaringClass()
.getName() + "." + cachedMethod.method.getName(), ex);
throw new IOException("Error invoking method: " + cachedMethod.method.getDeclaringClass()
.getName() + "." + cachedMethod.method.getName(), ex);
}
}
@ -275,7 +324,7 @@ class RmiBridge {
int value = rmiObjectIdCounter.getAndAdd(2);
if (value > MAX_RMI_VALUE) {
rmiObjectIdCounter.set(MAX_RMI_VALUE); // prevent wrapping by spammy callers
throw new NetException("RMI next value has exceeded maximum limits.");
logger.error("RMI next value has exceeded maximum limits in RmiBridge!");
}
return value;
}
@ -286,6 +335,7 @@ class RmiBridge {
*
* @param objectID Must not be Integer.MAX_VALUE.
*/
@SuppressWarnings("AutoBoxing")
public
void register(int objectID, Object object) {
if (objectID == Integer.MAX_VALUE) {
@ -307,12 +357,12 @@ class RmiBridge {
if (logger2.isTraceEnabled()) {
logger2.trace("Object registered with ObjectSpace as {}:{}", objectID, object);
}
logger2.info("Object registered with ObjectSpace as {}:{}", objectID, object);
}
/**
* Removes an object. The remote end of the RmiBridge connection will no longer be able to access it.
*/
@SuppressWarnings("AutoBoxing")
public
void remove(int objectID) {
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
@ -334,6 +384,7 @@ class RmiBridge {
/**
* Removes an object. The remote end of the RmiBridge connection will no longer be able to access it.
*/
@SuppressWarnings("AutoBoxing")
public
void remove(Object object) {
WriteLock writeLock = RmiBridge.this.objectLock.writeLock();
@ -395,7 +446,7 @@ class RmiBridge {
return (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(),
temp,
new RemoteInvocationHandler(invokeMethodPool, connection, objectID));
new RemoteInvocationHandler(connection, objectID));
}
/**

View File

@ -34,13 +34,13 @@
*/
package dorkbox.network.rmi;
import dorkbox.util.exceptions.NetException;
import java.io.IOException;
/** Thrown when a method with a return value is invoked on a remote object and the response is not received with the
* {@link RemoteObject#setResponseTimeout(int) response timeout}.
* @see dorkbox.network.connection.Connection#createRemoteObject(Class, Class)
* @author Nathan Sweet <misc@n4te.com> */
public class TimeoutException extends NetException {
public class TimeoutException extends IOException {
private static final long serialVersionUID = -3526277240277423682L;
public TimeoutException () {

View File

@ -19,6 +19,8 @@ import dorkbox.network.connection.ConnectionImpl;
import dorkbox.util.SerializationManager;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
/**
* Threads reading/writing, it messes up a single instance.
* it is possible to use a single kryo with the use of synchronize, however - that defeats the point of multi-threaded
@ -27,24 +29,18 @@ public
interface CryptoSerializationManager extends SerializationManager, RMISerializationManager {
/**
* Determines if this buffer is encrypted or not.
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
boolean isEncrypted(ByteBuf buffer);
void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException;
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message);
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message);
void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException;
/**
* Reads an object from the buffer.
@ -54,7 +50,7 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat
* @param connection can be NULL
* @param length should ALWAYS be the length of the expected object!
*/
Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length);
Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException;
/**
* Reads an object from the buffer.
@ -64,5 +60,5 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat
* @param connection can be NULL
* @param length should ALWAYS be the length of the expected object!
*/
Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length);
Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException;
}

View File

@ -28,7 +28,7 @@ public class ChunkedDataIdleTest extends BaseTest {
// have to test sending objects
@Test
public void ObjectSender() throws InitializationException, SecurityException, IOException {
public void ObjectSender() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -62,7 +62,7 @@ public class ChunkedDataIdleTest extends BaseTest {
private void sendObject(final Data mainData, Configuration configuration, final ConnectionType type)
throws InitializationException, SecurityException, IOException {
throws InitializationException, SecurityException, IOException, InterruptedException {
Server server = new Server(configuration);
server.disableRemoteKeyValidation();

View File

@ -21,7 +21,7 @@ class ClientSendTest extends BaseTest {
@Test
public
void sendDataFromClientClass() throws InitializationException, SecurityException, IOException {
void sendDataFromClientClass() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);

View File

@ -20,7 +20,7 @@ class ConnectionTest extends BaseTest {
@Test
public
void connectLocal() throws InitializationException, SecurityException, IOException {
void connectLocal() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -37,7 +37,7 @@ class ConnectionTest extends BaseTest {
@Test
public
void connectTcp() throws InitializationException, SecurityException, IOException {
void connectTcp() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -56,7 +56,7 @@ class ConnectionTest extends BaseTest {
@Test
public
void connectTcpUdp() throws InitializationException, SecurityException, IOException {
void connectTcpUdp() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -76,7 +76,7 @@ class ConnectionTest extends BaseTest {
@Test
public
void connectTcpUdt() throws InitializationException, SecurityException, IOException {
void connectTcpUdt() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -96,7 +96,7 @@ class ConnectionTest extends BaseTest {
@Test
public
void connectTcpUdpUdt() throws InitializationException, SecurityException, IOException {
void connectTcpUdpUdt() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -151,7 +151,7 @@ class ConnectionTest extends BaseTest {
}
private
Client startClient(Configuration configuration) throws InitializationException, SecurityException, IOException {
Client startClient(Configuration configuration) throws InitializationException, SecurityException, IOException, InterruptedException {
Client client;
if (configuration != null) {
client = new Client(configuration);

View File

@ -17,7 +17,7 @@ class DiscoverHostTest extends BaseTest {
@Test
public
void broadcast() throws InitializationException, SecurityException, IOException {
void broadcast() throws InitializationException, SecurityException, IOException, InterruptedException {
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;

View File

@ -32,7 +32,7 @@ class IdleTest extends BaseTest {
@Test
public
void InputStreamSender() throws InitializationException, SecurityException, IOException {
void InputStreamSender() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(false, false);
final int largeDataSize = 12345;
@ -65,7 +65,7 @@ class IdleTest extends BaseTest {
// have to test sending objects
@Test
public
void ObjectSender() throws InitializationException, SecurityException, IOException {
void ObjectSender() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -100,7 +100,7 @@ class IdleTest extends BaseTest {
private
void sendObject(final Data mainData, Configuration configuration, final ConnectionType type)
throws InitializationException, SecurityException, IOException {
throws InitializationException, SecurityException, IOException, InterruptedException {
Server server = new Server(configuration);
server.disableRemoteKeyValidation();
@ -160,7 +160,7 @@ class IdleTest extends BaseTest {
private
void streamSpecificType(final int largeDataSize, Configuration configuration, final ConnectionType type)
throws InitializationException, SecurityException, IOException {
throws InitializationException, SecurityException, IOException, InterruptedException {
Server server = new Server(configuration);
server.disableRemoteKeyValidation();
addEndPoint(server);

View File

@ -25,7 +25,7 @@ class LargeBufferTest extends BaseTest {
@Test
public
void manyLargeMessages() throws InitializationException, SecurityException, IOException {
void manyLargeMessages() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);

View File

@ -61,7 +61,7 @@ class ListenerTest extends BaseTest {
@SuppressWarnings("rawtypes")
@Test
public
void listener() throws SecurityException, InitializationException, IOException {
void listener() throws SecurityException, InitializationException, IOException, InterruptedException {
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;

View File

@ -18,7 +18,7 @@ class MultipleServerTest extends BaseTest {
@Test
public
void multipleServers() throws InitializationException, SecurityException, IOException {
void multipleServers() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
KryoCryptoSerializationManager.DEFAULT.register(String[].class);

View File

@ -37,7 +37,7 @@ class MultipleThreadTest extends BaseTest {
@Test
public
void multipleThreads() throws InitializationException, SecurityException, IOException {
void multipleThreads() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
KryoCryptoSerializationManager.DEFAULT.register(String[].class);
KryoCryptoSerializationManager.DEFAULT.register(DataClass.class);

View File

@ -2,6 +2,7 @@ package dorkbox.network;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.Listener;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.InitializationException;
@ -19,7 +20,10 @@ public class PingPongLocalTest extends BaseTest {
int tries = 10000;
@Test
public void pingPongLocal() throws InitializationException, SecurityException, IOException {
public void pingPongLocal() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
this.fail = "Data not received.";
final Data dataLOCAL = new Data();
@ -28,7 +32,6 @@ public class PingPongLocalTest extends BaseTest {
Server server = new Server();
server.disableRemoteKeyValidation();
addEndPoint(server);
register(server.getSerialization());
server.bind(false);
server.listeners().add(new Listener<Data>() {
@Override
@ -52,7 +55,6 @@ public class PingPongLocalTest extends BaseTest {
Client client = new Client();
client.disableRemoteKeyValidation();
addEndPoint(client);
register(client.getSerialization());
client.listeners().add(new Listener<Data>() {
AtomicInteger check = new AtomicInteger(0);

View File

@ -30,7 +30,7 @@ class PingPongTest extends BaseTest {
@Test
public
void pingPong() throws InitializationException, SecurityException, IOException {
void pingPong() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);

View File

@ -21,7 +21,7 @@ class PingTest extends BaseTest {
// ping prefers the following order: UDP, UDT, TCP
@Test
public
void pingTCP() throws InitializationException, SecurityException, IOException {
void pingTCP() throws InitializationException, SecurityException, IOException, InterruptedException {
this.response = -1;
Configuration configuration = new Configuration();
@ -57,7 +57,7 @@ class PingTest extends BaseTest {
@Test
public
void pingTCP_testListeners1() throws InitializationException, SecurityException, IOException {
void pingTCP_testListeners1() throws InitializationException, SecurityException, IOException, InterruptedException {
this.response = -1;
Configuration configuration = new Configuration();
@ -114,7 +114,7 @@ class PingTest extends BaseTest {
@Test
public
void pingTCP_testListeners2() throws InitializationException, SecurityException, IOException {
void pingTCP_testListeners2() throws InitializationException, SecurityException, IOException, InterruptedException {
this.response = -1;
Configuration configuration = new Configuration();
@ -163,7 +163,7 @@ class PingTest extends BaseTest {
// ping prefers the following order: UDP, UDT, TCP
@Test
public
void pingUDP() throws InitializationException, SecurityException, IOException {
void pingUDP() throws InitializationException, SecurityException, IOException, InterruptedException {
this.response = -1;
Configuration configuration = new Configuration();
@ -204,7 +204,7 @@ class PingTest extends BaseTest {
// ping prefers the following order: UDP, UDT, TCP
@Test
public
void pingUDT() throws InitializationException, SecurityException, IOException {
void pingUDT() throws InitializationException, SecurityException, IOException, InterruptedException {
this.response = -1;
Configuration configuration = new Configuration();

View File

@ -19,7 +19,7 @@ class ReconnectTest extends BaseTest {
@Test
public
void reconnect() throws InitializationException, SecurityException, IOException {
void reconnect() throws InitializationException, SecurityException, IOException, InterruptedException {
final Timer timer = new Timer();
Configuration configuration = new Configuration();
@ -67,7 +67,11 @@ class ReconnectTest extends BaseTest {
public
void run() {
System.out.println("Reconnecting: " + reconnectCount.get());
client.reconnect();
try {
client.reconnect();
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}

View File

@ -19,7 +19,7 @@ class ReuseTest extends BaseTest {
@Test
public
void socketReuse() throws InitializationException, SecurityException, IOException {
void socketReuse() throws InitializationException, SecurityException, IOException, InterruptedException {
this.serverCount = new AtomicInteger(0);
this.clientCount = new AtomicInteger(0);
@ -99,7 +99,7 @@ class ReuseTest extends BaseTest {
@Test
public
void localReuse() throws InitializationException, SecurityException, IOException {
void localReuse() throws InitializationException, SecurityException, IOException, InterruptedException {
this.serverCount = new AtomicInteger(0);
this.clientCount = new AtomicInteger(0);

View File

@ -26,7 +26,7 @@ class UnregisteredClassTest extends BaseTest {
@Test
public
void unregisteredClasses() throws InitializationException, SecurityException, IOException {
void unregisteredClasses() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT(false, false);
int origSize = EndPoint.udpMaxSize;

View File

@ -17,8 +17,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
public
class RmiGlobalTest extends BaseTest {
@ -26,8 +25,8 @@ class RmiGlobalTest extends BaseTest {
private int CLIENT_GLOBAL_OBJECT_ID = 0;
private int SERVER_GLOBAL_OBJECT_ID = 0;
private TestObject globalRemoteServerObject = new TestObjectImpl();
private TestObject globalRemoteClientObject = new TestObjectImpl();
private final TestObject globalRemoteServerObject = new TestObjectImpl();
private final TestObject globalRemoteClientObject = new TestObjectImpl();
private static
void runTest(final Connection connection, final Object remoteObject, final int remoteObjectID) {
@ -35,107 +34,112 @@ class RmiGlobalTest extends BaseTest {
@Override
public
void run() {
TestObject test = connection.getRemoteObject(remoteObjectID);
System.err.println("Starting test for: " + remoteObjectID);
//TestObject test = connection.getRemoteObject(id, TestObject.class);
assertEquals(remoteObject.hashCode(), test.hashCode());
RemoteObject remoteObject = (RemoteObject) test;
// Default behavior. RMI is transparent, method calls behave like normal
// (return values and exceptions are returned, call is synchronous)
System.err.println("hashCode: " + test.hashCode());
System.err.println("toString: " + test);
test.moo();
test.moo("Cow");
assertEquals(remoteObjectID, test.id());
// UDP calls that ignore the return value
remoteObject.setUDP(true);
test.moo("Meow");
assertEquals(0, test.id());
remoteObject.setUDP(false);
// Test that RMI correctly waits for the remotely invoked method to exit
remoteObject.setResponseTimeout(5000);
test.moo("You should see this two seconds before...", 2000);
System.out.println("...This");
remoteObject.setResponseTimeout(3000);
// Try exception handling
boolean caught = false;
try {
TestObject test = connection.getRemoteObject(remoteObjectID);
System.err.println("Starting test for: " + remoteObjectID);
//TestObject test = connection.getRemoteObject(id, TestObject.class);
assertEquals(remoteObject.hashCode(), test.hashCode());
RemoteObject remoteObject = (RemoteObject) test;
// Default behavior. RMI is transparent, method calls behave like normal
// (return values and exceptions are returned, call is synchronous)
System.err.println("hashCode: " + test.hashCode());
System.err.println("toString: " + test);
test.moo();
test.moo("Cow");
assertEquals(remoteObjectID, test.id());
// UDP calls that ignore the return value
remoteObject.setUDP(true);
test.moo("Meow");
assertEquals(0, test.id());
remoteObject.setUDP(false);
// Test that RMI correctly waits for the remotely invoked method to exit
remoteObject.setResponseTimeout(5000);
test.moo("You should see this two seconds before...", 2000);
System.out.println("...This");
remoteObject.setResponseTimeout(3000);
// Try exception handling
boolean caught = false;
try {
test.throwException();
} catch (UnsupportedOperationException ex) {
System.err.println("\tExpected.");
caught = true;
}
assertTrue(caught);
// Return values are ignored, but exceptions are still dealt with properly
remoteObject.setTransmitReturnValue(false);
test.moo("Baa");
test.id();
caught = false;
try {
test.throwException();
} catch (UnsupportedOperationException ex) {
caught = true;
}
assertTrue(caught);
// Non-blocking call that ignores the return value
remoteObject.setNonBlocking(true);
remoteObject.setTransmitReturnValue(false);
test.moo("Meow");
assertEquals(0, test.id());
// Non-blocking call that returns the return value
remoteObject.setTransmitReturnValue(true);
test.moo("Foo");
assertEquals(0, test.id());
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForLastResponse());
assertEquals(0, test.id());
byte responseID = remoteObject.getLastResponseID();
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID));
// Non-blocking call that errors out
remoteObject.setTransmitReturnValue(false);
test.throwException();
} catch (UnsupportedOperationException ex) {
System.err.println("\tExpected.");
caught = true;
assertEquals(remoteObject.waitForLastResponse()
.getClass(), UnsupportedOperationException.class);
// Call will time out if non-blocking isn't working properly
remoteObject.setTransmitExceptions(false);
test.moo("Mooooooooo", 3000);
// should wait for a small time
remoteObject.setTransmitReturnValue(true);
remoteObject.setNonBlocking(false);
remoteObject.setResponseTimeout(6000);
System.out.println("You should see this 2 seconds before");
float slow = test.slow();
System.out.println("...This");
assertEquals(123f, slow, .0001D);
// Test sending a reference to a remote object.
MessageWithTestObject m = new MessageWithTestObject();
m.number = 678;
m.text = "sometext";
m.testObject = test;
connection.send()
.TCP(m)
.flush();
} catch (IOException e) {
e.printStackTrace();
fail();
}
assertTrue(caught);
// Return values are ignored, but exceptions are still dealt with properly
remoteObject.setTransmitReturnValue(false);
test.moo("Baa");
test.id();
caught = false;
try {
test.throwException();
} catch (UnsupportedOperationException ex) {
caught = true;
}
assertTrue(caught);
// Non-blocking call that ignores the return value
remoteObject.setNonBlocking(true);
remoteObject.setTransmitReturnValue(false);
test.moo("Meow");
assertEquals(0, test.id());
// Non-blocking call that returns the return value
remoteObject.setTransmitReturnValue(true);
test.moo("Foo");
assertEquals(0, test.id());
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForLastResponse());
assertEquals(0, test.id());
byte responseID = remoteObject.getLastResponseID();
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID));
// Non-blocking call that errors out
remoteObject.setTransmitReturnValue(false);
test.throwException();
assertEquals(remoteObject.waitForLastResponse()
.getClass(), UnsupportedOperationException.class);
// Call will time out if non-blocking isn't working properly
remoteObject.setTransmitExceptions(false);
test.moo("Mooooooooo", 3000);
// should wait for a small time
remoteObject.setTransmitReturnValue(true);
remoteObject.setNonBlocking(false);
remoteObject.setResponseTimeout(6000);
System.out.println("You should see this 2 seconds before");
float slow = test.slow();
System.out.println("...This");
assertEquals(123f, slow, .0001D);
// Test sending a reference to a remote object.
MessageWithTestObject m = new MessageWithTestObject();
m.number = 678;
m.text = "sometext";
m.testObject = test;
connection.send()
.TCP(m)
.flush();
}
}.start();
}
@ -154,7 +158,7 @@ class RmiGlobalTest extends BaseTest {
@Test
public
void rmi() throws InitializationException, SecurityException, IOException {
void rmi() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -169,8 +173,6 @@ class RmiGlobalTest extends BaseTest {
server.disableRemoteKeyValidation();
server.setIdleTimeout(0);
register(server.getSerialization());
// register this object as a global object that the client will get
SERVER_GLOBAL_OBJECT_ID = server.createGlobalObject(globalRemoteServerObject);

View File

@ -27,7 +27,7 @@ class RmiSendObjectTest extends BaseTest {
*/
@Test
public
void rmi() throws InitializationException, SecurityException, IOException {
void rmi() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
KryoCryptoSerializationManager.DEFAULT.registerRemote(TestObject.class, TestObjectImpl.class);
KryoCryptoSerializationManager.DEFAULT.registerRemote(OtherObject.class, OtherObjectImpl.class);
@ -77,23 +77,32 @@ class RmiSendObjectTest extends BaseTest {
@Override
public
void run() {
TestObject test = connection.createRemoteObject(TestObjectImpl.class);
test.setOther(43.21f);
// Normal remote method call.
assertEquals(43.21f, test.other(), .0001f);
// Make a remote method call that returns another remote proxy object.
OtherObject otherObject = test.getOtherObject();
// Normal remote method call on the second object.
otherObject.setValue(12.34f);
float value = otherObject.value();
assertEquals(12.34f, value, .0001f);
// When a remote proxy object is sent, the other side receives its actual remote object.
// we have to manually flush, since we are in a separate thread that does not auto-flush.
connection.send()
.TCP(otherObject)
.flush();
TestObject test = null;
try {
test = connection.createRemoteObject(TestObjectImpl.class);
test.setOther(43.21f);
// Normal remote method call.
assertEquals(43.21f, test.other(), .0001f);
// Make a remote method call that returns another remote proxy object.
OtherObject otherObject = test.getOtherObject();
// Normal remote method call on the second object.
otherObject.setValue(12.34f);
float value = otherObject.value();
assertEquals(12.34f, value, .0001f);
// When a remote proxy object is sent, the other side receives its actual remote object.
// we have to manually flush, since we are in a separate thread that does not auto-flush.
connection.send()
.TCP(otherObject)
.flush();
} catch (IOException e) {
e.printStackTrace();
fail();
}
}
}).start();
}
@ -129,7 +138,7 @@ class RmiSendObjectTest extends BaseTest {
@IgnoreSerialization
private final int ID = idCounter.getAndIncrement();
@RemoteProxy
@RMI
private final OtherObject otherObject = new OtherObjectImpl();
private float aFloat;

View File

@ -17,8 +17,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
public
class RmiTest extends BaseTest {
@ -29,104 +28,111 @@ class RmiTest extends BaseTest {
@Override
public
void run() {
TestObject test = connection.createRemoteObject(TestObjectImpl.class);
System.err.println("Starting test for: " + remoteObjectID);
//TestObject test = connection.getRemoteObject(id, TestObject.class);
RemoteObject remoteObject = (RemoteObject) test;
// Default behavior. RMI is transparent, method calls behave like normal
// (return values and exceptions are returned, call is synchronous)
System.err.println("hashCode: " + test.hashCode());
System.err.println("toString: " + test);
test.moo();
test.moo("Cow");
assertEquals(remoteObjectID, test.id());
// UDP calls that ignore the return value
remoteObject.setUDP(true);
test.moo("Meow");
assertEquals(0, test.id());
remoteObject.setUDP(false);
// Test that RMI correctly waits for the remotely invoked method to exit
remoteObject.setResponseTimeout(5000);
test.moo("You should see this two seconds before...", 2000);
System.out.println("...This");
remoteObject.setResponseTimeout(3000);
// Try exception handling
boolean caught = false;
TestObject test = null;
try {
test = connection.createRemoteObject(TestObjectImpl.class);
System.err.println("Starting test for: " + remoteObjectID);
//TestObject test = connection.getRemoteObject(id, TestObject.class);
RemoteObject remoteObject = (RemoteObject) test;
// Default behavior. RMI is transparent, method calls behave like normal
// (return values and exceptions are returned, call is synchronous)
System.err.println("hashCode: " + test.hashCode());
System.err.println("toString: " + test);
test.moo();
test.moo("Cow");
assertEquals(remoteObjectID, test.id());
// UDP calls that ignore the return value
remoteObject.setUDP(true);
test.moo("Meow");
assertEquals(0, test.id());
remoteObject.setUDP(false);
// Test that RMI correctly waits for the remotely invoked method to exit
remoteObject.setResponseTimeout(5000);
test.moo("You should see this two seconds before...", 2000);
System.out.println("...This");
remoteObject.setResponseTimeout(3000);
// Try exception handling
boolean caught = false;
try {
test.throwException();
} catch (UnsupportedOperationException ex) {
System.err.println("\tExpected.");
caught = true;
}
assertTrue(caught);
// Return values are ignored, but exceptions are still dealt with properly
remoteObject.setTransmitReturnValue(false);
test.moo("Baa");
test.id();
caught = false;
try {
test.throwException();
} catch (UnsupportedOperationException ex) {
caught = true;
}
assertTrue(caught);
// Non-blocking call that ignores the return value
remoteObject.setNonBlocking(true);
remoteObject.setTransmitReturnValue(false);
test.moo("Meow");
assertEquals(0, test.id());
// Non-blocking call that returns the return value
remoteObject.setTransmitReturnValue(true);
test.moo("Foo");
assertEquals(0, test.id());
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForLastResponse());
assertEquals(0, test.id());
byte responseID = remoteObject.getLastResponseID();
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID));
// Non-blocking call that errors out
remoteObject.setTransmitReturnValue(false);
test.throwException();
} catch (UnsupportedOperationException ex) {
System.err.println("\tExpected.");
caught = true;
assertEquals(remoteObject.waitForLastResponse()
.getClass(), UnsupportedOperationException.class);
// Call will time out if non-blocking isn't working properly
remoteObject.setTransmitExceptions(false);
test.moo("Mooooooooo", 3000);
// should wait for a small time
remoteObject.setTransmitReturnValue(true);
remoteObject.setNonBlocking(false);
remoteObject.setResponseTimeout(6000);
System.out.println("You should see this 2 seconds before");
float slow = test.slow();
System.out.println("...This");
assertEquals(slow, 123, .0001D);
// Test sending a reference to a remote object.
MessageWithTestObject m = new MessageWithTestObject();
m.number = 678;
m.text = "sometext";
m.testObject = test;
connection.send()
.TCP(m)
.flush();
} catch (IOException e) {
e.printStackTrace();
fail();
}
assertTrue(caught);
// Return values are ignored, but exceptions are still dealt with properly
remoteObject.setTransmitReturnValue(false);
test.moo("Baa");
test.id();
caught = false;
try {
test.throwException();
} catch (UnsupportedOperationException ex) {
caught = true;
}
assertTrue(caught);
// Non-blocking call that ignores the return value
remoteObject.setNonBlocking(true);
remoteObject.setTransmitReturnValue(false);
test.moo("Meow");
assertEquals(0, test.id());
// Non-blocking call that returns the return value
remoteObject.setTransmitReturnValue(true);
test.moo("Foo");
assertEquals(0, test.id());
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForLastResponse());
assertEquals(0, test.id());
byte responseID = remoteObject.getLastResponseID();
// wait for the response to id()
assertEquals(remoteObjectID, remoteObject.waitForResponse(responseID));
// Non-blocking call that errors out
remoteObject.setTransmitReturnValue(false);
test.throwException();
assertEquals(remoteObject.waitForLastResponse()
.getClass(), UnsupportedOperationException.class);
// Call will time out if non-blocking isn't working properly
remoteObject.setTransmitExceptions(false);
test.moo("Mooooooooo", 3000);
// should wait for a small time
remoteObject.setTransmitReturnValue(true);
remoteObject.setNonBlocking(false);
remoteObject.setResponseTimeout(6000);
System.out.println("You should see this 2 seconds before");
float slow = test.slow();
System.out.println("...This");
assertEquals(slow, 123, .0001D);
// Test sending a reference to a remote object.
MessageWithTestObject m = new MessageWithTestObject();
m.number = 678;
m.text = "sometext";
m.testObject = test;
connection.send()
.TCP(m)
.flush();
}
}.start();
}
@ -143,7 +149,7 @@ class RmiTest extends BaseTest {
@Test
public
void rmi() throws InitializationException, SecurityException, IOException {
void rmi() throws InitializationException, SecurityException, IOException, InterruptedException {
KryoCryptoSerializationManager.DEFAULT = KryoCryptoSerializationManager.DEFAULT();
register(KryoCryptoSerializationManager.DEFAULT);
@ -158,9 +164,6 @@ class RmiTest extends BaseTest {
server.disableRemoteKeyValidation();
server.setIdleTimeout(0);
register(server.getSerialization());
addEndPoint(server);
server.bind(false);