Code polish. Changed kryo to pool (needed for nested serialized objects, specifically RMI). Cleaned up API.

This commit is contained in:
nathan 2016-03-06 23:53:25 +01:00
parent 6e72e79d0c
commit 52f5c1688b
4 changed files with 92 additions and 164 deletions

View File

@ -15,10 +15,8 @@
*/
package dorkbox.network.connection;
import com.esotericsoftware.kryo.ClassResolver;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Registration;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.factories.SerializerFactory;
@ -37,6 +35,8 @@ import dorkbox.network.rmi.InvokeMethodSerializer;
import dorkbox.network.rmi.RemoteObjectSerializer;
import dorkbox.network.rmi.RmiRegistration;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.objectPool.ObjectPool;
import dorkbox.objectPool.PoolableObject;
import dorkbox.util.Property;
import dorkbox.util.serialization.ArraysAsListSerializer;
import dorkbox.util.serialization.EccPrivateKeySerializer;
@ -71,9 +71,6 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KryoCryptoSerializationManager.class);
// round to the nearest power of 2
private static final int capacity = 1 << (32 - Integer.numberOfLeadingZeros((Runtime.getRuntime().availableProcessors() * 2) - 1));
// compression options
@Property
static final int compressionLevel = 6;
@ -135,6 +132,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}
static class ClassSerializer {
final Class<?> clazz;
final Serializer<?> serializer;
@ -169,8 +168,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}
private boolean initialized = false;
private final ThreadLocal<KryoExtra> kryoThreadLocal;
private final ObjectPool<KryoExtra> kryoPool;
// used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class
final List<Class<?>> classesToRegister = new ArrayList<Class<?>>();
@ -209,15 +207,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
public
KryoCryptoSerializationManager(final boolean references, final boolean registrationRequired, final SerializerFactory factory) {
// we have to use a custom queue, because we CANNOT have kryo's used that have not been properly "registered" with
// different serializers/etc. This queue will properly block if it runs out of kryo's
// This pool wil also pre-populate, so that we have the hit on startup, instead of on access
// this is also so that our register methods can correctly register with all of the kryo instances
kryoThreadLocal = new ThreadLocal<KryoExtra>() {
kryoPool = ObjectPool.NonBlockingSoftReference(new PoolableObject<KryoExtra>() {
@Override
protected
KryoExtra initialValue() {
public
KryoExtra create() {
synchronized (KryoCryptoSerializationManager.this) {
KryoExtra kryo = new KryoExtra();
@ -259,7 +252,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
// After all common registrations, register OtherObjectImpl only on the server using the remote object interface ID.
// This causes OtherObjectImpl to be serialized as OtherObject.
int otherObjectID = kryo.getRegistration(remoteClass.implClass).getId();
int otherObjectID = kryo.getRegistration(remoteClass.implClass)
.getId();
// this overrides the 'otherObjectID' with the specified class/serializer, so that when we WRITE this ID, the impl is READ
kryo.register(remoteClass.ifaceClass, remoteObjectSerializer, otherObjectID);
@ -276,20 +270,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
return kryo;
}
}
};
}
/**
* If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered using the
* {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
*
* @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
* @see ClassResolver#getRegistration(Class)
*/
@Override
public synchronized
Registration getRegistration(Class<?> clazz) {
return kryoThreadLocal.get().getRegistration(clazz);
});
}
/**
@ -350,7 +331,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}
/**
* Objects that we want to use RMI with must be accessed via an interface. This method configures the serialization of an implementation
* Objects that are accessed over RMI, must be accessed via an interface. This method configures the serialization of an
* implementation
* to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation class is ALREADY registered,
* then it's registration will be overwritten by this one
*
@ -360,6 +342,11 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
@Override
public synchronized
<Iface, Impl extends Iface> void registerRemote(final Class<Iface> ifaceClass, final Class<Impl> implClass) {
if (initialized) {
logger.warn("Serialization manager already initialized. Ignoring duplicate registerRemote(Class, Class) call.");
return;
}
remoteClassToRegister.add(new RemoteClass<Iface, Impl>(ifaceClass, implClass));
}
@ -397,29 +384,31 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
}
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p>
* No crypto and no sequence number
* <p>
* There is a small speed penalty if there were no kryo's available to use.
* @return takes a kryo instance from the pool.
*/
@Override
public final
void write(final ByteBuf buffer, final Object message) throws IOException {
write0(null, buffer, message, false);
public
KryoExtra takeKryo() {
return kryoPool.take();
}
/**
* Reads an object from the buffer.
* <p>
* No crypto and no sequence number
*
* @param length should ALWAYS be the length of the expected object!
* Returns a kryo instance to the pool.
*/
@Override
public final
Object read(final ByteBuf buffer, final int length) throws IOException {
return read0(null, buffer, length);
public
void returnKryo(KryoExtra kryo) {
kryoPool.put(kryo);
}
/**
* Determines if this buffer is encrypted or not.
*/
public static
boolean isEncrypted(final ByteBuf buffer) {
// read off the magic byte
byte magicByte = buffer.getByte(buffer.readerIndex());
return (magicByte & KryoExtra.crypto) == KryoExtra.crypto;
}
/**
@ -428,8 +417,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
@Override
public
void writeFullClassAndObject(final Logger logger, final Output output, final Object value) throws IOException {
Kryo kryo = kryoThreadLocal.get();
boolean prev;
KryoExtra kryo = kryoPool.take();
boolean prev = false;
try {
prev = kryo.isRegistrationRequired();
@ -442,9 +431,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
logger.error(msg, ex);
}
throw new IOException(msg, ex);
} finally {
kryo.setRegistrationRequired(prev);
kryoPool.put(kryo);
}
kryo.setRegistrationRequired(prev);
}
/**
@ -453,7 +443,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
@Override
public
Object readFullClassAndObject(final Logger logger, final Input input) throws IOException {
Kryo kryo = kryoThreadLocal.get();
KryoExtra kryo = kryoPool.take();
boolean prev = false;
try {
@ -469,23 +459,26 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
throw new IOException(msg, ex);
} finally {
kryo.setRegistrationRequired(prev);
kryoPool.put(kryo);
}
}
@Override
public
Kryo getKryo() {
return kryoThreadLocal.get();
}
/**
* Determines if this buffer is encrypted or not.
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p>
* No crypto and no sequence number
* <p>
* There is a small speed penalty if there were no kryo's available to use.
*/
public static
boolean isEncrypted(final ByteBuf buffer) {
// read off the magic byte
byte magicByte = buffer.getByte(buffer.readerIndex());
return (magicByte & KryoExtra.crypto) == KryoExtra.crypto;
@Override
public final
void write(final ByteBuf buffer, final Object message) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
kryo.write(null, buffer, message, logger);
} finally {
kryoPool.put(kryo);
}
}
/**
@ -495,19 +488,31 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
*/
@Override
public final
void writeWithCryptoTcp(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
write0(connection, buffer, message, true);
void writeWithCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
kryo.write(connection, buffer, message, logger);
} finally {
kryoPool.put(kryo);
}
}
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* Reads an object from the buffer.
* <p>
* There is a small speed penalty if there were no kryo's available to use.
* No crypto and no sequence number
*
* @param length should ALWAYS be the length of the expected object!
*/
@Override
public final
void writeWithCryptoUdp(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
write0(connection, buffer, message, true);
Object read(final ByteBuf buffer, final int length) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
return kryo.read(null, buffer, length, logger);
} finally {
kryoPool.put(kryo);
}
}
/**
@ -518,72 +523,15 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
* @param connection can be NULL
* @param length should ALWAYS be the length of the expected object!
*/
@SuppressWarnings("Duplicates")
@Override
public final
Object readWithCryptoTcp(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException {
return read0(connection, buffer, length);
}
/**
* Reads an object from the buffer.
* <p>
* Crypto + sequence number
*
* @param connection can be NULL
* @param length should ALWAYS be the length of the expected object!
*/
@Override
public final
Object readWithCryptoUdp(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException {
return read0(connection, buffer, length);
}
/**
* @param doCrypto true if we want to perform crypto on this data.
*/
@SuppressWarnings("unchecked")
private
void write0(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) throws IOException {
final KryoExtra kryo = kryoThreadLocal.get();
Object readWithCrypto(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
kryo.write(connection, buffer, message, doCrypto);
} catch (Exception ex) {
final String msg = "Unable to serialize buffer";
logger.error(msg, ex);
throw new IOException(msg, ex);
return kryo.read(connection, buffer, length, logger);
} finally {
// release resources
kryo.releaseWrite();
}
}
@SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
private
Object read0(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException {
final KryoExtra kryo = kryoThreadLocal.get();
int originalStartPos = buffer.readerIndex();
////////////////
// Note: we CANNOT write BACK to the buffer since there could be additional data on it!
////////////////
try {
Object object = kryo.read(connection, buffer, length);
return object;
} catch (Exception ex) {
final String msg = "Unable to deserialize buffer";
logger.error(msg, ex);
throw new IOException(msg, ex);
} finally {
// make sure the end of the buffer is in the correct spot.
// move the reader index to the end of the object (since we are reading encrypted data
// this just has to happen before the length field is reassigned.
buffer.readerIndex(originalStartPos + length);
// release resources
kryo.releaseRead();
kryoPool.put(kryo);
}
}
}

View File

@ -105,7 +105,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
ConnectionImpl networkConnection = this.registrationWrapper.getServerUDP(remoteAddress);
if (networkConnection != null) {
// try to write data! (IT SHOULD ALWAYS BE ENCRYPTED HERE!)
this.serializationManager.writeWithCryptoUdp(networkConnection, buffer, object);
this.serializationManager.writeWithCrypto(networkConnection, buffer, object);
}
else {
// this means we are still in the REGISTRATION phase.
@ -182,7 +182,7 @@ class RegistrationRemoteHandlerServerUDP<C extends Connection> extends MessageTo
Object object;
try {
object = serializationManager2.readWithCryptoUdp(connection, data, data.writerIndex());
object = serializationManager2.readWithCrypto(connection, data, data.writerIndex());
} catch (Exception e) {
logger2.error("UDP unable to deserialize buffer", e);
shutdown(registrationWrapper2, channel);

View File

@ -33,14 +33,7 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException;
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException;
void writeWithCrypto(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException;
/**
* Reads an object from the buffer.
@ -52,17 +45,6 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat
* @param length
* should ALWAYS be the length of the expected object!
*/
Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException;
Object readWithCrypto(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException;
/**
* Reads an object from the buffer.
* <p/>
* Crypto + sequence number
*
* @param connection
* can be NULL
* @param length
* should ALWAYS be the length of the expected object!
*/
Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException;
}

View File

@ -15,9 +15,7 @@
*/
package dorkbox.network.util;
import com.esotericsoftware.kryo.ClassResolver;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
import dorkbox.network.connection.KryoExtra;
public
interface RMISerializationManager {
@ -28,14 +26,14 @@ interface RMISerializationManager {
void initRmiSerialization();
/**
* If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered using the
* {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
*
* @throws IllegalArgumentException
* if the class is not registered and registration is required.
* @see ClassResolver#getRegistration(Class)
* @return takes a kryo instance from the pool.
*/
Registration getRegistration(Class<?> clazz);
KryoExtra takeKryo();
/**
* Returns a kryo instance to the pool.
*/
void returnKryo(KryoExtra object);
/**
* Objects that we want to use RMI with, must be accessed via an interface. This method configures the serialization of an