Added extra checks to see if a network family is available before using it
This commit is contained in:
parent
b953f9bf85
commit
696c8f65cf
|
@ -96,6 +96,12 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
*/
|
*/
|
||||||
private val connectionRules = CopyOnWriteArrayList<ConnectionRule>()
|
private val connectionRules = CopyOnWriteArrayList<ConnectionRule>()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* true if the following network stacks are available for use
|
||||||
|
*/
|
||||||
|
private val canUseIPv4 = config.enableIPv4 && IPv4.isAvailable
|
||||||
|
private val canUseIPv6 = config.enableIPv6 && IPv6.isAvailable
|
||||||
|
|
||||||
internal val listenIPv4Address: InetAddress?
|
internal val listenIPv4Address: InetAddress?
|
||||||
internal val listenIPv6Address: InetAddress?
|
internal val listenIPv6Address: InetAddress?
|
||||||
|
|
||||||
|
@ -105,11 +111,25 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
|
|
||||||
require(config.listenIpAddress.isNotBlank()) { "Blank listen IP address, cannot continue"}
|
require(config.listenIpAddress.isNotBlank()) { "Blank listen IP address, cannot continue"}
|
||||||
|
|
||||||
|
// can't disable everything!
|
||||||
|
if (!config.enableIPC && !config.enableIPv4 && !config.enableIPv6) {
|
||||||
|
require(false) { "At least one of IPC/IPv4/IPv6 must be enabled!" }
|
||||||
|
}
|
||||||
|
|
||||||
|
// have to verify if it's the only thing specified, is IPv4 available...
|
||||||
|
if (!config.enableIPC && !config.enableIPv6 && config.enableIPv4 && !IPv4.isAvailable) {
|
||||||
|
require(false) { "IPC/IPv6 are disabled and IPv4 is enabled, but there is no IPv4 interface available!" }
|
||||||
|
}
|
||||||
|
|
||||||
|
// have to verify if it's the only thing specified, is IPv4 available...
|
||||||
|
if (!config.enableIPC && !config.enableIPv4 && config.enableIPv6 && !IPv6.isAvailable) {
|
||||||
|
require(false) { "IPC/IPv4 are disabled and IPv6 is enabled, but there is no IPv6 interface available!" }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// localhost/loopback IP might not always be 127.0.0.1 or ::1
|
// localhost/loopback IP might not always be 127.0.0.1 or ::1
|
||||||
// We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this)
|
// We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this)
|
||||||
listenIPv4Address = if (!config.enableIPv4) {
|
listenIPv4Address = if (canUseIPv4) {
|
||||||
null
|
|
||||||
} else {
|
|
||||||
when (config.listenIpAddress) {
|
when (config.listenIpAddress) {
|
||||||
"loopback", "localhost", "lo" -> IPv4.LOCALHOST
|
"loopback", "localhost", "lo" -> IPv4.LOCALHOST
|
||||||
"0", "::", "0.0.0.0", "*" -> {
|
"0", "::", "0.0.0.0", "*" -> {
|
||||||
|
@ -119,10 +139,11 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
else -> Inet4Address.getAllByName(config.listenIpAddress)[0]
|
else -> Inet4Address.getAllByName(config.listenIpAddress)[0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
listenIPv6Address = if (!config.enableIPv6) {
|
|
||||||
null
|
null
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
listenIPv6Address = if (canUseIPv6) {
|
||||||
when (config.listenIpAddress) {
|
when (config.listenIpAddress) {
|
||||||
"loopback", "localhost", "lo" -> IPv6.LOCALHOST
|
"loopback", "localhost", "lo" -> IPv6.LOCALHOST
|
||||||
"0", "::", "0.0.0.0", "*" -> {
|
"0", "::", "0.0.0.0", "*" -> {
|
||||||
|
@ -132,6 +153,9 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
else -> Inet6Address.getAllByName(config.listenIpAddress)[0]
|
else -> Inet6Address.getAllByName(config.listenIpAddress)[0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
if (config.publicationPort <= 0) { throw ServerException("configuration port must be > 0") }
|
if (config.publicationPort <= 0) { throw ServerException("configuration port must be > 0") }
|
||||||
if (config.publicationPort >= 65535) { throw ServerException("configuration port must be < 65535") }
|
if (config.publicationPort >= 65535) { throw ServerException("configuration port must be < 65535") }
|
||||||
|
@ -195,7 +219,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
|
private fun getIpv4Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
|
||||||
val poller = if (config.enableIPv4) {
|
val poller = if (canUseIPv4) {
|
||||||
val driver = UdpMediaDriverConnection(address = listenIPv4Address!!,
|
val driver = UdpMediaDriverConnection(address = listenIPv4Address!!,
|
||||||
publicationPort = config.publicationPort,
|
publicationPort = config.publicationPort,
|
||||||
subscriptionPort = config.subscriptionPort,
|
subscriptionPort = config.subscriptionPort,
|
||||||
|
@ -263,7 +287,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
|
private fun getIpv6Poller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
|
||||||
val poller = if (config.enableIPv6) {
|
val poller = if (canUseIPv6) {
|
||||||
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!,
|
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!,
|
||||||
publicationPort = config.publicationPort,
|
publicationPort = config.publicationPort,
|
||||||
subscriptionPort = config.subscriptionPort,
|
subscriptionPort = config.subscriptionPort,
|
||||||
|
@ -331,68 +355,60 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
|
private fun getIpv6WildcardPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
|
||||||
val poller = if (config.enableIPv6) {
|
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!,
|
||||||
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!,
|
publicationPort = config.publicationPort,
|
||||||
publicationPort = config.publicationPort,
|
subscriptionPort = config.subscriptionPort,
|
||||||
subscriptionPort = config.subscriptionPort,
|
streamId = UDP_HANDSHAKE_STREAM_ID,
|
||||||
streamId = UDP_HANDSHAKE_STREAM_ID,
|
sessionId = RESERVED_SESSION_ID_INVALID)
|
||||||
sessionId = RESERVED_SESSION_ID_INVALID)
|
|
||||||
|
|
||||||
driver.buildServer(aeron, logger)
|
driver.buildServer(aeron, logger)
|
||||||
val publication = driver.publication
|
val publication = driver.publication
|
||||||
val subscription = driver.subscription
|
val subscription = driver.subscription
|
||||||
|
|
||||||
object : AeronPoller {
|
val poller = object : AeronPoller {
|
||||||
/**
|
/**
|
||||||
* Note:
|
* Note:
|
||||||
* Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is
|
* Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is
|
||||||
* desired, then limiting message sizes to MTU size is a good practice.
|
* desired, then limiting message sizes to MTU size is a good practice.
|
||||||
*
|
*
|
||||||
* There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB.
|
* There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB.
|
||||||
* Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery
|
* Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery
|
||||||
* properties from failure and streams with mechanical sympathy.
|
* properties from failure and streams with mechanical sympathy.
|
||||||
*/
|
*/
|
||||||
val handler = FragmentAssembler { buffer: DirectBuffer, offset: Int, length: Int, header: Header ->
|
val handler = FragmentAssembler { buffer: DirectBuffer, offset: Int, length: Int, header: Header ->
|
||||||
// this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe!
|
// this is processed on the thread that calls "poll". Subscriptions are NOT multi-thread safe!
|
||||||
|
|
||||||
// The sessionId is unique within a Subscription and unique across all Publication's from a sourceIdentity.
|
// The sessionId is unique within a Subscription and unique across all Publication's from a sourceIdentity.
|
||||||
// for the handshake, the sessionId IS NOT GLOBALLY UNIQUE
|
// for the handshake, the sessionId IS NOT GLOBALLY UNIQUE
|
||||||
val sessionId = header.sessionId()
|
val sessionId = header.sessionId()
|
||||||
|
|
||||||
// note: this address will ALWAYS be an IP:PORT combo OR it will be aeron:ipc (if IPC, it will be a different handler!)
|
// note: this address will ALWAYS be an IP:PORT combo OR it will be aeron:ipc (if IPC, it will be a different handler!)
|
||||||
val remoteIpAndPort = (header.context() as Image).sourceIdentity()
|
val remoteIpAndPort = (header.context() as Image).sourceIdentity()
|
||||||
|
|
||||||
// split
|
// split
|
||||||
val splitPoint = remoteIpAndPort.lastIndexOf(':')
|
val splitPoint = remoteIpAndPort.lastIndexOf(':')
|
||||||
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
|
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
|
||||||
// val port = remoteIpAndPort.substring(splitPoint+1)
|
// val port = remoteIpAndPort.substring(splitPoint+1)
|
||||||
|
|
||||||
// this should never be null, because we are feeding it a valid IP address from aeron
|
// this should never be null, because we are feeding it a valid IP address from aeron
|
||||||
// maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is.
|
// maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is.
|
||||||
val clientAddress = IP.getByName(clientAddressString)!!
|
val clientAddress = IP.getByName(clientAddressString)!!
|
||||||
|
|
||||||
|
|
||||||
val message = readHandshakeMessage(buffer, offset, length, header)
|
val message = readHandshakeMessage(buffer, offset, length, header)
|
||||||
handshake.processUdpHandshakeMessageServer(this@Server,
|
handshake.processUdpHandshakeMessageServer(this@Server,
|
||||||
publication,
|
publication,
|
||||||
sessionId,
|
sessionId,
|
||||||
clientAddressString,
|
clientAddressString,
|
||||||
clientAddress,
|
clientAddress,
|
||||||
message,
|
message,
|
||||||
aeron,
|
aeron,
|
||||||
true)
|
true)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun poll(): Int { return subscription.poll(handler, 1) }
|
override fun poll(): Int { return subscription.poll(handler, 1) }
|
||||||
override fun close() { driver.close() }
|
override fun close() { driver.close() }
|
||||||
override fun serverInfo(): String { return driver.serverInfo() }
|
override fun serverInfo(): String { return driver.serverInfo() }
|
||||||
}
|
|
||||||
} else {
|
|
||||||
object : AeronPoller {
|
|
||||||
override fun poll(): Int { return 0 }
|
|
||||||
override fun close() {}
|
|
||||||
override fun serverInfo(): String { return "IPv6 Disabled" }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(poller.serverInfo())
|
logger.info(poller.serverInfo())
|
||||||
|
@ -425,7 +441,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
||||||
|
|
||||||
if (isWildcard) {
|
if (isWildcard) {
|
||||||
// IPv6 will bind to IPv4 wildcard as well!!
|
// IPv6 will bind to IPv4 wildcard as well!!
|
||||||
if (config.enableIPv4 && config.enableIPv6) {
|
if (canUseIPv4 && canUseIPv6) {
|
||||||
ipv4Poller = object : AeronPoller {
|
ipv4Poller = object : AeronPoller {
|
||||||
override fun poll(): Int { return 0 }
|
override fun poll(): Int { return 0 }
|
||||||
override fun close() {}
|
override fun close() {}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user