Moved Kryoextra, moved Aeron IO, fixed generics with serialization

This commit is contained in:
nathan 2020-08-15 13:14:55 +02:00
parent 1b796971d3
commit d9ab3f7247
15 changed files with 185 additions and 188 deletions

View File

@ -271,8 +271,6 @@ class RemoteObjectStorage(val logger: KLogger) {
* @return the object registered with the specified ID. * @return the object registered with the specified ID.
*/ */
operator fun get(objectId: Int): Any? { operator fun get(objectId: Int): Any? {
validate(objectId)
return objectMap[objectId] return objectMap[objectId]
} }

View File

@ -1,6 +1,5 @@
package dorkbox.network.rmi package dorkbox.network.rmi
import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue
import dorkbox.network.rmi.messages.MethodResponse import dorkbox.network.rmi.messages.MethodResponse
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
@ -30,8 +29,8 @@ internal data class RmiWaiter(val id: Int) {
* this will replace the waiter if it was cancelled (waiters are not valid if cancelled) * this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
*/ */
fun prep() { fun prep() {
@Suppress("EXPERIMENTAL_API_USAGE")
if (channel.isClosedForReceive && channel.isClosedForSend) { if (channel.isClosedForReceive && channel.isClosedForSend) {
println("renew waiter")
channel = Channel(0) channel = Channel(0)
} }
} }
@ -74,17 +73,20 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) {
// Response ID's are for ALL in-flight RMI on the network stack. instead of limited to (originally) 64, we are now limited to 65,535 // Response ID's are for ALL in-flight RMI on the network stack. instead of limited to (originally) 64, we are now limited to 65,535
// these are just looped around in a ring buffer. // these are just looped around in a ring buffer.
// These are stored here as int, however these are REALLY shorts and are int-packed when transferring data on the wire // These are stored here as int, however these are REALLY shorts and are int-packed when transferring data on the wire
// 32,000 IN FLIGHT RMI method invocations is PLENTY // 64,000 IN FLIGHT RMI method invocations is PLENTY
private val maxValuesInCache = Short.MAX_VALUE.toInt() private val maxValuesInCache = (Short.MAX_VALUE.toInt() * 2) - 1 // -1 because 0 is reserved
private val rmiWaiterCache = Channel<RmiWaiter>(maxValuesInCache)
private val rmiWaiterCache = MultithreadConcurrentQueue<RmiWaiter>(maxValuesInCache)
private val pendingLock = ReentrantReadWriteLock() private val pendingLock = ReentrantReadWriteLock()
private val pending = Int2NullableObjectHashMap<Any>(32, Hashing.DEFAULT_LOAD_FACTOR, true) private val pending = Int2NullableObjectHashMap<Any>(32, Hashing.DEFAULT_LOAD_FACTOR, true)
init { init {
// create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint!
val ids = IntArrayList(maxValuesInCache, Integer.MIN_VALUE) val ids = IntArrayList(maxValuesInCache, Integer.MIN_VALUE)
for (id in Short.MIN_VALUE..-1) {
ids.addInt(id)
}
// ZERO is special, and is never added! // ZERO is special, and is never added!
for (id in 1..Short.MAX_VALUE) { for (id in 1..Short.MAX_VALUE) {
ids.addInt(id) ids.addInt(id)
@ -114,15 +116,15 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) {
previous.doNotify() previous.doNotify()
// since this was the FIRST one to trigger, return it to the cache. // since this was the FIRST one to trigger, return it to the cache.
rmiWaiterCache.offer(previous) rmiWaiterCache.send(previous)
} }
} }
/** /**
* gets the RmiWaiter (id + waiter) * gets the RmiWaiter (id + waiter)
*/ */
internal fun prep(rmiObjectId: Int): RmiWaiter { internal suspend fun prep(rmiObjectId: Int): RmiWaiter {
val responseRmi = rmiWaiterCache.poll() val responseRmi = rmiWaiterCache.receive()
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled) // this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep() responseRmi.prep()
@ -175,7 +177,7 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) {
val resultOrWaiter = pendingLock.write { pending.remove(pendingId) } val resultOrWaiter = pendingLock.write { pending.remove(pendingId) }
if (resultOrWaiter is RmiWaiter) { if (resultOrWaiter is RmiWaiter) {
// since this was the FIRST one to trigger, return it to the cache. // since this was the FIRST one to trigger, return it to the cache.
rmiWaiterCache.offer(resultOrWaiter) rmiWaiterCache.send(resultOrWaiter)
return TIMEOUT_EXCEPTION return TIMEOUT_EXCEPTION
} }
@ -183,7 +185,7 @@ internal class RmiResponseStorage(private val actionDispatch: CoroutineScope) {
} }
fun close() { fun close() {
rmiWaiterCache.clear() rmiWaiterCache.close()
pendingLock.write { pending.clear() } pendingLock.write { pending.clear() }
} }
} }

