Changed handshake write to be THREAD strategy instead of COROUTINE strategy

This commit is contained in:
Robinson 2021-04-29 10:04:53 +02:00
parent 47b63173ac
commit 7a6926df93

View File

@ -45,6 +45,7 @@ import kotlinx.coroutines.runBlocking
import mu.KLogger import mu.KLogger
import mu.KotlinLogging import mu.KotlinLogging
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
import org.agrona.concurrent.IdleStrategy
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
@ -85,6 +86,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
private val handshakeKryo: KryoExtra private val handshakeKryo: KryoExtra
private val sendIdleStrategy: CoroutineIdleStrategy private val sendIdleStrategy: CoroutineIdleStrategy
private val sendIdleStrategyHandshake: IdleStrategy
/** /**
* Crypto and signature management * Crypto and signature management
@ -123,6 +125,8 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// serialization stuff // serialization stuff
serialization = config.serialization serialization = config.serialization
sendIdleStrategy = config.sendIdleStrategy sendIdleStrategy = config.sendIdleStrategy
sendIdleStrategyHandshake = sendIdleStrategy.cloneToNormal()
handshakeKryo = serialization.initHandshakeKryo() handshakeKryo = serialization.initHandshakeKryo()
// we have to be able to specify the property store // we have to be able to specify the property store
@ -296,7 +300,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD // note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
internal suspend fun writeHandshakeMessage(publication: Publication, message: HandshakeMessage) { internal fun writeHandshakeMessage(publication: Publication, message: HandshakeMessage) {
// The handshake sessionId IS NOT globally unique // The handshake sessionId IS NOT globally unique
logger.trace { logger.trace {
"[${publication.sessionId()}] send HS: $message" "[${publication.sessionId()}] send HS: $message"
@ -329,7 +333,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
*/ */
if (result >= Publication.ADMIN_ACTION) { if (result >= Publication.ADMIN_ACTION) {
// we should retry. // we should retry.
sendIdleStrategy.idle() sendIdleStrategyHandshake.idle()
continue continue
} }
@ -344,7 +348,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException` ListenerManager.cleanStackTrace(exception, 2) // 2 because we do not want to see the stack for the abstract `newException`
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
} finally { } finally {
sendIdleStrategy.reset() sendIdleStrategyHandshake.reset()
} }
} }