Updated networkutils, code polish

This commit is contained in:
nathan 2020-09-19 22:06:54 +02:00
parent 2429810fd7
commit 5017437a7a
2 changed files with 43 additions and 30 deletions

View File

@ -35,6 +35,7 @@ import dorkbox.network.rmi.RemoteObject
import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.rmi.RmiManagerConnections
import dorkbox.network.rmi.TimeoutException
import dorkbox.util.Sys
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
@ -78,10 +79,6 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
private val lockStepForReconnect = atomic<SuspendWaiter?>(null)
init {
config.validate()
}
override fun newException(message: String, cause: Throwable?): Throwable {
return ClientException(message, cause)
}
@ -390,27 +387,39 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// because we are getting the class registration details from the SERVER, this should never be the case.
// It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers)
val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!")
val exception = if (usingIPC) {
ClientRejectedException("Connection to IPC has incorrect class registration details!!")
} else {
ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!")
}
listenerManager.notifyError(exception)
throw exception
}
val newConnection = if (usingIPC) {
newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID))
val newConnection: CONNECTION
if (usingIPC) {
newConnection = newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID))
} else {
newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress))
newConnection = newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress))
remoteAddress!!
// VALIDATE are we allowed to connect to this server (now that we have the initial server information)
@Suppress("UNCHECKED_CAST")
val permitConnection = listenerManager.notifyFilter(newConnection)
if (!permitConnection) {
handshakeConnection.close()
val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress)} was not permitted!")
listenerManager.notifyError(exception)
throw exception
}
logger.info("Adding new signature for ${IP.toString(remoteAddress)} : ${Sys.bytesToHex(connectionInfo.publicKey)}")
settingsStore.addRegisteredServerKey(remoteAddress, connectionInfo.publicKey)
}
// VALIDATE are we allowed to connect to this server (now that we have the initial server information)
@Suppress("UNCHECKED_CAST")
val permitConnection = listenerManager.notifyFilter(newConnection)
if (!permitConnection) {
handshakeConnection.close()
val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} was not permitted!")
listenerManager.notifyError(exception)
throw exception
}
//////////////
/// Extra Close action
@ -428,6 +437,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// manually call it.
// this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
@Suppress("EXPERIMENTAL_API_USAGE")
actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) {
// NOTE: UNDISPATCHED means that this coroutine will start as an event loop, instead of concurrently
// we want this behavior INSTEAD OF automatically starting this on a new thread.

View File

@ -110,8 +110,6 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
internal val listenIPv6Address: InetAddress?
init {
config.validate()
// localhost/loopback IP might not always be 127.0.0.1 or ::1
// We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this)
listenIPv4Address = if (canUseIPv4) {
@ -119,7 +117,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
"loopback", "localhost", "lo" -> IPv4.LOCALHOST
"0", "::", "0.0.0.0", "*" -> {
// this is the "wildcard" address. Windows has problems with this.
InetAddress.getByAddress("", byteArrayOf(0, 0, 0, 0))
InetAddress.getByAddress(null, byteArrayOf(0, 0, 0, 0))
}
else -> Inet4Address.getAllByName(config.listenIpAddress)[0]
}
@ -133,7 +131,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
"loopback", "localhost", "lo" -> IPv6.LOCALHOST
"0", "::", "0.0.0.0", "*" -> {
// this is the "wildcard" address. Windows has problems with this.
InetAddress.getByAddress("", byteArrayOf(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
InetAddress.getByAddress(null, byteArrayOf(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
}
else -> Inet6Address.getAllByName(config.listenIpAddress)[0]
}
@ -232,7 +230,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// val port = remoteIpAndPort.substring(splitPoint+1)
// this should never be null, because we are feeding it a valid IP address from aeron
val clientAddress = IPv4.getByNameUnsafe(clientAddressString)
val clientAddress = IPv4.fromStringUnsafe(clientAddressString)
val message = readHandshakeMessage(buffer, offset, length, header)
@ -302,7 +300,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// val port = remoteIpAndPort.substring(splitPoint+1)
// this should never be null, because we are feeding it a valid IP address from aeron
val clientAddress = IPv6.getByName(clientAddressString)!!
val clientAddress = IPv6.fromString(clientAddressString)!!
val message = readHandshakeMessage(buffer, offset, length, header)
@ -372,7 +370,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// this should never be null, because we are feeding it a valid IP address from aeron
// maybe IPv4, maybe IPv6! This is slower than if we ALREADY know what it is.
val clientAddress = IP.getByName(clientAddressString)!!
val clientAddress = IP.fromString(clientAddressString)!!
val message = readHandshakeMessage(buffer, offset, length, header)
@ -483,6 +481,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// instantly notified and on cleanup, the server-listenermanager is called
// this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
@Suppress("EXPERIMENTAL_API_USAGE")
actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) {
// NOTE: UNDISPATCHED means that this coroutine will start as an event loop, instead of concurrently
// we want this behavior INSTEAD OF automatically starting this on a new thread.
@ -525,6 +524,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// NOTE: this must be the LAST thing happening!
// this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
@Suppress("EXPERIMENTAL_API_USAGE")
val job = actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) {
// NOTE: UNDISPATCHED means that this coroutine will start as an event loop, instead of concurrently
// we want this behavior INSTEAD OF automatically starting this on a new thread.
@ -588,16 +588,19 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
* Closes the server and all it's connections. After a close, you may call 'bind' again.
*/
override fun close0() {
bindAlreadyCalled = false
// when we call close, it will shutdown the polling mechanism then wait for us to tell it to cleanup connections.
//
// Aeron + the Media Driver will have already been shutdown at this point.
runBlocking {
// These are run in lock-step
shutdownPollWaiter.doNotify()
shutdownEventWaiter.doWait()
if (bindAlreadyCalled) {
bindAlreadyCalled = false
runBlocking {
// These are run in lock-step
shutdownPollWaiter.doNotify()
shutdownEventWaiter.doWait()
}
}
}