WIP - removing heap allocations

This commit is contained in:
Robinson 2023-09-03 21:17:37 +02:00
parent ac2cf56fb9
commit e7999d3095
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 195 additions and 130 deletions

View File

@ -22,6 +22,8 @@ import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.netUtil.dnsUtils.ResolvedAddressTypes
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.EventActionOperator
import dorkbox.network.aeron.EventCloseOperator
import dorkbox.network.aeron.EventPoller
import dorkbox.network.connection.*
import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress
@ -825,66 +827,72 @@ open class Client<CONNECTION : Connection>(
// have to make a new thread to listen for incoming data!
// SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them
networkEventPoller.submit(
action = {
// if we initiate a disconnect manually, then there is no need to wait for aeron to verify it's closed
// we only want to wait for aeron to verify it's closed if we are SUPPOSED to be connected, but there's a network blip
if (!(shutdownEventPoller || newConnection.isClosed() || newConnection.isConnected())) {
newConnection.poll()
} else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
logger.debug { "[${connection}] connection expired (cleanup)" }
action = object : EventActionOperator {
override fun invoke(): Int {
// if we initiate a disconnect manually, then there is no need to wait for aeron to verify it's closed
// we only want to wait for aeron to verify it's closed if we are SUPPOSED to be connected, but there's a network blip
return if (!(shutdownEventPoller || newConnection.isClosed() || newConnection.isConnected())) {
newConnection.poll()
} else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
logger.debug { "[${connection}] connection expired (cleanup)" }
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
removeConnection(newConnection)
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
removeConnection(newConnection)
// we already removed the connection, we can call it again without side effects
newConnection.close()
// remove ourselves from processing
EventPoller.REMOVE
}
},
onShutdown = {
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
// this can be closed when the connection is remotely closed in ADDITION to manually closing
logger.debug { "Client event dispatch closing..." }
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
this@Client.close(
closeEverything = false,
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
}
// we can now call connect again
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()
if (mustRestartDriverOnError) {
logger.error { "Critical driver error detected, reconnecting client" }
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
// also wait for everyone else to shutdown!!
aeronDriver.internal.endPointUsages.forEach {
it.waitForEndpointShutdown()
runBlocking {
// we already removed the connection, we can call it again without side effects
newConnection.close()
}
// if we restart/reconnect too fast, errors from the previous run will still be present!
aeronDriver.delayLingerTimeout()
reconnect()
// remove ourselves from processing
EventPoller.REMOVE
}
}
},
onClose = object : EventCloseOperator {
override suspend fun invoke() {
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
// this can be closed when the connection is remotely closed in ADDITION to manually closing
logger.debug { "Client event dispatch closing..." }
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
this@Client.close(
closeEverything = false,
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
}
// we can now call connect again
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()
if (mustRestartDriverOnError) {
logger.error { "Critical driver error detected, reconnecting client" }
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
// also wait for everyone else to shutdown!!
aeronDriver.internal.endPointUsages.forEach {
it.waitForEndpointShutdown()
}
// if we restart/reconnect too fast, errors from the previous run will still be present!
aeronDriver.delayLingerTimeout()
reconnect()
}
} else {
logger.debug { "Closed the Network Event Poller..." }
}
} else {
logger.debug { "Closed the Network Event Poller..." }
}
})

View File

@ -16,9 +16,9 @@
package dorkbox.network
import dorkbox.hex.toHexString
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.*
import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.EventPoller
import dorkbox.network.aeron.EventActionOperator
import dorkbox.network.connection.*
import dorkbox.network.connection.IpInfo.Companion.IpListenType
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
@ -271,92 +271,99 @@ open class Server<CONNECTION : Connection>(
logger.info { ipPoller.info }
networkEventPoller.submit(
action = {
if (!shutdownEventPoller) {
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
action = object : EventActionOperator {
override fun invoke(): Int {
return if (!shutdownEventPoller) {
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
// this checks to see if there are NEW clients to handshake with
var pollCount = ipcPoller.poll() + ipPoller.poll()
// this checks to see if there are NEW clients to handshake with
var pollCount = ipcPoller.poll() + ipPoller.poll()
// this manages existing clients (for cleanup + connection polling). This has a concurrent iterator,
// so we can modify this as we go
connections.forEach { connection ->
if (!(connection.isClosed() || connection.isConnected()) ) {
// Otherwise, poll the connection for messages
pollCount += connection.poll()
} else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
logger.debug { "[${connection}] connection expired (cleanup)" }
// this manages existing clients (for cleanup + connection polling). This has a concurrent iterator,
// so we can modify this as we go
connections.forEach { connection ->
if (!(connection.isClosed() || connection.isConnected()) ) {
// Otherwise, poll the connection for messages
pollCount += connection.poll()
} else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
logger.debug { "[${connection}] connection expired (cleanup)" }
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
removeConnection(connection)
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
removeConnection(connection)
// we already removed the connection, we can call it again without side effects
connection.close()
runBlocking {
// we already removed the connection, we can call it again without side effects
connection.close()
}
}
}
}
pollCount
} else {
// remove ourselves from processing
EventPoller.REMOVE
pollCount
} else {
// remove ourselves from processing
EventPoller.REMOVE
}
}
},
onShutdown = {
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
logger.debug { "Server event dispatch closing..." }
onClose = object : EventCloseOperator {
override suspend fun invoke() {
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
logger.debug { "Server event dispatch closing..." }
ipcPoller.close()
ipPoller.close()
ipcPoller.close()
ipPoller.close()
// clear all the handshake info
handshake.clear()
// clear all the handshake info
handshake.clear()
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
this@Server.close(
closeEverything = false,
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
}
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
this@Server.close(
closeEverything = false,
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
}
if (mustRestartDriverOnError) {
logger.error { "Critical driver error detected, restarting server." }
if (mustRestartDriverOnError) {
logger.error { "Critical driver error detected, restarting server." }
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
EventDispatcher.launchSequentially(EventDispatcher.CONNECT) {
waitForEndpointShutdown()
// also wait for everyone else to shutdown!!
aeronDriver.internal.endPointUsages.forEach {
it.waitForEndpointShutdown()
}
// also wait for everyone else to shutdown!!
aeronDriver.internal.endPointUsages.forEach {
it.waitForEndpointShutdown()
}
// if we restart/reconnect too fast, errors from the previous run will still be present!
aeronDriver.delayLingerTimeout()
// if we restart/reconnect too fast, errors from the previous run will still be present!
aeronDriver.delayLingerTimeout()
val p1 = this@Server.port1
val p2 = this@Server.port2
val p1 = this@Server.port1
val p2 = this@Server.port2
if (p1 == 0 && p2 == 0) {
bindIpc()
} else {
bind(p1, p2)
if (p1 == 0 && p2 == 0) {
bindIpc()
} else {
bind(p1, p2)
}
}
}
// we can now call bind again
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()
logger.debug { "Closed the Network Event Poller..." }
}
// we can now call bind again
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()
logger.debug { "Closed the Network Event Poller..." }
})
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.aeron
internal interface EventActionOperator {
operator fun invoke(): Int
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.aeron
internal interface EventCloseOperator {
suspend operator fun invoke()
}

View File

@ -37,11 +37,14 @@ import java.util.concurrent.*
* this among ALL clients within the same JVM so that we can support multiple clients/servers
*/
internal class EventPoller {
private class EventAction(val onAction: EventActionOperator, val onClose: EventCloseOperator)
companion object {
internal const val REMOVE = -1
val eventLogger = KotlinLogging.logger(EventPoller::class.java.simpleName)
private class EventAction(val onAction: suspend EventPoller.()->Int, val onClose: suspend ()->Unit)
private val pollDispatcher = Executors.newSingleThreadExecutor(
NamedThreadFactory("Poll Dispatcher", Configuration.networkThreadGroup, true)
@ -107,7 +110,7 @@ internal class EventPoller {
try {
// check to see if we should remove this event (when a client/server closes, it is removed)
// once ALL endpoint are closed, this is shutdown.
val poll = it.onAction(this@EventPoller)
val poll = it.onAction()
// <0 means we remove the event from processing
// 0 means we idle
@ -163,21 +166,26 @@ internal class EventPoller {
/**
* Will cause the executing thread to wait until the event has been started
*/
suspend fun submit(action: suspend EventPoller.() -> Int, onShutdown: suspend () -> Unit) = mutex.withLock {
suspend fun submit(action: EventActionOperator, onClose: EventCloseOperator) = mutex.withLock {
submitEvents.getAndIncrement()
// this forces the current thread to WAIT until the network poll system has started
val pollStartupLatch = CountDownLatch(1)
pollEvents.add(EventAction(action, onShutdown))
pollEvents.add(EventAction(action, onClose))
pollEvents.add(EventAction(
{
pollStartupLatch.countDown()
object : EventActionOperator {
override fun invoke(): Int {
pollStartupLatch.countDown()
// remove ourselves
REMOVE
},
{}
// remove ourselves
return REMOVE
}
}
, object : EventCloseOperator {
override suspend fun invoke() {
}
}
))
pollStartupLatch.await()