Fixed issues with forcing unique media driver locations. Fixed issues where connections weren't getting removed. Added reversal of auto-IPC (in case the server doesn't have IPC)

This commit is contained in:
nathan 2020-09-11 01:14:22 +02:00
parent 09c895e981
commit 9ef9a066a0
9 changed files with 455 additions and 313 deletions

View File

@ -15,8 +15,10 @@
*/ */
package dorkbox.network package dorkbox.network
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6 import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverConnection
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
@ -85,6 +87,11 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
if (config.networkMtuSize <= 0) { throw ClientException("configuration networkMtuSize must be > 0") } if (config.networkMtuSize <= 0) { throw ClientException("configuration networkMtuSize must be > 0") }
if (config.networkMtuSize >= 9 * 1024) { throw ClientException("configuration networkMtuSize must be < ${9 * 1024}") } if (config.networkMtuSize >= 9 * 1024) { throw ClientException("configuration networkMtuSize must be < ${9 * 1024}") }
if (config.enableIpc && config.aeronDirectoryForceUnique) {
require(false) { "IPC and forcing a unique Aeron directory are incompatible!" }
}
} }
override fun newException(message: String, cause: Throwable?): Throwable { override fun newException(message: String, cause: Throwable?): Throwable {
@ -168,8 +175,8 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) { connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) {
// Default IPC ports are flipped because they are in the perspective of the SERVER // Default IPC ports are flipped because they are in the perspective of the SERVER
connect(remoteAddress = remoteAddress, connect(remoteAddress = remoteAddress,
ipcPublicationId = IPC_HANDSHAKE_STREAM_ID_SUB, ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId = IPC_HANDSHAKE_STREAM_ID_PUB, ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS = connectionTimeoutMS, connectionTimeoutMS = connectionTimeoutMS,
reliable = reliable) reliable = reliable)
} }
@ -186,8 +193,8 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
* @throws ClientRejectedException if the client connection is rejected * @throws ClientRejectedException if the client connection is rejected
*/ */
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
suspend fun connect(ipcPublicationId: Int = IPC_HANDSHAKE_STREAM_ID_SUB, suspend fun connect(ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = IPC_HANDSHAKE_STREAM_ID_PUB, ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS: Long = 30_000L) { connectionTimeoutMS: Long = 30_000L) {
// Default IPC ports are flipped because they are in the perspective of the SERVER // Default IPC ports are flipped because they are in the perspective of the SERVER
@ -227,8 +234,8 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
private suspend fun connect(remoteAddress: InetAddress? = null, private suspend fun connect(remoteAddress: InetAddress? = null,
// Default IPC ports are flipped because they are in the perspective of the SERVER // Default IPC ports are flipped because they are in the perspective of the SERVER
ipcPublicationId: Int = IPC_HANDSHAKE_STREAM_ID_SUB, ipcPublicationId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = IPC_HANDSHAKE_STREAM_ID_PUB, ipcSubscriptionId: Int = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) { connectionTimeoutMS: Long = 30_000L, reliable: Boolean = true) {
// this will exist ONLY if we are reconnecting via a "disconnect" callback // this will exist ONLY if we are reconnecting via a "disconnect" callback
lockStepForReconnect.value?.doWait() lockStepForReconnect.value?.doWait()
@ -239,65 +246,96 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
} }
lockStepForReconnect.lazySet(null) lockStepForReconnect.lazySet(null)
// localhost/loopback IP might not always be 127.0.0.1 or ::1
this.remoteAddress0 = remoteAddress
connection0 = null connection0 = null
// we are done with initial configuration, now initialize aeron and the general state of this endpoint // we are done with initial configuration, now initialize aeron and the general state of this endpoint
val aeron = initEndpointState() val aeron = initEndpointState()
// only change LOCALHOST -> IPC if the media driver is ALREADY running LOCALLY! if (config.enableIpc && config.aeronDirectoryForceUnique) {
val canAutoChangeToIpc = config.enableIpcForLoopback && isRunning() require(false) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" }
if (canAutoChangeToIpc) {
logger.info("Media driver is already running. Support for auto-switch LOCALHOST -> IPC is enabled")
} }
// only try to connect via IPv4 if we have a network interface that supports it! // only try to connect via IPv4 if we have a network interface that supports it!
if (remoteAddress is Inet4Address && !IPv4.isAvailable) { if (remoteAddress is Inet4Address && !IPv4.isAvailable) {
require(false) { "Unable to connect to the IPv4 address $remoteAddress, there are no IPv4 interfaces available!"} require(false) { "Unable to connect to the IPv4 address ${IPv4.toString(remoteAddress)}, there are no IPv4 interfaces available!" }
} }
// only try to connect via IPv6 if we have a network interface that supports it! // only try to connect via IPv6 if we have a network interface that supports it!
if (remoteAddress is Inet6Address && !IPv6.isAvailable) { if (remoteAddress is Inet6Address && !IPv6.isAvailable) {
require(false) { "Unable to connect to the IPv6 address $remoteAddress, there are no IPv6 interfaces available!"} require(false) { "Unable to connect to the IPv6 address ${IPv6.toString(remoteAddress)}, there are no IPv6 interfaces available!" }
}
if (remoteAddress != null && remoteAddress.isAnyLocalAddress) {
require(false) { "Cannot connect to ${IP.toString(remoteAddress)} It is an invalid address!" }
}
// only change LOCALHOST -> IPC if the media driver is ALREADY running LOCALLY!
var usingIPC = config.enableIpc && remoteAddress == null
val autoChangeToIpc = !usingIPC && config.enableIpcForLoopback &&
remoteAddress != null && remoteAddress.isLoopbackAddress && isRunning(mediaDriverContext)
if (autoChangeToIpc) {
logger.info {"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC" }
} }
// NETWORK OR IPC ADDRESS val handshake = ClientHandshake(config, crypto, this)
// if we connect to "loopback", then MAYBE we substitute if for IPC (with log message) val handshakeConnection = if (autoChangeToIpc || usingIPC) {
// MAYBE the server doesn't have IPC enabled? If no, we need to connect via UDP instead
val test = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId,
streamId = ipcPublicationId,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
// "fast" connection timeout, since this is IPC
connectionTimeoutMS = 1000)
// localhost/loopback IP might not always be 127.0.0.1 or ::1 var success = false
if (remoteAddress == null) {
this.remoteAddress0 = null
} else if (remoteAddress.isAnyLocalAddress) {
throw IllegalArgumentException("Cannot connect to $remoteAddress It is an invalid address!")
} else if (canAutoChangeToIpc && remoteAddress.isLoopbackAddress) {
logger.info { "Auto-changing network connection from $remoteAddress -> IPC" }
this.remoteAddress0 = null
} else {
this.remoteAddress0 = remoteAddress
}
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
try {
test.buildClient(aeron, logger)
success = true
} catch (e: Exception) {
// if we specified that we want to use IPC, then we have to throw the timeout exception, because there is no IPC
if (usingIPC) {
throw e
}
}
val handshake = ClientHandshake(logger, config, crypto, this) if (success) {
usingIPC = true
test
} else {
logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" }
// try a UDP connection instead
val handshakeConnection = if (this.remoteAddress0 == null) { val test = UdpMediaDriverConnection(address = this.remoteAddress0!!,
IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId, publicationPort = config.subscriptionPort,
streamId = ipcPublicationId, subscriptionPort = config.publicationPort,
sessionId = RESERVED_SESSION_ID_INVALID) streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = connectionTimeoutMS,
isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
test.buildClient(aeron, logger)
test
}
} }
else { else {
UdpMediaDriverConnection(address = this.remoteAddress0!!, val test = UdpMediaDriverConnection(address = this.remoteAddress0!!,
publicationPort = config.subscriptionPort, publicationPort = config.subscriptionPort,
subscriptionPort = config.publicationPort, subscriptionPort = config.publicationPort,
streamId = UDP_HANDSHAKE_STREAM_ID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = RESERVED_SESSION_ID_INVALID, sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID,
connectionTimeoutMS = connectionTimeoutMS, connectionTimeoutMS = connectionTimeoutMS,
isReliable = reliable) isReliable = reliable)
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
test.buildClient(aeron, logger)
test
} }
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
handshakeConnection.buildClient(aeron, logger)
logger.info(handshakeConnection.clientInfo()) logger.info(handshakeConnection.clientInfo())
@ -308,7 +346,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// VALIDATE:: check to see if the remote connection's public key has changed! // VALIDATE:: check to see if the remote connection's public key has changed!
val validateRemoteAddress = if (this.remoteAddress0 == null) { val validateRemoteAddress = if (usingIPC) {
PublicKeyValidationState.VALID PublicKeyValidationState.VALID
} else { } else {
crypto.validateRemoteAddress(this.remoteAddress0!!, connectionInfo.publicKey) crypto.validateRemoteAddress(this.remoteAddress0!!, connectionInfo.publicKey)
@ -316,7 +354,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
if (validateRemoteAddress == PublicKeyValidationState.INVALID) { if (validateRemoteAddress == PublicKeyValidationState.INVALID) {
handshakeConnection.close() handshakeConnection.close()
val exception = ClientRejectedException("Connection to $remoteAddress not allowed! Public key mismatch.") val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} not allowed! Public key mismatch.")
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
@ -328,7 +366,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// we are now connected, so we can connect to the NEW client-specific ports // we are now connected, so we can connect to the NEW client-specific ports
val reliableClientConnection = if (this.remoteAddress0 == null) { val reliableClientConnection = if (usingIPC) {
IpcMediaDriverConnection(sessionId = connectionInfo.sessionId, IpcMediaDriverConnection(sessionId = connectionInfo.sessionId,
// NOTE: pub/sub must be switched! // NOTE: pub/sub must be switched!
streamIdSubscription = connectionInfo.publicationPort, streamIdSubscription = connectionInfo.publicationPort,
@ -366,13 +404,13 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// because we are getting the class registration details from the SERVER, this should never be the case. // because we are getting the class registration details from the SERVER, this should never be the case.
// It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers) // It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers)
val exception = ClientRejectedException("Connection to $remoteAddress has incorrect class registration details!!") val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!")
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
val newConnection = if (this.remoteAddress0 == null) { val newConnection = if (usingIPC) {
newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID)) newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID))
} else { } else {
newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress)) newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress))
@ -383,7 +421,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
val permitConnection = listenerManager.notifyFilter(newConnection) val permitConnection = listenerManager.notifyFilter(newConnection)
if (!permitConnection) { if (!permitConnection) {
handshakeConnection.close() handshakeConnection.close()
val exception = ClientRejectedException("Connection to $remoteAddress was not permitted!") val exception = ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} was not permitted!")
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
@ -414,11 +452,10 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
} }
connection0 = newConnection connection0 = newConnection
connections.add(newConnection) addConnection(newConnection)
// tell the server our connection handshake is done, and the connection can now listen for data. // tell the server our connection handshake is done, and the connection can now listen for data.
val canFinishConnecting = handshake.handshakeDone(handshakeConnection, connectionTimeoutMS) val canFinishConnecting = handshake.handshakeDone(handshakeConnection, connectionTimeoutMS)
if (canFinishConnecting) { if (canFinishConnecting) {
isConnected = true isConnected = true

View File

@ -15,10 +15,10 @@
*/ */
package dorkbox.network package dorkbox.network
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.CoroutineBackoffIdleStrategy import dorkbox.network.aeron.CoroutineBackoffIdleStrategy
import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy
import dorkbox.network.connection.EndPoint
import dorkbox.network.serialization.Serialization import dorkbox.network.serialization.Serialization
import dorkbox.network.storage.PropertyStore import dorkbox.network.storage.PropertyStore
import dorkbox.network.storage.SettingsStore import dorkbox.network.storage.SettingsStore
@ -31,21 +31,6 @@ import mu.KLogger
import java.io.File import java.io.File
class ServerConfiguration : dorkbox.network.Configuration() { class ServerConfiguration : dorkbox.network.Configuration() {
/**
* Enables the ability to use the IPv4 network stack.
*/
var enableIPv4 = true
/**
* Enables the ability to use the IPv6 network stack.
*/
var enableIPv6 = true
/**
* Enables the ability use IPC (Inter Process Communication)
*/
var enableIPC = true
/** /**
* The address for the server to listen on. "*" will accept connections from all interfaces, otherwise specify * The address for the server to listen on. "*" will accept connections from all interfaces, otherwise specify
* the hostname (or IP) to bind to. * the hostname (or IP) to bind to.
@ -65,15 +50,39 @@ class ServerConfiguration : dorkbox.network.Configuration() {
/** /**
* The IPC Publication ID is used to define what ID the server will send data on. The client IPC subscription ID must match this value. * The IPC Publication ID is used to define what ID the server will send data on. The client IPC subscription ID must match this value.
*/ */
var ipcPublicationId = EndPoint.IPC_HANDSHAKE_STREAM_ID_PUB var ipcPublicationId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_PUB
/** /**
* The IPC Subscription ID is used to define what ID the server will receive data on. The client IPC publication ID must match this value. * The IPC Subscription ID is used to define what ID the server will receive data on. The client IPC publication ID must match this value.
*/ */
var ipcSubscriptionId = EndPoint.IPC_HANDSHAKE_STREAM_ID_SUB var ipcSubscriptionId = AeronConfig.IPC_HANDSHAKE_STREAM_ID_SUB
} }
open class Configuration { open class Configuration {
/**
* Enables the ability to use the IPv4 network stack.
*/
var enableIPv4 = true
/**
* Enables the ability to use the IPv6 network stack.
*/
var enableIPv6 = true
/**
* Enables the ability use IPC (Inter Process Communication).
*
* Aeron must be running in the same location for the client/server in order for this to work
*/
var enableIpc = true
/**
* Permit loopback connections to use IPC instead of UDP for communicating, if possible. IPC is about 4x faster than UDP in loopback situations.
*
* This configuration only affects the client
*/
var enableIpcForLoopback: Boolean = true
/** /**
* When connecting to a remote client/server, should connections be allowed if the remote machine signature has changed? * When connecting to a remote client/server, should connections be allowed if the remote machine signature has changed?
* *
@ -101,13 +110,6 @@ open class Configuration {
*/ */
var subscriptionPort: Int = 0 var subscriptionPort: Int = 0
/**
* Permit loopback connections to use IPC instead of UDP for communicating. IPC is about 4x faster than UDP in loopback situations.
*
* This configuration only affects the client
*/
var enableIpcForLoopback: Boolean = true
/** /**
* How long a connection must be disconnected before we cleanup the memory associated with it * How long a connection must be disconnected before we cleanup the memory associated with it
*/ */
@ -181,9 +183,14 @@ open class Configuration {
var threadingMode = ThreadingMode.SHARED var threadingMode = ThreadingMode.SHARED
/** /**
* Log Buffer Locations for the Media Driver. The default location is a TEMP dir. This must be unique PER application and instance! * Aeron location for the Media Driver. The default location is a TEMP dir.
*/ */
var aeronLogDirectory: File? = null var aeronDirectory: File? = null
/**
* Should we force the Aeron location to be unique for every instance? This is mutually exclusive with IPC.
*/
var aeronDirectoryForceUnique = false
/** /**
* The Aeron MTU value impacts a lot of things. * The Aeron MTU value impacts a lot of things.

View File

@ -18,6 +18,7 @@ package dorkbox.network
import dorkbox.netUtil.IP import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6 import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.AeronPoller import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverConnection
@ -64,12 +65,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
* @return true if the configuration matches and can connect (but not verify) to the TCP control socket. * @return true if the configuration matches and can connect (but not verify) to the TCP control socket.
*/ */
fun isRunning(configuration: ServerConfiguration): Boolean { fun isRunning(configuration: ServerConfiguration): Boolean {
val server = Server<Connection>(configuration) val context = AeronConfig.createContext(configuration)
return AeronConfig.isRunning(context)
val running = server.isRunning()
server.close()
return running
} }
} }
@ -112,20 +109,24 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
require(config.listenIpAddress.isNotBlank()) { "Blank listen IP address, cannot continue"} require(config.listenIpAddress.isNotBlank()) { "Blank listen IP address, cannot continue"}
// can't disable everything! // can't disable everything!
if (!config.enableIPC && !config.enableIPv4 && !config.enableIPv6) { if (!config.enableIpc && !config.enableIPv4 && !config.enableIPv6) {
require(false) { "At least one of IPC/IPv4/IPv6 must be enabled!" } require(false) { "At least one of IPC/IPv4/IPv6 must be enabled!" }
} }
// have to verify if it's the only thing specified, is IPv4 available... // have to verify if it's the only thing specified, is IPv4 available...
if (!config.enableIPC && !config.enableIPv6 && config.enableIPv4 && !IPv4.isAvailable) { if (!config.enableIpc && !config.enableIPv6 && config.enableIPv4 && !IPv4.isAvailable) {
require(false) { "IPC/IPv6 are disabled and IPv4 is enabled, but there is no IPv4 interface available!" } require(false) { "IPC/IPv6 are disabled and IPv4 is enabled, but there is no IPv4 interface available!" }
} }
// have to verify if it's the only thing specified, is IPv4 available... // have to verify if it's the only thing specified, is IPv4 available...
if (!config.enableIPC && !config.enableIPv4 && config.enableIPv6 && !IPv6.isAvailable) { if (!config.enableIpc && !config.enableIPv4 && config.enableIPv6 && !IPv6.isAvailable) {
require(false) { "IPC/IPv4 are disabled and IPv6 is enabled, but there is no IPv6 interface available!" } require(false) { "IPC/IPv4 are disabled and IPv6 is enabled, but there is no IPv6 interface available!" }
} }
if (config.enableIpc && config.aeronDirectoryForceUnique) {
require(false) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" }
}
// localhost/loopback IP might not always be 127.0.0.1 or ::1 // localhost/loopback IP might not always be 127.0.0.1 or ::1
// We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this) // We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this)
@ -178,10 +179,10 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
private fun getIpcPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller { private fun getIpcPoller(aeron: Aeron, config: ServerConfiguration): AeronPoller {
val poller = if (config.enableIPC) { val poller = if (config.enableIpc) {
val driver = IpcMediaDriverConnection(streamIdSubscription = config.ipcSubscriptionId, val driver = IpcMediaDriverConnection(streamIdSubscription = config.ipcSubscriptionId,
streamId = config.ipcPublicationId, streamId = config.ipcPublicationId,
sessionId = RESERVED_SESSION_ID_INVALID) sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID)
driver.buildServer(aeron, logger) driver.buildServer(aeron, logger)
val publication = driver.publication val publication = driver.publication
val subscription = driver.subscription val subscription = driver.subscription
@ -223,8 +224,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
val driver = UdpMediaDriverConnection(address = listenIPv4Address!!, val driver = UdpMediaDriverConnection(address = listenIPv4Address!!,
publicationPort = config.publicationPort, publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort, subscriptionPort = config.subscriptionPort,
streamId = UDP_HANDSHAKE_STREAM_ID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = RESERVED_SESSION_ID_INVALID) sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID)
driver.buildServer(aeron, logger) driver.buildServer(aeron, logger)
val publication = driver.publication val publication = driver.publication
@ -291,8 +292,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, val driver = UdpMediaDriverConnection(address = listenIPv6Address!!,
publicationPort = config.publicationPort, publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort, subscriptionPort = config.subscriptionPort,
streamId = UDP_HANDSHAKE_STREAM_ID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = RESERVED_SESSION_ID_INVALID) sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID)
driver.buildServer(aeron, logger) driver.buildServer(aeron, logger)
val publication = driver.publication val publication = driver.publication
@ -358,8 +359,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
val driver = UdpMediaDriverConnection(address = listenIPv6Address!!, val driver = UdpMediaDriverConnection(address = listenIPv6Address!!,
publicationPort = config.publicationPort, publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort, subscriptionPort = config.subscriptionPort,
streamId = UDP_HANDSHAKE_STREAM_ID, streamId = AeronConfig.UDP_HANDSHAKE_STREAM_ID,
sessionId = RESERVED_SESSION_ID_INVALID) sessionId = AeronConfig.RESERVED_SESSION_ID_INVALID)
driver.buildServer(aeron, logger) driver.buildServer(aeron, logger)
val publication = driver.publication val publication = driver.publication
@ -605,34 +606,6 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
} }
} }
/**
* TODO: when adding a "custom" connection, it's super important to not have to worry about the sessionID (which is what we key off of)
* Adds a custom connection to the server.
*
* This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and
* you want *this* server instance to manage listeners + message dispatch
*
* @param connection the connection to add
*/
fun addConnection(connection: CONNECTION) {
connections.add(connection)
}
/**
* TODO: when adding a "custom" connection, it's super important to not have to worry about the sessionID (which is what we key off of)
* Removes a custom connection to the server.
*
*
* This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and
* you want *this* server instance to manage listeners + message dispatch
*
* @param connection the connection to remove
*/
fun removeConnection(connection: CONNECTION) {
connections.remove(connection)
}
/** /**
* Closes the server and all it's connections. After a close, you may call 'bind' again. * Closes the server and all it's connections. After a close, you may call 'bind' again.
*/ */

View File

@ -0,0 +1,232 @@
package dorkbox.network.aeron
import dorkbox.network.Configuration
import dorkbox.util.NamedThreadFactory
import io.aeron.Publication
import io.aeron.driver.MediaDriver
import mu.KLogger
import mu.KotlinLogging
import java.io.File
/**
*
*/
internal object AeronConfig {
/**
* Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW
*/
const val RESERVED_SESSION_ID_INVALID = 0
/**
* The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0!
*/
const val RESERVED_SESSION_ID_LOW = 1
/**
* The inclusive upper bound of the reserved sessions range.
*/
const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE
const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe
const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de
const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3
fun errorCodeName(result: Long): String {
return when (result) {
// The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
Publication.NOT_CONNECTED -> "Not connected"
// The offer failed due to back pressure from the subscribers preventing further transmission.
Publication.BACK_PRESSURED -> "Back pressured"
// The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
Publication.ADMIN_ACTION -> "Administrative action"
// The Publication has been closed and should no longer be used.
Publication.CLOSED -> "Publication is closed"
// If this happens then the publication should be closed and a new one added. To make it less likely to happen then increase the term buffer length.
Publication.MAX_POSITION_EXCEEDED -> "Maximum term position exceeded"
else -> throw IllegalStateException("Unknown error code: $result")
}
}
private fun create(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")): MediaDriver.Context {
/*
* Linux
* Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and
* net.core.wmem_max to allow larger SO_SNDBUF values to be set.
*
* Windows
* Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so.
*
* Mac/Darwin
*
* Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB.
*/
if (config.receiveBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
if (config.sendBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
/*
* Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir).
*
* You can create a RAM disk with the following command:
*
* $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))`
*
* where:
*
* DISK_NAME should be replaced with a name of your choice.
* SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk).
*
* For example, the following command creates a RAM disk named DevShm which is 2GB in size:
*
* $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))`
*
* After this command is executed the new disk will be mounted under /Volumes/DevShm.
*/
if (config.aeronDirectory == null) {
val baseFileLocation = config.suggestAeronLogLocation(logger)
// val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName)
val aeronLogDirectory = File(baseFileLocation, "aeron")
config.aeronDirectory = aeronLogDirectory
}
val threadFactory = NamedThreadFactory("Aeron", false)
// LOW-LATENCY SETTINGS
// .termBufferSparseFile(false)
// .useWindowsHighResTimer(true)
// .threadingMode(ThreadingMode.DEDICATED)
// .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE)
// .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE)
// .senderIdleStrategy(NoOpIdleStrategy.INSTANCE);
// setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true");
// setProperty("aeron.mtu.length", "16384");
// setProperty("aeron.socket.so_sndbuf", "2097152");
// setProperty("aeron.socket.so_rcvbuf", "2097152");
// setProperty("aeron.rcv.initial.window.length", "2097152");
// driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind)
val context = MediaDriver.Context()
.publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW)
.publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH)
.conductorThreadFactory(threadFactory)
.receiverThreadFactory(threadFactory)
.senderThreadFactory(threadFactory)
.sharedNetworkThreadFactory(threadFactory)
.sharedThreadFactory(threadFactory)
.threadingMode(config.threadingMode)
.mtuLength(config.networkMtuSize)
.socketSndbufLength(config.sendBufferSize)
.socketRcvbufLength(config.receiveBufferSize)
context
.aeronDirectoryName(config.aeronDirectory!!.absolutePath)
if (context.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) {
// default 64 megs each is HUGE
context.ipcTermBufferLength(8 * 1024 * 1024)
}
if (context.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) {
// default 16 megs each is HUGE (we run out of space in production w/ lots of clients)
context.publicationTermBufferLength(2 * 1024 * 1024)
}
val aeronDir = File(context.aeronDirectoryName()).absoluteFile
context.aeronDirectoryName(aeronDir.path)
return context
}
/**
* Creates the Aeron Media Driver context
*
* @throws IllegalArgumentException if the aeron media driver directory cannot be setup
*/
fun createContext(config: Configuration, logger: KLogger = KotlinLogging.logger("AeronConfig")): MediaDriver.Context {
var context = create(config, logger)
// will setup the aeron directory or throw IllegalArgumentException if it cannot be configured
var aeronDir = context.aeronDirectory()
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
var isRunning = isRunning(context)
// this is incompatible with IPC, and will not be set if IPC is enabled
if (config.aeronDirectoryForceUnique && isRunning) {
val savedParent = aeronDir.parentFile
var retry = 0
val retryMax = 100
while (config.aeronDirectoryForceUnique && isRunning) {
if (retry++ > retryMax) {
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
}
val randomNum = (1..retryMax).shuffled().first()
val newDir = savedParent.resolve("${aeronDir.name}_$randomNum")
context = create(config, logger)
context.aeronDirectoryName(newDir.path)
// this happens EXACTLY once. Must be BEFORE the "isRunning" check!
context.concludeAeronDirectory()
isRunning = isRunning(context)
}
}
aeronDir = context.aeronDirectory()
// make sure we start over!
if (!isRunning && aeronDir.exists()) {
// try to delete the dir
if (!aeronDir.deleteRecursively()) {
logger.warn { "Unable to delete the aeron directory $aeronDir. Aeron was not running when this was attempted." }
}
}
return context
}
/**
* Checks to see if an endpoint (using the specified configuration) is running.
*
* @return true if the media driver is active and running
*/
fun isRunning(context: MediaDriver.Context): Boolean {
// if the media driver is running, it will be a quick connection. Usually 100ms or so
return context.isDriverActive(1_000) { }
}
}

View File

@ -20,7 +20,6 @@ package dorkbox.network.aeron
import dorkbox.netUtil.IP import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4 import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6 import dorkbox.netUtil.IPv6
import dorkbox.network.connection.EndPoint
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Aeron import io.aeron.Aeron
import io.aeron.ChannelUriStringBuilder import io.aeron.ChannelUriStringBuilder
@ -84,7 +83,7 @@ class UdpMediaDriverConnection(override val address: InetAddress,
private fun uri(): ChannelUriStringBuilder { private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp") val builder = ChannelUriStringBuilder().reliable(isReliable).media("udp")
if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId) builder.sessionId(sessionId)
} }
@ -195,7 +194,7 @@ class UdpMediaDriverConnection(override val address: InetAddress,
override fun clientInfo(): String { override fun clientInfo(): String {
return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" "Connecting to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else { } else {
"Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" "Connecting handshake to ${IP.toString(address)} [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
@ -213,7 +212,7 @@ class UdpMediaDriverConnection(override val address: InetAddress,
IP.toString(address) IP.toString(address)
} }
return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)" "Listening on $address [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else { } else {
"Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)" "Listening handshake on $address [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
@ -253,7 +252,7 @@ class IpcMediaDriverConnection(override val streamId: Int,
private fun uri(): ChannelUriStringBuilder { private fun uri(): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().media("ipc") val builder = ChannelUriStringBuilder().media("ipc")
if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
builder.sessionId(sessionId) builder.sessionId(sessionId)
} }
@ -345,7 +344,7 @@ class IpcMediaDriverConnection(override val streamId: Int,
} }
override fun clientInfo() : String { override fun clientInfo() : String {
return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] aeron connection established to [$streamIdSubscription|$streamId]" "[$sessionId] aeron connection established to [$streamIdSubscription|$streamId]"
} else { } else {
"Connecting handshake to IPC [$streamIdSubscription|$streamId]" "Connecting handshake to IPC [$streamIdSubscription|$streamId]"
@ -353,7 +352,7 @@ class IpcMediaDriverConnection(override val streamId: Int,
} }
override fun serverInfo() : String { override fun serverInfo() : String {
return if (sessionId != EndPoint.RESERVED_SESSION_ID_INVALID) { return if (sessionId != AeronConfig.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC listening on [$streamIdSubscription|$streamId] " "[$sessionId] IPC listening on [$streamIdSubscription|$streamId] "
} else { } else {
"Listening handshake on IPC [$streamIdSubscription|$streamId]" "Listening handshake on IPC [$streamIdSubscription|$streamId]"

View File

@ -120,7 +120,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) // The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter)
// The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this // The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
// counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) // counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
private val aes_gcm_iv = atomic(0) // private val aes_gcm_iv = atomic(0)
// RMI support for this connection // RMI support for this connection
internal val rmiConnectionSupport = endPoint.getRmiConnectionSupport() internal val rmiConnectionSupport = endPoint.getRmiConnectionSupport()
@ -338,16 +338,15 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
} }
// on close, we want to make sure this file is DELETED! // on close, we want to make sure this file is DELETED!
val logFile = endPoint.mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publication.registrationId()}.logbuffer") val logFile = endPoint.getMediaDriverPublicationFile(publication.registrationId())
publication.close() publication.close()
closeTimeoutTime = System.currentTimeMillis() + timeOut closeTimeoutTime = System.currentTimeMillis() + timeOut
while (logFile.exists() && System.currentTimeMillis() < closeTimeoutTime) { while (logFile.exists() && System.currentTimeMillis() < closeTimeoutTime) {
if (logFile.delete()) { if (logFile.delete()) {
break break
} }
delay(100)
} }
if (logFile.exists()) { if (logFile.exists()) {
@ -356,6 +355,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
rmiConnectionSupport.clearProxyObjects() rmiConnectionSupport.clearProxyObjects()
endPoint.removeConnection(this)
// This is set by the client so if there is a "connect()" call in the the disconnect callback, we can have proper // This is set by the client so if there is a "connect()" call in the the disconnect callback, we can have proper
// lock-stop ordering for how disconnect and connect work with each-other // lock-stop ordering for how disconnect and connect work with each-other
preCloseAction() preCloseAction()

View File

@ -19,6 +19,7 @@ import dorkbox.network.Client
import dorkbox.network.Configuration import dorkbox.network.Configuration
import dorkbox.network.Server import dorkbox.network.Server
import dorkbox.network.ServerConfiguration import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.CoroutineIdleStrategy import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.exceptions.MessageNotRegisteredException import dorkbox.network.exceptions.MessageNotRegisteredException
import dorkbox.network.handshake.HandshakeMessage import dorkbox.network.handshake.HandshakeMessage
@ -31,7 +32,6 @@ import dorkbox.network.rmi.messages.RmiMessage
import dorkbox.network.serialization.KryoExtra import dorkbox.network.serialization.KryoExtra
import dorkbox.network.serialization.Serialization import dorkbox.network.serialization.Serialization
import dorkbox.network.storage.SettingsStore import dorkbox.network.storage.SettingsStore
import dorkbox.util.NamedThreadFactory
import dorkbox.util.exceptions.SecurityException import dorkbox.util.exceptions.SecurityException
import io.aeron.Aeron import io.aeron.Aeron
import io.aeron.Publication import io.aeron.Publication
@ -67,51 +67,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
protected constructor(config: Configuration) : this(Client::class.java, config) protected constructor(config: Configuration) : this(Client::class.java, config)
protected constructor(config: ServerConfiguration) : this(Server::class.java, config) protected constructor(config: ServerConfiguration) : this(Server::class.java, config)
companion object {
/**
* Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW
*/
const val RESERVED_SESSION_ID_INVALID = 0
/**
* The inclusive lower bound of the reserved sessions range. THIS SHOULD NEVER BE <= 0!
*/
const val RESERVED_SESSION_ID_LOW = 1
/**
* The inclusive upper bound of the reserved sessions range.
*/
const val RESERVED_SESSION_ID_HIGH = Integer.MAX_VALUE
const val UDP_HANDSHAKE_STREAM_ID: Int = 0x1337cafe
const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de
const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3
private fun errorCodeName(result: Long): String {
return when (result) {
// The publication is not connected to a subscriber, this can be an intermittent state as subscribers come and go.
Publication.NOT_CONNECTED -> "Not connected"
// The offer failed due to back pressure from the subscribers preventing further transmission.
Publication.BACK_PRESSURED -> "Back pressured"
// The action is an operation such as log rotation which is likely to have succeeded by the next retry attempt.
Publication.ADMIN_ACTION -> "Administrative action"
// The Publication has been closed and should no longer be used.
Publication.CLOSED -> "Publication is closed"
// If this happens then the publication should be closed and a new one added. To make it less likely to happen then increase the term buffer length.
Publication.MAX_POSITION_EXCEEDED -> "Maximum term position exceeded"
else -> throw IllegalStateException("Unknown error code: $result")
}
}
}
val logger: KLogger = KotlinLogging.logger(type.simpleName) val logger: KLogger = KotlinLogging.logger(type.simpleName)
internal val actionDispatch = CoroutineScope(Dispatchers.Default) internal val actionDispatch = CoroutineScope(Dispatchers.Default)
@ -119,9 +74,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
internal val listenerManager = ListenerManager<CONNECTION>() internal val listenerManager = ListenerManager<CONNECTION>()
internal val connections = ConnectionManager<CONNECTION>() internal val connections = ConnectionManager<CONNECTION>()
internal val mediaDriverContext: MediaDriver.Context internal lateinit var mediaDriverContext: MediaDriver.Context
private var mediaDriver: MediaDriver? = null private var mediaDriver: MediaDriver? = null
private var aeron: Aeron? = null internal var aeron: Aeron? = null
/** /**
* Returns the serialization wrapper if there is an object type that needs to be added outside of the basic types. * Returns the serialization wrapper if there is an object type that needs to be added outside of the basic types.
@ -160,125 +115,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
} }
} }
// Aeron configuration
/*
* Linux
* Linux normally requires some settings of sysctl values. One is net.core.rmem_max to allow larger SO_RCVBUF and
* net.core.wmem_max to allow larger SO_SNDBUF values to be set.
*
* Windows
* Windows tends to use SO_SNDBUF values that are too small. It is recommended to use values more like 1MB or so.
*
* Mac/Darwin
*
* Mac tends to use SO_SNDBUF values that are too small. It is recommended to use larger values, like 16KB.
*/
if (config.receiveBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_RCVBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
if (config.sendBufferSize == 0) {
config.receiveBufferSize = io.aeron.driver.Configuration.SOCKET_SNDBUF_LENGTH_DEFAULT
// when {
// OS.isLinux() ->
// OS.isWindows() ->
// OS.isMacOsX() ->
// }
// val rmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.rmem_max")
// val wmem_max = dorkbox.network.other.NetUtil.sysctlGetInt("net.core.wmem_max")
}
/*
* Note: Since Mac OS does not have a built-in support for /dev/shm it is advised to create a RAM disk for the Aeron directory (aeron.dir).
*
* You can create a RAM disk with the following command:
*
* $ diskutil erasevolume HFS+ "DISK_NAME" `hdiutil attach -nomount ram://$((2048 * SIZE_IN_MB))`
*
* where:
*
* DISK_NAME should be replaced with a name of your choice.
* SIZE_IN_MB is the size in megabytes for the disk (e.g. 4096 for a 4GB disk).
*
* For example, the following command creates a RAM disk named DevShm which is 2GB in size:
*
* $ diskutil erasevolume HFS+ "DevShm" `hdiutil attach -nomount ram://$((2048 * 2048))`
*
* After this command is executed the new disk will be mounted under /Volumes/DevShm.
*/
var aeronDirAlreadyExists = false
if (config.aeronLogDirectory == null) {
val baseFileLocation = config.suggestAeronLogLocation(logger)
// val aeronLogDirectory = File(baseFileLocation, "aeron-" + type.simpleName)
val aeronLogDirectory = File(baseFileLocation, "aeron")
aeronDirAlreadyExists = aeronLogDirectory.exists()
config.aeronLogDirectory = aeronLogDirectory
}
logger.info("Aeron log directory: ${config.aeronLogDirectory}")
if (aeronDirAlreadyExists) {
logger.warn("Aeron log directory already exists! This might not be what you want!")
}
val threadFactory = NamedThreadFactory("Aeron", false)
// LOW-LATENCY SETTINGS
// .termBufferSparseFile(false)
// .useWindowsHighResTimer(true)
// .threadingMode(ThreadingMode.DEDICATED)
// .conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE)
// .receiverIdleStrategy(NoOpIdleStrategy.INSTANCE)
// .senderIdleStrategy(NoOpIdleStrategy.INSTANCE);
// setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true");
// setProperty("aeron.mtu.length", "16384");
// setProperty("aeron.socket.so_sndbuf", "2097152");
// setProperty("aeron.socket.so_rcvbuf", "2097152");
// setProperty("aeron.rcv.initial.window.length", "2097152");
// driver context must happen in the initializer, because we have a Server.isRunning() method that uses the mediaDriverContext (without bind)
val mDrivercontext = MediaDriver.Context()
.publicationReservedSessionIdLow(RESERVED_SESSION_ID_LOW)
.publicationReservedSessionIdHigh(RESERVED_SESSION_ID_HIGH)
.conductorThreadFactory(threadFactory)
.receiverThreadFactory(threadFactory)
.senderThreadFactory(threadFactory)
.sharedNetworkThreadFactory(threadFactory)
.sharedThreadFactory(threadFactory)
.threadingMode(config.threadingMode)
.mtuLength(config.networkMtuSize)
.socketSndbufLength(config.sendBufferSize)
.socketRcvbufLength(config.receiveBufferSize)
mDrivercontext
.aeronDirectoryName(config.aeronLogDirectory!!.absolutePath)
.concludeAeronDirectory()
if (mDrivercontext.ipcTermBufferLength() != io.aeron.driver.Configuration.ipcTermBufferLength()) {
// default 64 megs each is HUGE
mDrivercontext.ipcTermBufferLength(8 * 1024 * 1024)
}
if (mDrivercontext.publicationTermBufferLength() != io.aeron.driver.Configuration.termBufferLength()) {
// default 16 megs each is HUGE (we run out of space in production w/ lots of clients)
mDrivercontext.publicationTermBufferLength(2 * 1024 * 1024)
}
mediaDriverContext = mDrivercontext
// serialization stuff // serialization stuff
serialization = config.serialization serialization = config.serialization
sendIdleStrategy = config.sendIdleStrategy sendIdleStrategy = config.sendIdleStrategy
@ -292,27 +128,33 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
} }
internal fun initEndpointState(): Aeron { internal fun initEndpointState(): Aeron {
val aeronDirectory = config.aeronLogDirectory!!.absolutePath // Aeron configuration
val context: MediaDriver.Context = AeronConfig.createContext(config, logger)
if (!isRunning()) { logger.info { "Aeron log directory: ${context.aeronDirectory()}" }
logger.debug("Starting Aeron Media driver...")
mediaDriverContext
if (!AeronConfig.isRunning(context)) {
logger.debug("Starting Aeron Media driver in ${context.aeronDirectory()}")
context
.dirDeleteOnStart(true) .dirDeleteOnStart(true)
.dirDeleteOnShutdown(true) .dirDeleteOnShutdown(true)
// the server always creates a the media driver. // the server always creates a the media driver.
try { try {
mediaDriver = MediaDriver.launch(mediaDriverContext) mediaDriver = MediaDriver.launch(context)
} catch (e: Exception) { } catch (e: Exception) {
listenerManager.notifyError(e) listenerManager.notifyError(e)
throw e throw e
} }
} }
val aeronContext = Aeron.Context() val aeronContext = Aeron.Context()
aeronContext aeronContext
.aeronDirectoryName(aeronDirectory) .aeronDirectoryName(context.aeronDirectory().path)
.concludeAeronDirectory() .concludeAeronDirectory()
try { try {
@ -328,6 +170,8 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
throw e throw e
} }
mediaDriverContext = context
shutdown.getAndSet(false) shutdown.getAndSet(false)
shutdownLatch = SuspendWaiter() shutdownLatch = SuspendWaiter()
@ -335,9 +179,46 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
return aeron!! return aeron!!
} }
/**
* @return the aeron media driver log file for a specific publication. This should be removed when a publication is closed (but is not always!)
*/
internal fun getMediaDriverPublicationFile(publicationRegId: Long): File {
return mediaDriverContext.aeronDirectory().resolve("publications").resolve("${publicationRegId}.logbuffer")
}
abstract fun newException(message: String, cause: Throwable? = null): Throwable abstract fun newException(message: String, cause: Throwable? = null): Throwable
// used internally to remove a connection
internal fun removeConnection(connection: Connection) {
@Suppress("UNCHECKED_CAST")
removeConnection(connection as CONNECTION)
}
/**
* Adds a custom connection to the server.
*
* This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and
* you want *this* endpoint to manage listeners + message dispatch
*
* @param connection the connection to add
*/
fun addConnection(connection: CONNECTION) {
connections.add(connection)
}
/**
* Removes a custom connection to the server.
*
* This should only be used in situations where there can be DIFFERENT types of connections (such as a 'web-based' connection) and
* you want *this* endpoint to manage listeners + message dispatch
*
* @param connection the connection to remove
*/
fun removeConnection(connection: CONNECTION) {
connections.remove(connection)
}
/** /**
* Returns the property store used by this endpoint. The property store can store via properties, * Returns the property store used by this endpoint. The property store can store via properties,
* a database, etc, or can be a "null" property store, which does nothing * a database, etc, or can be a "null" property store, which does nothing
@ -508,7 +389,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
} }
// more critical error sending the message. we shouldn't retry or anything. // more critical error sending the message. we shouldn't retry or anything.
val exception = newException("[${publication.sessionId()}] Error sending handshake message. $message (${errorCodeName(result)})") val exception = newException("[${publication.sessionId()}] Error sending handshake message. $message (${AeronConfig.errorCodeName(result)})")
ListenerManager.cleanStackTraceInternal(exception) ListenerManager.cleanStackTraceInternal(exception)
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
return return
@ -681,7 +562,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
} }
// more critical error sending the message. we shouldn't retry or anything. // more critical error sending the message. we shouldn't retry or anything.
logger.error("[${publication.sessionId()}] Error sending message. $message (${errorCodeName(result)})") logger.error("[${publication.sessionId()}] Error sending message. $message (${AeronConfig.errorCodeName(result)})")
return return
} }
@ -734,14 +615,24 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
shutdownLatch.doWait() shutdownLatch.doWait()
} }
// /**
// * Checks to see if an endpoint (using the current configuration) is running.
// *
// * @return true if the media driver is active and running
// */
// fun isRunning(): Boolean {
// // if the media driver is running, it will be a quick connection. Usually 100ms or so
// return configureMediaDriverContext().isDriverActive(1_000) { }
// }
/** /**
* Checks to see if an endpoint (using the specified configuration) is running. * Checks to see if an endpoint (using the specified configuration) is running.
* *
* @return true if the client/server is active and running * @return true if the media driver is active and running
*/ */
fun isRunning(): Boolean { fun isRunning(context: MediaDriver.Context): Boolean {
// if the media driver is running, it will be a quick connection. Usually 100ms or so // if the media driver is running, it will be a quick connection. Usually 100ms or so
return mediaDriverContext.isDriverActive(1_000) { } return context.isDriverActive(1_000) { }
} }
final override fun close() { final override fun close() {

View File

@ -22,11 +22,11 @@ import com.github.benmanes.caffeine.cache.RemovalListener
import dorkbox.netUtil.IP import dorkbox.netUtil.IP
import dorkbox.network.Server import dorkbox.network.Server
import dorkbox.network.ServerConfiguration import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverConnection
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.exceptions.AllocationException import dorkbox.network.exceptions.AllocationException
@ -71,7 +71,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
private val connectionsPerIpCounts = ConnectionCounts() private val connectionsPerIpCounts = ConnectionCounts()
// guarantee that session/stream ID's will ALWAYS be unique! (there can NEVER be a collision!) // guarantee that session/stream ID's will ALWAYS be unique! (there can NEVER be a collision!)
private val sessionIdAllocator = RandomIdAllocator(EndPoint.RESERVED_SESSION_ID_LOW, EndPoint.RESERVED_SESSION_ID_HIGH) private val sessionIdAllocator = RandomIdAllocator(AeronConfig.RESERVED_SESSION_ID_LOW, AeronConfig.RESERVED_SESSION_ID_HIGH)
private val streamIdAllocator = RandomIdAllocator(1, Integer.MAX_VALUE) private val streamIdAllocator = RandomIdAllocator(1, Integer.MAX_VALUE)
@ -109,7 +109,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
logger.trace { "[${pendingConnection.id}] Connection from client $connectionString done with handshake." } logger.trace { "[${pendingConnection.id}] Connection from client $connectionString done with handshake." }
// this enables the connection to start polling for messages // this enables the connection to start polling for messages
server.connections.add(pendingConnection) server.addConnection(pendingConnection)
// now tell the client we are done // now tell the client we are done
runBlocking { runBlocking {

View File

@ -765,8 +765,9 @@ open class Serialization(private val references: Boolean = true, private val fac
* # BLOCKING * # BLOCKING
* *
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
*
* @throws IOException
*/ */
@Throws(IOException::class)
override fun write(buffer: DirectBuffer, message: Any) { override fun write(buffer: DirectBuffer, message: Any) {
runBlocking { runBlocking {
val kryo = takeKryo() val kryo = takeKryo()