From 52f5c1688b91b671ed0e6795bee97c84b326d741 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 6 Mar 2016 23:53:25 +0100 Subject: [PATCH] Code polish. Changed kryo to pool (needed for nested serialized objects, specifically RMI). Cleaned up API. --- .../KryoCryptoSerializationManager.java | 212 +++++++----------- .../RegistrationRemoteHandlerServerUDP.java | 4 +- .../util/CryptoSerializationManager.java | 22 +- .../network/util/RMISerializationManager.java | 18 +- 4 files changed, 92 insertions(+), 164 deletions(-) diff --git a/src/dorkbox/network/connection/KryoCryptoSerializationManager.java b/src/dorkbox/network/connection/KryoCryptoSerializationManager.java index 4c4f1aea..2b1fd21c 100644 --- a/src/dorkbox/network/connection/KryoCryptoSerializationManager.java +++ b/src/dorkbox/network/connection/KryoCryptoSerializationManager.java @@ -15,10 +15,8 @@ */ package dorkbox.network.connection; -import com.esotericsoftware.kryo.ClassResolver; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.Registration; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.factories.SerializerFactory; @@ -37,6 +35,8 @@ import dorkbox.network.rmi.InvokeMethodSerializer; import dorkbox.network.rmi.RemoteObjectSerializer; import dorkbox.network.rmi.RmiRegistration; import dorkbox.network.util.CryptoSerializationManager; +import dorkbox.objectPool.ObjectPool; +import dorkbox.objectPool.PoolableObject; import dorkbox.util.Property; import dorkbox.util.serialization.ArraysAsListSerializer; import dorkbox.util.serialization.EccPrivateKeySerializer; @@ -71,9 +71,6 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KryoCryptoSerializationManager.class); - // round to the nearest power of 2 - private static final int capacity = 1 << (32 - Integer.numberOfLeadingZeros((Runtime.getRuntime().availableProcessors() * 2) - 1)); - // compression options @Property static final int compressionLevel = 6; @@ -135,6 +132,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } + + static class ClassSerializer { final Class clazz; final Serializer serializer; @@ -169,8 +168,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } private boolean initialized = false; - - private final ThreadLocal kryoThreadLocal; + private final ObjectPool kryoPool; // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class final List> classesToRegister = new ArrayList>(); @@ -209,15 +207,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ public KryoCryptoSerializationManager(final boolean references, final boolean registrationRequired, final SerializerFactory factory) { - // we have to use a custom queue, because we CANNOT have kryo's used that have not been properly "registered" with - // different serializers/etc. This queue will properly block if it runs out of kryo's - - // This pool wil also pre-populate, so that we have the hit on startup, instead of on access - // this is also so that our register methods can correctly register with all of the kryo instances - kryoThreadLocal = new ThreadLocal() { + kryoPool = ObjectPool.NonBlockingSoftReference(new PoolableObject() { @Override - protected - KryoExtra initialValue() { + public + KryoExtra create() { synchronized (KryoCryptoSerializationManager.this) { KryoExtra kryo = new KryoExtra(); @@ -259,7 +252,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { // After all common registrations, register OtherObjectImpl only on the server using the remote object interface ID. // This causes OtherObjectImpl to be serialized as OtherObject. - int otherObjectID = kryo.getRegistration(remoteClass.implClass).getId(); + int otherObjectID = kryo.getRegistration(remoteClass.implClass) + .getId(); // this overrides the 'otherObjectID' with the specified class/serializer, so that when we WRITE this ID, the impl is READ kryo.register(remoteClass.ifaceClass, remoteObjectSerializer, otherObjectID); @@ -276,20 +270,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { return kryo; } } - }; - } - - /** - * If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered using the - * {@link Kryo#addDefaultSerializer(Class, Class) default serializer}. - * - * @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true. - * @see ClassResolver#getRegistration(Class) - */ - @Override - public synchronized - Registration getRegistration(Class clazz) { - return kryoThreadLocal.get().getRegistration(clazz); + }); } /** @@ -350,7 +331,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } /** - * Objects that we want to use RMI with must be accessed via an interface. This method configures the serialization of an implementation + * Objects that are accessed over RMI, must be accessed via an interface. This method configures the serialization of an + * implementation * to be serialized via the defined interface, as a RemoteObject (ie: proxy object). If the implementation class is ALREADY registered, * then it's registration will be overwritten by this one * @@ -360,6 +342,11 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { @Override public synchronized void registerRemote(final Class ifaceClass, final Class implClass) { + if (initialized) { + logger.warn("Serialization manager already initialized. Ignoring duplicate registerRemote(Class, Class) call."); + return; + } + remoteClassToRegister.add(new RemoteClass(ifaceClass, implClass)); } @@ -397,29 +384,31 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { } /** - * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. - *

