Code cleanup
This commit is contained in:
parent
ccf7a37d3c
commit
15c7fb2a3d
@ -718,7 +718,7 @@ open class Client<CONNECTION : Connection>(
|
||||
|
||||
try {
|
||||
// only have ot do one
|
||||
serialization.finishClientConnect(connectionInfo.kryoRegistrationDetails, maxMessageSize)
|
||||
serialization.finishClientConnect(connectionInfo.kryoRegistrationDetails)
|
||||
} catch (e: Exception) {
|
||||
handshakeConnection.close()
|
||||
|
||||
@ -738,7 +738,7 @@ open class Client<CONNECTION : Connection>(
|
||||
// we set up our kryo information once we connect to a server (using the server's kryo registration details)
|
||||
|
||||
// every time we connect to a server, we have to reconfigure AND reassign kryo
|
||||
readKryo = serialization.newReadKryo(maxMessageSize)
|
||||
readKryo = serialization.newReadKryo()
|
||||
|
||||
|
||||
///////////////
|
||||
|
@ -21,14 +21,12 @@ import dorkbox.collections.ConcurrentIterator
|
||||
import dorkbox.network.Configuration
|
||||
import dorkbox.network.connection.EndPoint
|
||||
import dorkbox.util.NamedThreadFactory
|
||||
import dorkbox.util.sync.CountDownLatch
|
||||
import kotlinx.atomicfu.atomic
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import mu.KLogger
|
||||
import mu.KotlinLogging
|
||||
import org.agrona.concurrent.IdleStrategy
|
||||
import java.util.concurrent.*
|
||||
|
||||
/**
|
||||
@ -53,9 +51,9 @@ internal class EventPoller {
|
||||
private lateinit var dispatchScope: CoroutineScope
|
||||
|
||||
private lateinit var pollStrategy: CoroutineIdleStrategy
|
||||
private lateinit var clonedStrategy: IdleStrategy
|
||||
|
||||
private var running = true
|
||||
@Volatile
|
||||
private var running = false
|
||||
private var mutex = Mutex()
|
||||
|
||||
// this is thread safe
|
||||
@ -91,17 +89,18 @@ internal class EventPoller {
|
||||
running = true
|
||||
configured = true
|
||||
shutdownLatch = CountDownLatch(1)
|
||||
pollStrategy = config.pollIdleStrategy
|
||||
clonedStrategy = config.pollIdleStrategy.cloneToNormal()
|
||||
pollStrategy = config.pollIdleStrategy.clone()
|
||||
|
||||
dispatchScope = CoroutineScope(pollDispatcher + SupervisorJob())
|
||||
require(pollDispatcher.isActive) { "Unable to start the event dispatch in the terminated state!" }
|
||||
|
||||
dispatchScope.launch {
|
||||
val pollIdleStrategy = clonedStrategy
|
||||
val pollIdleStrategy = pollStrategy
|
||||
var pollCount = 0
|
||||
threadId = Thread.currentThread().id
|
||||
|
||||
pollIdleStrategy.reset()
|
||||
|
||||
while (running) {
|
||||
pollEvents.forEachRemovable {
|
||||
try {
|
||||
@ -151,7 +150,9 @@ internal class EventPoller {
|
||||
shutdownLatch.countDown()
|
||||
}
|
||||
} else {
|
||||
require(pollStrategy == config.pollIdleStrategy) {
|
||||
// we don't want to use .equals, because that also compares STATE, which for us is going to be different because we are cloned!
|
||||
// toString has the right info to compare types/config accurately
|
||||
require(pollStrategy.toString() == config.pollIdleStrategy.toString()) {
|
||||
"The network event poll strategy is different between the multiple instances of network clients/servers. There **WILL BE** thread starvation, so this behavior is forbidden!"
|
||||
}
|
||||
}
|
||||
@ -223,7 +224,7 @@ internal class EventPoller {
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun doClose() {
|
||||
private fun doClose() {
|
||||
val wasRunning = running
|
||||
|
||||
running = false
|
||||
|
@ -125,7 +125,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||
* but that has a (very low) maximum reassembly size -- so we have our own mechanism for object fragmentation/assembly, which
|
||||
* is (in reality) only limited by available ram.
|
||||
*/
|
||||
internal val maxMessageSize = config.networkMtuSize - DataHeaderFlyweight.HEADER_LENGTH
|
||||
private val maxMessageSize = config.networkMtuSize - DataHeaderFlyweight.HEADER_LENGTH
|
||||
|
||||
/**
|
||||
* Read and Write can be concurrent (different buffers are used)
|
||||
@ -178,7 +178,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger)
|
||||
internal val rmiConnectionSupport: RmiManagerConnections<CONNECTION>
|
||||
|
||||
private val streamingManager = StreamingManager<CONNECTION>(logger, messageDispatch, config, maxMessageSize)
|
||||
private val streamingManager = StreamingManager<CONNECTION>(logger, messageDispatch, config)
|
||||
|
||||
private val pingManager = PingManager<CONNECTION>()
|
||||
|
||||
@ -207,14 +207,14 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||
// serialization stuff
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
serialization = config.serialization as Serialization<CONNECTION>
|
||||
serialization.finishInit(type, maxMessageSize)
|
||||
serialization.finishInit(type, config.networkMtuSize)
|
||||
|
||||
serialization.fileContentsSerializer.streamingManager = streamingManager
|
||||
|
||||
// we are done with initial configuration, now finish serialization
|
||||
// the CLIENT will reassign these in the `connect0` method (because it registers what the server says to register)
|
||||
if (type == Server::class.java) {
|
||||
readKryo = serialization.newReadKryo(maxMessageSize)
|
||||
readKryo = serialization.newReadKryo()
|
||||
}
|
||||
|
||||
|
||||
|
@ -36,6 +36,7 @@ import dorkbox.network.serialization.KryoWriter
|
||||
import dorkbox.os.OS
|
||||
import dorkbox.util.Sys
|
||||
import io.aeron.Publication
|
||||
import io.aeron.protocol.DataHeaderFlyweight
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.launch
|
||||
import mu.KLogger
|
||||
@ -44,9 +45,7 @@ import org.agrona.concurrent.UnsafeBuffer
|
||||
import java.io.File
|
||||
import java.io.FileInputStream
|
||||
|
||||
internal class StreamingManager<CONNECTION : Connection>(
|
||||
private val logger: KLogger, private val messageDispatch: CoroutineScope, val config: Configuration, val maxMessageSize: Int
|
||||
) {
|
||||
internal class StreamingManager<CONNECTION : Connection>(private val logger: KLogger, private val messageDispatch: CoroutineScope, val config: Configuration) {
|
||||
|
||||
companion object {
|
||||
private const val KILOBYTE = 1024
|
||||
@ -581,7 +580,7 @@ internal class StreamingManager<CONNECTION : Connection>(
|
||||
connection: CONNECTION,
|
||||
streamSessionId: Int
|
||||
): Boolean {
|
||||
val maxMessageSize = maxMessageSize.toLong()
|
||||
val maxMessageSize = (config.networkMtuSize - DataHeaderFlyweight.HEADER_LENGTH).toLong()
|
||||
|
||||
val fileInputStream = file.inputStream()
|
||||
|
||||
|
@ -140,8 +140,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
|
||||
private val writeKryos: Pool<KryoWriter<CONNECTION>> = ObjectPool.nonBlockingBounded(
|
||||
poolObject = object : BoundedPoolObject<KryoWriter<CONNECTION>>() {
|
||||
override fun newInstance(): KryoWriter<CONNECTION> {
|
||||
logger.debug { "Creating new Kryo($maxMessageSize)" }
|
||||
return newWriteKryo(maxMessageSize)
|
||||
return newWriteKryo()
|
||||
}
|
||||
},
|
||||
maxSize = OS.optimumNumberOfThreads * 2
|
||||
@ -462,6 +461,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
|
||||
throw IllegalArgumentException("Unable to initialize object serialization more than once!")
|
||||
}
|
||||
|
||||
// DO NOT USE THE POOL! This kryo instance must be thrown away!
|
||||
val kryo = KryoWriter<CONNECTION>(maxMessageSize)
|
||||
newGlobalKryo(kryo)
|
||||
|
||||
@ -493,8 +493,9 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
|
||||
*
|
||||
* @return true if initialization was successful, false otherwise. DOES NOT CATCH EXCEPTIONS EXTERNALLY
|
||||
*/
|
||||
internal fun finishClientConnect(kryoRegistrationDetailsFromServer: ByteArray, maxMessageSize: Int) {
|
||||
internal fun finishClientConnect(kryoRegistrationDetailsFromServer: ByteArray) {
|
||||
val readKryo = KryoReader<CONNECTION>(maxMessageSize)
|
||||
// DO NOT USE THE POOL! This kryo instance must be thrown away!
|
||||
val writeKryo = KryoWriter<CONNECTION>(maxMessageSize)
|
||||
newGlobalKryo(readKryo)
|
||||
newGlobalKryo(writeKryo)
|
||||
@ -772,7 +773,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
|
||||
*
|
||||
* @return takes a kryo instance from the pool, or creates one if the pool was empty
|
||||
*/
|
||||
fun newReadKryo(maxMessageSize: Int): KryoReader<CONNECTION> {
|
||||
fun newReadKryo(): KryoReader<CONNECTION> {
|
||||
val kryo = KryoReader<CONNECTION>(maxMessageSize)
|
||||
newGlobalKryo(kryo)
|
||||
|
||||
@ -788,7 +789,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
|
||||
*
|
||||
* @return takes a kryo instance from the pool, or creates one if the pool was empty
|
||||
*/
|
||||
fun newWriteKryo(maxMessageSize: Int): KryoWriter<CONNECTION> {
|
||||
private fun newWriteKryo(): KryoWriter<CONNECTION> {
|
||||
logger.debug { "Creating new Kryo($maxMessageSize)" }
|
||||
val kryo = KryoWriter<CONNECTION>(maxMessageSize)
|
||||
newGlobalKryo(kryo)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user