View File

@ -39,8 +39,8 @@ import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.io.Output
import dorkbox.network.connection.KryoExtra
import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.RmiUtils
import dorkbox.network.serialization.KryoExtra
import java.lang.reflect.Method import java.lang.reflect.Method
/** /**

View File

@ -19,8 +19,8 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.io.Output
import dorkbox.network.connection.KryoExtra
import dorkbox.network.rmi.RmiClient import dorkbox.network.rmi.RmiClient
import dorkbox.network.serialization.KryoExtra
import java.lang.reflect.Proxy import java.lang.reflect.Proxy
/** /**

View File

@ -32,7 +32,7 @@
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
package dorkbox.network.pipeline; package dorkbox.network.serialization;
import java.io.DataInput; import java.io.DataInput;
import java.io.IOException; import java.io.IOException;

View File

@ -32,7 +32,7 @@
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
package dorkbox.network.pipeline; package dorkbox.network.serialization;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.OutputStream; import java.io.OutputStream;

View File

@ -16,8 +16,7 @@
package dorkbox.network.serialization package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import dorkbox.network.connection.KryoExtra import mu.KLogger
import org.slf4j.Logger
internal open class ClassRegistration(var clazz: Class<*>) { internal open class ClassRegistration(var clazz: Class<*>) {
var id = 0 var id = 0
@ -28,8 +27,10 @@ internal open class ClassRegistration(var clazz: Class<*>) {
id = registration.id id = registration.id
} }
open fun log(logger: Logger) { open fun log(logger: KLogger) {
logger.trace("Registered {} -> {}", id, clazz.name) logger.trace {
"Registered $id -> ${clazz.name}"
}
} }
fun getInfoArray(): Array<Any> { fun getInfoArray(): Array<Any> {

View File

@ -16,8 +16,7 @@
package dorkbox.network.serialization package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import dorkbox.network.connection.KryoExtra import mu.KLogger
import org.slf4j.Logger
internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz) { internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) : ClassRegistration(clazz) {
init { init {
@ -28,7 +27,9 @@ internal class ClassRegistration0(clazz: Class<*>, serializer: Serializer<*>) :
id = kryo.register(clazz, serializer).id id = kryo.register(clazz, serializer).id
} }
override fun log(logger: Logger) { override fun log(logger: KLogger) {
logger.trace("Registered {} -> {} using {}", id, clazz.name, serializer?.javaClass?.name) logger.trace {
"Registered $id -> ${clazz.name} using ${serializer?.javaClass?.name}"
}
} }
} }

View File

@ -15,8 +15,7 @@
*/ */
package dorkbox.network.serialization package dorkbox.network.serialization
import dorkbox.network.connection.KryoExtra import mu.KLogger
import org.slf4j.Logger
internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz) { internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(clazz) {
init { init {
@ -27,7 +26,9 @@ internal class ClassRegistration1(clazz: Class<*>, id: Int) : ClassRegistration(
kryo.register(clazz, id) kryo.register(clazz, id)
} }
override fun log(logger: Logger) { override fun log(logger: KLogger) {
logger.trace("Registered {} -> (specified) {}", id, clazz.name) logger.trace {
"Registered $id -> (specified) ${clazz.name}"
}
} }
} }

View File

@ -16,8 +16,7 @@
package dorkbox.network.serialization package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import dorkbox.network.connection.KryoExtra import mu.KLogger
import org.slf4j.Logger
internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz) { internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id: Int) : ClassRegistration(clazz) {
init { init {
@ -29,7 +28,9 @@ internal class ClassRegistration2(clazz: Class<*>, serializer: Serializer<*>, id
kryo.register(clazz, serializer, id) kryo.register(clazz, serializer, id)
} }
override fun log(logger: Logger) { override fun log(logger: KLogger) {
logger.trace("Registered {} -> (specified) {} using {}", id, clazz.name, serializer?.javaClass?.name) logger.trace {
"Registered $id -> (specified) ${clazz.name} using ${serializer?.javaClass?.name}"
}
} }
} }