- * No crypto and no sequence number - *

- * There is a small speed penalty if there were no kryo's available to use. + * @return takes a kryo instance from the pool. */ @Override - public final - void write(final ByteBuf buffer, final Object message) throws IOException { - write0(null, buffer, message, false); + public + KryoExtra takeKryo() { + return kryoPool.take(); } /** - * Reads an object from the buffer. - *

- * No crypto and no sequence number - * - * @param length should ALWAYS be the length of the expected object! + * Returns a kryo instance to the pool. */ @Override - public final - Object read(final ByteBuf buffer, final int length) throws IOException { - return read0(null, buffer, length); + public + void returnKryo(KryoExtra kryo) { + kryoPool.put(kryo); + } + + /** + * Determines if this buffer is encrypted or not. + */ + public static + boolean isEncrypted(final ByteBuf buffer) { + // read off the magic byte + byte magicByte = buffer.getByte(buffer.readerIndex()); + return (magicByte & KryoExtra.crypto) == KryoExtra.crypto; } /** @@ -428,8 +417,8 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { @Override public void writeFullClassAndObject(final Logger logger, final Output output, final Object value) throws IOException { - Kryo kryo = kryoThreadLocal.get(); - boolean prev; + KryoExtra kryo = kryoPool.take(); + boolean prev = false; try { prev = kryo.isRegistrationRequired(); @@ -442,9 +431,10 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { logger.error(msg, ex); } throw new IOException(msg, ex); + } finally { + kryo.setRegistrationRequired(prev); + kryoPool.put(kryo); } - - kryo.setRegistrationRequired(prev); } /** @@ -453,7 +443,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { @Override public Object readFullClassAndObject(final Logger logger, final Input input) throws IOException { - Kryo kryo = kryoThreadLocal.get(); + KryoExtra kryo = kryoPool.take(); boolean prev = false; try { @@ -469,23 +459,26 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { throw new IOException(msg, ex); } finally { kryo.setRegistrationRequired(prev); + kryoPool.put(kryo); } } - @Override - public - Kryo getKryo() { - return kryoThreadLocal.get(); - } - /** - * Determines if this buffer is encrypted or not. + * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + *

+ * No crypto and no sequence number + *

+ * There is a small speed penalty if there were no kryo's available to use. */ - public static - boolean isEncrypted(final ByteBuf buffer) { - // read off the magic byte - byte magicByte = buffer.getByte(buffer.readerIndex()); - return (magicByte & KryoExtra.crypto) == KryoExtra.crypto; + @Override + public final + void write(final ByteBuf buffer, final Object message) throws IOException { + final KryoExtra kryo = kryoPool.take(); + try { + kryo.write(null, buffer, message, logger); + } finally { + kryoPool.put(kryo); + } } /** @@ -495,19 +488,31 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { */ @Override public final - void writeWithCryptoTcp(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException { - write0(connection, buffer, message, true); + void writeWithCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException { + final KryoExtra kryo = kryoPool.take(); + try { + kryo.write(connection, buffer, message, logger); + } finally { + kryoPool.put(kryo); + } } /** - * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + * Reads an object from the buffer. *

- * There is a small speed penalty if there were no kryo's available to use. + * No crypto and no sequence number + * + * @param length should ALWAYS be the length of the expected object! */ @Override public final - void writeWithCryptoUdp(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException { - write0(connection, buffer, message, true); + Object read(final ByteBuf buffer, final int length) throws IOException { + final KryoExtra kryo = kryoPool.take(); + try { + return kryo.read(null, buffer, length, logger); + } finally { + kryoPool.put(kryo); + } } /** @@ -518,72 +523,15 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager { * @param connection can be NULL * @param length should ALWAYS be the length of the expected object! */ + @SuppressWarnings("Duplicates") @Override public final - Object readWithCryptoTcp(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException { - return read0(connection, buffer, length); - } - - /** - * Reads an object from the buffer. - *

- * Crypto + sequence number - * - * @param connection can be NULL - * @param length should ALWAYS be the length of the expected object! - */ - @Override - public final - Object readWithCryptoUdp(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException { - return read0(connection, buffer, length); - } - - /** - * @param doCrypto true if we want to perform crypto on this data. - */ - @SuppressWarnings("unchecked") - - private - void write0(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) throws IOException { - final KryoExtra kryo = kryoThreadLocal.get(); - + Object readWithCrypto(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException { + final KryoExtra kryo = kryoPool.take(); try { - kryo.write(connection, buffer, message, doCrypto); - } catch (Exception ex) { - final String msg = "Unable to serialize buffer"; - logger.error(msg, ex); - throw new IOException(msg, ex); + return kryo.read(connection, buffer, length, logger); } finally { - // release resources - kryo.releaseWrite(); - } - } - - @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"}) - private - Object read0(final ConnectionImpl connection, final ByteBuf buffer, final int length) throws IOException { - final KryoExtra kryo = kryoThreadLocal.get(); - - int originalStartPos = buffer.readerIndex(); - - //////////////// - // Note: we CANNOT write BACK to the buffer since there could be additional data on it! - //////////////// - try { - Object object = kryo.read(connection, buffer, length); - return object; - } catch (Exception ex) { - final String msg = "Unable to deserialize buffer"; - logger.error(msg, ex); - throw new IOException(msg, ex); - } finally { - // make sure the end of the buffer is in the correct spot. - // move the reader index to the end of the object (since we are reading encrypted data - // this just has to happen before the length field is reassigned. - buffer.readerIndex(originalStartPos + length); - - // release resources - kryo.releaseRead(); + kryoPool.put(kryo); } } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java index 7668166e..109af8bb 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java @@ -105,7 +105,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo ConnectionImpl networkConnection = this.registrationWrapper.getServerUDP(remoteAddress); if (networkConnection != null) { // try to write data! (IT SHOULD ALWAYS BE ENCRYPTED HERE!) - this.serializationManager.writeWithCryptoUdp(networkConnection, buffer, object); + this.serializationManager.writeWithCrypto(networkConnection, buffer, object); } else { // this means we are still in the REGISTRATION phase. @@ -182,7 +182,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo Object object; try { - object = serializationManager2.readWithCryptoUdp(connection, data, data.writerIndex()); + object = serializationManager2.readWithCrypto(connection, data, data.writerIndex()); } catch (Exception e) { logger2.error("UDP unable to deserialize buffer", e); shutdown(registrationWrapper2, channel); diff --git a/src/dorkbox/network/util/CryptoSerializationManager.java b/src/dorkbox/network/util/CryptoSerializationManager.java index 5b08d5e2..69035b16 100644 --- a/src/dorkbox/network/util/CryptoSerializationManager.java +++ b/src/dorkbox/network/util/CryptoSerializationManager.java @@ -33,14 +33,7 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat *

* There is a small speed penalty if there were no kryo's available to use. */ - void writeWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException; - - /** - * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. - *

- * There is a small speed penalty if there were no kryo's available to use. - */ - void writeWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException; + void writeWithCrypto(ConnectionImpl connection, ByteBuf buffer, Object message) throws IOException; /** * Reads an object from the buffer. @@ -52,17 +45,6 @@ interface CryptoSerializationManager extends SerializationManager, RMISerializat * @param length * should ALWAYS be the length of the expected object! */ - Object readWithCryptoTcp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException; + Object readWithCrypto(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException; - /** - * Reads an object from the buffer. - *

- * Crypto + sequence number - * - * @param connection - * can be NULL - * @param length - * should ALWAYS be the length of the expected object! - */ - Object readWithCryptoUdp(ConnectionImpl connection, ByteBuf buffer, int length) throws IOException; } diff --git a/src/dorkbox/network/util/RMISerializationManager.java b/src/dorkbox/network/util/RMISerializationManager.java index 45465ac2..35edef0b 100644 --- a/src/dorkbox/network/util/RMISerializationManager.java +++ b/src/dorkbox/network/util/RMISerializationManager.java @@ -15,9 +15,7 @@ */ package dorkbox.network.util; -import com.esotericsoftware.kryo.ClassResolver; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Registration; +import dorkbox.network.connection.KryoExtra; public interface RMISerializationManager { @@ -28,14 +26,14 @@ interface RMISerializationManager { void initRmiSerialization(); /** - * If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered using the - * {@link Kryo#addDefaultSerializer(Class, Class) default serializer}. - * - * @throws IllegalArgumentException - * if the class is not registered and registration is required. - * @see ClassResolver#getRegistration(Class) + * @return takes a kryo instance from the pool. */ - Registration getRegistration(Class clazz); + KryoExtra takeKryo(); + + /** + * Returns a kryo instance to the pool. + */ + void returnKryo(KryoExtra object); /** * Objects that we want to use RMI with, must be accessed via an interface. This method configures the serialization of an