2020-08-12 23:30:16 +02:00
/ *
2020-08-19 15:29:35 +02:00
* Copyright 2020 dorkbox , llc
2020-08-12 23:30:16 +02:00
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package dorkbox.network
2022-03-08 08:41:06 +01:00
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.netUtil.Inet4
import dorkbox.netUtil.Inet6
2021-04-27 13:57:47 +02:00
import dorkbox.network.aeron.AeronDriver
2020-09-09 01:33:09 +02:00
import dorkbox.network.aeron.AeronPoller
2020-08-12 23:30:16 +02:00
import dorkbox.network.connection.Connection
2021-07-09 15:16:12 +02:00
import dorkbox.network.connection.ConnectionParams
2020-08-12 23:30:16 +02:00
import dorkbox.network.connection.EndPoint
2020-09-09 01:33:09 +02:00
import dorkbox.network.connectionType.ConnectionRule
2022-07-14 04:50:53 +02:00
import dorkbox.network.exceptions.AllocationException
2020-09-09 01:33:09 +02:00
import dorkbox.network.exceptions.ServerException
2020-08-12 23:30:16 +02:00
import dorkbox.network.handshake.ServerHandshake
2022-05-30 02:45:50 +02:00
import dorkbox.network.handshake.ServerHandshakePollers
2021-07-06 15:38:53 +02:00
import dorkbox.network.rmi.RmiSupportServer
2022-07-14 04:50:53 +02:00
import kotlinx.atomicfu.atomic
2020-08-27 13:57:30 +02:00
import kotlinx.coroutines.Job
2020-08-12 23:30:16 +02:00
import kotlinx.coroutines.launch
2020-08-27 13:57:30 +02:00
import kotlinx.coroutines.runBlocking
2020-09-09 01:33:09 +02:00
import java.net.InetAddress
2022-03-08 08:41:06 +01:00
import java.util.concurrent.*
2020-08-12 23:30:16 +02:00
/ * *
* The server can only be accessed in an ASYNC manner . This means that the server can only be used in RESPONSE to events . If you access the
* server OUTSIDE of events , you will get inaccurate information from the server ( such as getConnections ( ) )
*
2022-06-11 23:53:39 +02:00
* To put it bluntly , ONLY have the server do work inside a listener !
2021-07-09 15:16:12 +02:00
*
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
2022-05-28 12:15:45 +02:00
* @param loggerName allows for a custom logger name for this endpoint ( for when there are multiple endpoints )
2020-08-12 23:30:16 +02:00
* /
2021-07-09 15:16:12 +02:00
open class Server < CONNECTION : Connection > (
2022-05-28 12:15:45 +02:00
config : ServerConfiguration = ServerConfiguration ( ) ,
connectionFunc : ( connectionParameters : ConnectionParams < CONNECTION > ) -> CONNECTION ,
loggerName : String = Server :: class . java . simpleName )
: EndPoint < CONNECTION > ( config , connectionFunc , loggerName ) {
2022-06-11 23:53:39 +02:00
/ * *
* The server can only be accessed in an ASYNC manner . This means that the server can only be used in RESPONSE to events . If you access the
* server OUTSIDE of events , you will get inaccurate information from the server ( such as getConnections ( ) )
*
* To put it bluntly , ONLY have the server do work inside a listener !
*
* @param config these are the specific connection options
* @param loggerName allows for a custom logger name for this endpoint ( for when there are multiple endpoints )
* @param connectionFunc allows for custom connection implementations defined as a unit function
* /
constructor ( config : ServerConfiguration ,
loggerName : String ,
connectionFunc : ( connectionParameters : ConnectionParams < CONNECTION > ) -> CONNECTION )
: this ( config , connectionFunc , loggerName )
2022-05-28 12:15:45 +02:00
/ * *
* The server can only be accessed in an ASYNC manner . This means that the server can only be used in RESPONSE to events . If you access the
* server OUTSIDE of events , you will get inaccurate information from the server ( such as getConnections ( ) )
*
* To put it bluntly , ONLY have the server do work inside of a listener !
*
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
* /
constructor ( config : ServerConfiguration ,
connectionFunc : ( connectionParameters : ConnectionParams < CONNECTION > ) -> CONNECTION )
: this ( config , connectionFunc , Server :: class . java . simpleName )
/ * *
* The server can only be accessed in an ASYNC manner . This means that the server can only be used in RESPONSE to events . If you access the
* server OUTSIDE of events , you will get inaccurate information from the server ( such as getConnections ( ) )
*
* To put it bluntly , ONLY have the server do work inside of a listener !
*
* @param config these are the specific connection options
* @param loggerName allows for a custom logger name for this endpoint ( for when there are multiple endpoints )
* /
constructor ( config : ServerConfiguration ,
loggerName : String = Server :: class . java . simpleName )
: this ( config ,
{
@Suppress ( " UNCHECKED_CAST " )
Connection ( it ) as CONNECTION
} ,
loggerName )
/ * *
* The server can only be accessed in an ASYNC manner . This means that the server can only be used in RESPONSE to events . If you access the
* server OUTSIDE of events , you will get inaccurate information from the server ( such as getConnections ( ) )
*
* To put it bluntly , ONLY have the server do work inside of a listener !
*
* @param config these are the specific connection options
* /
constructor ( config : ServerConfiguration )
: this ( config ,
{
@Suppress ( " UNCHECKED_CAST " )
Connection ( it ) as CONNECTION
} ,
Server :: class . java . simpleName )
2020-08-12 23:30:16 +02:00
companion object {
/ * *
* Gets the version number .
* /
2022-08-03 01:52:50 +02:00
const val version = " 5.32 "
2020-08-12 23:30:16 +02:00
/ * *
* Checks to see if a server ( using the specified configuration ) is running .
*
2021-04-30 21:18:57 +02:00
* This method should only be used to check if a server is running for a DIFFERENT configuration than the currently running server
2020-08-12 23:30:16 +02:00
* /
fun isRunning ( configuration : ServerConfiguration ) : Boolean {
2021-04-30 21:18:57 +02:00
return AeronDriver ( configuration ) . isRunning ( )
2020-08-12 23:30:16 +02:00
}
2021-04-09 20:24:45 +02:00
init {
// Add this project to the updates system, which verifies this class + UUID + version information
dorkbox . updates . Updates . add ( Server :: class . java , " 90a2c3b1e4fa41ea90d31fbdf8b2c6ef " , version )
}
2020-08-12 23:30:16 +02:00
}
2021-07-06 15:38:53 +02:00
/ * *
* Methods supporting Remote Method Invocation and Objects for GLOBAL scope objects ( different than CONNECTION scope objects )
* /
val rmiGlobal = RmiSupportServer ( logger , rmiGlobalSupport )
2020-08-12 23:30:16 +02:00
/ * *
* @return true if this server has successfully bound to an IP address and is running
* /
2022-07-14 04:50:53 +02:00
private var bindAlreadyCalled = atomic ( false )
2020-08-12 23:30:16 +02:00
2020-09-09 01:33:09 +02:00
/ * *
* These are run in lock - step to shutdown / close the server . Afterwards , bind ( ) can be called again
* /
2022-07-27 00:20:34 +02:00
@Volatile
private var shutdownPollLatch = CountDownLatch ( 1 )
@Volatile
private var shutdownEventLatch = CountDownLatch ( 1 )
2020-09-09 01:33:09 +02:00
2020-08-12 23:30:16 +02:00
/ * *
* Maintains a thread - safe collection of rules used to define the connection type with this server .
* /
2020-08-12 23:35:31 +02:00
private val connectionRules = CopyOnWriteArrayList < ConnectionRule > ( )
2020-08-12 23:30:16 +02:00
2020-09-10 00:24:41 +02:00
/ * *
* true if the following network stacks are available for use
* /
2022-05-30 02:45:50 +02:00
internal val canUseIPv4 = config . enableIPv4 && IPv4 . isAvailable
internal val canUseIPv6 = config . enableIPv6 && IPv6 . isAvailable
2020-09-10 00:24:41 +02:00
2020-09-09 01:33:09 +02:00
2021-04-29 01:48:04 +02:00
// localhost/loopback IP might not always be 127.0.0.1 or ::1
// We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this)
internal val listenIPv4Address : InetAddress ? =
if ( canUseIPv4 ) {
2020-09-09 01:33:09 +02:00
when ( config . listenIpAddress ) {
2022-05-28 12:16:05 +02:00
" loopback " , " localhost " , " lo " , " 127.0.0.1 " , " ::1 " -> IPv4 . LOCALHOST
2020-09-09 01:33:09 +02:00
" 0 " , " :: " , " 0.0.0.0 " , " * " -> {
// this is the "wildcard" address. Windows has problems with this.
2021-04-29 01:48:04 +02:00
IPv4 . WILDCARD
2020-09-09 01:33:09 +02:00
}
2021-04-29 01:48:04 +02:00
else -> Inet4 . toAddress ( config . listenIpAddress ) // Inet4Address.getAllByName(config.listenIpAddress)[0]
2020-08-12 23:30:16 +02:00
}
}
2020-09-10 00:24:41 +02:00
else {
2020-09-09 01:33:09 +02:00
null
2020-09-10 00:24:41 +02:00
}
2021-04-29 01:48:04 +02:00
internal val listenIPv6Address : InetAddress ? =
if ( canUseIPv6 ) {
2020-09-09 01:33:09 +02:00
when ( config . listenIpAddress ) {
2022-05-28 12:16:05 +02:00
" loopback " , " localhost " , " lo " , " 127.0.0.1 " , " ::1 " -> IPv6 . LOCALHOST
2020-09-09 01:33:09 +02:00
" 0 " , " :: " , " 0.0.0.0 " , " * " -> {
// this is the "wildcard" address. Windows has problems with this.
2021-04-29 01:48:04 +02:00
IPv6 . WILDCARD
2020-09-09 01:33:09 +02:00
}
2021-04-29 01:48:04 +02:00
else -> Inet6 . toAddress ( config . listenIpAddress )
2020-08-15 13:21:20 +02:00
}
}
2020-09-10 00:24:41 +02:00
else {
null
}
2020-08-15 13:21:20 +02:00
2021-04-29 01:48:04 +02:00
init {
2020-09-02 15:03:57 +02:00
// we are done with initial configuration, now finish serialization
2021-04-29 01:48:04 +02:00
serialization . finishInit ( type )
2020-08-12 23:30:16 +02:00
}
2020-08-15 13:21:20 +02:00
2020-09-25 14:49:17 +02:00
final override fun newException ( message : String , cause : Throwable ? ) : Throwable {
2020-08-12 23:30:16 +02:00
return ServerException ( message , cause )
}
/ * *
* Binds the server to AERON configuration
* /
2020-08-25 17:45:08 +02:00
@Suppress ( " DuplicatedCode " )
2022-07-27 00:20:34 +02:00
fun bind ( ) {
2022-07-29 04:52:11 +02:00
// NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines!
2022-07-14 04:50:53 +02:00
if ( bindAlreadyCalled . getAndSet ( true ) ) {
2022-05-30 02:45:50 +02:00
logger . error { " Unable to bind when the server is already running! " }
2022-07-27 00:20:34 +02:00
return
2020-08-12 23:30:16 +02:00
}
2021-07-06 15:38:53 +02:00
try {
2022-08-02 13:10:06 +02:00
startDriver ( )
2021-07-06 15:38:53 +02:00
} catch ( e : Exception ) {
2022-08-02 13:10:06 +02:00
logger . error ( e ) { " Unable to start the network driver " }
2022-07-27 00:20:34 +02:00
return
2021-07-06 15:38:53 +02:00
}
2020-08-12 23:30:16 +02:00
2022-07-27 00:20:34 +02:00
shutdownPollLatch = CountDownLatch ( 1 )
shutdownEventLatch = CountDownLatch ( 1 )
2020-08-12 23:30:16 +02:00
config as ServerConfiguration
2022-07-29 04:52:11 +02:00
val handshake = ServerHandshake ( logger , config , listenerManager , aeronDriver )
2020-09-15 12:01:05 +02:00
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
2020-08-12 23:30:16 +02:00
2021-04-30 16:01:25 +02:00
// this forces the current thread to WAIT until poll system has started
2022-07-27 00:20:34 +02:00
val pollStartupLatch = CountDownLatch ( 1 )
2021-04-30 16:01:25 +02:00
2022-07-15 14:15:40 +02:00
val server = this @Server
2022-07-29 04:52:11 +02:00
val ipcPoller : AeronPoller = ServerHandshakePollers . ipc ( aeronDriver , config , server , handshake )
2021-04-30 16:01:25 +02:00
// if we are binding to WILDCARD, then we have to do something special if BOTH IPv4 and IPv6 are enabled!
2022-05-30 02:45:50 +02:00
val isWildcard = listenIPv4Address == IPv4 . WILDCARD || listenIPv6Address == IPv6 . WILDCARD
2021-04-30 16:01:25 +02:00
val ipv4Poller : AeronPoller
val ipv6Poller : AeronPoller
if ( isWildcard ) {
if ( canUseIPv4 && canUseIPv6 ) {
2022-05-30 02:45:50 +02:00
// IPv6 will bind to IPv4 wildcard as well, so don't bind both!
ipv4Poller = ServerHandshakePollers . disabled ( " IPv4 Disabled " )
2022-07-29 04:52:11 +02:00
ipv6Poller = ServerHandshakePollers . ip6Wildcard ( aeronDriver , config , server , handshake )
2020-09-09 01:33:09 +02:00
} else {
2021-04-30 16:01:25 +02:00
// only 1 will be a real poller
2022-07-29 04:52:11 +02:00
ipv4Poller = ServerHandshakePollers . ip4 ( aeronDriver , config , server , handshake )
ipv6Poller = ServerHandshakePollers . ip6 ( aeronDriver , config , server , handshake )
2020-09-09 01:33:09 +02:00
}
2021-04-30 16:01:25 +02:00
} else {
2022-07-29 04:52:11 +02:00
ipv4Poller = ServerHandshakePollers . ip4 ( aeronDriver , config , server , handshake )
ipv6Poller = ServerHandshakePollers . ip6 ( aeronDriver , config , server , handshake )
2021-04-30 16:01:25 +02:00
}
2020-09-15 12:01:05 +02:00
2022-05-30 02:45:50 +02:00
2022-07-27 00:20:34 +02:00
val networkEventProcessor = Runnable {
pollStartupLatch . countDown ( )
2022-07-14 04:50:53 +02:00
2022-07-29 04:52:11 +02:00
val pollIdleStrategy = config . pollIdleStrategy . cloneToNormal ( )
2020-08-12 23:30:16 +02:00
try {
var pollCount : Int
2020-08-15 13:21:20 +02:00
2020-08-12 23:30:16 +02:00
while ( !is Shutdown ( ) ) {
pollCount = 0
2020-08-26 16:27:59 +02:00
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
2020-08-25 02:43:20 +02:00
// this checks to see if there are NEW clients on the handshake ports
2020-09-09 01:33:09 +02:00
pollCount += ipv4Poller . poll ( )
pollCount += ipv6Poller . poll ( )
2020-08-12 23:30:16 +02:00
// this checks to see if there are NEW clients via IPC
2020-09-09 01:33:09 +02:00
pollCount += ipcPoller . poll ( )
2020-08-12 23:30:16 +02:00
2020-09-11 11:06:53 +02:00
// this manages existing clients (for cleanup + connection polling). This has a concurrent iterator,
// so we can modify this as we go
connections . forEach { connection ->
2022-07-03 13:13:38 +02:00
if ( ! connection . isClosedViaAeron ( ) ) {
// Otherwise, poll the connection for messages
2022-08-02 12:04:09 +02:00
pollCount += connection . poll ( )
2022-07-03 13:13:38 +02:00
} else {
2020-09-11 11:06:53 +02:00
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
2022-07-18 05:08:01 +02:00
logger . debug { " [ ${connection.id} / ${connection.streamId} ] connection expired " }
2020-09-11 11:06:53 +02:00
2020-09-15 12:01:05 +02:00
removeConnection ( connection )
// this will call removeConnection again, but that is ok
2022-07-29 04:52:11 +02:00
// this is blocking, because the connection MUST be removed in the same thread that is processing events
connection . close ( )
2020-09-11 11:06:53 +02:00
// have to manually notify the server-listenerManager that this connection was closed
// if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is
// instantly notified and on cleanup, the server-listenermanager is called
2020-08-25 02:43:20 +02:00
2021-04-30 18:15:54 +02:00
// this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
2022-07-14 04:50:53 +02:00
actionDispatch . launch {
2020-09-11 11:06:53 +02:00
listenerManager . notifyDisconnect ( connection )
}
2020-08-15 13:21:20 +02:00
}
2020-09-11 11:06:53 +02:00
}
2020-08-12 23:30:16 +02:00
// 0 means we idle. >0 means reset and don't idle (because there are likely more poll events)
pollIdleStrategy . idle ( pollCount )
}
2020-09-09 01:33:09 +02:00
2022-08-02 21:13:04 +02:00
logger . debug { " Network event dispatch closing... " }
2020-09-09 01:33:09 +02:00
// we want to process **actual** close cleanup events on this thread as well, otherwise we will have threading problems
2022-07-27 00:20:34 +02:00
shutdownPollLatch . await ( )
2020-09-09 01:33:09 +02:00
// we have to manually cleanup the connections and call server-notifyDisconnect because otherwise this will never get called
val jobs = mutableListOf < Job > ( )
// we want to clear all the connections FIRST (since we are shutting down)
val cons = mutableListOf < CONNECTION > ( )
connections . forEach { cons . add ( it ) }
connections . clear ( )
cons . forEach { connection ->
2022-07-27 00:20:34 +02:00
logger . info { " [ ${connection.id} / ${connection.streamId} ] Connection cleanup and close " }
2020-09-09 01:33:09 +02:00
// make sure the connection is closed (close can only happen once, so a duplicate call does nothing!)
connection . close ( )
// have to manually notify the server-listenerManager that this connection was closed
// if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is
// instantly notified and on cleanup, the server-listenermanager is called
// NOTE: this must be the LAST thing happening!
2021-04-30 18:15:54 +02:00
// this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
2022-07-14 04:50:53 +02:00
val job = actionDispatch . launch {
2020-09-09 01:33:09 +02:00
listenerManager . notifyDisconnect ( connection )
}
jobs . add ( job )
}
// when we close a client or a server, we want to make sure that ALL notifications are finished.
// when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown
2022-07-27 00:20:34 +02:00
runBlocking {
jobs . forEach { it . join ( ) }
}
2020-09-15 12:01:05 +02:00
} catch ( e : Exception ) {
2022-05-30 02:45:50 +02:00
logger . error ( e ) { " Unexpected error during server message polling! " }
2020-08-12 23:30:16 +02:00
} finally {
2020-09-09 01:33:09 +02:00
ipv4Poller . close ( )
ipv6Poller . close ( )
ipcPoller . close ( )
2020-08-12 23:30:16 +02:00
2022-07-15 14:15:40 +02:00
// clear all the handshake info
handshake . clear ( )
2022-07-14 04:50:53 +02:00
try {
// make sure that we have de-allocated all connection data
handshake . checkForMemoryLeaks ( )
} catch ( e : AllocationException ) {
logger . error ( e ) { " Error during server cleanup " }
}
2020-09-09 01:33:09 +02:00
// finish closing -- this lets us make sure that we don't run into race conditions on the thread that calls close()
2022-06-15 23:53:26 +02:00
try {
2022-07-27 00:20:34 +02:00
shutdownEventLatch . countDown ( )
2022-06-15 23:53:26 +02:00
} catch ( ignored : Exception ) { }
2020-08-12 23:30:16 +02:00
}
}
2022-07-27 00:20:34 +02:00
config . networkInterfaceEventDispatcher . submit ( networkEventProcessor )
2020-09-15 12:01:05 +02:00
2022-07-27 00:20:34 +02:00
// wait for the polling thread to startup before letting bind() return
pollStartupLatch . await ( )
2020-08-12 23:30:16 +02:00
}
/ * *
* Adds an IP + subnet rule that defines what type of connection this IP + subnet should have .
* - NOTHING : Nothing happens to the in / out bytes
* - COMPRESS : The in / out bytes are compressed with LZ4 - fast
* - COMPRESS _AND _ENCRYPT : The in / out bytes are compressed ( LZ4 - fast ) THEN encrypted ( AES - 256 - GCM )
*
* If no rules are defined , then for LOOPBACK , it will always be `COMPRESS` and for everything else it will always be ` COMPRESS _AND _ENCRYPT ` .
*
* If rules are defined , then everything by default is ` COMPRESS _AND _ENCRYPT ` .
*
* The compression algorithm is LZ4 - fast , so there is a small performance impact for a very large gain
* Compress : 6.210 micros / op ; 629.0 MB / s ( output : 55.4 % )
* Uncompress : 0.641 micros / op ; 6097.9 MB / s
* /
fun addConnectionRules ( vararg rules : ConnectionRule ) {
connectionRules . addAll ( listOf ( * rules ) )
}
2020-09-22 19:39:24 +02:00
/ * *
2020-09-22 21:06:31 +02:00
* Runs an action for each connection
2020-09-22 19:39:24 +02:00
* /
fun forEachConnection ( function : ( connection : CONNECTION ) -> Unit ) {
connections . forEach {
function ( it )
}
}
2020-08-27 13:57:30 +02:00
/ * *
* Closes the server and all it ' s connections . After a close , you may call ' bind ' again .
* /
2020-09-25 14:49:17 +02:00
final override fun close0 ( ) {
2022-07-14 04:50:53 +02:00
// when we call close, it will shutdown the polling mechanism, then wait for us to tell it to cleanup connections.
2020-09-09 01:33:09 +02:00
//
// Aeron + the Media Driver will have already been shutdown at this point.
2022-07-14 04:50:53 +02:00
if ( bindAlreadyCalled . getAndSet ( false ) ) {
2022-07-27 00:20:34 +02:00
// These are run in lock-step
shutdownPollLatch . countDown ( )
shutdownEventLatch . await ( )
2020-08-27 13:57:30 +02:00
}
2020-08-12 23:30:16 +02:00
}
2020-09-03 01:31:08 +02:00
// /**
// * Only called by the server!
// *
// * If we are loopback or the client is a specific IP/CIDR address, then we do things differently. The LOOPBACK address will never encrypt or compress the traffic.
// */
// // after the handshake, what sort of connection do we want (NONE, COMPRESS, ENCRYPT+COMPRESS)
// fun getConnectionUpgradeType(remoteAddress: InetSocketAddress): Byte {
// val address = remoteAddress.address
// val size = connectionRules.size
//
// // if it's unknown, then by default we encrypt the traffic
// var connectionType = ConnectionProperties.COMPRESS_AND_ENCRYPT
// if (size == 0 && address == IPv4.LOCALHOST) {
// // if nothing is specified, then by default localhost is compression and everything else is encrypted
// connectionType = ConnectionProperties.COMPRESS
// }
// for (i in 0 until size) {
// val rule = connectionRules[i] ?: continue
// if (rule.matches(remoteAddress)) {
// connectionType = rule.ruleType()
// break
// }
// }
// logger.debug("Validating {} Permitted type is: {}", remoteAddress, connectionType)
// return connectionType.type
// }
2020-08-12 23:30:16 +02:00
}