View File

@ -15,9 +15,8 @@
*/ */
package dorkbox.network.serialization package dorkbox.network.serialization
import dorkbox.network.connection.KryoExtra
import dorkbox.network.rmi.messages.ObjectResponseSerializer import dorkbox.network.rmi.messages.ObjectResponseSerializer
import org.slf4j.Logger import mu.KLogger
internal class ClassRegistrationIfaceAndImpl(ifaceClass: Class<*>, val implClass: Class<*>, objectResponseSerializer: ObjectResponseSerializer) : ClassRegistration(ifaceClass) { internal class ClassRegistrationIfaceAndImpl(ifaceClass: Class<*>, val implClass: Class<*>, objectResponseSerializer: ObjectResponseSerializer) : ClassRegistration(ifaceClass) {
@ -29,7 +28,9 @@ internal class ClassRegistrationIfaceAndImpl(ifaceClass: Class<*>, val implClass
id = kryo.register(clazz, serializer).id id = kryo.register(clazz, serializer).id
} }
override fun log(logger: Logger) { override fun log(logger: KLogger) {
logger.trace("Registered {} -> (RMI) {}", id, implClass.name) logger.trace {
"Registered $id -> (RMI) ${implClass.name}"
}
} }
} }

View File

@ -1,20 +0,0 @@
package dorkbox.network.serialization;
/**
* Signals the remote end that certain things need to happen
*/
public
class ControlMessage {
public static final byte INVALID_STATUS = 0x0;
public static final byte CONNECTING = 0x2;
public static final byte CONNECTED = 0x3;
public static final byte DISCONNECT = 0x7F; // max signed byte value, 127
public byte command = INVALID_STATUS;
public byte payload = INVALID_STATUS;
public ControlMessage() {
}
}

View File

