Code cleanup
This commit is contained in:
parent
2a485bd097
commit
9b1650ae31
|
@ -519,8 +519,8 @@ open class Client<CONNECTION : Connection>(
|
|||
startDriver()
|
||||
initializeState()
|
||||
} catch (e: Exception) {
|
||||
resetOnError()
|
||||
listenerManager.notifyError(ClientException("Unable to start the client!", e))
|
||||
resetOnError()
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -190,7 +190,7 @@ open class Server<CONNECTION : Connection>(
|
|||
if (config.enableIPv4) { logger.warn { "IPv4 is enabled, but only IPC will be used." }}
|
||||
if (config.enableIPv6) { logger.warn { "IPv6 is enabled, but only IPC will be used." }}
|
||||
|
||||
bind(0,0)
|
||||
bind(0, 0, true)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -210,7 +210,11 @@ open class Server<CONNECTION : Connection>(
|
|||
require(port2 < 65535) { "port2 must be < 65535" }
|
||||
}
|
||||
|
||||
bind(port1, port2, false)
|
||||
}
|
||||
|
||||
@Suppress("DuplicatedCode")
|
||||
private fun bind(port1: Int, port2: Int, onlyBindIpc: Boolean) {
|
||||
// the lifecycle of a server is the ENDPOINT (measured via the network event poller)
|
||||
if (endpointIsRunning.value) {
|
||||
listenerManager.notifyError(ServerException("Unable to start, the server is already running!"))
|
||||
|
@ -235,9 +239,6 @@ open class Server<CONNECTION : Connection>(
|
|||
this@Server.port1 = port1
|
||||
this@Server.port2 = port2
|
||||
|
||||
bind0()
|
||||
}
|
||||
private fun bind0() {
|
||||
config as ServerConfiguration
|
||||
|
||||
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
|
||||
|
@ -245,22 +246,28 @@ open class Server<CONNECTION : Connection>(
|
|||
val server = this@Server
|
||||
handshake = ServerHandshake(config, listenerManager, aeronDriver)
|
||||
|
||||
val ipcPoller: AeronPoller = if (config.enableIpc) {
|
||||
val ipcPoller: AeronPoller = if (config.enableIpc || onlyBindIpc) {
|
||||
ServerHandshakePollers.ipc(server, handshake)
|
||||
} else {
|
||||
ServerHandshakePollers.disabled("IPC Disabled")
|
||||
}
|
||||
|
||||
val ipPoller = when (ipInfo.ipType) {
|
||||
// IPv6 will bind to IPv4 wildcard as well, so don't bind both!
|
||||
IpListenType.IPWildcard -> ServerHandshakePollers.ip6Wildcard(server, handshake)
|
||||
IpListenType.IPv4Wildcard -> ServerHandshakePollers.ip4(server, handshake)
|
||||
IpListenType.IPv6Wildcard -> ServerHandshakePollers.ip6(server, handshake)
|
||||
IpListenType.IPv4 -> ServerHandshakePollers.ip4(server, handshake)
|
||||
IpListenType.IPv6 -> ServerHandshakePollers.ip6(server, handshake)
|
||||
IpListenType.IPC -> ServerHandshakePollers.disabled("IPv4/6 Disabled")
|
||||
|
||||
val ipPoller = if (onlyBindIpc) {
|
||||
ServerHandshakePollers.disabled("IPv4/6 Disabled")
|
||||
} else {
|
||||
when (ipInfo.ipType) {
|
||||
// IPv6 will bind to IPv4 wildcard as well, so don't bind both!
|
||||
IpListenType.IPWildcard -> ServerHandshakePollers.ip6Wildcard(server, handshake)
|
||||
IpListenType.IPv4Wildcard -> ServerHandshakePollers.ip4(server, handshake)
|
||||
IpListenType.IPv6Wildcard -> ServerHandshakePollers.ip6(server, handshake)
|
||||
IpListenType.IPv4 -> ServerHandshakePollers.ip4(server, handshake)
|
||||
IpListenType.IPv6 -> ServerHandshakePollers.ip6(server, handshake)
|
||||
IpListenType.IPC -> ServerHandshakePollers.disabled("IPv4/6 Disabled")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
logger.info { ipcPoller.info }
|
||||
logger.info { ipPoller.info }
|
||||
|
||||
|
@ -450,9 +457,6 @@ open class Server<CONNECTION : Connection>(
|
|||
return string0
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable
|
||||
*/
|
||||
fun <R> use(block: (Server<CONNECTION>) -> R): R {
|
||||
return try {
|
||||
block(this)
|
||||
|
|
|
@ -18,10 +18,8 @@ package dorkbox.network.connection
|
|||
import dorkbox.classUtil.ClassHelper
|
||||
import dorkbox.classUtil.ClassHierarchy
|
||||
import dorkbox.collections.IdentityMap
|
||||
import dorkbox.network.Configuration
|
||||
import dorkbox.network.ipFilter.IpFilterRule
|
||||
import dorkbox.os.OS
|
||||
import dorkbox.util.NamedThreadFactory
|
||||
import mu.KLogger
|
||||
import net.jodah.typetools.TypeResolver
|
||||
import java.util.concurrent.*
|
||||
|
@ -38,11 +36,6 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
|
|||
*/
|
||||
val LOAD_FACTOR = OS.getFloat(ListenerManager::class.qualifiedName + "LOAD_FACTOR", 0.8f)
|
||||
|
||||
internal val executor = Executors.newSingleThreadExecutor(
|
||||
NamedThreadFactory("Error Dispatcher", Configuration.networkThreadGroup, Thread.NORM_PRIORITY, true)
|
||||
)
|
||||
|
||||
|
||||
/**
|
||||
* Remove from the stacktrace kotlin coroutine info + dorkbox network call stack. This is NOT used by RMI
|
||||
*
|
||||
|
@ -328,7 +321,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
|
|||
val func = function as (CONNECTION, Any) -> Unit
|
||||
|
||||
val newMessageArray: Array<(CONNECTION, Any) -> Unit>
|
||||
val onMessageArray: Array<(CONNECTION, Any) -> Unit>? = tempMap.get(messageClass)
|
||||
val onMessageArray: Array<(CONNECTION, Any) -> Unit>? = tempMap[messageClass]
|
||||
|
||||
if (onMessageArray != null) {
|
||||
newMessageArray = add(function, onMessageArray)
|
||||
|
@ -466,7 +459,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
|
|||
fun notifyError(connection: CONNECTION, exception: Throwable) {
|
||||
val list = onErrorList
|
||||
if (list.isNotEmpty()) {
|
||||
executor.submit {
|
||||
EventDispatcher.ERROR.launch {
|
||||
list.forEach {
|
||||
try {
|
||||
it(connection, exception)
|
||||
|
@ -490,7 +483,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
|
|||
fun notifyError(exception: Throwable) {
|
||||
val list = onErrorGlobalList
|
||||
if (list.isNotEmpty()) {
|
||||
executor.submit {
|
||||
EventDispatcher.ERROR.launch {
|
||||
list.forEach {
|
||||
try {
|
||||
it(exception)
|
||||
|
|
Loading…
Reference in New Issue