No longer have a dependency on Storage serialization manager

This commit is contained in:
nathan 2020-09-15 21:17:18 +02:00
parent 8af039265b
commit e08d5a3d2b

View File

@ -19,8 +19,6 @@ import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.SerializerFactory
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy
import com.esotericsoftware.minlog.Log
import dorkbox.network.Server
@ -39,20 +37,15 @@ import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.rmi.messages.MethodResponseSerializer
import dorkbox.network.rmi.messages.RmiClientSerializer
import dorkbox.network.rmi.messages.RmiServerSerializer
import dorkbox.network.storage.SettingsStore
import dorkbox.os.OS
import dorkbox.util.serialization.SerializationDefaults
import dorkbox.util.serialization.SerializationManager
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.runBlocking
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.MutableDirectBuffer
import org.agrona.collections.Int2ObjectHashMap
import org.objenesis.instantiator.ObjectInstantiator
import org.objenesis.strategy.StdInstantiatorStrategy
import java.io.IOException
import java.lang.reflect.Constructor
import java.lang.reflect.InvocationHandler
import kotlin.coroutines.Continuation
@ -78,7 +71,7 @@ import kotlin.coroutines.Continuation
* an object's type. Default is [ReflectionSerializerFactory] with [FieldSerializer]. @see
* Kryo#newDefaultSerializer(Class)
*/
open class Serialization(private val references: Boolean = true, private val factory: SerializerFactory<*>? = null) : SerializationManager<DirectBuffer> {
open class Serialization(private val references: Boolean = true, private val factory: SerializerFactory<*>? = null) {
companion object {
// -2 is the same value that kryo uses for invalid id's
@ -147,7 +140,7 @@ open class Serialization(private val references: Boolean = true, private val fac
*
* This must happen before the creation of the client/server
*/
override fun <T> register(clazz: Class<T>): Serialization {
fun <T> register(clazz: Class<T>): Serialization {
require(!initialized.value) { "Serialization 'register(class)' cannot happen after client/server initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
@ -172,7 +165,7 @@ open class Serialization(private val references: Boolean = true, private val fac
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
override fun <T> register(clazz: Class<T>, id: Int): Serialization {
fun <T> register(clazz: Class<T>, id: Int): Serialization {
require(!initialized.value) { "Serialization 'register(Class, int)' cannot happen after client/server initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
@ -195,7 +188,7 @@ open class Serialization(private val references: Boolean = true, private val fac
* method. The order must be the same at deserialization as it was for serialization.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>): Serialization {
fun <T> register(clazz: Class<T>, serializer: Serializer<T>): Serialization {
require(!initialized.value) { "Serialization 'register(Class, Serializer)' cannot happen after client/server initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
@ -220,7 +213,7 @@ open class Serialization(private val references: Boolean = true, private val fac
* these IDs can be repurposed.
*/
@Synchronized
override fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): Serialization {
fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): Serialization {
require(!initialized.value) { "Serialization 'register(Class, Serializer, int)' cannot happen after client/server initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
@ -395,9 +388,7 @@ open class Serialization(private val references: Boolean = true, private val fac
*
* This is to prevent (and recognize) out-of-order class/serializer registration. If an ID is already in use by a different type, an exception is thrown.
*/
internal fun finishInit(type: Class<*>,
settingsStore: SettingsStore,
kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0)): Boolean {
internal fun finishInit(type: Class<*>, kryoRegistrationDetailsFromServer: ByteArray = ByteArray(0)): Boolean {
logger = KotlinLogging.logger(type.simpleName)
@ -408,10 +399,6 @@ open class Serialization(private val references: Boolean = true, private val fac
return false
}
settingsStore.getSerializationRegistrations().forEach {
classesToRegister.add(ClassRegistration3(it))
}
val kryo = initKryo()
initializeClassRegistrations(kryo)
} else {
@ -761,100 +748,100 @@ open class Serialization(private val references: Boolean = true, private val fac
return methodCache[classId]
}
/**
* # BLOCKING
*
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
*
* @throws IOException
*/
override fun write(buffer: DirectBuffer, message: Any) {
runBlocking {
val kryo = takeKryo()
try {
val output = AeronOutput(buffer as MutableDirectBuffer)
kryo.writeClassAndObject(output, message)
} finally {
returnKryo(kryo)
}
}
}
/**
* # BLOCKING
*
* Reads an object from the buffer.
*
* @param length should ALWAYS be the length of the expected object!
*/
@Throws(IOException::class)
override fun read(buffer: DirectBuffer, length: Int): Any? {
return runBlocking {
val kryo = takeKryo()
try {
val input = AeronInput(buffer)
kryo.readClassAndObject(input)
} finally {
returnKryo(kryo)
}
}
}
/**
* # BLOCKING
*
* Writes the class and object using an available kryo instance
*/
@Throws(IOException::class)
override fun writeFullClassAndObject(output: Output, value: Any) {
runBlocking {
val kryo = takeKryo()
var prev = false
try {
prev = kryo.isRegistrationRequired
kryo.isRegistrationRequired = false
kryo.writeClassAndObject(output, value)
} catch (ex: Exception) {
val msg = "Unable to serialize buffer"
logger.error(msg, ex)
throw IOException(msg, ex)
} finally {
kryo.isRegistrationRequired = prev
returnKryo(kryo)
}
}
}
/**
* # BLOCKING
*
* Returns a class read from the input
*/
@Throws(IOException::class)
override fun readFullClassAndObject(input: Input): Any {
return runBlocking {
val kryo = takeKryo()
var prev = false
try {
prev = kryo.isRegistrationRequired
kryo.isRegistrationRequired = false
kryo.readClassAndObject(input)
} catch (ex: Exception) {
val msg = "Unable to deserialize buffer"
logger.error(msg, ex)
throw IOException(msg, ex)
} finally {
kryo.isRegistrationRequired = prev
returnKryo(kryo)
}
}
}
// NOTE: These following functions are ONLY called on a single thread!
fun readMessage(buffer: DirectBuffer, offset: Int, length: Int, connection: Connection): Any? {
return readKryo.read(buffer, offset, length, connection)
}
// /**
// * # BLOCKING
// *
// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
// *
// * @throws IOException
// */
// override fun write(buffer: DirectBuffer, message: Any) {
// runBlocking {
// val kryo = takeKryo()
// try {
// val output = AeronOutput(buffer as MutableDirectBuffer)
// kryo.writeClassAndObject(output, message)
// } finally {
// returnKryo(kryo)
// }
// }
// }
//
// /**
// * # BLOCKING
// *
// * Reads an object from the buffer.
// *
// * @param length should ALWAYS be the length of the expected object!
// */
// @Throws(IOException::class)
// override fun read(buffer: DirectBuffer, length: Int): Any? {
// return runBlocking {
// val kryo = takeKryo()
// try {
// val input = AeronInput(buffer)
// kryo.readClassAndObject(input)
// } finally {
// returnKryo(kryo)
// }
// }
// }
//
// /**
// * # BLOCKING
// *
// * Writes the class and object using an available kryo instance
// */
// @Throws(IOException::class)
// override fun writeFullClassAndObject(output: Output, value: Any) {
// runBlocking {
// val kryo = takeKryo()
// var prev = false
// try {
// prev = kryo.isRegistrationRequired
// kryo.isRegistrationRequired = false
// kryo.writeClassAndObject(output, value)
// } catch (ex: Exception) {
// val msg = "Unable to serialize buffer"
// logger.error(msg, ex)
// throw IOException(msg, ex)
// } finally {
// kryo.isRegistrationRequired = prev
// returnKryo(kryo)
// }
// }
// }
//
// /**
// * # BLOCKING
// *
// * Returns a class read from the input
// */
// @Throws(IOException::class)
// override fun readFullClassAndObject(input: Input): Any {
// return runBlocking {
// val kryo = takeKryo()
// var prev = false
// try {
// prev = kryo.isRegistrationRequired
// kryo.isRegistrationRequired = false
// kryo.readClassAndObject(input)
// } catch (ex: Exception) {
// val msg = "Unable to deserialize buffer"
// logger.error(msg, ex)
// throw IOException(msg, ex)
// } finally {
// kryo.isRegistrationRequired = prev
// returnKryo(kryo)
// }
// }
// }
// /**
// * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
// *