iface) {
- @SuppressWarnings({"unchecked"})
- T remoteObject = (T) this.endPoint.getRemoteObject(this, objectID, new Class>[] {iface});
- return remoteObject;
- }
-
- /**
- * Returns a proxy object that implements the specified interfaces. Methods
- * invoked on the proxy object will be invoked remotely on the object with
- * the specified ID in the ObjectSpace for the specified connection. If the
- * remote end of the connection has not {@link #addConnection(Connection)
- * added} the connection to the ObjectSpace, the remote method invocations
- * will be ignored.
- *
- * Methods that return a value will throw {@link TimeoutException} if the
- * response is not received with the
- * {@link RemoteObject#setResponseTimeout(int) response timeout}.
- *
- * If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
- * (the default), then methods that return a value must not be called from
- * the update thread for the connection. An exception will be thrown if this
- * occurs. Methods with a void return value can be called on the update
- * thread.
- *
- * If a proxy returned from this method is part of an object graph sent over
- * the network, the object graph on the receiving side will have the proxy
- * object replaced with the registered object.
- *
- * @see RemoteObject
- */
- @Override
- public RemoteObject getRemoteObject(int objectID, Class>... ifaces) {
- return this.endPoint.getRemoteObject(this, objectID, ifaces);
- }
-
/**
* Invoked when a {@link Channel} has been idle for a while.
*/
@Override
- public void userEventTriggered(ChannelHandlerContext context, Object event) throws Exception {
+ public
+ void userEventTriggered(ChannelHandlerContext context, Object event) throws Exception {
// if (e.getState() == IdleState.READER_IDLE) {
// e.getChannel().close();
// } else if (e.getState() == IdleState.WRITER_IDLE) {
@@ -406,12 +407,14 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
}
@Override
- public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
+ public
+ void channelRead(ChannelHandlerContext context, Object message) throws Exception {
channelRead(message);
ReferenceCountUtil.release(message);
}
- public void channelRead(Object object) throws Exception {
+ public
+ void channelRead(Object object) throws Exception {
// prevent close from occurring SMACK in the middle of a message in progress.
// delay close until it's finished.
@@ -430,13 +433,14 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
}
@Override
- public void channelInactive(ChannelHandlerContext context) throws Exception {
+ public
+ void channelInactive(ChannelHandlerContext context) throws Exception {
// if we are in the middle of a message, hold off.
if (this.messageInProgress.get()) {
synchronized (this.messageInProgressLock) {
try {
this.messageInProgressLock.wait();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignored) {
}
}
}
@@ -451,19 +455,24 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
if (isTCP) {
type = "TCP";
- } else if (channelClass == NioDatagramChannel.class) {
+ }
+ else if (channelClass == NioDatagramChannel.class || channelClass == EpollDatagramChannel.class) {
type = "UDP";
- } else if (channelClass == EpollDatagramChannel.class) {
- type = "UDP";
- } else if (channelClass == NioUdtByteConnectorChannel.class) {
+ }
+ else if (channelClass == NioUdtByteConnectorChannel.class) {
type = "UDT";
- } else if (channelClass == LocalChannel.class) {
+ }
+ else if (channelClass == LocalChannel.class) {
type = "LOCAL";
- } else {
+ }
+ else {
type = "UNKNOWN";
}
- this.logger.info("Closed remote {} connection: {}", type, channel.remoteAddress().toString());
+ this.logger.info("Closed remote {} connection: {}",
+ type,
+ channel.remoteAddress()
+ .toString());
}
// our master channels are TCP/LOCAL (which are mutually exclusive). Only key disconnect events based on the status of them.
@@ -486,21 +495,22 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
* Closes the connection
*/
@Override
- public final void close() {
+ public final
+ void close() {
// only close if we aren't already in the middle of closing.
if (this.closeInProgress.compareAndSet(false, true)) {
- int idleTimeout = this.endPoint.getIdleTimeout();
- if (idleTimeout == 0) {
+ int idleTimeoutMs = this.endPoint.getIdleTimeout();
+ if (idleTimeoutMs == 0) {
// default is 2 second timeout, in milliseconds.
- idleTimeout = 2000;
+ idleTimeoutMs = 2000;
}
// if we are in the middle of a message, hold off.
synchronized (this.messageInProgressLock) {
if (this.messageInProgress.get()) {
try {
- this.messageInProgressLock.wait(idleTimeout);
- } catch (InterruptedException e) {
+ this.messageInProgressLock.wait(idleTimeoutMs);
+ } catch (InterruptedException ignored) {
}
}
}
@@ -521,8 +531,8 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
synchronized (this.closeInProgressLock) {
if (!this.alreadyClosed.get()) {
try {
- this.closeInProgressLock.wait(idleTimeout);
- } catch (Exception e) {
+ this.closeInProgressLock.wait(idleTimeoutMs);
+ } catch (Exception ignored) {
}
}
}
@@ -530,7 +540,8 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
}
@Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ public
+ void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
if (!(cause instanceof IOException)) {
Channel channel = context.channel();
@@ -553,27 +564,29 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
* Expose methods to modify the connection listeners.
*/
@Override
- public final ListenerBridge listeners() {
+ public final
+ ListenerBridge listeners() {
return this;
}
/**
* Adds a listener to this connection/endpoint to be notified of
* connect/disconnect/idle/receive(object) events.
- *
+ *
* If the listener already exists, it is not added again.
- *
+ *
* When called by a server, NORMALLY listeners are added at the GLOBAL level
* (meaning, I add one listener, and ALL connections are notified of that
* listener.
- *
+ *
* It is POSSIBLE to add a server connection ONLY (ie, not global) listener
* (via connection.addListener), meaning that ONLY that listener attached to
* the connection is notified on that event (ie, admin type listeners)
*/
@SuppressWarnings("rawtypes")
@Override
- public final void add(ListenerRaw listener) {
+ public final
+ void add(ListenerRaw listener) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
@@ -586,31 +599,34 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
// is empty, we can remove it from this connection.
synchronized (this) {
if (this.localListenerManager == null) {
- this.localListenerManager = ((EndPointServer)this.endPoint).addListenerManager(this);
+ this.localListenerManager = ((EndPointServer) this.endPoint).addListenerManager(this);
}
this.localListenerManager.add(listener);
}
- } else {
- this.endPoint.listeners().add(listener);
+ }
+ else {
+ this.endPoint.listeners()
+ .add(listener);
}
}
/**
* Removes a listener from this connection/endpoint to NO LONGER be notified
* of connect/disconnect/idle/receive(object) events.
- *
+ *
* When called by a server, NORMALLY listeners are added at the GLOBAL level
* (meaning, I add one listener, and ALL connections are notified of that
* listener.
- *
+ *
* It is POSSIBLE to remove a server-connection 'non-global' listener (via
* connection.removeListener), meaning that ONLY that listener attached to
* the connection is removed
*/
@SuppressWarnings("rawtypes")
@Override
- public final void remove(ListenerRaw listener) {
+ public final
+ void remove(ListenerRaw listener) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
@@ -626,12 +642,14 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
this.localListenerManager.remove(listener);
if (!this.localListenerManager.hasListeners()) {
- ((EndPointServer)this.endPoint).removeListenerManager(this);
+ ((EndPointServer) this.endPoint).removeListenerManager(this);
}
}
}
- } else {
- this.endPoint.listeners().remove(listener);
+ }
+ else {
+ this.endPoint.listeners()
+ .remove(listener);
}
}
@@ -640,7 +658,8 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
* LONGER be notified of connect/disconnect/idle/receive(object) events.
*/
@Override
- public final void removeAll() {
+ public final
+ void removeAll() {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
@@ -656,11 +675,13 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
this.localListenerManager.removeAll();
this.localListenerManager = null;
- ((EndPointServer)this.endPoint).removeListenerManager(this);
+ ((EndPointServer) this.endPoint).removeListenerManager(this);
}
}
- } else {
- this.endPoint.listeners().removeAll();
+ }
+ else {
+ this.endPoint.listeners()
+ .removeAll();
}
}
@@ -670,7 +691,8 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
* connect/disconnect/idle/receive(object) events.
*/
@Override
- public final void removeAll(Class> classType) {
+ public final
+ void removeAll(Class> classType) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
@@ -687,27 +709,32 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
if (!this.localListenerManager.hasListeners()) {
this.localListenerManager = null;
- ((EndPointServer)this.endPoint).removeListenerManager(this);
+ ((EndPointServer) this.endPoint).removeListenerManager(this);
}
}
}
- } else {
- this.endPoint.listeners().removeAll(classType);
+ }
+ else {
+ this.endPoint.listeners()
+ .removeAll(classType);
}
}
@Override
- public String toString() {
+ public
+ String toString() {
return this.channelWrapper.toString();
}
@Override
- public int hashCode() {
+ public
+ int hashCode() {
return id();
}
@Override
- public boolean equals(Object obj) {
+ public
+ boolean equals(Object obj) {
if (this == obj) {
return true;
}
@@ -723,16 +750,154 @@ public class ConnectionImpl extends ChannelInboundHandlerAdapter
if (other.channelWrapper != null) {
return false;
}
- } else if (!this.channelWrapper.equals(other.channelWrapper)) {
- return false;
- }
- if (this.name == null) {
- if (other.name != null) {
- return false;
- }
- } else if (!this.name.equals(other.name)) {
+ }
+ else if (!this.channelWrapper.equals(other.channelWrapper)) {
return false;
}
+
return true;
}
+
+
+ //
+ //
+ // RMI methods
+ //
+
+ volatile RegistrationLatch registrationLatch;
+
+ class RegistrationLatch {
+ final CountDownLatch latch = new CountDownLatch(1);
+ Object remoteObject;
+ boolean hasError = false;
+ }
+
+
+ private final AtomicInteger rmiObjectIdCounter = new AtomicInteger(0);
+
+
+ @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"})
+ @Override
+ public
+ Iface createRemoteObject(final Class remoteImplementationInterface,
+ final Class remoteImplementationClass) throws NetException {
+
+ // only one register can happen at a time
+ synchronized (rmiObjectIdCounter) {
+ registrationLatch = new RegistrationLatch();
+
+ // since this synchronous, we want to wait for the response before we continue
+ TCP(new RmiRegistration(remoteImplementationClass.getName())).flush();
+
+ try {
+ if (!registrationLatch.latch.await(2, TimeUnit.SECONDS)) {
+ final String errorMessage = "Timed out getting registration ID for: " + remoteImplementationClass;
+ logger.error(errorMessage);
+ throw new NetException(errorMessage);
+ }
+ } catch (InterruptedException e) {
+ final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass;
+ logger.error(errorMessage, e);
+ throw new NetException(errorMessage, e);
+ }
+
+ // local var to prevent double hit on volatile field
+ final RegistrationLatch latch = registrationLatch;
+ if (latch.hasError) {
+ final String errorMessage = "Error getting registration ID for: " + remoteImplementationClass;
+ logger.error(errorMessage);
+ throw new NetException(errorMessage);
+ }
+
+ return (Iface) latch.remoteObject;
+ }
+ }
+
+ void registerInternal(final ConnectionImpl connection, final RmiRegistration remoteRegistration) {
+ final String implementationClassName = remoteRegistration.remoteImplementationClass;
+
+
+ if (implementationClassName != null) {
+ // THIS IS ON THE SERVER SIDE
+ // create a new ID, and register the ID and new object (must create a new one) in the object maps
+
+ Class> implementationClass;
+
+ try {
+ implementationClass = Class.forName(implementationClassName);
+ } catch (Exception e) {
+ logger.error("Error registering RMI class " + implementationClassName, e);
+ connection.TCP(new RmiRegistration()).flush();
+ return;
+ }
+
+ try {
+ final Object remotePrimaryObject = implementationClass.newInstance();
+ rmiBridge.register(rmiObjectIdCounter.getAndIncrement(), remotePrimaryObject);
+
+ LinkedList remoteClasses = new LinkedList();
+ remoteClasses.add(new ClassObject(implementationClass, remotePrimaryObject));
+
+ ClassObject remoteClassObject;
+ while ((remoteClassObject = remoteClasses.pollFirst()) != null) {
+ // we have to check the class that is being registered for any additional proxy information
+ for (Field field : remoteClassObject.clazz.getDeclaredFields()) {
+ Annotation[] annotations = field.getDeclaredAnnotations();
+
+ if (annotations != null) {
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(RemoteProxy.class)) {
+ boolean prev = field.isAccessible();
+ field.setAccessible(true);
+ final Object o = field.get(remoteClassObject.object);
+ field.setAccessible(prev);
+ final Class> type = field.getType();
+
+ rmiBridge.register(rmiObjectIdCounter.getAndIncrement(), o);
+
+ remoteClasses.offerLast(new ClassObject(type, o));
+ }
+ }
+ }
+ }
+ }
+
+// connection.TCP(new RmiRegistration()).flush();
+ connection.TCP(new RmiRegistration(remotePrimaryObject)).flush();
+ } catch (Exception e) {
+ logger.error("Error registering RMI class " + implementationClassName, e);
+ connection.TCP(new RmiRegistration()).flush();
+ }
+ } else {
+ // THIS IS ON THE CLIENT SIDE
+
+ // the next two use a local var, so that there isn't a double hit for volatile access
+ final RegistrationLatch latch = this.registrationLatch;
+ latch.hasError = remoteRegistration.hasError;
+
+ if (!remoteRegistration.hasError) {
+ latch.remoteObject = remoteRegistration.remoteObject;
+ }
+
+ // notify the original register that it may continue. We access the volatile field directly, so that it's members are updated
+ registrationLatch.latch.countDown();
+ }
+ }
+
+
+ /**
+ * Returns the object registered with the specified ID.
+ */
+ public
+ Object getRegisteredObject(final int objectID) {
+ return rmiBridge.getRegisteredObject(objectID);
+ }
+
+ /**
+ * Returns the ID registered for the specified object, or Integer.MAX_VALUE if not found.
+ */
+ public
+ int getRegisteredId(final Object object) {
+ return rmiBridge.getRegisteredId(object);
+ }
}
diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java
index 2f3a6a9c..18914d18 100644
--- a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java
+++ b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionManager.java
@@ -1,25 +1,21 @@
package dorkbox.network.connection;
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.slf4j.Logger;
-
import dorkbox.network.rmi.RmiMessages;
import dorkbox.network.util.ConcurrentHashMapFactory;
import dorkbox.network.util.exceptions.NetException;
import dorkbox.util.ClassHelper;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Type;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.CopyOnWriteArrayList;
//note that we specifically DO NOT implement equals/hashCode, because we cannot create two separate
// objects that are somehow equal to each other.
-public class ConnectionManager implements ListenerBridge, ISessionManager {
+public
+class ConnectionManager implements ListenerBridge, ISessionManager {
public static Listener> unRegisteredType_Listener = null;
@@ -28,15 +24,16 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
private final ConcurrentHashMapFactory localManagers;
private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
- /** Used by the listener subsystem to determine types. */
+ /**
+ * Used by the listener subsystem to determine types.
+ */
private final Class> baseClass;
protected final org.slf4j.Logger logger;
- private final String name;
volatile boolean shutdown = false;
- public ConnectionManager(String name, Class> baseClass) {
- this.name = name;
- this.logger = org.slf4j.LoggerFactory.getLogger(name);
+ public
+ ConnectionManager(final String loggerName, final Class> baseClass) {
+ this.logger = org.slf4j.LoggerFactory.getLogger(loggerName);
this.baseClass = baseClass;
@@ -44,7 +41,8 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
private static final long serialVersionUID = 1L;
@Override
- public CopyOnWriteArrayList> createNewOject(Object... args) {
+ public
+ CopyOnWriteArrayList> createNewOject(Object... args) {
return new CopyOnWriteArrayList>();
}
};
@@ -53,8 +51,9 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
private static final long serialVersionUID = 1L;
@Override
- public ConnectionManager createNewOject(Object... args) {
- return new ConnectionManager(ConnectionManager.this.name + "-" + args[0] + " Specific", ConnectionManager.this.baseClass);
+ public
+ ConnectionManager createNewOject(Object... args) {
+ return new ConnectionManager(loggerName + "-" + args[0] + " Specific", ConnectionManager.this.baseClass);
}
};
}
@@ -62,20 +61,21 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
/**
* Adds a listener to this connection/endpoint to be notified of
* connect/disconnect/idle/receive(object) events.
- *
+ *
* If the listener already exists, it is not added again.
- *
+ *
* When called by a server, NORMALLY listeners are added at the GLOBAL level
* (meaning, I add one listener, and ALL connections are notified of that
* listener.
- *
+ *
* It is POSSIBLE to add a server connection ONLY (ie, not global) listener
* (via connection.addListener), meaning that ONLY that listener attached to
* the connection is notified on that event (ie, admin type listeners)
*/
@SuppressWarnings("rawtypes")
@Override
- public final void add(ListenerRaw listener) {
+ public final
+ void add(ListenerRaw listener) {
if (listener == null) {
throw new IllegalArgumentException("listener cannot be null.");
}
@@ -95,9 +95,8 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
addListener0(listener);
return;
- } else if (ClassHelper.hasInterface(Connection.class, genericClass) &&
- !ClassHelper.hasParentClass(this.baseClass, genericClass)) {
-
+ }
+ else if (ClassHelper.hasInterface(Connection.class, genericClass) && !ClassHelper.hasParentClass(this.baseClass, genericClass)) {
// now we must make sure that the PARENT class is NOT the base class. ONLY the base class is allowed!
addListener0(listener);
return;
@@ -110,8 +109,9 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
/**
* INTERNAL USE ONLY
*/
- @SuppressWarnings({"unchecked","rawtypes"})
- private final void addListener0(ListenerRaw listener) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private
+ void addListener0(ListenerRaw listener) {
Class> type = listener.getObjectType();
CopyOnWriteArrayList> list = this.listeners.getOrCreate(type);
@@ -119,25 +119,29 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
- logger2.trace("listener added: {} <{}>", listener.getClass().getName(), listener.getObjectType());
+ logger2.trace("listener added: {} <{}>",
+ listener.getClass()
+ .getName(),
+ listener.getObjectType());
}
}
/**
* Removes a listener from this connection/endpoint to NO LONGER be notified
* of connect/disconnect/idle/receive(object) events.
- *
+ *
* When called by a server, NORMALLY listeners are added at the GLOBAL level
* (meaning, I add one listener, and ALL connections are notified of that
* listener.
- *
+ *
* It is POSSIBLE to remove a server-connection 'non-global' listener (via
* connection.removeListener), meaning that ONLY that listener attached to
* the connection is removed
*/
@SuppressWarnings("rawtypes")
@Override
- public final void remove(ListenerRaw listener) {
+ public final
+ void remove(ListenerRaw listener) {
if (listener == null) {
throw new IllegalArgumentException("listener cannot be null.");
}
@@ -151,7 +155,10 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
- logger2.trace("listener removed: {} <{}>", listener.getClass().getName(), listener.getObjectType());
+ logger2.trace("listener removed: {} <{}>",
+ listener.getClass()
+ .getName(),
+ listener.getObjectType());
}
}
@@ -159,23 +166,25 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
* Removes all registered listeners from this connection/endpoint to NO
* LONGER be notified of connect/disconnect/idle/receive(object) events.
*/
- @Override
- public final void removeAll() {
- this.listeners.clear();
-
- Logger logger2 = this.logger;
- if (logger2.isTraceEnabled()) {
- logger2.trace("all listeners removed !!");
- }
- }
-
- /**
- * Removes all registered listeners (of the object type) from this
- * connection/endpoint to NO LONGER be notified of
- * connect/disconnect/idle/receive(object) events.
- */
@Override
- public final void removeAll(Class> classType) {
+ public final
+ void removeAll() {
+ this.listeners.clear();
+
+ Logger logger2 = this.logger;
+ if (logger2.isTraceEnabled()) {
+ logger2.trace("all listeners removed !!");
+ }
+ }
+
+ /**
+ * Removes all registered listeners (of the object type) from this
+ * connection/endpoint to NO LONGER be notified of
+ * connect/disconnect/idle/receive(object) events.
+ */
+ @Override
+ public final
+ void removeAll(Class> classType) {
if (classType == null) {
throw new IllegalArgumentException("classType cannot be null.");
}
@@ -184,25 +193,29 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
- logger2.trace("all listeners removed for type: {}", classType.getClass().getName());
+ logger2.trace("all listeners removed for type: {}",
+ classType.getClass()
+ .getName());
}
}
/**
* Invoked when a message object was received from a remote peer.
- *
+ *
* If data is sent in response to this event, the connection data is automatically flushed to the wire. If the data is sent in a separate thread,
- * {@link connection.send().flush()} must be called manually.
- *
+ * {@link EndPoint#send().flush()} must be called manually.
+ *
* {@link ISessionManager}
*/
@Override
- public final void notifyOnMessage(Connection connection, Object message) {
+ public final
+ void notifyOnMessage(Connection connection, Object message) {
notifyOnMessage0(connection, message, false);
}
- private final boolean notifyOnMessage0(Connection connection, Object message, boolean foundListener) {
+ private
+ boolean notifyOnMessage0(Connection connection, Object message, boolean foundListener) {
Class> objectType = message.getClass();
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
@@ -265,42 +278,50 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
// only run a flush once
if (foundListener) {
- connection.send().flush();
- } else if (unRegisteredType_Listener != null) {
+ connection.send()
+ .flush();
+ }
+ else if (unRegisteredType_Listener != null) {
unRegisteredType_Listener.received(connection, null);
- } else {
+ }
+ else {
Logger logger2 = this.logger;
if (logger2.isErrorEnabled()) {
- this.logger.error("----------- LISTENER NOT REGISTERED FOR TYPE: {}", message.getClass().getSimpleName());
+ this.logger.error("----------- LISTENER NOT REGISTERED FOR TYPE: {}",
+ message.getClass()
+ .getSimpleName());
}
}
return foundListener;
}
- public static void setUnregisteredTypeListener(Listener> listener) {
+ public static
+ void setUnregisteredTypeListener(Listener> listener) {
unRegisteredType_Listener = listener;
}
/**
* Invoked when a Connection has been idle for a while.
- *
+ *
* {@link ISessionManager}
*/
@Override
- public final void notifyOnIdle(Connection connection) {
+ public final
+ void notifyOnIdle(Connection connection) {
Set>>> entrySet = this.listeners.entrySet();
- CopyOnWriteArrayList> list;
+ CopyOnWriteArrayList> list;
for (Entry>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
- for (ListenerRaw listener : list) {
+ for (ListenerRaw listener : list) {
if (this.shutdown) {
return;
}
listener.idle(connection);
}
- connection.send().flush();
+ connection.send()
+ .flush();
}
}
@@ -312,28 +333,30 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
/**
- * Invoked when a {@link Channel} is open, bound to a local address, and connected to a remote address.
- *
+ * Invoked when a Channel is open, bound to a local address, and connected to a remote address.
+ *
* {@link ISessionManager}
*/
@Override
- public void connectionConnected(Connection connection) {
+ public
+ void connectionConnected(Connection connection) {
// create a new connection!
this.connections.add(connection);
try {
Set>>> entrySet = this.listeners.entrySet();
- CopyOnWriteArrayList> list;
+ CopyOnWriteArrayList> list;
for (Entry>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
- for (ListenerRaw listener : list) {
+ for (ListenerRaw listener : list) {
if (this.shutdown) {
return;
}
listener.connected(connection);
}
- connection.send().flush();
+ connection.send()
+ .flush();
}
}
@@ -348,14 +371,15 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
/**
- * Invoked when a {@link Channel} was disconnected from its remote peer.
- *
+ * Invoked when a Channel was disconnected from its remote peer.
+ *
* {@link ISessionManager}
*/
@Override
- public void connectionDisconnected(Connection connection) {
+ public
+ void connectionDisconnected(Connection connection) {
Set>>> entrySet = this.listeners.entrySet();
- CopyOnWriteArrayList> list;
+ CopyOnWriteArrayList> list;
for (Entry>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
@@ -384,13 +408,14 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
/**
* Invoked when there is an error of some kind during the up/down stream process
- *
+ *
* {@link ISessionManager}
*/
@Override
- public void connectionError(Connection connection, Throwable throwable) {
+ public
+ void connectionError(Connection connection, Throwable throwable) {
Set>>> entrySet = this.listeners.entrySet();
- CopyOnWriteArrayList> list;
+ CopyOnWriteArrayList> list;
for (Entry>> entry : entrySet) {
list = entry.getValue();
if (list != null) {
@@ -401,7 +426,8 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
listener.error(connection, throwable);
}
- connection.send().flush();
+ connection.send()
+ .flush();
}
}
@@ -414,17 +440,19 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
/**
* Returns a non-modifiable list of active connections
- *
+ *
* {@link ISessionManager}
*/
@Override
- public List getConnections() {
+ public
+ List getConnections() {
return Collections.unmodifiableList(this.connections);
}
- final ConnectionManager addListenerManager(Connection connection) {
+ final
+ ConnectionManager addListenerManager(Connection connection) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener, and ALL connections
// are notified of that listener.
// it is POSSIBLE to add a connection-specfic listener (via connection.addListener), meaning that ONLY
@@ -440,7 +468,8 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
return lm;
}
- final void removeListenerManager(Connection connection) {
+ final
+ void removeListenerManager(Connection connection) {
this.localManagers.remove(connection);
}
@@ -450,7 +479,8 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*
* @return Returns a FAST list of active connections.
*/
- public final Collection getConnections0() {
+ public final
+ Collection getConnections0() {
return this.connections;
}
@@ -459,10 +489,14 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*
* @return Returns a FAST first connection (for client!).
*/
- public final Connection getConnection0() {
- if (this.connections.iterator().hasNext()) {
- return this.connections.iterator().next();
- } else {
+ public final
+ Connection getConnection0() {
+ if (this.connections.iterator()
+ .hasNext()) {
+ return this.connections.iterator()
+ .next();
+ }
+ else {
throw new NetException("Not connected to a remote computer. Unable to continue!");
}
}
@@ -472,14 +506,16 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
*
* @return a boolean indicating if there are any listeners registered with this manager.
*/
- final boolean hasListeners() {
+ final
+ boolean hasListeners() {
return this.listeners.isEmpty();
}
/**
* Closes all associated resources/threads/connections
*/
- final void stop() {
+ final
+ void stop() {
this.shutdown = true;
// disconnect the sessions
@@ -491,7 +527,8 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
/**
* Close all connections ONLY
*/
- final void closeConnections() {
+ final
+ void closeConnections() {
// close the sessions
Iterator iterator = this.connections.iterator();
while (iterator.hasNext()) {
@@ -503,4 +540,17 @@ public class ConnectionManager implements ListenerBridge, ISessionManager {
}
this.connections.clear();
}
+
+ @Override
+ public
+ boolean equals(final Object o) {
+ return false;
+
+ }
+
+ @Override
+ public
+ int hashCode() {
+ return 0;
+ }
}
diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java
index 551c0ede..e83e39ef 100644
--- a/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java
+++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPoint.java
@@ -1,38 +1,25 @@
package dorkbox.network.connection;
-import com.esotericsoftware.kryo.factories.SerializerFactory;
-import dorkbox.network.ConnectionOptions;
+import dorkbox.network.Configuration;
import dorkbox.network.connection.bridge.ConnectionBridgeBase;
import dorkbox.network.connection.registration.MetaChannel;
-import dorkbox.network.connection.registration.Registration;
import dorkbox.network.connection.wrapper.ChannelLocalWrapper;
import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.pipeline.KryoEncoder;
import dorkbox.network.pipeline.KryoEncoderCrypto;
-import dorkbox.network.rmi.RemoteObject;
-import dorkbox.network.rmi.Rmi;
import dorkbox.network.rmi.RmiBridge;
-import dorkbox.network.rmi.TimeoutException;
-import dorkbox.network.util.EndpointTool;
-import dorkbox.network.util.KryoSerializationManager;
-import dorkbox.network.util.SerializationManager;
+import dorkbox.network.util.CryptoSerializationManager;
+import dorkbox.network.util.EndPointTool;
import dorkbox.network.util.entropy.Entropy;
import dorkbox.network.util.exceptions.InitializationException;
-import dorkbox.network.util.exceptions.NetException;
import dorkbox.network.util.exceptions.SecurityException;
-import dorkbox.network.util.serializers.FieldAnnotationAwareSerializer;
-import dorkbox.network.util.serializers.IgnoreSerialization;
import dorkbox.network.util.store.NullSettingsStore;
import dorkbox.network.util.store.SettingsStore;
import dorkbox.util.Sys;
import dorkbox.util.collections.IntMap;
import dorkbox.util.collections.IntMap.Entries;
import dorkbox.util.crypto.Crypto;
-import dorkbox.util.crypto.serialization.EccPrivateKeySerializer;
-import dorkbox.util.crypto.serialization.EccPublicKeySerializer;
-import dorkbox.util.crypto.serialization.IesParametersSerializer;
-import dorkbox.util.crypto.serialization.IesWithCipherParametersSerializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
@@ -41,18 +28,20 @@ import io.netty.util.concurrent.Future;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
-import org.bouncycastle.crypto.params.IESParameters;
-import org.bouncycastle.crypto.params.IESWithCipherParameters;
import org.slf4j.Logger;
-import java.lang.annotation.Annotation;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.security.AccessControlException;
import java.security.SecureRandom;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -72,17 +61,20 @@ class EndPoint {
// TODO: maybe some sort of STUN-like connection keep-alive??
+ // TODO: do we really need this? Maybe?
+ public static final String LOCAL_CHANNEL = "local_channel";
protected static final String shutdownHookName = "::SHUTDOWN_HOOK::";
protected static final String stopTreadName = "::STOP_THREAD::";
-
-
+ /**
+ * Shall we REALLY use a valid lan IP address instead of the loopback address? A common mistake is to listen on localhost, which does
+ * not accept external network connections
+ */
+ public static boolean useLanIpInsteadOfLoopback = true;
/**
* this can be changed to a more specialized value, if necessary
*/
- public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
- public static final String LOCAL_CHANNEL = "local_channel";
-
-
+ public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime()
+ .availableProcessors() * 2;
/**
* The amount of time in milli-seconds to wait for this endpoint to close all
* {@link Channel}s and shutdown gracefully.
@@ -114,60 +106,58 @@ class EndPoint {
// Needed for NIO selectors on Android 2.2, and to force IPv4.
System.setProperty("java.net.preferIPv4Stack", Boolean.TRUE.toString());
System.setProperty("java.net.preferIPv6Addresses", Boolean.FALSE.toString());
- } catch (AccessControlException ignored) {
+ } catch (Throwable ignored) {
}
}
protected final org.slf4j.Logger logger;
- protected final String name;
-
- // make sure that the endpoint is closed on JVM shutdown (if it's still open at that point in time)
- protected Thread shutdownHook;
+ protected final Class extends EndPoint> type;
protected final ConnectionManager connectionManager;
- protected final SerializationManager serializationManager;
-
+ protected final CryptoSerializationManager serializationManager;
protected final RegistrationWrapper registrationWrapper;
- // The remote object space is used by RMI.
- private final RmiBridge rmiBridge;
-
- // the eventLoop groups are used to track and manage the event loops for startup/shutdown
- private List eventLoopGroups = new ArrayList(8);
- private List shutdownChannelList = new ArrayList();
+ protected final Object shutdownInProgress = new Object();
+ final ECPrivateKeyParameters privateKey;
+ final ECPublicKeyParameters publicKey;
+ final SecureRandom secureRandom;
private final CountDownLatch blockUntilDone = new CountDownLatch(1);
- private final CountDownLatch blockWhileShutdown = new CountDownLatch(1);
-
- protected final Object shutdownInProgress = new Object();
+ private final Executor rmiExecutor;
+ private final boolean rmiEnabled;
+ // the eventLoop groups are used to track and manage the event loops for startup/shutdown
+ private final List eventLoopGroups = new ArrayList(8);
+ private final List shutdownChannelList = new ArrayList();
+ private final ConcurrentHashMap, EndPointTool> toolMap = new ConcurrentHashMap, EndPointTool>();
+ // make sure that the endpoint is closed on JVM shutdown (if it's still open at that point in time)
+ protected Thread shutdownHook;
protected AtomicBoolean stopCalled = new AtomicBoolean(false);
-
protected AtomicBoolean isConnected = new AtomicBoolean(false);
-
+ SettingsStore propertyStore;
+ boolean disableRemoteKeyValidation;
/**
* in milliseconds. default is disabled!
*/
- private volatile int idleTimeout = 0;
-
- private ConcurrentHashMap, EndpointTool> toolMap = new ConcurrentHashMap, EndpointTool>();
-
- final ECPrivateKeyParameters privateKey;
- final ECPublicKeyParameters publicKey;
-
- final SecureRandom secureRandom;
- SettingsStore propertyStore;
- boolean disableRemoteKeyValidation;
+ private volatile int idleTimeoutMs = 0;
+ /**
+ * @param type this is either "Client" or "Server", depending on who is creating this endpoint.
+ * @param options these are the specific connection options
+ * @throws InitializationException
+ * @throws SecurityException
+ */
public
- EndPoint(String name, ConnectionOptions options) throws InitializationException, SecurityException {
- this.name = name;
- this.logger = org.slf4j.LoggerFactory.getLogger(name);
+ EndPoint(Class extends EndPoint> type, final Configuration options) throws InitializationException, SecurityException, IOException {
+ this.type = type;
- this.registrationWrapper = new RegistrationWrapper(this, this.logger);
+ this.logger = org.slf4j.LoggerFactory.getLogger(type);
+
+ this.registrationWrapper = new RegistrationWrapper(this,
+ this.logger); // TODO - get rid of the wrapper, since it just loops back on itself
// make sure that 'localhost' is REALLY our specific IP address
- if (options.host != null && (options.host.equals("localhost") || options.host.startsWith("127."))) {
+ if (useLanIpInsteadOfLoopback && options.host != null && (options.host.equals("localhost") || options.host.startsWith("127."))) {
try {
InetAddress localHostLanAddress = Sys.getLocalHostLanAddress();
options.host = localHostLanAddress.getHostAddress();
@@ -177,14 +167,33 @@ class EndPoint {
}
}
+ // serialization stuff
+ this.serializationManager = KryoCryptoSerializationManager.DEFAULT;
+
+ rmiEnabled = options.rmiEnabled;
+ if (rmiEnabled) {
+ // setup our RMI serialization managers. Can only be called once
+ serializationManager.initRmiSerialization();
+ }
+
+ rmiExecutor = options.rmiExecutor;
+
+
+ // setup our TCP kryo encoders
+ this.registrationWrapper.setKryoTcpEncoder(new KryoEncoder(this.serializationManager));
+ this.registrationWrapper.setKryoTcpCryptoEncoder(new KryoEncoderCrypto(this.serializationManager));
+
+
// we have to be able to specify WHAT property store we want to use, since it can change!
if (options.settingsStore == null) {
- this.propertyStore = new PropertyStore(name);
+ this.propertyStore = new PropertyStore();
}
else {
this.propertyStore = options.settingsStore;
}
+ this.propertyStore.init(type, this.serializationManager, null);
+
// null it out, since it is sensitive!
options.settingsStore = null;
@@ -198,7 +207,7 @@ class EndPoint {
if (privateKey == null || publicKey == null) {
try {
// seed our RNG based off of this and create our ECC keys
- byte[] seedBytes = Entropy.get("There are no ECC keys for the " + name + " yet");
+ byte[] seedBytes = Entropy.get("There are no ECC keys for the " + type.getSimpleName() + " yet");
SecureRandom secureRandom = new SecureRandom(seedBytes);
secureRandom.nextBytes(seedBytes);
@@ -242,57 +251,23 @@ class EndPoint {
}
};
this.shutdownHook.setName(shutdownHookName);
- Runtime.getRuntime().addShutdownHook(this.shutdownHook);
-
-
- // serialization stuff
- if (options.serializationManager != null) {
- this.serializationManager = options.serializationManager;
- }
- else {
- this.serializationManager = new KryoSerializationManager();
+ try {
+ Runtime.getRuntime()
+ .addShutdownHook(this.shutdownHook);
+ } catch (Throwable ignored) {
+ // if we are in the middle of shutdown, we cannot do this.
}
+
// we don't care about un-instantiated/constructed members, since the class type is the only interest.
- this.connectionManager = new ConnectionManager(name, connection0(null).getClass());
-
- // setup our TCP kryo encoders
- this.registrationWrapper.setKryoTcpEncoder(new KryoEncoder(this.serializationManager));
- this.registrationWrapper.setKryoTcpCryptoEncoder(new KryoEncoderCrypto(this.serializationManager));
-
-
- this.serializationManager.setReferences(false);
- this.serializationManager.setRegistrationRequired(true);
-
- this.serializationManager.register(PingMessage.class);
- this.serializationManager.register(byte[].class);
- this.serializationManager.register(IESParameters.class, new IesParametersSerializer());
- this.serializationManager.register(IESWithCipherParameters.class, new IesWithCipherParametersSerializer());
- this.serializationManager.register(ECPublicKeyParameters.class, new EccPublicKeySerializer());
- this.serializationManager.register(ECPrivateKeyParameters.class, new EccPrivateKeySerializer());
- this.serializationManager.register(Registration.class);
-
-
- // ignore fields that have the "IgnoreSerialization" annotation.
- Set> marks = new HashSet>();
- marks.add(IgnoreSerialization.class);
- SerializerFactory disregardingFactory = new FieldAnnotationAwareSerializer.Factory(marks, true);
- this.serializationManager.setDefaultSerializer(disregardingFactory);
-
+ this.connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass());
// add the ping listener (internal use only!)
this.connectionManager.add(new PingSystemListener());
- /*
- * Creates the remote method invocation (RMI) bridge for this endpoint.
- *
- * there is some housekeeping that is necessary BEFORE a connection is actually connected..
- */
- if (options.enableRmi) {
- this.rmiBridge = new RmiBridge(this.logger, this.serializationManager);
- }
- else {
- this.rmiBridge = null;
+ if (this.rmiEnabled) {
+ // these register the listener for registering a class implementation for RMI (internal use only)
+ this.connectionManager.add(new RegisterRmiSystemListener());
}
}
@@ -350,6 +325,15 @@ class EndPoint {
return true;
}
+ /**
+ * The amount of milli-seconds that must elapse with no read or write before {@link ListenerRaw#idle(Connection)} }
+ * will be triggered
+ */
+ public
+ int getIdleTimeout() {
+ return this.idleTimeoutMs;
+ }
+
/**
* The {@link Listener:idle()} will be triggered when neither read nor write
* has happened for the specified period of time (in milli-seconds)
@@ -357,17 +341,8 @@ class EndPoint {
* Specify {@code 0} to disable (default).
*/
public
- void setIdleTimeout(int idleTimeout) {
- this.idleTimeout = idleTimeout;
- }
-
- /**
- * The amount of milli-seconds that must elapse with no read or write before {@link Listener.idle()}
- * will be triggered
- */
- public
- int getIdleTimeout() {
- return this.idleTimeout;
+ void setIdleTimeout(int idleTimeoutMs) {
+ this.idleTimeoutMs = idleTimeoutMs;
}
/**
@@ -400,39 +375,26 @@ class EndPoint {
}
}
-
-
/**
* Returns the serialization wrapper if there is an object type that needs to be added outside of the basics.
*/
public
- SerializationManager getSerialization() {
+ CryptoSerializationManager getSerialization() {
return this.serializationManager;
}
- /**
- * Gets the remote method invocation (RMI) bridge for this endpoint.
- */
- public
- Rmi rmi() {
- if (this.rmiBridge == null) {
- throw new NetException("Cannot use a remote object space that has NOT been created first! Configure the ConnectionOptions!");
- }
-
- return this.rmiBridge;
- }
-
/**
* This method allows the connections used by the client/server to be subclassed (custom implementations).
*
- * As this is for the network stack, the new connection type MUST subclass {@link Connection}
+ * As this is for the network stack, the new connection MUST subclass {@link Connection}
+ *
+ * The parameters are ALL NULL when getting the base class, as this instance is just thrown away.
*
- * @param bridge null when retrieving the subclass type (internal use only). Non-null when creating a new (and real) connection.
* @return a new network connection
*/
public
- Connection newConnection(String name) {
- return new ConnectionImpl(name);
+ Connection newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) {
+ return new ConnectionImpl(logger, endPoint, rmiBridge);
}
/**
@@ -446,8 +408,13 @@ class EndPoint {
Connection connection0(MetaChannel metaChannel) {
Connection connection;
+ RmiBridge rmiBridge = null;
+ if (metaChannel != null && rmiEnabled) {
+ rmiBridge = new RmiBridge(logger, rmiExecutor);
+ }
+
// setup the extras needed by the network connection.
- // These properties are ASSGINED in the same thread that CREATED the object. Only the AES info needs to be
+ // These properties are ASSIGNED in the same thread that CREATED the object. Only the AES info needs to be
// volatile since it is the only thing that changes.
if (metaChannel != null) {
ChannelWrapper wrapper;
@@ -464,23 +431,22 @@ class EndPoint {
}
}
- connection = newConnection(this.name);
-
- // now initialize the connection channels with whatever extra info they might need.
- connection.init(this, new Bridge(wrapper, this.connectionManager));
-
+ connection = newConnection(logger, this, rmiBridge);
metaChannel.connection = connection;
- // notify our remote object space that it is able to receive method calls.
- if (this.rmiBridge != null) {
- connection.listeners().add(this.rmiBridge.getListener());
+ // now initialize the connection channels with whatever extra info they might need.
+ connection.init(new Bridge(wrapper, this.connectionManager));
+
+ if (rmiBridge != null) {
+ // notify our remote object space that it is able to receive method calls.
+ connection.listeners().add(rmiBridge.getListener());
}
}
else {
- // getting the baseClass
+ // getting the connection baseClass
// have to add the networkAssociate to a map of "connected" computers
- connection = newConnection(this.name);
+ connection = newConnection(null, null, null);
}
return connection;
@@ -532,51 +498,23 @@ class EndPoint {
public abstract
ConnectionBridgeBase send();
- /**
- * Returns a proxy object that implements the specified interfaces. Methods
- * invoked on the proxy object will be invoked remotely on the object with
- * the specified ID in the ObjectSpace for the specified connection. If the
- * remote end of the connection has not {@link #addConnection(Connection)
- * added} the connection to the ObjectSpace, the remote method invocations
- * will be ignored.
- *
- * Methods that return a value will throw {@link TimeoutException} if the
- * response is not received with the
- * {@link RemoteObject#setResponseTimeout(int) response timeout}.
- *
- * If {@link RemoteObject#setNonBlocking(boolean) non-blocking} is false
- * (the default), then methods that return a value must not be called from
- * the update thread for the connection. An exception will be thrown if this
- * occurs. Methods with a void return value can be called on the update
- * thread.
- *
- * If a proxy returned from this method is part of an object graph sent over
- * the network, the object graph on the receiving side will have the proxy
- * object replaced with the registered object.
- *
- * @see RemoteObject
- */
- public
- RemoteObject getRemoteObject(Connection connection, int objectID, Class>[] ifaces) {
- return this.rmiBridge.getRemoteObject(connection, objectID, ifaces);
- }
-
/**
* Registers a tool with the server, to be used by other services.
*/
public
- void registerTool(EndpointTool toolClass) {
+ void registerTool(EndPointTool toolClass) {
if (toolClass == null) {
throw new IllegalArgumentException("Tool must not be null! Unable to add tool");
}
- Class>[] interfaces = toolClass.getClass().getInterfaces();
+ Class>[] interfaces = toolClass.getClass()
+ .getInterfaces();
int length = interfaces.length;
int index = -1;
if (length > 1) {
Class> clazz2;
- Class cls = EndpointTool.class;
+ Class cls = EndPointTool.class;
for (int i = 0; i < length; i++) {
clazz2 = interfaces[i];
@@ -595,7 +533,7 @@ class EndPoint {
}
Class> clazz = interfaces[index];
- EndpointTool put = this.toolMap.put(clazz, toolClass);
+ EndPointTool put = this.toolMap.put(clazz, toolClass);
if (put != null) {
throw new IllegalArgumentException("Tool must be unique! Unable to add tool");
}
@@ -605,7 +543,7 @@ class EndPoint {
* Only get the tools in the ModuleStart (ie: load) methods. If done in the constructor, the tool might not be available yet
*/
public
- T getTool(Class> toolClass) {
+ T getTool(Class> toolClass) {
if (toolClass == null) {
throw new IllegalArgumentException("Tool must not be null! Unable to add tool");
}
@@ -615,11 +553,6 @@ class EndPoint {
return tool;
}
- public
- String getName() {
- return this.name;
- }
-
/**
* Closes all connections ONLY (keeps the server/client running).
*
@@ -651,9 +584,11 @@ class EndPoint {
}
/**
- * Safely closes all associated resources/threads/connections
+ * Safely closes all associated resources/threads/connections.
*
- * Override stopExtraActions() if you want to provide extra behavior to stopping the endpoint
+ * If we want to WAIT for this endpoint to shutdown, we must explicitly call waitForShutdown()
+ *
+ * Override stopExtraActions() if you want to provide extra behavior while stopping the endpoint
*/
public final
void stop() {
@@ -667,52 +602,66 @@ class EndPoint {
// This occurs when calling stop from within a listener callback.
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
- boolean inEventThread = !threadName.equals(shutdownHookName) && !threadName.equals(stopTreadName);
+ boolean inShutdownThread = !threadName.equals(shutdownHookName) && !threadName.equals(stopTreadName);
// used to check the event groups to see if we are running from one of them. NOW we force to
// ALWAYS shutdown inside a NEW thread
- if (!inEventThread) {
+ if (!inShutdownThread) {
stopInThread();
}
else {
- Thread thread = new Thread(new Runnable() {
- @Override
- public
- void run() {
- EndPoint.this.stopInThread();
- EndPoint.this.blockWhileShutdown.countDown();
+ // we have to make sure always run this from within it's OWN thread -- because if it's run from within
+ // a client/server thread executor, it will deadlock while waiting for the threadpool to terminate.
+ boolean isInEventLoop = false;
+ for (EventLoopGroup loopGroup : this.eventLoopGroups) {
+ for (EventExecutor child : loopGroup.children()) {
+ if (child.inEventLoop()) {
+ isInEventLoop = true;
+ break;
+ }
}
- });
- thread.setDaemon(false);
- thread.setName(stopTreadName);
- thread.start();
+ }
- // we want to wait for this to finish before we continue
- try {
- this.blockWhileShutdown.await();
- } catch (InterruptedException e) {
- this.logger.error("Thread interrupted while waiting for shutdown to finish!");
+ if (!isInEventLoop) {
+ EndPoint.this.stopInThread();
+ }
+ else {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public
+ void run() {
+ EndPoint.this.stopInThread();
+ }
+ });
+ thread.setDaemon(false);
+ thread.setName(stopTreadName);
+ thread.start();
}
}
}
// This actually does the "stopping", since there is some logic to making sure we don't deadlock, this is important
- private final
+ private
void stopInThread() {
// make sure we are not trying to stop during a startup procedure.
// This will wait until we have finished starting up/shutting down.
synchronized (this.shutdownInProgress) {
close();
+ this.logger.info("Stopping endpoint");
+
// there is no need to call "stop" again if we close the connection.
// however, if this is called WHILE from the shutdown hook, blammo! problems!
// Also, you can call client/server.stop from another thread, which is run when the JVM is shutting down
// (as there is nothing left to do), and also have problems.
- if (!Thread.currentThread().getName().equals(shutdownHookName)) {
+ if (!Thread.currentThread()
+ .getName()
+ .equals(shutdownHookName)) {
try {
- Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
+ Runtime.getRuntime()
+ .removeShutdownHook(this.shutdownHook);
} catch (Exception e) {
// ignore
}
@@ -737,13 +686,15 @@ class EndPoint {
registrationWrapper2.releaseChannelMap();
}
+
// shutdown the database store
this.propertyStore.shutdown();
// now we stop all of our channels
for (ChannelFuture f : this.shutdownChannelList) {
Channel channel = f.channel();
- channel.close().awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
+ channel.close()
+ .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
}
// we have to clear the shutdown list.
@@ -753,16 +704,18 @@ class EndPoint {
List> shutdownThreadList = new LinkedList>();
for (EventLoopGroup loopGroup : this.eventLoopGroups) {
- shutdownThreadList.add(loopGroup.shutdownGracefully());
+ shutdownThreadList.add(loopGroup.shutdownGracefully(maxShutdownWaitTimeInMilliSeconds,
+ maxShutdownWaitTimeInMilliSeconds * 4,
+ TimeUnit.MILLISECONDS));
}
// now wait for them to finish!
+ // It can take a few seconds to shut down the executor. This will affect unit testing, where connections are quickly created/stopped
for (Future> f : shutdownThreadList) {
- f.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
+ f.syncUninterruptibly();
}
// when the eventloop closes, the associated selectors are ALSO closed!
-
stopExtraActions();
}
@@ -778,50 +731,23 @@ class EndPoint {
}
/**
- * Determines if the specified thread (usually the current thread) is a member of a group's threads.
- */
- protected static final
- boolean checkInEventGroup(Thread currentThread, EventLoopGroup group) {
- if (group != null) {
- Set children = group.children();
- for (EventExecutor e : children) {
- if (e.inEventLoop(currentThread)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Blocks the current thread until the endpoint has been stopped.
- *
- * @param blockUntilTerminate if TRUE, then this endpoint will block until STOP is called, otherwise it will not block
+ * Blocks the current thread until the endpoint has been stopped. If the endpoint is already stopped, this do nothing.
*/
public final
- void waitForStop(boolean blockUntilTerminate) {
- if (blockUntilTerminate) {
- // we now BLOCK until the stop method is called.
- try {
- this.blockUntilDone.await();
- } catch (InterruptedException e) {
- this.logger.error("Thread interrupted while waiting for stop!");
- }
+ void waitForShutdown() {
+ // we now BLOCK until the stop method is called.
+ try {
+ this.blockUntilDone.await();
+ } catch (InterruptedException e) {
+ this.logger.error("Thread interrupted while waiting for stop!");
}
}
- @Override
- public
- String toString() {
- return "EndPoint [" + this.name + "]";
- }
-
@Override
public
int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + (this.name == null ? 0 : this.name.hashCode());
result = prime * result + (this.privateKey == null ? 0 : this.privateKey.hashCode());
result = prime * result + (this.publicKey == null ? 0 : this.publicKey.hashCode());
return result;
@@ -840,14 +766,7 @@ class EndPoint {
return false;
}
EndPoint other = (EndPoint) obj;
- if (this.name == null) {
- if (other.name != null) {
- return false;
- }
- }
- else if (!this.name.equals(other.name)) {
- return false;
- }
+
if (this.privateKey == null) {
if (other.privateKey != null) {
return false;
@@ -866,4 +785,15 @@ class EndPoint {
}
return true;
}
+
+ @Override
+ public
+ String toString() {
+ return "EndPoint [" + getName() + "]";
+ }
+
+ public
+ String getName() {
+ return this.type.getSimpleName();
+ }
}
diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java
index 2ee83f4a..a3aa9c16 100644
--- a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java
+++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java
@@ -1,46 +1,49 @@
package dorkbox.network.connection;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-
-import dorkbox.network.ConnectionOptions;
+import dorkbox.network.Client;
+import dorkbox.network.Configuration;
import dorkbox.network.connection.bridge.ConnectionBridge;
import dorkbox.network.connection.bridge.ConnectionBridgeFlushAlways;
import dorkbox.network.util.exceptions.InitializationException;
import dorkbox.network.util.exceptions.SecurityException;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* This serves the purpose of making sure that specific methods are not available to the end user.
*/
-public class EndPointClient extends EndPoint implements Runnable {
+public
+class EndPointClient extends EndPoint implements Runnable {
- protected List bootstraps = new LinkedList();
- protected AtomicInteger connectingBootstrap = new AtomicInteger(0);
protected final Object registrationLock = new Object();
-
+ protected List bootstraps = new LinkedList();
+ protected final AtomicInteger connectingBootstrap = new AtomicInteger(0);
protected volatile int connectionTimeout = 5000; // default
protected volatile boolean registrationComplete = false;
private volatile ConnectionBridgeFlushAlways connectionBridgeFlushAlways;
- public EndPointClient(String name, ConnectionOptions options) throws InitializationException, SecurityException {
- super(name, options);
+ public
+ EndPointClient(Configuration options) throws InitializationException, SecurityException, IOException {
+ super(Client.class, options);
}
- protected void registerNextProtocol() {
+ protected
+ void registerNextProtocol() {
new Thread(this, "Bootstrap registration").start();
}
@Override
- public void run() {
- synchronized(this.connectingBootstrap) {
+ public
+ void run() {
+ synchronized (this.connectingBootstrap) {
int bootstrapToRegister = this.connectingBootstrap.getAndIncrement();
BootstrapWrapper bootstrapWrapper = this.bootstraps.get(bootstrapToRegister);
@@ -60,12 +63,18 @@ public class EndPointClient extends EndPoint implements Runnable {
future = bootstrapWrapper.bootstrap.connect();
future.await();
} catch (Exception e) {
- String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, e);
+ String errorMessage = stopWithErrorMessage(logger2,
+ "Could not connect to the " + bootstrapWrapper.type + " server on port: " +
+ bootstrapWrapper.port,
+ e);
throw new IllegalArgumentException(errorMessage);
}
if (!future.isSuccess()) {
- String errorMessage = stopWithErrorMessage(logger2, "Could not connect to the " + bootstrapWrapper.type + " server on port: " + bootstrapWrapper.port, future.cause());
+ String errorMessage = stopWithErrorMessage(logger2,
+ "Could not connect to the " + bootstrapWrapper.type + " server on port: " +
+ bootstrapWrapper.port,
+ future.cause());
throw new IllegalArgumentException(errorMessage);
}
@@ -78,11 +87,13 @@ public class EndPointClient extends EndPoint implements Runnable {
/**
* Internal call by the pipeline to notify the client to continue registering the different session protocols.
+ *
* @return true if we are done registering bootstraps
*/
@Override
- protected boolean registerNextProtocol0() {
- synchronized(this.connectingBootstrap) {
+ protected
+ boolean registerNextProtocol0() {
+ synchronized (this.connectingBootstrap) {
this.registrationComplete = this.connectingBootstrap.get() == this.bootstraps.size();
if (!this.registrationComplete) {
registerNextProtocol();
@@ -104,7 +115,8 @@ public class EndPointClient extends EndPoint implements Runnable {
* will BLOCK until it has successfully registered it's connections.
*/
@Override
- final void connectionConnected0(Connection connection) {
+ final
+ void connectionConnected0(Connection connection) {
// invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need.
super.connectionConnected0(connection);
@@ -114,6 +126,25 @@ public class EndPointClient extends EndPoint implements Runnable {
}
}
+ /**
+ * Expose methods to send objects to a destination.
+ *
+ * This returns a bridge that will flush after EVERY send! This is because sending data can occur on the client, outside
+ * of the normal eventloop patterns, and it is confusing to the user to have to manually flush the channel each time.
+ */
+ @Override
+ public
+ ConnectionBridge send() {
+ ConnectionBridgeFlushAlways connectionBridgeFlushAlways2 = this.connectionBridgeFlushAlways;
+ if (connectionBridgeFlushAlways2 == null) {
+ ConnectionBridge clientBridge = this.connectionManager.getConnection0()
+ .send();
+ this.connectionBridgeFlushAlways = new ConnectionBridgeFlushAlways(clientBridge);
+ }
+
+ return this.connectionBridgeFlushAlways;
+ }
+
/**
* Internal call to abort registration if the shutdown command is issued during channel registration.
*/
@@ -123,21 +154,4 @@ public class EndPointClient extends EndPoint implements Runnable {
}
stop();
}
-
- /**
- * Expose methods to send objects to a destination.
- *
- * This returns a bridge that will flush after EVERY send! This is because sending data can occur on the client, outside
- * of the normal eventloop patterns, and it is confusing to the user to have to manually flush the channel each time.
- */
- @Override
- public ConnectionBridge send() {
- ConnectionBridgeFlushAlways connectionBridgeFlushAlways2 = this.connectionBridgeFlushAlways;
- if (connectionBridgeFlushAlways2 == null) {
- ConnectionBridge clientBridge = this.connectionManager.getConnection0().send();
- this.connectionBridgeFlushAlways = new ConnectionBridgeFlushAlways(clientBridge);
- }
-
- return this.connectionBridgeFlushAlways;
- }
}
diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java
index 82760467..a67e7e9d 100644
--- a/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java
+++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPointServer.java
@@ -1,19 +1,24 @@
package dorkbox.network.connection;
-import dorkbox.network.ConnectionOptions;
+import dorkbox.network.Configuration;
+import dorkbox.network.Server;
import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.util.exceptions.InitializationException;
import dorkbox.network.util.exceptions.SecurityException;
+import java.io.IOException;
+
/**
* This serves the purpose of making sure that specific methods are not available to the end user.
*/
-public class EndPointServer extends EndPoint {
+public
+class EndPointServer extends EndPoint {
- private ServerConnectionBridge serverConnections;
+ private final ServerConnectionBridge serverConnections;
- public EndPointServer(String name, ConnectionOptions options) throws InitializationException, SecurityException {
- super(name, options);
+ public
+ EndPointServer(Configuration options) throws InitializationException, SecurityException, IOException {
+ super(Server.class, options);
this.serverConnections = new ServerConnectionBridge(this.connectionManager);
}
@@ -22,7 +27,8 @@ public class EndPointServer extends EndPoint {
* Expose methods to send objects to a destination.
*/
@Override
- public ConnectionBridgeServer send() {
+ public
+ ConnectionBridgeServer send() {
return this.serverConnections;
}
@@ -35,7 +41,8 @@ public class EndPointServer extends EndPoint {
*
* @return a newly created listener manager for the connection
*/
- final ConnectionManager addListenerManager(Connection connection) {
+ final
+ ConnectionManager addListenerManager(Connection connection) {
return this.connectionManager.addListenerManager(connection);
}
@@ -45,10 +52,11 @@ public class EndPointServer extends EndPoint {
*
* It is POSSIBLE to remove a server-connection 'local' listener (via connection.removeListener), meaning that ONLY
* that listener attached to the connection is removed
- *
+ *
* This removes the listener manager for that specific connection
*/
- final void removeListenerManager(Connection connection) {
+ final
+ void removeListenerManager(Connection connection) {
this.connectionManager.removeListenerManager(connection);
}
}
diff --git a/Dorkbox-Network/src/dorkbox/network/util/KryoConnectionSerializationManager.java b/Dorkbox-Network/src/dorkbox/network/connection/KryoCryptoSerializationManager.java
similarity index 68%
rename from Dorkbox-Network/src/dorkbox/network/util/KryoConnectionSerializationManager.java
rename to Dorkbox-Network/src/dorkbox/network/connection/KryoCryptoSerializationManager.java
index 5a5721d2..bf1b453d 100644
--- a/Dorkbox-Network/src/dorkbox/network/util/KryoConnectionSerializationManager.java
+++ b/Dorkbox-Network/src/dorkbox/network/connection/KryoCryptoSerializationManager.java
@@ -1,37 +1,43 @@
-package dorkbox.network.util;
+package dorkbox.network.connection;
import com.esotericsoftware.kryo.*;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.factories.SerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.pool.KryoFactory;
-import com.esotericsoftware.kryo.pool.KryoPool;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.util.MapReferenceResolver;
-import dorkbox.network.connection.Connection;
-import dorkbox.network.pipeline.ByteBufInput;
-import dorkbox.network.pipeline.ByteBufOutput;
-import dorkbox.network.rmi.RmiRegisterClassesCallback;
-import dorkbox.network.rmi.SerializerRegistration;
+import dorkbox.network.rmi.*;
+import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.network.util.exceptions.NetException;
import dorkbox.network.util.serializers.FieldAnnotationAwareSerializer;
import dorkbox.network.util.serializers.IgnoreSerialization;
+import dorkbox.network.util.serializers.UnmodifiableCollectionsSerializer;
import dorkbox.util.crypto.Crypto;
-import dorkbox.util.crypto.bouncycastle.GCMBlockCipher_ByteBuf;
+import dorkbox.util.crypto.serialization.EccPrivateKeySerializer;
+import dorkbox.util.crypto.serialization.EccPublicKeySerializer;
+import dorkbox.util.crypto.serialization.IesParametersSerializer;
+import dorkbox.util.crypto.serialization.IesWithCipherParametersSerializer;
+import dorkbox.util.objectPool.ObjectPool;
+import dorkbox.util.objectPool.ObjectPoolFactory;
+import dorkbox.util.objectPool.PoolableObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.Unpooled;
import io.netty.handler.codec.compression.CompressionException;
import io.netty.handler.codec.compression.SnappyAccess;
-import org.bouncycastle.crypto.engines.AESFastEngine;
-import org.jctools.queues.MpmcArrayQueue;
+import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
+import org.bouncycastle.crypto.params.ECPublicKeyParameters;
+import org.bouncycastle.crypto.params.IESParameters;
+import org.bouncycastle.crypto.params.IESWithCipherParameters;
+import org.jctools.util.Pow2;
import org.slf4j.Logger;
import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Collection;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@@ -40,18 +46,16 @@ import java.util.zip.Inflater;
* Threads reading/writing, it messes up a single instance.
* it is possible to use a single kryo with the use of synchronize, however - that defeats the point of multi-threaded
*/
+@SuppressWarnings({"unused", "StaticNonFinalField"})
public
-class KryoConnectionSerializationManager implements ConnectionSerializationManager {
+class KryoCryptoSerializationManager implements CryptoSerializationManager {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KryoCryptoSerializationManager.class);
+ private static final int capacity = Pow2.roundToPowerOfTwo(Runtime.getRuntime()
+ .availableProcessors() * 32);
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KryoConnectionSerializationManager.class);
private static final boolean ENABLE_SNAPPY = false;
- /**
- * Specify if we want KRYO to use unsafe memory for serialization, or to use the ASM backend. Unsafe memory use is WAY faster, but is
- * limited to the "same endianess" on all endpoints, and unsafe DOES NOT work on android.
- */
- public static boolean useUnsafeMemory = false;
-
/**
* The minimum amount that we'll consider actually attempting to compress.
* This value is preamble + the minimum length our Snappy service will
@@ -59,661 +63,66 @@ class KryoConnectionSerializationManager implements ConnectionSerializationManag
*/
private static final int MIN_COMPRESSIBLE_LENGTH = 18;
+ /**
+ * bit masks
+ */
+ private static final int compression = 1;
+ private static final int crypto = 1 << 1;
+
+ // compression options
+ static final int compressionLevel = 6;
+ private static final ByteBuf NULL_BUFFER = null;
+
+ /**
+ * Specify if we want KRYO to use unsafe memory for serialization, or to use the ASM backend. Unsafe memory use is WAY faster, but is
+ * limited to the "same endianess" on all endpoints, and unsafe DOES NOT work on android.
+ */
+ public static boolean useUnsafeMemory = false;
+
+ private static final String OBJECT_ID = "objectID";
+ private boolean rmiInitialized = false;
+
+ /**
+ * The default serialization manager. This is static, since serialization must be consistent within the JVM. This can be changed.
+ */
+ public static KryoCryptoSerializationManager DEFAULT = DEFAULT();
- private final KryoPool pool;
- private RmiRegisterClassesCallback rmiCallback;
public static
- KryoConnectionSerializationManager DEFAULT() {
- return DEFAULT(false, true);
+ KryoCryptoSerializationManager DEFAULT() {
+ return DEFAULT(true, true);
}
public static
- KryoConnectionSerializationManager DEFAULT(final boolean references, final boolean registrationRequired) {
- // ignore fields that have the "IgnoreSerialization" annotation.
- Set> marks = new HashSet>();
+ KryoCryptoSerializationManager DEFAULT(final boolean references, final boolean registrationRequired) {
+ // ignore fields that have the "@IgnoreSerialization" annotation.
+ Collection> marks = new ArrayList>();
marks.add(IgnoreSerialization.class);
SerializerFactory disregardingFactory = new FieldAnnotationAwareSerializer.Factory(marks, true);
- // from the list-serve email. This offers 8x performance in resolving references over the default impl.
- final BinaryListReferenceResolver resolver = new BinaryListReferenceResolver();
- return new KryoConnectionSerializationManager(references, registrationRequired, resolver, disregardingFactory);
+ final KryoCryptoSerializationManager serializationManager = new KryoCryptoSerializationManager(references,
+ registrationRequired,
+ disregardingFactory);
+
+ serializationManager.register(PingMessage.class);
+ serializationManager.register(byte[].class);
+
+ serializationManager.register(IESParameters.class, new IesParametersSerializer());
+ serializationManager.register(IESWithCipherParameters.class, new IesWithCipherParametersSerializer());
+ serializationManager.register(ECPublicKeyParameters.class, new EccPublicKeySerializer());
+ serializationManager.register(ECPrivateKeyParameters.class, new EccPrivateKeySerializer());
+ serializationManager.register(dorkbox.network.connection.registration.Registration.class);
+
+ // necessary for the transport of exceptions.
+ serializationManager.register(ArrayList.class, new CollectionSerializer());
+ serializationManager.register(StackTraceElement.class);
+ serializationManager.register(StackTraceElement[].class);
+
+ UnmodifiableCollectionsSerializer.registerSerializers(serializationManager);
+
+ return serializationManager;
}
-
- // @formatter:off
- private enum ChunkType {
- COMPRESSED_DATA,
- UNCOMPRESSED_DATA,
- RESERVED_UNSKIPPABLE,
- RESERVED_SKIPPABLE
- }
-
- /** bit masks */
- private static final int compression = 1 << 0;
- private static final int crypto = 1 << 1;
- // @formatter:on
-
- // compression options
- private static final int compressionLevel = 6;
-
-
- @SuppressWarnings("rawtypes")
- class KryoRegister {
- public Class> clazz = null;
- public Serializer> serializer = null;
- public Integer id = null;
- public SerializerRegistration registration;
-
- public
- KryoRegister() {
- }
- }
-
-
- class KryoExtra extends Kryo {
- private final ByteBufOutput outputBuffer;
- private final ByteBufInput inputBuffer;
-
- private final Inflater inflater;
- private final Deflater deflater;
-
- private final SnappyAccess snappy;
-
- private final ByteBuf tmpBuffer1;
- private final ByteBuf tmpBuffer2;
-
- private final GCMBlockCipher_ByteBuf aesEngine;
-
-
- public
- KryoExtra() {
- this.snappy = new SnappyAccess();
- this.deflater = new Deflater(compressionLevel, true);
- this.inflater = new Inflater(true);
-
- this.inputBuffer = new ByteBufInput();
- this.outputBuffer = new ByteBufOutput();
-
- this.tmpBuffer1 = Unpooled.buffer(1024);
- this.tmpBuffer2 = Unpooled.buffer(1024);
- this.aesEngine = new GCMBlockCipher_ByteBuf(new AESFastEngine());
- }
- }
-
-
-
- final ArrayList registers = new ArrayList(16);
-
-
- /**
- * @param references If true, each appearance of an object in the graph after the first is stored as an integer ordinal.
- * When set to true, {@link MapReferenceResolver} is used. This enables references to the same object and
- * cyclic graphs to be serialized, but typically adds overhead of one byte per object. (should be true)
- *
- * @param registrationRequired If true, an exception is thrown when an unregistered class is encountered.
- *
- * If false, when an unregistered class is encountered, its fully qualified class name will be serialized
- * and the {@link Kryo#addDefaultSerializer(Class, Class) default serializer} for the class used to
- * serialize the object. Subsequent appearances of the class within the same object graph are serialized
- * as an int id.
- *
- * Registered classes are serialized as an int id, avoiding the overhead of serializing the class name,
- * but have the drawback of needing to know the classes to be serialized up front.
- *
- * @param referenceResolver Sets the reference resolver and enables references.
- *
- * @param factory Sets the serializer factory to use when no {@link Kryo#addDefaultSerializer(Class, Class) default
- * serializers} match an object's type. Default is {@link ReflectionSerializerFactory} with
- * {@link FieldSerializer}. @see Kryo#newDefaultSerializer(Class)
- *
- */
- public
- KryoConnectionSerializationManager(final boolean references,
- final boolean registrationRequired,
- final ReferenceResolver referenceResolver,
- final SerializerFactory factory) {
- KryoFactory kryoFactory = new KryoFactory() {
- @SuppressWarnings("unchecked")
- @Override
- public
- KryoExtra create() {
- KryoExtra kryo = new KryoExtra();
-
- // we HAVE to pre-allocate the KRYOs
- boolean useAsm = !useUnsafeMemory;
-
- kryo.setAsmEnabled(useAsm);
- kryo.setRegistrationRequired(registrationRequired);
-
- kryo.setReferences(references);
-
- if (referenceResolver != null) {
- kryo.setReferenceResolver(referenceResolver);
- }
- if (factory != null) {
- kryo.setDefaultSerializer(factory);
- }
-
- for (KryoRegister register : KryoConnectionSerializationManager.this.registers) {
- if (register.registration != null) {
- Registration reg = kryo.register(register.clazz);
- register.registration.register(reg.getSerializer());
- }
- else {
- if (register.serializer != null && register.id != null) {
- kryo.register(register.clazz, register.serializer, register.id);
- }
- else if (register.serializer != null) {
- kryo.register(register.clazz, register.serializer);
- }
- else {
- kryo.register(register.clazz);
- }
- }
- }
-
- if (KryoConnectionSerializationManager.this.rmiCallback != null) {
- // necessary for the RMI bridge. Only called once, but necessary for all kryo instances
- KryoConnectionSerializationManager.this.rmiCallback.registerForClasses(kryo);
- }
-
- return kryo;
- }
- };
-
- this.pool = new KryoPool.Builder(kryoFactory).queue(new MpmcArrayQueue(Runtime.getRuntime()
- .availableProcessors() * 32))
- .build();
- }
-
-
-
- /**
- * Registers the class using the lowest, next available integer ID and the
- * {@link Kryo#getDefaultSerializer(Class) default serializer}. If the class
- * is already registered, the existing entry is updated with the new
- * serializer. Registering a primitive also affects the corresponding
- * primitive wrapper.
- *
- * Because the ID assigned is affected by the IDs registered before it, the
- * order classes are registered is important when using this method. The
- * order must be the same at deserialization as it was for serialization.
- */
- @Override
- public
- void register(Class> clazz) {
- KryoRegister kryoRegister = new KryoRegister();
- kryoRegister.clazz = clazz;
- this.registers.add(kryoRegister);
- }
-
- /**
- * Registers the class using the lowest, next available integer ID and the
- * specified serializer. If the class is already registered, the existing
- * entry is updated with the new serializer. Registering a primitive also
- * affects the corresponding primitive wrapper.
- *
- * Because the ID assigned is affected by the IDs registered before it, the
- * order classes are registered is important when using this method. The
- * order must be the same at deserialization as it was for serialization.
- */
- @Override
- public
- void register(Class> clazz, Serializer> serializer) {
- KryoRegister kryoRegister = new KryoRegister();
- kryoRegister.clazz = clazz;
- kryoRegister.serializer = serializer;
- this.registers.add(kryoRegister);
- }
-
- /**
- * Registers the class using the specified ID and serializer. If the ID is
- * already in use by the same type, the old entry is overwritten. If the ID
- * is already in use by a different type, a {@link KryoException} is thrown.
- * Registering a primitive also affects the corresponding primitive wrapper.
- *
- * IDs must be the same at deserialization as they were for serialization.
- *
- * @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs
- * 0-8 are used by default for primitive types and String, but
- * these IDs can be repurposed.
- */
- @Override
- public
- void register(Class> clazz, Serializer> serializer, int id) {
- KryoRegister kryoRegister = new KryoRegister();
- kryoRegister.clazz = clazz;
- kryoRegister.serializer = serializer;
- kryoRegister.id = id;
- this.registers.add(kryoRegister);
- }
-
- /**
- * primarily used by RMI It is not common to call this method!
- *
- * Registers the class using the lowest, next available integer ID and the
- * {@link SerializerRegistration(Class) serializer}. If the class
- * is already registered, the existing entry is updated with the new
- * serializer. Registering a primitive also affects the corresponding
- * primitive wrapper.
- *
- * Because the ID assigned is affected by the IDs registered before it, the
- * order classes are registered is important when using this method. The
- * order must be the same at deserialization as it was for serialization.
- */
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public
- void registerSerializer(Class> clazz, SerializerRegistration registration) {
- KryoRegister kryoRegister = new KryoRegister();
- kryoRegister.clazz = clazz;
- kryoRegister.registration = registration;
- this.registers.add(kryoRegister);
- }
-
- /**
- * Necessary to register classes for RMI, only called once when the RMI bridge is created.
- */
- @Override
- public
- void registerForRmiClasses(RmiRegisterClassesCallback callback) {
- this.rmiCallback = callback;
- }
-
- /**
- * If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is
- * automatically registered using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
- *
- * @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
- * @see ClassResolver#getRegistration(Class)
- */
- @Override
- public
- Registration getRegistration(Class> clazz) {
- // registration is always required, will throw exception if this class is not already registered
- final Kryo kryo = this.pool.borrow();
- Registration r;
-
- try {
- r = kryo.getRegistration(clazz);
- } finally {
- this.pool.release(kryo);
- }
-
- return r;
- }
-
- /**
- * Determines if this buffer is encrypted or not.
- */
- @Override
- public final
- boolean isEncrypted(ByteBuf buffer) {
- // read off the magic byte
- byte magicByte = buffer.getByte(buffer.readerIndex());
- return (magicByte & crypto) == crypto;
- }
-
- /**
- * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
- *
- * No crypto and no sqeuence number
- *
- * There is a small speed penalty if there were no kryo's available to use.
- */
- @Override
- public final
- void write(ByteBuf buffer, Object message) {
- write0(null, buffer, message, false);
- }
-
- /**
- * Writes the class and object using an available kryo instance
- */
- @Override
- public
- void writeFullClassAndObject(Output output, Object value) {
- final Kryo kryo = this.pool.borrow();
- boolean prev = kryo.isRegistrationRequired();
- kryo.setRegistrationRequired(false);
-
- try {
- kryo.writeClassAndObject(output, value);
- } catch (KryoException ex) {
- throw new NetException("Unable to serialize buffer", ex);
- } finally {
- kryo.setRegistrationRequired(prev);
- this.pool.release(kryo);
- }
- }
-
- @Override
- public
- Object readFullClassAndObject(final Input input) {
- final Kryo kryo = this.pool.borrow();
- boolean prev = kryo.isRegistrationRequired();
- kryo.setRegistrationRequired(false);
-
- try {
- Object readClassAndObject = kryo.readClassAndObject(input);
- return readClassAndObject;
- } catch (KryoException ex) {
- throw new NetException("Unable to deserialize buffer", ex);
- } finally {
- kryo.setRegistrationRequired(prev);
- this.pool.release(kryo);
- }
- }
-
- @Override
- public
- Kryo borrow() {
- return this.pool.borrow();
- }
-
- @Override
- public
- void release(final Kryo kryo) {
- this.pool.release(kryo);
- }
-
- /**
- * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
- *
- * There is a small speed penalty if there were no kryo's available to use.
- */
- @Override
- public final
- void writeWithCryptoTcp(Connection connection, ByteBuf buffer, Object message) {
- if (connection == null) {
- throw new NetException("Unable to perform crypto when NO network connection!");
- }
-
- write0(connection, buffer, message, true);
- }
-
- /**
- * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
- *
- * There is a small speed penalty if there were no kryo's available to use.
- */
- @Override
- public final
- void writeWithCryptoUdp(Connection connection, ByteBuf buffer, Object message) {
- if (connection == null) {
- throw new NetException("Unable to perform crypto when NO network connection!");
- }
-
- write0(connection, buffer, message, true);
- }
-
-
- /**
- * @param doCrypto true if we want to perform crypto on this data.
- */
- @SuppressWarnings("unchecked")
- private
- void write0(Connection connection, ByteBuf buffer, Object message, boolean doCrypto) {
- final KryoExtra kryo = (KryoExtra) this.pool.borrow();
- Logger logger2 = logger;
-
- try {
- byte magicByte = (byte) 0x00000000;
-
- ByteBuf bufferWithData = kryo.tmpBuffer1;
- ByteBuf bufferTempData = kryo.tmpBuffer2;
-
- // write the object to the TEMP buffer! this will be compressed with snappy
- kryo.outputBuffer.setBuffer(bufferWithData);
-
- // connection will ALWAYS be of type Connection or NULL.
- // used by RMI/some serializers to determine which connection wrote this object
- // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
- if (connection != null) {
- kryo.getContext()
- .put(Connection.connection, connection);
- }
-
- kryo.writeClassAndObject(kryo.outputBuffer, message);
-
- // release resources
- kryo.outputBuffer.setBuffer((ByteBuf) null);
-
- // save off how much data the object took + the length of the (possible) sequence.
- int length = bufferWithData.writerIndex(); // it started at ZERO (since it's written to the temp buffer.
-
- // snappy compression
- // tmpBuffer2 = compress(tmpBuffer1)
- //noinspection StatementWithEmptyBody
- if (length > MIN_COMPRESSIBLE_LENGTH) {
- if (ENABLE_SNAPPY) {
- snappyCompress(bufferWithData, bufferTempData, length, kryo.snappy);
- }
- else {
- compress(bufferWithData, bufferTempData, length, kryo.deflater);
- }
-
- // check to make sure that it was WORTH compressing, like what I had before
- int compressedLength = bufferTempData.readableBytes();
- if (compressedLength < length) {
- // specify we compressed data
- magicByte = (byte) (magicByte | compression);
-
- length = compressedLength;
-
- // swap buffers
- ByteBuf tmp = bufferWithData;
- bufferWithData = bufferTempData;
- bufferTempData = tmp;
- }
- else {
- // "copy" (do nothing)
- bufferWithData.readerIndex(0); // have to reset the reader
- }
- }
- else {
- // "copy" (do nothing)
- }
-
- // at this point, we have 2 options for *bufferWithData*
- // compress -> tmpBuffers2 has data
- // copy -> tmpBuffers1 has data
-
-
- // AES CRYPTO
- if (doCrypto && connection != null) {
- if (logger2.isTraceEnabled()) {
- logger2.trace("Encrypting data with - AES {}", connection);
- }
-
- length = Crypto.AES.encrypt(kryo.aesEngine, connection.getCryptoParameters(), bufferWithData, bufferTempData, length);
-
- // swap buffers
- ByteBuf tmp = bufferWithData;
- bufferWithData = bufferTempData;
- bufferTempData = tmp;
- bufferTempData.clear();
-
- // only needed for server UDP connections to determine if the data is encrypted or not.
- magicByte = (byte) (magicByte | crypto);
- }
-
-
- /// MOVE EVERYTHING TO THE PROPER BYTE BUF
-
- // write out the "magic" byte.
- buffer.writeByte(magicByte); // leave space for the magic magicByte
-
- // transfer the tmpBuffer (if necessary) back into the "primary" buffer.
- buffer.writeBytes(bufferWithData);
-
- // don't forget the clear the temp buffers!
- kryo.tmpBuffer1.clear();
- kryo.tmpBuffer2.clear();
-
-
- } catch (KryoException ex) {
- throw new NetException("Unable to serialize buffer", ex);
- } finally {
- this.pool.release(kryo);
- }
- }
-
- /**
- * Reads an object from the buffer.
- *
- * No crypto and no sequence number
- *
- * @param length should ALWAYS be the length of the expected object!
- */
- @Override
- public final
- Object read(ByteBuf buffer, int length) {
- return read0(null, buffer, length, false);
- }
-
- /**
- * Reads an object from the buffer.
- *
- * Crypto + sequence number
- *
- * @param connection can be NULL
- * @param length should ALWAYS be the length of the expected object!
- */
- @Override
- public final
- Object readWithCryptoTcp(Connection connection, ByteBuf buffer, int length) {
- if (connection == null) {
- throw new NetException("Unable to perform crypto when NO network connection!");
- }
-
- return read0(connection, buffer, length, true);
- }
-
- /**
- * Reads an object from the buffer.
- *
- * Crypto + sequence number
- *
- * @param connection can be NULL
- * @param length should ALWAYS be the length of the expected object!
- */
- @Override
- public final
- Object readWithCryptoUdp(Connection connection, ByteBuf buffer, int length) {
- if (connection == null) {
- throw new NetException("Unable to perform crypto when NO network connection!");
- }
-
- return read0(connection, buffer, length, true);
- }
-
- /**
- * @param doCrypto true if crypto was used for this data.
- */
- @SuppressWarnings("unchecked")
- private
- Object read0(Connection connection, ByteBuf buffer, int length, boolean doCrypto) {
- final KryoExtra kryo = (KryoExtra) this.pool.borrow();
- Logger logger2 = logger;
-
- int originalLength = 0;
- int originalStartPos = 0;
-
- ////////////////
- // Note: we CANNOT write BACK to "buffer" since there could be additional data on it!
- ////////////////
- try {
- // read off the magic byte
- int startPosition = buffer.readerIndex();
- byte magicByte = buffer.readByte();
-
- // adjust for the magic byte
- startPosition++;
- length--;
-
- originalLength = length;
- originalStartPos = startPosition;
-
- ByteBuf bufferWithData = buffer;
- ByteBuf bufferTempData = kryo.tmpBuffer2;
-
- // AES CRYPTO STUFF
- if (doCrypto) {
- if ((magicByte & crypto) != crypto) {
- throw new NetException("Unable to perform crypto when data does not to use crypto!");
- }
-
- if (logger2.isTraceEnabled()) {
- logger2.trace("Decrypting data with - AES " + connection);
- }
-
- Crypto.AES.decrypt(kryo.aesEngine, connection.getCryptoParameters(), bufferWithData, bufferTempData, length);
-
- // since we "nuked" the start position, we have to make sure the compressor picks up the change.
- startPosition = 0;
-
- // swap buffers
- bufferWithData = bufferTempData;
- bufferTempData = kryo.tmpBuffer2;
- }
-
- // did we compress it??
- //noinspection StatementWithEmptyBody
- if ((magicByte & compression) == compression) {
- if (ENABLE_SNAPPY) {
- snappyDecompress(bufferWithData, bufferTempData, kryo.snappy);
- }
- else {
- decompress(bufferWithData, bufferTempData, kryo.inflater);
- }
-
- // swap buffers
- ByteBuf tmp = bufferWithData;
- bufferWithData = bufferTempData;
- bufferTempData = tmp;
- }
- else {
- // "copy" (do nothing)
- }
-
- // read the object from the buffer.
- kryo.inputBuffer.setBuffer(bufferWithData);
-
-
-
- // connection will ALWAYS be of type IConnection or NULL.
- // used by RMI/some serializers to determine which connection read this object
- // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
- if (connection != null) {
- kryo.getContext()
- .put(Connection.connection, connection);
- }
-
- Object readClassAndObject = kryo.readClassAndObject(kryo.inputBuffer);
-
- return readClassAndObject;
- } catch (KryoException ex) {
- throw new NetException("Unable to deserialize buffer", ex);
- } finally {
- // release resources
- kryo.inputBuffer.setBuffer((ByteBuf) null);
-
- // make sure the end of the buffer is in the correct spot.
- // move the reader index to the end of the object (since we are reading encrypted data
- // this just has to happen before the length field is reassigned.
- buffer.readerIndex(originalStartPos + originalLength);
-
- // don't forget the clear the temp buffers!
- kryo.tmpBuffer1.clear();
- kryo.tmpBuffer2.clear();
-
- this.pool.release(kryo);
- }
- }
-
-
@SuppressWarnings("unused")
private static
void compress(ByteBuf inputBuffer, ByteBuf outputBuffer, int length, Deflater compress) {
@@ -733,6 +142,7 @@ class KryoConnectionSerializationManager implements ConnectionSerializationManag
}
}
+
private static
void decompress(ByteBuf inputBuffer, ByteBuf outputBuffer, Inflater decompress) {
byte[] in = new byte[inputBuffer.readableBytes()];
@@ -787,7 +197,6 @@ class KryoConnectionSerializationManager implements ConnectionSerializationManag
}
}
-
private static
void snappyDecompress(ByteBuf inputBuffer, ByteBuf outputBuffer, SnappyAccess snappy) {
try {
@@ -914,7 +323,6 @@ class KryoConnectionSerializationManager implements ConnectionSerializationManag
}
}
-
private static
void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
out.writeByte(1);
@@ -953,4 +361,626 @@ class KryoConnectionSerializationManager implements ConnectionSerializationManag
void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
out.writeInt(ByteBufUtil.swapInt(SnappyAccess.calculateChecksum(slice)));
}
+
+ final ObjectPool pool;
+
+ /**
+ * @param references If true, each appearance of an object in the graph after the first is stored as an integer ordinal.
+ * When set to true, {@link MapReferenceResolver} is used. This enables references to the same object and
+ * cyclic graphs to be serialized, but typically adds overhead of one byte per object. (should be true)
+ *
+ * @param registrationRequired If true, an exception is thrown when an unregistered class is encountered.
+ *
+ * If false, when an unregistered class is encountered, its fully qualified class name will be serialized
+ * and the {@link Kryo#addDefaultSerializer(Class, Class) default serializer} for the class used to
+ * serialize the object. Subsequent appearances of the class within the same object graph are serialized
+ * as an int id.
+ *
+ * Registered classes are serialized as an int id, avoiding the overhead of serializing the class name,
+ * but have the drawback of needing to know the classes to be serialized up front.
+ *
+ * @param factory Sets the serializer factory to use when no {@link Kryo#addDefaultSerializer(Class, Class) default
+ * serializers} match an object's type. Default is {@link ReflectionSerializerFactory} with
+ * {@link FieldSerializer}. @see Kryo#newDefaultSerializer(Class)
+ *
+ */
+ public
+ KryoCryptoSerializationManager(final boolean references, final boolean registrationRequired, final SerializerFactory factory) {
+ // we have to use a custom queue, because we CANNOT have kryo's used that have not been properly "registered" with
+ // different serializers/etc. This queue will properly block if it runs out of kryo's
+
+ // This pool wil also pre-populate, so that we have the hit on startup, instead of on access
+ // this is also so that our register methods can correctly register with all of the kryo instances
+ pool = ObjectPoolFactory.create(new PoolableObject() {
+ @Override
+ public
+ Kryo create() {
+ KryoExtra kryo = new KryoExtra();
+
+ // we HAVE to pre-allocate the KRYOs
+ boolean useAsm = !useUnsafeMemory;
+
+ kryo.setAsmEnabled(useAsm);
+ kryo.setRegistrationRequired(registrationRequired);
+
+ kryo.setReferences(references);
+
+ if (factory != null) {
+ kryo.setDefaultSerializer(factory);
+ }
+
+ return kryo;
+ }
+ }, capacity);
+ }
+
+ /**
+ * Registers the class using the lowest, next available integer ID and the
+ * {@link Kryo#getDefaultSerializer(Class) default serializer}. If the class
+ * is already registered, the existing entry is updated with the new
+ * serializer. Registering a primitive also affects the corresponding
+ * primitive wrapper.
+ *
+ * Because the ID assigned is affected by the IDs registered before it, the
+ * order classes are registered is important when using this method. The
+ * order must be the same at deserialization as it was for serialization.
+ */
+ @Override
+ public synchronized
+ void register(Class> clazz) {
+ Kryo kryo;
+ for (int i = 0; i < capacity; i++) {
+ kryo = this.pool.takeUninterruptibly();
+ kryo.register(clazz);
+ this.pool.release(kryo);
+ }
+ }
+
+ /**
+ * Registers the class using the lowest, next available integer ID and the
+ * specified serializer. If the class is already registered, the existing
+ * entry is updated with the new serializer. Registering a primitive also
+ * affects the corresponding primitive wrapper.
+ *
+ * Because the ID assigned is affected by the IDs registered before it, the
+ * order classes are registered is important when using this method. The
+ * order must be the same at deserialization as it was for serialization.
+ */
+ @Override
+ public synchronized
+ void register(Class> clazz, Serializer> serializer) {
+ Kryo kryo;
+ for (int i = 0; i < capacity; i++) {
+ kryo = this.pool.takeUninterruptibly();
+ kryo.register(clazz, serializer);
+ this.pool.release(kryo);
+ }
+ }
+
+ /**
+ * Registers the class using the specified ID and serializer. If the ID is
+ * already in use by the same type, the old entry is overwritten. If the ID
+ * is already in use by a different type, a {@link KryoException} is thrown.
+ * Registering a primitive also affects the corresponding primitive wrapper.
+ *
+ * IDs must be the same at deserialization as they were for serialization.
+ *
+ * @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs
+ * 0-8 are used by default for primitive types and String, but
+ * these IDs can be repurposed.
+ */
+ @Override
+ public synchronized
+ void register(Class> clazz, Serializer> serializer, int id) {
+ Kryo kryo;
+ for (int i = 0; i < capacity; i++) {
+ kryo = this.pool.takeUninterruptibly();
+ kryo.register(clazz, serializer, id);
+ this.pool.release(kryo);
+ }
+ }
+
+ /**
+ * Necessary to register classes for RMI, only called once when the RMI bridge is created.
+ */
+ @Override
+ public synchronized
+ void initRmiSerialization() {
+ if (rmiInitialized) {
+ return;
+ }
+ rmiInitialized = true;
+
+ Kryo kryo;
+ for (int i = 0; i < capacity; i++) {
+ kryo = this.pool.takeUninterruptibly();
+ // necessary for the RMI bridge. Only called once, but for all kryo instances
+
+ kryo.register(Class.class);
+ kryo.register(RmiRegistration.class);
+ kryo.register(Object[].class);
+ kryo.register(InvokeMethod.class, new InvokeMethodSerializer(null));
+
+ FieldSerializer resultSerializer = new FieldSerializer(kryo, InvokeMethodResult.class) {
+ @Override
+ public
+ void write(Kryo kryo, Output output, InvokeMethodResult result) {
+ super.write(kryo, output, result);
+ output.writeInt(result.objectID, true);
+ }
+
+ @Override
+ public
+ InvokeMethodResult read(Kryo kryo, Input input, Class type) {
+ InvokeMethodResult result = super.read(kryo, input, type);
+ result.objectID = input.readInt(true);
+ return result;
+ }
+ };
+ resultSerializer.removeField(OBJECT_ID);
+ kryo.register(InvokeMethodResult.class, resultSerializer);
+
+ kryo.register(InvocationHandler.class, new Serializer