From d9ab3f72474138e334792e17c8075d0b8e7d775e Mon Sep 17 00:00:00 2001 From: nathan Date: Sat, 15 Aug 2020 13:14:55 +0200 Subject: [PATCH] Moved Kryoextra, moved Aeron IO, fixed generics with serialization --- .../network/rmi/RemoteObjectStorage.kt | 2 - src/dorkbox/network/rmi/RmiResponseStorage.kt | 24 +- .../rmi/messages/MethodRequestSerializer.kt | 2 +- .../messages/RmiClientRequestSerializer.kt | 2 +- .../AeronInput.java | 2 +- .../AeronOutput.java | 2 +- .../serialization/ClassRegistration.kt | 9 +- .../serialization/ClassRegistration0.kt | 9 +- .../serialization/ClassRegistration1.kt | 9 +- .../serialization/ClassRegistration2.kt | 9 +- .../ClassRegistrationIfaceAndImpl.kt | 9 +- .../network/serialization/ControlMessage.java | 20 -- .../KryoExtra.kt | 22 +- .../NetworkSerializationManager.kt | 9 +- .../network/serialization/Serialization.kt | 243 ++++++++++-------- 15 files changed, 185 insertions(+), 188 deletions(-) rename src/dorkbox/network/{pipeline => serialization}/AeronInput.java (99%) rename src/dorkbox/network/{pipeline => serialization}/AeronOutput.java (99%) delete mode 100644 src/dorkbox/network/serialization/ControlMessage.java rename src/dorkbox/network/{connection => serialization}/KryoExtra.kt (98%) diff --git a/src/dorkbox/network/rmi/RemoteObjectStorage.kt b/src/dorkbox/network/rmi/RemoteObjectStorage.kt index 821cd0f0..521f3fcf 100644 --- a/src/dorkbox/network/rmi/RemoteObjectStorage.kt +++ b/src/dorkbox/network/rmi/RemoteObjectStorage.kt @@ -271,8 +271,6 @@ class RemoteObjectStorage(val logger: KLogger) { * @return the object registered with the specified ID. */ operator fun get(objectId: Int): Any? { - validate(objectId) - return objectMap[objectId] } diff --git a/src/dorkbox/network/rmi/RmiResponseStorage.kt b/src/dorkbox/network/rmi/RmiResponseStorage.kt index 8653fbaf..2c32dfdc 100644 --- a/src/dorkbox/network/rmi/RmiResponseStorage.kt +++ b/src/dorkbox/network/rmi/RmiResponseStorage.kt @@ -1,6 +1,5 @@ package dorkbox.network.rmi -import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue import dorkbox.network.rmi.messages.MethodResponse import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel @@ -30,8 +29,8 @@ internal data class RmiWaiter(val id: Int) { * this will replace the waiter if it was cancelled (waiters are not valid if cancelled) */ fun prep() { + @Suppress("EXPERIMENTAL_API_USAGE") if (channel.isClosedForReceive && channel.isClosedForSend) { - println("renew waiter") channel = Channel(0) } } @@ -74,17 +73,20 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) { // Response ID's are for ALL in-flight RMI on the network stack. instead of limited to (originally) 64, we are now limited to 65,535 // these are just looped around in a ring buffer. // These are stored here as int, however these are REALLY shorts and are int-packed when transferring data on the wire - // 32,000 IN FLIGHT RMI method invocations is PLENTY - private val maxValuesInCache = Short.MAX_VALUE.toInt() - - private val rmiWaiterCache = MultithreadConcurrentQueue(maxValuesInCache) + // 64,000 IN FLIGHT RMI method invocations is PLENTY + private val maxValuesInCache = (Short.MAX_VALUE.toInt() * 2) - 1 // -1 because 0 is reserved + private val rmiWaiterCache = Channel(maxValuesInCache) private val pendingLock = ReentrantReadWriteLock() private val pending = Int2NullableObjectHashMap(32, Hashing.DEFAULT_LOAD_FACTOR, true) + init { // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! val ids = IntArrayList(maxValuesInCache, Integer.MIN_VALUE) + for (id in Short.MIN_VALUE..-1) { + ids.addInt(id) + } // ZERO is special, and is never added! for (id in 1..Short.MAX_VALUE) { ids.addInt(id) @@ -114,15 +116,15 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) { previous.doNotify() // since this was the FIRST one to trigger, return it to the cache. - rmiWaiterCache.offer(previous) + rmiWaiterCache.send(previous) } } /** * gets the RmiWaiter (id + waiter) */ - internal fun prep(rmiObjectId: Int): RmiWaiter { - val responseRmi = rmiWaiterCache.poll() + internal suspend fun prep(rmiObjectId: Int): RmiWaiter { + val responseRmi = rmiWaiterCache.receive() // this will replace the waiter if it was cancelled (waiters are not valid if cancelled) responseRmi.prep() @@ -175,7 +177,7 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) { val resultOrWaiter = pendingLock.write { pending.remove(pendingId) } if (resultOrWaiter is RmiWaiter) { // since this was the FIRST one to trigger, return it to the cache. - rmiWaiterCache.offer(resultOrWaiter) + rmiWaiterCache.send(resultOrWaiter) return TIMEOUT_EXCEPTION } @@ -183,7 +185,7 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) { } fun close() { - rmiWaiterCache.clear() + rmiWaiterCache.close() pendingLock.write { pending.clear() } } } diff --git a/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt b/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt index 16cebf0b..b8da4067 100644 --- a/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt +++ b/src/dorkbox/network/rmi/messages/MethodRequestSerializer.kt @@ -39,8 +39,8 @@ import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import dorkbox.network.connection.KryoExtra import dorkbox.network.rmi.RmiUtils +import dorkbox.network.serialization.KryoExtra import java.lang.reflect.Method /** diff --git a/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt b/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt index d86179ff..6280f0bc 100644 --- a/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt +++ b/src/dorkbox/network/rmi/messages/RmiClientRequestSerializer.kt @@ -19,8 +19,8 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import dorkbox.network.connection.KryoExtra import dorkbox.network.rmi.RmiClient +import dorkbox.network.serialization.KryoExtra import java.lang.reflect.Proxy /** diff --git a/src/dorkbox/network/pipeline/AeronInput.java b/src/dorkbox/network/serialization/AeronInput.java similarity index 99% rename from src/dorkbox/network/pipeline/AeronInput.java rename to src/dorkbox/network/serialization/AeronInput.java index 4f734aa1..a6bf863c 100644 --- a/src/dorkbox/network/pipeline/AeronInput.java +++ b/src/dorkbox/network/serialization/AeronInput.java @@ -32,7 +32,7 @@ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package dorkbox.network.pipeline; +package dorkbox.network.serialization; import java.io.DataInput; import java.io.IOException; diff --git a/src/dorkbox/network/pipeline/AeronOutput.java b/src/dorkbox/network/serialization/AeronOutput.java similarity index 99% rename from src/dorkbox/network/pipeline/AeronOutput.java rename to src/dorkbox/network/serialization/AeronOutput.java index 1c2fdf62..c1a0d0ac 100644 --- a/src/dorkbox/network/pipeline/AeronOutput.java +++ b/src/dorkbox/network/serialization/AeronOutput.java @@ -32,7 +32,7 @@ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package dorkbox.network.pipeline; +package dorkbox.network.serialization; import java.io.DataOutput; import java.io.OutputStream; diff --git a/src/dorkbox/network/serialization/ClassRegistration.kt b/src/dorkbox/network/serialization/ClassRegistration.kt index 3d81e5f2..09fb7606 100644 --- a/src/dorkbox/network/serialization/ClassRegistration.kt +++ b/src/dorkbox/network/serialization/ClassRegistration.kt @@ -16,8 +16,7 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Serializer -import dorkbox.network.connection.KryoExtra -import org.slf4j.Logger +import mu.KLogger internal open class ClassRegistration(var clazz: Class<*>) { var id = 0 @@ -28,8 +27,10 @@ internal open class ClassRegistration(var clazz: Class<*>) { id = registration.id } - open fun log(logger: Logger) { - logger.trace("Registered {} -> {}", id, clazz.name) + open fun log(logger: KLogger) { + logger.trace { + "Registered $id -> ${clazz.name}" + } } fun getInfoArray(): Array { diff --git a/src/dorkbox/network/serialization/ClassRegistration0.kt b/src/dorkbox/network/serialization/ClassRegistration0.kt index 83ae6851..b939788a 100644 --- a/src/dorkbox/network/serialization/ClassRegistration0.kt +++ b/src/dorkbox/network/serialization/ClassRegistration0.kt @@ -16,8 +16,7 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Serializer -import dorkbox.network.connection.KryoExtra -import org.slf4j.Logger +import mu.KLogger internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz) { init { @@ -28,7 +27,9 @@ internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : id = kryo.register(clazz, serializer).id } - override fun log(logger: Logger) { - logger.trace("Registered {} -> {} using {}", id, clazz.name, serializer?.javaClass?.name) + override fun log(logger: KLogger) { + logger.trace { + "Registered $id -> ${clazz.name} using ${serializer?.javaClass?.name}" + } } } diff --git a/src/dorkbox/network/serialization/ClassRegistration1.kt b/src/dorkbox/network/serialization/ClassRegistration1.kt index 04e054d8..8dabfb61 100644 --- a/src/dorkbox/network/serialization/ClassRegistration1.kt +++ b/src/dorkbox/network/serialization/ClassRegistration1.kt @@ -15,8 +15,7 @@ */ package dorkbox.network.serialization -import dorkbox.network.connection.KryoExtra -import org.slf4j.Logger +import mu.KLogger internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz) { init { @@ -27,7 +26,9 @@ internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration( kryo.register(clazz, id) } - override fun log(logger: Logger) { - logger.trace("Registered {} -> (specified) {}", id, clazz.name) + override fun log(logger: KLogger) { + logger.trace { + "Registered $id -> (specified) ${clazz.name}" + } } } diff --git a/src/dorkbox/network/serialization/ClassRegistration2.kt b/src/dorkbox/network/serialization/ClassRegistration2.kt index 6bc1b467..d26a623c 100644 --- a/src/dorkbox/network/serialization/ClassRegistration2.kt +++ b/src/dorkbox/network/serialization/ClassRegistration2.kt @@ -16,8 +16,7 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Serializer -import dorkbox.network.connection.KryoExtra -import org.slf4j.Logger +import mu.KLogger internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz) { init { @@ -29,7 +28,9 @@ internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id kryo.register(clazz, serializer, id) } - override fun log(logger: Logger) { - logger.trace("Registered {} -> (specified) {} using {}", id, clazz.name, serializer?.javaClass?.name) + override fun log(logger: KLogger) { + logger.trace { + "Registered $id -> (specified) ${clazz.name} using ${serializer?.javaClass?.name}" + } } } diff --git a/src/dorkbox/network/serialization/ClassRegistrationIfaceAndImpl.kt b/src/dorkbox/network/serialization/ClassRegistrationIfaceAndImpl.kt index ad7de48f..d79d4db3 100644 --- a/src/dorkbox/network/serialization/ClassRegistrationIfaceAndImpl.kt +++ b/src/dorkbox/network/serialization/ClassRegistrationIfaceAndImpl.kt @@ -15,9 +15,8 @@ */ package dorkbox.network.serialization -import dorkbox.network.connection.KryoExtra import dorkbox.network.rmi.messages.ObjectResponseSerializer -import org.slf4j.Logger +import mu.KLogger internal class ClassRegistrationIfaceAndImpl(ifaceClass: Class<*>, val implClass: Class<*>, objectResponseSerializer: ObjectResponseSerializer) : ClassRegistration(ifaceClass) { @@ -29,7 +28,9 @@ internal class ClassRegistrationIfaceAndImpl(ifaceClass: Class<*>, val implClass id = kryo.register(clazz, serializer).id } - override fun log(logger: Logger) { - logger.trace("Registered {} -> (RMI) {}", id, implClass.name) + override fun log(logger: KLogger) { + logger.trace { + "Registered $id -> (RMI) ${implClass.name}" + } } } diff --git a/src/dorkbox/network/serialization/ControlMessage.java b/src/dorkbox/network/serialization/ControlMessage.java deleted file mode 100644 index 17d3feba..00000000 --- a/src/dorkbox/network/serialization/ControlMessage.java +++ /dev/null @@ -1,20 +0,0 @@ -package dorkbox.network.serialization; - -/** - * Signals the remote end that certain things need to happen - */ -public -class ControlMessage { - public static final byte INVALID_STATUS = 0x0; - public static final byte CONNECTING = 0x2; - public static final byte CONNECTED = 0x3; - - public static final byte DISCONNECT = 0x7F; // max signed byte value, 127 - - - public byte command = INVALID_STATUS; - public byte payload = INVALID_STATUS; - - public ControlMessage() { - } -} diff --git a/src/dorkbox/network/connection/KryoExtra.kt b/src/dorkbox/network/serialization/KryoExtra.kt similarity index 98% rename from src/dorkbox/network/connection/KryoExtra.kt rename to src/dorkbox/network/serialization/KryoExtra.kt index d620ee94..6326b518 100644 --- a/src/dorkbox/network/connection/KryoExtra.kt +++ b/src/dorkbox/network/serialization/KryoExtra.kt @@ -13,13 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dorkbox.network.connection +package dorkbox.network.serialization import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import dorkbox.network.pipeline.AeronInput -import dorkbox.network.pipeline.AeronOutput +import dorkbox.network.connection.Connection import dorkbox.network.rmi.CachedMethod import dorkbox.os.OS import dorkbox.util.Sys @@ -29,7 +28,6 @@ import org.agrona.DirectBuffer import org.agrona.collections.Int2ObjectHashMap import org.slf4j.Logger import java.io.IOException -import javax.crypto.Cipher /** * Nothing in this class is thread safe @@ -50,7 +48,7 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap>) lateinit var connection: Connection // private val secureRandom = SecureRandom() - private var cipher: Cipher? = null +// private var cipher: Cipher? = null private val compressor = factory.fastCompressor() private val decompressor = factory.fastDecompressor() @@ -69,13 +67,13 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap>) private const val IV_LENGTH_BYTE = 12 } - init { - cipher = try { - Cipher.getInstance(ALGORITHM) - } catch (e: Exception) { - throw IllegalStateException("could not get cipher instance", e) - } - } +// init { +// cipher = try { +// Cipher.getInstance(ALGORITHM) +// } catch (e: Exception) { +// throw IllegalStateException("could not get cipher instance", e) +// } +// } fun getMethods(classId: Int): Array { return methodCache[classId] diff --git a/src/dorkbox/network/serialization/NetworkSerializationManager.kt b/src/dorkbox/network/serialization/NetworkSerializationManager.kt index 9b05b5a4..84f847de 100644 --- a/src/dorkbox/network/serialization/NetworkSerializationManager.kt +++ b/src/dorkbox/network/serialization/NetworkSerializationManager.kt @@ -16,7 +16,6 @@ package dorkbox.network.serialization import com.esotericsoftware.kryo.Serializer -import dorkbox.network.connection.KryoExtra import dorkbox.network.rmi.CachedMethod import dorkbox.util.serialization.SerializationManager import org.agrona.DirectBuffer @@ -81,17 +80,17 @@ interface NetworkSerializationManager : SerializationManager { /** * @return takes a kryo instance from the pool. */ - fun takeKryo(): KryoExtra + suspend fun takeKryo(): KryoExtra /** * Returns a kryo instance to the pool. */ - fun returnKryo(kryo: KryoExtra) + suspend fun returnKryo(kryo: KryoExtra) /** * @return true if the remote kryo registration are the same as our own */ - fun verifyKryoRegistration(clientBytes: ByteArray): Boolean + suspend fun verifyKryoRegistration(clientBytes: ByteArray): Boolean /** * @return the details of all registration IDs -> Class name used by kryo @@ -145,7 +144,7 @@ interface NetworkSerializationManager : SerializationManager { /** * Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration. */ - fun finishInit(endPointClass: Class<*>) + suspend fun finishInit(endPointClass: Class<*>) /** * @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index 0e7b62ef..b087717d 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -23,25 +23,31 @@ import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy import com.esotericsoftware.kryo.util.IdentityMap -import dorkbox.network.connection.KryoExtra import dorkbox.network.connection.ping.PingMessage -import dorkbox.network.handshake.Message -import dorkbox.network.pipeline.AeronInput -import dorkbox.network.pipeline.AeronOutput +import dorkbox.network.handshake.HandshakeMessage import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.RmiUtils -import dorkbox.network.rmi.messages.* -import dorkbox.objectPool.ObjectPool -import dorkbox.objectPool.PoolableObject +import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest +import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse +import dorkbox.network.rmi.messages.GlobalObjectCreateRequest +import dorkbox.network.rmi.messages.GlobalObjectCreateResponse +import dorkbox.network.rmi.messages.MethodRequest +import dorkbox.network.rmi.messages.MethodRequestSerializer +import dorkbox.network.rmi.messages.MethodResponse +import dorkbox.network.rmi.messages.MethodResponseSerializer +import dorkbox.network.rmi.messages.ObjectResponseSerializer +import dorkbox.network.rmi.messages.RmiClientRequestSerializer import dorkbox.os.OS import dorkbox.util.serialization.SerializationDefaults +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import mu.KLogger +import mu.KotlinLogging import org.agrona.DirectBuffer import org.agrona.MutableDirectBuffer import org.agrona.collections.Int2ObjectHashMap import org.objenesis.instantiator.ObjectInstantiator import org.objenesis.strategy.StdInstantiatorStrategy -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.io.IOException import java.lang.reflect.Constructor import java.lang.reflect.InvocationHandler @@ -66,8 +72,8 @@ import java.lang.reflect.InvocationHandler * an object's type. Default is [ReflectionSerializerFactory] with [FieldSerializer]. @see * Kryo#newDefaultSerializer(Class) */ -class Serialization(references: Boolean, - factory: SerializerFactory<*>?) : NetworkSerializationManager { +class Serialization(private val references: Boolean, + private val factory: SerializerFactory<*>?) : NetworkSerializationManager { companion object { @@ -86,7 +92,6 @@ class Serialization(references: Boolean, fun DEFAULT(references: Boolean = true, factory: SerializerFactory<*>? = null): Serialization { val serialization = Serialization(references, factory) - serialization.register(ControlMessage::class.java) serialization.register(PingMessage::class.java) // TODO this is built into aeron!??!?!?! // TODO: this is for diffie hellmen handshake stuff! @@ -95,16 +100,16 @@ class Serialization(references: Boolean, // TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo // serialization.register(XECPublicKey::class.java, XECPublicKeySerializer()) // serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer()) - serialization.register(Message::class.java) // must use full package name! +// serialization.register(Message::class.java) // must use full package name! return serialization } } - private lateinit var logger: Logger + private lateinit var logger: KLogger private var initialized = false - private val kryoPool: ObjectPool + private val kryoPool: Channel lateinit var classResolver: ClassResolver // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) @@ -137,48 +142,49 @@ class Serialization(references: Boolean, // reflectASM doesn't work on android private val useAsm = !OS.isAndroid() + private val KRYO_COUNT = 32 init { - kryoPool = ObjectPool.NonBlockingSoftReference(object : PoolableObject() { - override fun create(): KryoExtra { - synchronized(this@Serialization) { - - // we HAVE to pre-allocate the KRYOs - val kryo = KryoExtra(methodCache) - - kryo.instantiatorStrategy = instantiatorStrategy - kryo.references = references - - // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. - SerializationDefaults.register(kryo) - - // RMI stuff! - kryo.register(GlobalObjectCreateRequest::class.java) - kryo.register(GlobalObjectCreateResponse::class.java) - - kryo.register(ConnectionObjectCreateRequest::class.java) - kryo.register(ConnectionObjectCreateResponse::class.java) - - kryo.register(MethodRequest::class.java, methodRequestSerializer) - kryo.register(MethodResponse::class.java, methodResponseSerializer) - - @Suppress("UNCHECKED_CAST") - kryo.register(InvocationHandler::class.java as Class, objectRequestSerializer) - - // check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer) - classesToRegister.forEach { registration -> - registration.register(kryo) - } - - if (factory != null) { - kryo.setDefaultSerializer(factory) - } - - return kryo - } - } - }) + // reasonable size of available kryo's before coroutines are suspended during read/write + kryoPool = Channel(KRYO_COUNT) } + @Synchronized + private fun initKryo(): KryoExtra { + val kryo = KryoExtra(methodCache) + + kryo.instantiatorStrategy = instantiatorStrategy + kryo.references = references + + // All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems. + SerializationDefaults.register(kryo) + + // RMI stuff! + kryo.register(HandshakeMessage::class.java) + kryo.register(GlobalObjectCreateRequest::class.java) + kryo.register(GlobalObjectCreateResponse::class.java) + + kryo.register(ConnectionObjectCreateRequest::class.java) + kryo.register(ConnectionObjectCreateResponse::class.java) + + kryo.register(MethodRequest::class.java, methodRequestSerializer) + kryo.register(MethodResponse::class.java, methodResponseSerializer) + + @Suppress("UNCHECKED_CAST") + kryo.register(InvocationHandler::class.java as Class, objectRequestSerializer) + + // check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer) + classesToRegister.forEach { registration -> + registration.register(kryo) + } + + if (factory != null) { + kryo.setDefaultSerializer(factory) + } + + return kryo + } + + /** * Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer]. @@ -321,16 +327,15 @@ class Serialization(references: Boolean, * is already in use by a different type, an exception is thrown. */ @Synchronized - override fun finishInit(endPointClass: Class<*>) { - val name = endPointClass.simpleName - - this.logger = LoggerFactory.getLogger("$name.SERIAL") + override suspend fun finishInit(endPointClass: Class<*>) { + this.logger = KotlinLogging.logger(endPointClass.simpleName) initialized = true // initialize the kryo pool with at least 1 kryo instance. This ALSO makes sure that all of our class registration is done // correctly and (if not) we are are notified on the initial thread (instead of on the network update thread) - val kryo = kryoPool.take() + val kryo = takeKryo() + // save off the class-resolver, so we can lookup the class <-> id relationships classResolver = kryo.classResolver @@ -413,7 +418,7 @@ class Serialization(references: Boolean, output.toBytes().copyInto(savedRegistrationDetails, 0, 0, length) output.close() } finally { - kryoPool.put(kryo) + returnKryo(kryo) } } @@ -435,7 +440,7 @@ class Serialization(references: Boolean, * * @return true if kryo registration is required for all classes sent over the wire */ - override fun verifyKryoRegistration(clientBytes: ByteArray): Boolean { + override suspend fun verifyKryoRegistration(clientBytes: ByteArray): Boolean { // verify the registration IDs if necessary with our own. The CLIENT does not verify anything, only the server! val kryoRegistrationDetails = savedRegistrationDetails val equals = kryoRegistrationDetails.contentEquals(clientBytes) @@ -505,7 +510,9 @@ class Serialization(references: Boolean, // JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff if (serializerServer == objectResponseSerializer::class.java.name && serializerClient.isEmpty()) { - // this is for RMI! + // this is for SERVER RMI! + } else if (serializerClient == objectResponseSerializer::class.java.name && serializerServer.isEmpty()) { + // this is for CLIENT RMI! } else { success = false logger.error("Registration {} Client -> {} ({})", idClient, nameClient, serializerClient) @@ -543,15 +550,16 @@ class Serialization(references: Boolean, /** * @return takes a kryo instance from the pool. */ - override fun takeKryo(): KryoExtra { - return kryoPool.take() + override suspend fun takeKryo(): KryoExtra { + // ALWAYS get as many as needed. Recycle them to prevent too many getting created + return kryoPool.poll() ?: initKryo() } /** - * Returns a kryo instance to the pool. + * Returns a kryo instance to the pool for use later on */ - override fun returnKryo(kryo: KryoExtra) { - kryoPool.put(kryo) + override suspend fun returnKryo(kryo: KryoExtra) { + kryoPool.send(kryo) } /** @@ -638,83 +646,90 @@ class Serialization(references: Boolean, } /** + * # BLOCKING + * * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. - * - * - * No crypto and no sequence number - * - * - * There is a small speed penalty if there were no kryo's available to use. */ @Throws(IOException::class) override fun write(buffer: DirectBuffer, message: Any) { - val kryo = kryoPool.take() - try { - val output = AeronOutput(buffer as MutableDirectBuffer) - kryo.writeClassAndObject(output, message) - } finally { - kryoPool.put(kryo) + runBlocking { + val kryo = takeKryo() + try { + val output = AeronOutput(buffer as MutableDirectBuffer) + kryo.writeClassAndObject(output, message) + } finally { + returnKryo(kryo) + } } } /** + * # BLOCKING + * * Reads an object from the buffer. * - * - * No crypto and no sequence number - * * @param length should ALWAYS be the length of the expected object! */ @Throws(IOException::class) override fun read(buffer: DirectBuffer, length: Int): Any? { - val kryo = kryoPool.take() - return try { - val input = AeronInput(buffer) - kryo.readClassAndObject(input) - } finally { - kryoPool.put(kryo) + return runBlocking { + val kryo = takeKryo() + try { + val input = AeronInput(buffer) + kryo.readClassAndObject(input) + } finally { + returnKryo(kryo) + } } } /** + * # BLOCKING + * * Writes the class and object using an available kryo instance */ @Throws(IOException::class) override fun writeFullClassAndObject(output: Output, value: Any) { - val kryo = kryoPool.take() - var prev = false - try { - prev = kryo.isRegistrationRequired - kryo.isRegistrationRequired = false - kryo.writeClassAndObject(output, value) - } catch (ex: Exception) { - val msg = "Unable to serialize buffer" - logger.error(msg, ex) - throw IOException(msg, ex) - } finally { - kryo.isRegistrationRequired = prev - kryoPool.put(kryo) + runBlocking { + val kryo = takeKryo() + var prev = false + try { + prev = kryo.isRegistrationRequired + kryo.isRegistrationRequired = false + kryo.writeClassAndObject(output, value) + } catch (ex: Exception) { + val msg = "Unable to serialize buffer" + logger.error(msg, ex) + throw IOException(msg, ex) + } finally { + kryo.isRegistrationRequired = prev + returnKryo(kryo) + } } } /** + * # BLOCKING + * * Returns a class read from the input */ @Throws(IOException::class) override fun readFullClassAndObject(input: Input): Any { - val kryo = kryoPool.take() - var prev = false - return try { - prev = kryo.isRegistrationRequired - kryo.isRegistrationRequired = false - kryo.readClassAndObject(input) - } catch (ex: Exception) { - val msg = "Unable to deserialize buffer" - logger.error(msg, ex) - throw IOException(msg, ex) - } finally { - kryo.isRegistrationRequired = prev - kryoPool.put(kryo) + return runBlocking { + val kryo = takeKryo() + var prev = false + try { + prev = kryo.isRegistrationRequired + kryo.isRegistrationRequired = false + kryo.readClassAndObject(input) + } catch (ex: Exception) { + val msg = "Unable to deserialize buffer" + logger.error(msg, ex) + throw IOException(msg, ex) + } finally { + kryo.isRegistrationRequired = prev + returnKryo(kryo) + } } }