diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 98641f16..83c72162 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -96,6 +96,12 @@ open class Server(config: ServerConfiguration = ServerC */ private val connectionRules = CopyOnWriteArrayList() + /** + * true if the following network stacks are available for use + */ + private val canUseIPv4 = config.enableIPv4 && IPv4.isAvailable + private val canUseIPv6 = config.enableIPv6 && IPv6.isAvailable + internal val listenIPv4Address: InetAddress? internal val listenIPv6Address: InetAddress? @@ -105,11 +111,25 @@ open class Server(config: ServerConfiguration = ServerC require(config.listenIpAddress.isNotBlank()) { "Blank listen IP address, cannot continue"} + // can't disable everything! + if (!config.enableIPC && !config.enableIPv4 && !config.enableIPv6) { + require(false) { "At least one of IPC/IPv4/IPv6 must be enabled!" } + } + + // have to verify if it's the only thing specified, is IPv4 available... + if (!config.enableIPC && !config.enableIPv6 && config.enableIPv4 && !IPv4.isAvailable) { + require(false) { "IPC/IPv6 are disabled and IPv4 is enabled, but there is no IPv4 interface available!" } + } + + // have to verify if it's the only thing specified, is IPv4 available... + if (!config.enableIPC && !config.enableIPv4 && config.enableIPv6 && !IPv6.isAvailable) { + require(false) { "IPC/IPv4 are disabled and IPv6 is enabled, but there is no IPv6 interface available!" } + } + + // localhost/loopback IP might not always be 127.0.0.1 or ::1 // We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this) - listenIPv4Address = if (!config.enableIPv4) { - null - } else { + listenIPv4Address = if (canUseIPv4) { when (config.listenIpAddress) { "loopback", "localhost", "lo" -> IPv4.LOCALHOST "0", "::", "0.0.0.0", "*" -> { @@ -119,10 +139,11 @@ open class Server(config: ServerConfiguration = ServerC else -> Inet4Address.getAllByName(config.listenIpAddress)[0] } } - - listenIPv6Address = if (!config.enableIPv6) { + else { null - } else { + } + + listenIPv6Address = if (canUseIPv6) { when (config.listenIpAddress) { "loopback", "localhost", "lo" -> IPv6.LOCALHOST "0", "::", "0.0.0.0", "*" -> { @@ -132,6 +153,9 @@ open class Server(config: ServerConfiguration = ServerC else -> Inet6Address.getAllByName(config.listenIpAddress)[0] } } + else { + null + } if (config.publicationPort <= 0) { throw ServerException("configuration port must be > 0") } if (config.publicationPort >= 65535) { throw ServerException("configuration port must be < 65535") } @@ -195,7 +219,7 @@ open class Server(config: ServerConfiguration = ServerC } private fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { - val poller = if (config.enableIPv4) { + val poller = if (canUseIPv4) { val driver = UdpMediaDriverConnection(address = listenIPv4Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, @@ -263,7 +287,7 @@ open class Server(config: ServerConfiguration = ServerC } private fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller { - val poller = if (config.enableIPv6) { + val poller = if (canUseIPv6) { val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, publicationPort = config.publicationPort, subscriptionPort = config.subscriptionPort, @@ -331,68 +355,60 @@ open class Server(config: ServerConfiguration = ServerC } private fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller { - val poller = if (config.enableIPv6) { - val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, - publicationPort = config.publicationPort, - subscriptionPort = config.subscriptionPort, - streamId = UDP_HANDSHAKE_STREAM_ID, - sessionId = RESERVED_SESSION_ID_INVALID) + val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, + publicationPort = config.publicationPort, + subscriptionPort = config.subscriptionPort, + streamId = UDP_HANDSHAKE_STREAM_ID, + sessionId = RESERVED_SESSION_ID_INVALID) - driver.buildServer(aeron, logger) - val publication = driver.publication - val subscription = driver.subscription + driver.buildServer(aeron, logger) + val publication = driver.publication + val subscription = driver.subscription - object : AeronPoller { - /** - * Note: - * Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is - * desired, then limiting message sizes to MTU size is a good practice. - * - * There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. - * Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery - * properties from failure and streams with mechanical sympathy. - */ - val handler = FragmentAssembler { buffer: DirectBuffer, offset: Int, length: Int, header: Header -> - // this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe! + val poller = object : AeronPoller { + /** + * Note: + * Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is + * desired, then limiting message sizes to MTU size is a good practice. + * + * There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. + * Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery + * properties from failure and streams with mechanical sympathy. + */ + val handler = FragmentAssembler { buffer: DirectBuffer, offset: Int, length: Int, header: Header -> + // this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe! - // The sessionId is unique within a Subscription and unique across all Publication's from a sourceIdentity. - // for the handshake, the sessionId IS NOT GLOBALLY UNIQUE - val sessionId = header.sessionId() + // The sessionId is unique within a Subscription and unique across all Publication's from a sourceIdentity. + // for the handshake, the sessionId IS NOT GLOBALLY UNIQUE + val sessionId = header.sessionId() - // note: this address will ALWAYS be an IP:PORT combo OR it will be aeron:ipc (if IPC, it will be a different handler!) - val remoteIpAndPort = (header.context() as Image).sourceIdentity() + // note: this address will ALWAYS be an IP:PORT combo OR it will be aeron:ipc (if IPC, it will be a different handler!) + val remoteIpAndPort = (header.context() as Image).sourceIdentity() - // split - val splitPoint = remoteIpAndPort.lastIndexOf(':') - val clientAddressString = remoteIpAndPort.substring(0, splitPoint) - // val port = remoteIpAndPort.substring(splitPoint+1) + // split + val splitPoint = remoteIpAndPort.lastIndexOf(':') + val clientAddressString = remoteIpAndPort.substring(0, splitPoint) + // val port = remoteIpAndPort.substring(splitPoint+1) - // this should never be null, because we are feeding it a valid IP address from aeron - // maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is. - val clientAddress = IP.getByName(clientAddressString)!! + // this should never be null, because we are feeding it a valid IP address from aeron + // maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is. + val clientAddress = IP.getByName(clientAddressString)!! - val message = readHandshakeMessage(buffer, offset, length, header) - handshake.processUdpHandshakeMessageServer(this@Server, - publication, - sessionId, - clientAddressString, - clientAddress, - message, - aeron, - true) - } + val message = readHandshakeMessage(buffer, offset, length, header) + handshake.processUdpHandshakeMessageServer(this@Server, + publication, + sessionId, + clientAddressString, + clientAddress, + message, + aeron, + true) + } - override fun poll(): Int { return subscription.poll(handler, 1) } - override fun close() { driver.close() } - override fun serverInfo(): String { return driver.serverInfo() } - } - } else { - object : AeronPoller { - override fun poll(): Int { return 0 } - override fun close() {} - override fun serverInfo(): String { return "IPv6 Disabled" } - } + override fun poll(): Int { return subscription.poll(handler, 1) } + override fun close() { driver.close() } + override fun serverInfo(): String { return driver.serverInfo() } } logger.info(poller.serverInfo()) @@ -425,7 +441,7 @@ open class Server(config: ServerConfiguration = ServerC if (isWildcard) { // IPv6 will bind to IPv4 wildcard as well!! - if (config.enableIPv4 && config.enableIPv6) { + if (canUseIPv4 && canUseIPv6) { ipv4Poller = object : AeronPoller { override fun poll(): Int { return 0 } override fun close() {}