@ -13,13 +13,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package dorkbox.network.connection package dorkbox.network.serialization
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.io.Output
import dorkbox.network.pipeline.AeronInput import dorkbox.network.connection.Connection
import dorkbox.network.pipeline.AeronOutput
import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.CachedMethod
import dorkbox.os.OS import dorkbox.os.OS
import dorkbox.util.Sys import dorkbox.util.Sys
@ -29,7 +28,6 @@ import org.agrona.DirectBuffer
import org.agrona.collections.Int2ObjectHashMap import org.agrona.collections.Int2ObjectHashMap
import org.slf4j.Logger import org.slf4j.Logger
import java.io.IOException import java.io.IOException
import javax.crypto.Cipher
/** /**
* Nothing in this class is thread safe * Nothing in this class is thread safe
@ -50,7 +48,7 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
lateinit var connection: Connection lateinit var connection: Connection
// private val secureRandom = SecureRandom() // private val secureRandom = SecureRandom()
private var cipher: Cipher? = null // private var cipher: Cipher? = null
private val compressor = factory.fastCompressor() private val compressor = factory.fastCompressor()
private val decompressor = factory.fastDecompressor() private val decompressor = factory.fastDecompressor()
@ -69,13 +67,13 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
private const val IV_LENGTH_BYTE = 12 private const val IV_LENGTH_BYTE = 12
} }
init { // init {
cipher = try { // cipher = try {
Cipher.getInstance(ALGORITHM) // Cipher.getInstance(ALGORITHM)
} catch (e: Exception) { // } catch (e: Exception) {
throw IllegalStateException("could not get cipher instance", e) // throw IllegalStateException("could not get cipher instance", e)
} // }
} // }
fun getMethods(classId: Int): Array<CachedMethod> { fun getMethods(classId: Int): Array<CachedMethod> {
return methodCache[classId] return methodCache[classId]

View File

@ -16,7 +16,6 @@
package dorkbox.network.serialization package dorkbox.network.serialization
import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import dorkbox.network.connection.KryoExtra
import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.CachedMethod
import dorkbox.util.serialization.SerializationManager import dorkbox.util.serialization.SerializationManager
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
@ -81,17 +80,17 @@ interface NetworkSerializationManager : SerializationManager<DirectBuffer> {
/** /**
* @return takes a kryo instance from the pool. * @return takes a kryo instance from the pool.
*/ */
fun takeKryo(): KryoExtra suspend fun takeKryo(): KryoExtra
/** /**
* Returns a kryo instance to the pool. * Returns a kryo instance to the pool.
*/ */
fun returnKryo(kryo: KryoExtra) suspend fun returnKryo(kryo: KryoExtra)
/** /**
* @return true if the remote kryo registration are the same as our own * @return true if the remote kryo registration are the same as our own
*/ */
fun verifyKryoRegistration(clientBytes: ByteArray): Boolean suspend fun verifyKryoRegistration(clientBytes: ByteArray): Boolean
/** /**
* @return the details of all registration IDs -> Class name used by kryo * @return the details of all registration IDs -> Class name used by kryo
@ -145,7 +144,7 @@ interface NetworkSerializationManager : SerializationManager<DirectBuffer> {
/** /**
* Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration. * Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration.
*/ */
fun finishInit(endPointClass: Class<*>) suspend fun finishInit(endPointClass: Class<*>)
/** /**
* @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client * @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client

View File

@ -23,25 +23,31 @@ import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy
import com.esotericsoftware.kryo.util.IdentityMap import com.esotericsoftware.kryo.util.IdentityMap
import dorkbox.network.connection.KryoExtra
import dorkbox.network.connection.ping.PingMessage import dorkbox.network.connection.ping.PingMessage
import dorkbox.network.handshake.Message import dorkbox.network.handshake.HandshakeMessage
import dorkbox.network.pipeline.AeronInput
import dorkbox.network.pipeline.AeronOutput
import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.CachedMethod
import dorkbox.network.rmi.RmiUtils import dorkbox.network.rmi.RmiUtils
import dorkbox.network.rmi.messages.* import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest
import dorkbox.objectPool.ObjectPool import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse
import dorkbox.objectPool.PoolableObject import dorkbox.network.rmi.messages.GlobalObjectCreateRequest
import dorkbox.network.rmi.messages.GlobalObjectCreateResponse
import dorkbox.network.rmi.messages.MethodRequest
import dorkbox.network.rmi.messages.MethodRequestSerializer
import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.rmi.messages.MethodResponseSerializer
import dorkbox.network.rmi.messages.ObjectResponseSerializer
import dorkbox.network.rmi.messages.RmiClientRequestSerializer
import dorkbox.os.OS import dorkbox.os.OS
import dorkbox.util.serialization.SerializationDefaults import dorkbox.util.serialization.SerializationDefaults
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
import org.agrona.MutableDirectBuffer import org.agrona.MutableDirectBuffer
import org.agrona.collections.Int2ObjectHashMap import org.agrona.collections.Int2ObjectHashMap
import org.objenesis.instantiator.ObjectInstantiator import org.objenesis.instantiator.ObjectInstantiator
import org.objenesis.strategy.StdInstantiatorStrategy import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.IOException import java.io.IOException
import java.lang.reflect.Constructor import java.lang.reflect.Constructor
import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationHandler
@ -66,8 +72,8 @@ import java.lang.reflect.InvocationHandler
* an object's type. Default is [ReflectionSerializerFactory] with [FieldSerializer]. @see * an object's type. Default is [ReflectionSerializerFactory] with [FieldSerializer]. @see
* Kryo#newDefaultSerializer(Class) * Kryo#newDefaultSerializer(Class)
*/ */
class Serialization(references: Boolean, class Serialization(private val references: Boolean,
factory: SerializerFactory<*>?) : NetworkSerializationManager { private val factory: SerializerFactory<*>?) : NetworkSerializationManager {
companion object { companion object {
@ -86,7 +92,6 @@ class Serialization(references: Boolean,
fun DEFAULT(references: Boolean = true, factory: SerializerFactory<*>? = null): Serialization { fun DEFAULT(references: Boolean = true, factory: SerializerFactory<*>? = null): Serialization {
val serialization = Serialization(references, factory) val serialization = Serialization(references, factory)
serialization.register(ControlMessage::class.java)
serialization.register(PingMessage::class.java) // TODO this is built into aeron!??!?!?! serialization.register(PingMessage::class.java) // TODO this is built into aeron!??!?!?!
// TODO: this is for diffie hellmen handshake stuff! // TODO: this is for diffie hellmen handshake stuff!
@ -95,16 +100,16 @@ class Serialization(references: Boolean,
// TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo // TODO: fix kryo to work the way we want, so we can register interfaces + serializers with kryo
// serialization.register(XECPublicKey::class.java, XECPublicKeySerializer()) // serialization.register(XECPublicKey::class.java, XECPublicKeySerializer())
// serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer()) // serialization.register(XECPrivateKey::class.java, XECPrivateKeySerializer())
serialization.register(Message::class.java) // must use full package name! // serialization.register(Message::class.java) // must use full package name!
return serialization return serialization
} }
} }
private lateinit var logger: Logger private lateinit var logger: KLogger
private var initialized = false private var initialized = false
private val kryoPool: ObjectPool<KryoExtra> private val kryoPool: Channel<KryoExtra>
lateinit var classResolver: ClassResolver lateinit var classResolver: ClassResolver
// used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class) // used by operations performed during kryo initialization, which are by default package access (since it's an anon-inner class)
@ -137,48 +142,49 @@ class Serialization(references: Boolean,
// reflectASM doesn't work on android // reflectASM doesn't work on android
private val useAsm = !OS.isAndroid() private val useAsm = !OS.isAndroid()
private val KRYO_COUNT = 32
init { init {
kryoPool = ObjectPool.NonBlockingSoftReference(object : PoolableObject<KryoExtra>() { // reasonable size of available kryo's before coroutines are suspended during read/write
override fun create(): KryoExtra { kryoPool = Channel(KRYO_COUNT)
synchronized(this@Serialization) {
// we HAVE to pre-allocate the KRYOs
val kryo = KryoExtra(methodCache)
kryo.instantiatorStrategy = instantiatorStrategy
kryo.references = references
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
SerializationDefaults.register(kryo)
// RMI stuff!
kryo.register(GlobalObjectCreateRequest::class.java)
kryo.register(GlobalObjectCreateResponse::class.java)
kryo.register(ConnectionObjectCreateRequest::class.java)
kryo.register(ConnectionObjectCreateResponse::class.java)
kryo.register(MethodRequest::class.java, methodRequestSerializer)
kryo.register(MethodResponse::class.java, methodResponseSerializer)
@Suppress("UNCHECKED_CAST")
kryo.register(InvocationHandler::class.java as Class<Any>, objectRequestSerializer)
// check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer)
classesToRegister.forEach { registration ->
registration.register(kryo)
}
if (factory != null) {
kryo.setDefaultSerializer(factory)
}
return kryo
}
}
})
} }
@Synchronized
private fun initKryo(): KryoExtra {
val kryo = KryoExtra(methodCache)
kryo.instantiatorStrategy = instantiatorStrategy
kryo.references = references
// All registration MUST happen in-order of when the register(*) method was called, otherwise there are problems.
SerializationDefaults.register(kryo)
// RMI stuff!
kryo.register(HandshakeMessage::class.java)
kryo.register(GlobalObjectCreateRequest::class.java)
kryo.register(GlobalObjectCreateResponse::class.java)
kryo.register(ConnectionObjectCreateRequest::class.java)
kryo.register(ConnectionObjectCreateResponse::class.java)
kryo.register(MethodRequest::class.java, methodRequestSerializer)
kryo.register(MethodResponse::class.java, methodResponseSerializer)
@Suppress("UNCHECKED_CAST")
kryo.register(InvocationHandler::class.java as Class<Any>, objectRequestSerializer)
// check to see which interfaces are mapped to RMI (otherwise, the interface requires a serializer)
classesToRegister.forEach { registration ->
registration.register(kryo)
}
if (factory != null) {
kryo.setDefaultSerializer(factory)
}
return kryo
}
/** /**
* Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer]. * Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer].
@ -321,16 +327,15 @@ class Serialization(references: Boolean,
* is already in use by a different type, an exception is thrown. * is already in use by a different type, an exception is thrown.
*/ */
@Synchronized @Synchronized
override fun finishInit(endPointClass: Class<*>) { override suspend fun finishInit(endPointClass: Class<*>) {
val name = endPointClass.simpleName this.logger = KotlinLogging.logger(endPointClass.simpleName)
this.logger = LoggerFactory.getLogger("$name.SERIAL")
initialized = true initialized = true
// initialize the kryo pool with at least 1 kryo instance. This ALSO makes sure that all of our class registration is done // initialize the kryo pool with at least 1 kryo instance. This ALSO makes sure that all of our class registration is done
// correctly and (if not) we are are notified on the initial thread (instead of on the network update thread) // correctly and (if not) we are are notified on the initial thread (instead of on the network update thread)
val kryo = kryoPool.take() val kryo = takeKryo()
// save off the class-resolver, so we can lookup the class <-> id relationships // save off the class-resolver, so we can lookup the class <-> id relationships
classResolver = kryo.classResolver classResolver = kryo.classResolver
@ -413,7 +418,7 @@ class Serialization(references: Boolean,
output.toBytes().copyInto(savedRegistrationDetails, 0, 0, length) output.toBytes().copyInto(savedRegistrationDetails, 0, 0, length)
output.close() output.close()
} finally { } finally {
kryoPool.put(kryo) returnKryo(kryo)
} }
} }
@ -435,7 +440,7 @@ class Serialization(references: Boolean,
* *
* @return true if kryo registration is required for all classes sent over the wire * @return true if kryo registration is required for all classes sent over the wire
*/ */
override fun verifyKryoRegistration(clientBytes: ByteArray): Boolean { override suspend fun verifyKryoRegistration(clientBytes: ByteArray): Boolean {
// verify the registration IDs if necessary with our own. The CLIENT does not verify anything, only the server! // verify the registration IDs if necessary with our own. The CLIENT does not verify anything, only the server!
val kryoRegistrationDetails = savedRegistrationDetails val kryoRegistrationDetails = savedRegistrationDetails
val equals = kryoRegistrationDetails.contentEquals(clientBytes) val equals = kryoRegistrationDetails.contentEquals(clientBytes)
@ -505,7 +510,9 @@ class Serialization(references: Boolean,
// JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff // JUST MAYBE this is a serializer for RMI. The client doesn't have to register for RMI stuff
if (serializerServer == objectResponseSerializer::class.java.name && serializerClient.isEmpty()) { if (serializerServer == objectResponseSerializer::class.java.name && serializerClient.isEmpty()) {
// this is for RMI! // this is for SERVER RMI!
} else if (serializerClient == objectResponseSerializer::class.java.name && serializerServer.isEmpty()) {
// this is for CLIENT RMI!
} else { } else {
success = false success = false
logger.error("Registration {} Client -> {} ({})", idClient, nameClient, serializerClient) logger.error("Registration {} Client -> {} ({})", idClient, nameClient, serializerClient)
@ -543,15 +550,16 @@ class Serialization(references: Boolean,
/** /**
* @return takes a kryo instance from the pool. * @return takes a kryo instance from the pool.
*/ */
override fun takeKryo(): KryoExtra { override suspend fun takeKryo(): KryoExtra {
return kryoPool.take() // ALWAYS get as many as needed. Recycle them to prevent too many getting created
return kryoPool.poll() ?: initKryo()
} }
/** /**
* Returns a kryo instance to the pool. * Returns a kryo instance to the pool for use later on
*/ */
override fun returnKryo(kryo: KryoExtra) { override suspend fun returnKryo(kryo: KryoExtra) {
kryoPool.put(kryo) kryoPool.send(kryo)
} }
/** /**
@ -638,83 +646,90 @@ class Serialization(references: Boolean,
} }
/** /**
* # BLOCKING
*
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
*
*
* No crypto and no sequence number
*
*
* There is a small speed penalty if there were no kryo's available to use.
*/ */
@Throws(IOException::class) @Throws(IOException::class)
override fun write(buffer: DirectBuffer, message: Any) { override fun write(buffer: DirectBuffer, message: Any) {
val kryo = kryoPool.take() runBlocking {
try { val kryo = takeKryo()
val output = AeronOutput(buffer as MutableDirectBuffer) try {
kryo.writeClassAndObject(output, message) val output = AeronOutput(buffer as MutableDirectBuffer)
} finally { kryo.writeClassAndObject(output, message)
kryoPool.put(kryo) } finally {
returnKryo(kryo)
}
} }
} }
/** /**
* # BLOCKING
*
* Reads an object from the buffer. * Reads an object from the buffer.
* *
*
* No crypto and no sequence number
*
* @param length should ALWAYS be the length of the expected object! * @param length should ALWAYS be the length of the expected object!
*/ */
@Throws(IOException::class) @Throws(IOException::class)
override fun read(buffer: DirectBuffer, length: Int): Any? { override fun read(buffer: DirectBuffer, length: Int): Any? {
val kryo = kryoPool.take() return runBlocking {
return try { val kryo = takeKryo()
val input = AeronInput(buffer) try {
kryo.readClassAndObject(input) val input = AeronInput(buffer)
} finally { kryo.readClassAndObject(input)
kryoPool.put(kryo) } finally {
returnKryo(kryo)
}
} }
} }
/** /**
* # BLOCKING
*
* Writes the class and object using an available kryo instance * Writes the class and object using an available kryo instance
*/ */
@Throws(IOException::class) @Throws(IOException::class)
override fun writeFullClassAndObject(output: Output, value: Any) { override fun writeFullClassAndObject(output: Output, value: Any) {
val kryo = kryoPool.take() runBlocking {
var prev = false val kryo = takeKryo()
try { var prev = false
prev = kryo.isRegistrationRequired try {
kryo.isRegistrationRequired = false prev = kryo.isRegistrationRequired
kryo.writeClassAndObject(output, value) kryo.isRegistrationRequired = false
} catch (ex: Exception) { kryo.writeClassAndObject(output, value)
val msg = "Unable to serialize buffer" } catch (ex: Exception) {
logger.error(msg, ex) val msg = "Unable to serialize buffer"
throw IOException(msg, ex) logger.error(msg, ex)
} finally { throw IOException(msg, ex)
kryo.isRegistrationRequired = prev } finally {
kryoPool.put(kryo) kryo.isRegistrationRequired = prev
returnKryo(kryo)
}
} }
} }
/** /**
* # BLOCKING
*
* Returns a class read from the input * Returns a class read from the input
*/ */
@Throws(IOException::class) @Throws(IOException::class)
override fun readFullClassAndObject(input: Input): Any { override fun readFullClassAndObject(input: Input): Any {
val kryo = kryoPool.take() return runBlocking {
var prev = false val kryo = takeKryo()
return try { var prev = false
prev = kryo.isRegistrationRequired try {
kryo.isRegistrationRequired = false prev = kryo.isRegistrationRequired
kryo.readClassAndObject(input) kryo.isRegistrationRequired = false
} catch (ex: Exception) { kryo.readClassAndObject(input)
val msg = "Unable to deserialize buffer" } catch (ex: Exception) {
logger.error(msg, ex) val msg = "Unable to deserialize buffer"
throw IOException(msg, ex) logger.error(msg, ex)
} finally { throw IOException(msg, ex)
kryo.isRegistrationRequired = prev } finally {
kryoPool.put(kryo) kryo.isRegistrationRequired = prev
returnKryo(kryo)
}
} }
} }