Fixed issues with heap garbage generation and performance (suspend is better than blocking, but only with short execution stacks)

This commit is contained in:
Robinson 2023-09-07 01:00:53 +02:00
parent 7ac284bc1b
commit 94ae22716d
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 139 additions and 41 deletions

View File

@ -30,7 +30,6 @@ import dorkbox.network.connection.streaming.StreamingManager
import dorkbox.network.exceptions.*
import dorkbox.network.handshake.Handshaker
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingManager
import dorkbox.network.rmi.ResponseManager
import dorkbox.network.rmi.RmiManagerConnections
import dorkbox.network.rmi.RmiManagerGlobal
@ -40,16 +39,26 @@ import dorkbox.network.serialization.KryoReader
import dorkbox.network.serialization.KryoWriter
import dorkbox.network.serialization.Serialization
import dorkbox.network.serialization.SettingsStore
import dorkbox.objectPool.*
import dorkbox.os.OS
import dorkbox.util.NamedThreadFactory
import io.aeron.Publication
import io.aeron.driver.ThreadingMode
import io.aeron.logbuffer.Header
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.trySendBlocking
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.concurrent.IdleStrategy
import java.util.concurrent.*
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
// it results in severe UDP packet loss and contention.
//
@ -97,9 +106,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// this is rather silly, BUT if there are more complex errors WITH the coroutine that occur, a regular try/catch WILL NOT catch it.
// ADDITIONALLY, an error handler is ONLY effective at the first, top-level `launch`. IT WILL NOT WORK ANY OTHER WAY.
private val errorHandler = CoroutineExceptionHandler { _, exception ->
logger.error(exception) { "Uncaught Coroutine Error!" }
}
private var messageCoroutineScope: CoroutineScope? = null
private val messageChannel = Channel<Paired<CONNECTION>>(32) // the size is not necessary to be large. Thread handoff should be responsive.
private val pairedPool: Pool<Paired<CONNECTION>>
internal val listenerManager = ListenerManager<CONNECTION>(logger)
@ -172,8 +181,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
private val streamingManager = StreamingManager<CONNECTION>(logger, config)
private val pingManager = PingManager<CONNECTION>()
/**
* The primary machine port that the server will listen for connections on
*/
@ -250,7 +257,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
hook = Thread {
close(
close(
closeEverything = true,
notifyDisconnect = true,
releaseWaitingThreads = true
@ -258,6 +265,17 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
Runtime.getRuntime().addShutdownHook(hook)
val poolObject = object : BoundedPoolObject<Paired<CONNECTION>>() {
override fun newInstance(): Paired<CONNECTION> {
return Paired()
}
}
// The purpose of this, is to lessen the impact of garbage created on the heap.
pairedPool = ObjectPool.nonBlockingBounded(poolObject, 256)
}
@ -309,6 +327,45 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// there are threading issues if there are client(s) and server's within the same JVM, where we have thread starvation
// this resolves the problem. Additionally, this is tied-to specific a specific endpoint instance
networkEventPoller.configure(logger, config, this)
// how to select the number of threads that will fetch/use data off the network stack.
// The default is a minimum of 1, but maximum of 4.
// Account for
// - Aeron Threads (3, usually - defined in config)
// - Leave 2 threads for "the box"
val aeronThreads = when (config.threadingMode) {
ThreadingMode.SHARED -> 1
ThreadingMode.SHARED_NETWORK -> 2
ThreadingMode.DEDICATED -> 3
else -> 3
}
// create a new one when the endpoint starts up, because we close it when the endpoint shuts down or when the client retries
// this leaves 2 for the box and XX for aeron
val messageProcessThreads = (OS.optimumNumberOfThreads - aeronThreads).coerceAtLeast(1).coerceAtMost(4)
// create a new one when the endpoint starts up, because we close it when the endpoint shuts down or when the client retries
val messageCoroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
this.messageCoroutineScope = messageCoroutineScope
repeat(messageProcessThreads) {
messageCoroutineScope.launch {
logger.error { "Starting executor for message processing" }
// this is only true while the endpoint is running.
while (endpointIsRunning.value) {
val paired = messageChannel.receive()
val connection = paired.connection
val message = paired.message
pairedPool.put(paired)
processMessageFromChannel(connection, message)
}
}
}
}
/**
@ -465,15 +522,17 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
return try {
// since ANY thread can call 'send', we have to take kryo instances in a safe way
serialization.withKryo {
val buffer = this.write(connection, message)
val kryo = serialization.take()
try {
val buffer = kryo.write(connection, message)
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
// one small problem! What if the message is too big to send all at once?
// The maximum size we can send in a "single fragment" is the maxPayloadLength() function, which is the MTU length less header (with defaults this is 1,376 bytes).
if (objectSize >= maxMessageSize) {
serialization.withKryo {
val kryoStream = serialization.take()
try {
// we must split up the message! It's too large for Aeron to manage.
streamingManager.send(
publication = publication,
@ -481,14 +540,18 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
objectSize = objectSize,
maxMessageSize = maxMessageSize,
endPoint = this@EndPoint,
kryo = this, // this is safe, because we save out the bytes from the original object!
kryo = kryoStream, // this is safe, because we save out the bytes from the original object!
sendIdleStrategy = sendIdleStrategy,
connection = connection
)
} finally {
serialization.put(kryoStream)
}
} else {
aeronDriver.send(publication, internalBuffer, bufferClaim, 0, objectSize, sendIdleStrategy, connection, abortEarly, listenerManager)
aeronDriver.send(publication, internalBuffer, kryo.bufferClaim, 0, objectSize, sendIdleStrategy, connection, abortEarly, listenerManager)
}
} finally {
serialization.put(kryo)
}
} catch (e: Throwable) {
// if the driver is closed due to a network disconnect or a remote-client termination, we also must close the connection.
@ -554,16 +617,55 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// the remote endPoint will send this message if it is closing the connection.
// IF we get this message in time, then we do not have to wait for the connection to expire before closing it
is DisconnectMessage -> {
// NOTE: This MUST be on a new co-routine (this is...)
logger.debug { "Received disconnect message from $otherTypeName" }
connection.close(sendDisconnectMessage = false,
notifyDisconnect = true)
}
// streaming message. This is used when the published data is too large for a single Aeron message.
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped blocks of data.
is StreamingControl -> {
// NOTE: this CANNOT be on a separate thread, because we must guarantee that this happens first!
streamingManager.processControlMessage(message, this@EndPoint, connection)
}
is StreamingData -> {
// NOTE: this CANNOT be on a separate thread, because we must guarantee that this happens in-order!
try {
streamingManager.processDataMessage(message, this@EndPoint, connection)
} catch (e: Exception) {
listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e))
}
}
is Any -> {
// NOTE: This MUST be on a new threads (otherwise RMI has issues where it will be blocking)
val paired = pairedPool.take()
paired.connection = connection
paired.message = message
// This will try to send the element (blocking if necessary)
messageChannel.trySendBlocking(paired)
}
else -> {
listenerManager.notifyError(connection, MessageDispatchException("Unknown message received!!"))
}
}
}
/**
* This is also what process the incoming message when it is received from the aeron network.
*
* THIS IS PROCESSED ON MULTIPLE THREADS!
*/
private fun processMessageFromChannel(connection: CONNECTION, message: Any) {
when (message) {
is Ping -> {
// PING will also measure APP latency, not just NETWORK PIPE latency
try {
pingManager.manage(connection, responseManager, message, logger)
connection.receivePing(message)
} catch (e: Exception) {
listenerManager.notifyError(connection, PingException("Error while processing Ping message", e))
}
@ -581,26 +683,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
listenerManager.notifyError(connection, RMIException("Error while processing RMI message", e))
}
}
// streaming message. This is used when the published data is too large for a single Aeron message.
// TECHNICALLY, we could arbitrarily increase the size of the permitted Aeron message, however this doesn't let us
// send arbitrarily large pieces of data (gigs in size, potentially).
// This will recursively call into this method for each of the unwrapped blocks of data.
is StreamingControl -> {
streamingManager.processControlMessage(message, readKryo,this@EndPoint, connection)
}
is StreamingData -> {
// NOTE: This MUST NOT be on a new co-routine. It must be on the same thread!
try {
streamingManager.processDataMessage(message, this@EndPoint, connection)
} catch (e: Exception) {
listenerManager.notifyError(connection, StreamingException("Error processing StreamingMessage", e))
}
}
is Any -> {
else -> {
try {
var hasListeners = listenerManager.notifyOnMessage(connection, message)
@ -614,10 +697,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
listenerManager.notifyError(connection, MessageDispatchException("Error processing message ${message::class.java.name}", e))
}
}
else -> {
listenerManager.notifyError(connection, MessageDispatchException("Unknown message received!!"))
}
}
}
@ -647,7 +726,10 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
try {
// NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change!
val message = readKryo.read(buffer, offset, length, connection)
logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" }
if (logger.isTraceEnabled) {
// don't automatically create the lambda when trace is disabled! Because this uses 'outside' scoped info, it's a new lambda each time!
logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" }
}
processMessage(message, connection, readKryo)
} catch (e: Exception) {
listenerManager.notifyError(connection, newException("Error de-serializing message", e))
@ -771,6 +853,11 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// if we are restarting the network state, we want to continue to wait for a proper close event.
// when shutting down, it can take up to 5 seconds to fully register as "shutdown"
if (networkEventPoller.isDispatch()) {
// we cannot wait for a connection while inside the network event dispatch, since it has to close itself and this method waits for it!!
throw IllegalStateException("Unable to 'waitForClose()' while inside the network event dispatch, this will deadlock!")
}
val success = if (timeoutMS > 0) {
closeLatch.await(timeoutMS, TimeUnit.MILLISECONDS)
} else {
@ -850,6 +937,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
shutdownEventPoller = true
// if we close the poller AND listener manager too quickly, events will not get published
// this waits for the ENDPOINT to finish running its tasks in the poller.
pollerClosedLatch.await()
// this will ONLY close the event dispatcher if ALL endpoints have closed it.
@ -857,8 +945,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
networkEventPoller.close(logger, this)
// Connections MUST be closed first, because we want to make sure that no RMI messages can be received
// when we close the RMI support objects (in which case, weird - but harmless - errors show up)
// this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via an RMI method)
@ -885,6 +971,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
shutdownLatch.countDown()
shutdownInProgress.lazySet(false)
messageCoroutineScope?.cancel("Message dispatch has been closed")
if (releaseWaitingThreads) {
logger.trace { "Counting down the close latch..." }
closeLatch.countDown()
@ -895,6 +983,14 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
}
/**
* @return true if the current execution thread is in the primary network event dispatch
*/
fun isDispatch(): Boolean {
return networkEventPoller.isDispatch()
}
/**
* Reset the running state when there's an error starting up
*/
@ -904,6 +1000,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
endpointIsRunning.lazySet(false)
shutdown = false
shutdownEventPoller = false
messageCoroutineScope?.cancel("Reset the message dispatch")
}
override fun hashCode(): Int {