Fixed IPC auto-fallback connection issues

This commit is contained in:
nathan 2020-09-22 19:41:19 +02:00
parent 56fc7e54f1
commit c6e12e7872

View File

@ -26,11 +26,11 @@ import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientRejectedException import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.handshake.ClientHandshake import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.rmi.RemoteObject import dorkbox.network.rmi.RemoteObject
import dorkbox.network.rmi.RemoteObjectStorage import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.rmi.RmiManagerConnections import dorkbox.network.rmi.RmiManagerConnections
@ -261,8 +261,9 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// only change LOCALHOST -> IPC if the media driver is ALREADY running LOCALLY! // only change LOCALHOST -> IPC if the media driver is ALREADY running LOCALLY!
var usingIPC = config.enableIpc && remoteAddress == null var isUsingIPC = false
val autoChangeToIpc = !usingIPC && config.enableIpcForLoopback && val canUseIPC = config.enableIpc && remoteAddress == null
val autoChangeToIpc = canUseIPC && config.enableIpcForLoopback &&
remoteAddress != null && remoteAddress.isLoopbackAddress && isRunning(mediaDriverContext) remoteAddress != null && remoteAddress.isLoopbackAddress && isRunning(mediaDriverContext)
if (autoChangeToIpc) { if (autoChangeToIpc) {
logger.info {"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC" } logger.info {"IPC for loopback enabled and aeron is already running. Auto-changing network connection from ${IP.toString(remoteAddress!!)} -> IPC" }
@ -270,7 +271,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
val handshake = ClientHandshake(config, crypto, this) val handshake = ClientHandshake(config, crypto, this)
val handshakeConnection = if (autoChangeToIpc || usingIPC) { val handshakeConnection = if (autoChangeToIpc || canUseIPC) {
// MAYBE the server doesn't have IPC enabled? If no, we need to connect via UDP instead // MAYBE the server doesn't have IPC enabled? If no, we need to connect via UDP instead
val ipcConnection = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId, val ipcConnection = IpcMediaDriverConnection(streamIdSubscription = ipcSubscriptionId,
streamId = ipcPublicationId, streamId = ipcPublicationId,
@ -278,21 +279,18 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// "fast" connection timeout, since this is IPC // "fast" connection timeout, since this is IPC
connectionTimeoutMS = 1000) connectionTimeoutMS = 1000)
var success = false
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports // throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
try { try {
ipcConnection.buildClient(aeron, logger) ipcConnection.buildClient(aeron, logger)
success = true isUsingIPC = true
} catch (e: Exception) { } catch (e: Exception) {
// if we specified that we want to use IPC, then we have to throw the timeout exception, because there is no IPC // if we specified that we want to use IPC, then we have to throw the timeout exception, because there is no IPC
if (usingIPC) { if (canUseIPC) {
throw e throw e
} }
} }
if (success) { if (isUsingIPC) {
usingIPC = true
ipcConnection ipcConnection
} else { } else {
logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" } logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address ${IP.toString(remoteAddress!!)}" }
@ -334,7 +332,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// VALIDATE:: check to see if the remote connection's public key has changed! // VALIDATE:: check to see if the remote connection's public key has changed!
val validateRemoteAddress = if (usingIPC) { val validateRemoteAddress = if (isUsingIPC) {
PublicKeyValidationState.VALID PublicKeyValidationState.VALID
} else { } else {
crypto.validateRemoteAddress(this.remoteAddress0!!, connectionInfo.publicKey) crypto.validateRemoteAddress(this.remoteAddress0!!, connectionInfo.publicKey)
@ -354,7 +352,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// we are now connected, so we can connect to the NEW client-specific ports // we are now connected, so we can connect to the NEW client-specific ports
val reliableClientConnection = if (usingIPC) { val reliableClientConnection = if (isUsingIPC) {
IpcMediaDriverConnection(sessionId = connectionInfo.sessionId, IpcMediaDriverConnection(sessionId = connectionInfo.sessionId,
// NOTE: pub/sub must be switched! // NOTE: pub/sub must be switched!
streamIdSubscription = connectionInfo.publicationPort, streamIdSubscription = connectionInfo.publicationPort,
@ -392,7 +390,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
// because we are getting the class registration details from the SERVER, this should never be the case. // because we are getting the class registration details from the SERVER, this should never be the case.
// It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers) // It is still and edge case where the reconstruction of the registration details fails (maybe because of custom serializers)
val exception = if (usingIPC) { val exception = if (isUsingIPC) {
ClientRejectedException("Connection to IPC has incorrect class registration details!!") ClientRejectedException("Connection to IPC has incorrect class registration details!!")
} else { } else {
ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!") ClientRejectedException("Connection to ${IP.toString(remoteAddress!!)} has incorrect class registration details!!")
@ -404,7 +402,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
val newConnection: CONNECTION val newConnection: CONNECTION
if (usingIPC) { if (isUsingIPC) {
newConnection = newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID)) newConnection = newConnection(ConnectionParams(this, reliableClientConnection, PublicKeyValidationState.VALID))
} else { } else {
newConnection = newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress)) newConnection = newConnection(ConnectionParams(this, reliableClientConnection, validateRemoteAddress))