Added ip address to logs for connect/disconnect/timeout/etc
This commit is contained in:
parent
3064545645
commit
230ad249bd
@ -506,7 +506,7 @@ open class Client<CONNECTION : Connection>(
|
|||||||
udpConnection
|
udpConnection
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info { handshakeConnection }
|
logger.info { "Connecting to $handshakeConnection" }
|
||||||
|
|
||||||
|
|
||||||
connect0(handshake, handshakeConnection, handshakeTimeoutSec)
|
connect0(handshake, handshakeConnection, handshakeTimeoutSec)
|
||||||
@ -612,7 +612,7 @@ open class Client<CONNECTION : Connection>(
|
|||||||
handshakeConnection.publication.close()
|
handshakeConnection.publication.close()
|
||||||
|
|
||||||
|
|
||||||
val exception = ClientRejectedException("Connection to $remoteAddressString not allowed! Public key mismatch.")
|
val exception = ClientRejectedException("Connection to [$remoteAddressString] not allowed! Public key mismatch.")
|
||||||
logger.error(exception) { "Validation error" }
|
logger.error(exception) { "Validation error" }
|
||||||
throw exception
|
throw exception
|
||||||
}
|
}
|
||||||
@ -638,7 +638,7 @@ open class Client<CONNECTION : Connection>(
|
|||||||
val exception = if (isUsingIPC) {
|
val exception = if (isUsingIPC) {
|
||||||
ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!")
|
ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!")
|
||||||
} else {
|
} else {
|
||||||
ClientRejectedException("[${handshake.connectKey}] Connection to $remoteAddressString has incorrect class registration details!!")
|
ClientRejectedException("[${handshake.connectKey}] Connection to [$remoteAddressString] has incorrect class registration details!!")
|
||||||
}
|
}
|
||||||
ListenerManager.cleanStackTraceInternal(exception)
|
ListenerManager.cleanStackTraceInternal(exception)
|
||||||
throw exception
|
throw exception
|
||||||
@ -734,13 +734,13 @@ open class Client<CONNECTION : Connection>(
|
|||||||
handshakeConnection.subscription.close()
|
handshakeConnection.subscription.close()
|
||||||
handshakeConnection.publication.close()
|
handshakeConnection.publication.close()
|
||||||
|
|
||||||
val exception = ClientRejectedException("[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString was not permitted!")
|
val exception = ClientRejectedException("[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) to [$remoteAddressString] was not permitted!")
|
||||||
ListenerManager.cleanStackTrace(exception)
|
ListenerManager.cleanStackTrace(exception)
|
||||||
logger.error(exception) { "Permission error" }
|
logger.error(exception) { "Permission error" }
|
||||||
throw exception
|
throw exception
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info { "[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) adding new signature for $remoteAddressString : ${connectionInfo.publicKey.toHexString()}" }
|
logger.info { "[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) adding new signature for [$remoteAddressString] : ${connectionInfo.publicKey.toHexString()}" }
|
||||||
storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey)
|
storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -783,7 +783,7 @@ open class Client<CONNECTION : Connection>(
|
|||||||
try {
|
try {
|
||||||
handshake.done(handshakeConnection, successAttemptTimeout)
|
handshake.done(handshakeConnection, successAttemptTimeout)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.error(e) { "[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString error during handshake" }
|
logger.error(e) { "[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) to [$remoteAddressString] error during handshake" }
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -793,7 +793,7 @@ open class Client<CONNECTION : Connection>(
|
|||||||
|
|
||||||
isConnected = true
|
isConnected = true
|
||||||
|
|
||||||
logger.debug { "[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString done with handshake." }
|
logger.debug { "[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) to [$remoteAddressString] done with handshake." }
|
||||||
|
|
||||||
// this forces the current thread to WAIT until the network poll system has started
|
// this forces the current thread to WAIT until the network poll system has started
|
||||||
val pollStartupLatch = CountDownLatch(1)
|
val pollStartupLatch = CountDownLatch(1)
|
||||||
@ -815,7 +815,7 @@ open class Client<CONNECTION : Connection>(
|
|||||||
pollIdleStrategy.idle(pollCount)
|
pollIdleStrategy.idle(pollCount)
|
||||||
} else {
|
} else {
|
||||||
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
||||||
logger.debug { "[$aeronLogInfo] connection expired" }
|
logger.debug { "[$aeronLogInfo] connection from [$remoteAddressString] expired" }
|
||||||
|
|
||||||
// NOTE: We do not shutdown the client!! The client is only closed by explicitly calling `client.close()`
|
// NOTE: We do not shutdown the client!! The client is only closed by explicitly calling `client.close()`
|
||||||
newConnection.close()
|
newConnection.close()
|
||||||
|
@ -274,7 +274,7 @@ open class Server<CONNECTION : Connection>(
|
|||||||
pollCount += connection.poll()
|
pollCount += connection.poll()
|
||||||
} else {
|
} else {
|
||||||
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
||||||
logger.debug { "[${connection.id}/${connection.streamId}] connection expired" }
|
logger.debug { "[${connection.id}/${connection.streamId}] connection from [${connection.remoteAddressString}] expired" }
|
||||||
|
|
||||||
removeConnection(connection)
|
removeConnection(connection)
|
||||||
|
|
||||||
@ -311,7 +311,7 @@ open class Server<CONNECTION : Connection>(
|
|||||||
connections.clear()
|
connections.clear()
|
||||||
|
|
||||||
cons.forEach { connection ->
|
cons.forEach { connection ->
|
||||||
logger.info { "[${connection.id}/${connection.streamId}] Connection cleanup and close" }
|
logger.info { "[${connection.id}/${connection.streamId}] Connection from [${connection.remoteAddressString}] cleanup and close" }
|
||||||
|
|
||||||
// make sure the connection is closed (close can only happen once, so a duplicate call does nothing!)
|
// make sure the connection is closed (close can only happen once, so a duplicate call does nothing!)
|
||||||
connection.close()
|
connection.close()
|
||||||
|
@ -404,7 +404,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
|||||||
// the server 'handshake' connection info is cleaned up with the disconnect via timeout/expire.
|
// the server 'handshake' connection info is cleaned up with the disconnect via timeout/expire.
|
||||||
if (isClosed.compareAndSet(expect = false, update = true)) {
|
if (isClosed.compareAndSet(expect = false, update = true)) {
|
||||||
val aeronLogInfo = "${id}/${streamId}"
|
val aeronLogInfo = "${id}/${streamId}"
|
||||||
logger.debug {"[$aeronLogInfo] connection closing"}
|
logger.debug {"[$aeronLogInfo] connection with [$remoteAddressString] closing"}
|
||||||
|
|
||||||
subscription.close()
|
subscription.close()
|
||||||
|
|
||||||
@ -448,7 +448,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
|||||||
closeAction()
|
closeAction()
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug {"[$aeronLogInfo] connection closed"}
|
logger.debug {"[$aeronLogInfo] connection with [$remoteAddressString] closed"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,7 +178,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
try {
|
try {
|
||||||
// VALIDATE:: Check to see if there are already too many clients connected.
|
// VALIDATE:: Check to see if there are already too many clients connected.
|
||||||
if (server.connections.size() >= config.maxClientCount) {
|
if (server.connections.size() >= config.maxClientCount) {
|
||||||
logger.error("[$aeronLogInfo] Connection from $clientAddressString not allowed! Server is full. Max allowed is ${config.maxClientCount}")
|
logger.error("[$aeronLogInfo] Connection from [$clientAddressString] not allowed! Server is full. Max allowed is ${config.maxClientCount}")
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
||||||
@ -196,19 +196,19 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
// decrement it now, since we aren't going to permit this connection (take the extra decrement hit on failure, instead of always)
|
// decrement it now, since we aren't going to permit this connection (take the extra decrement hit on failure, instead of always)
|
||||||
connectionsPerIpCounts.decrement(clientAddress, currentCountForIp)
|
connectionsPerIpCounts.decrement(clientAddress, currentCountForIp)
|
||||||
|
|
||||||
logger.error { "[$aeronLogInfo] Too many connections for IP address $clientAddressString. Max allowed is ${config.maxConnectionsPerIpAddress}" }
|
logger.error { "[$aeronLogInfo] Too many connections for IP address [$clientAddressString]. Max allowed is ${config.maxConnectionsPerIpAddress}" }
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
||||||
HandshakeMessage.error("Too many connections for IP address"))
|
HandshakeMessage.error("Too many connections for IP address"))
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.error(e) { "[$aeronLogInfo] Handshake error!" }
|
logger.error(e) { "[$aeronLogInfo] Handshake error with [$clientAddressString]!" }
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
connectionsPerIpCounts.increment(clientAddress, currentCountForIp)
|
connectionsPerIpCounts.increment(clientAddress, currentCountForIp)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.error(e) { "[$aeronLogInfo] Could not validate client message" }
|
logger.error(e) { "[$aeronLogInfo] Could not validate client message from [$clientAddressString]" }
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
||||||
@ -411,7 +411,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
// VALIDATE:: check to see if the remote connection's public key has changed!
|
// VALIDATE:: check to see if the remote connection's public key has changed!
|
||||||
validateRemoteAddress = server.crypto.validateRemoteAddress(clientAddress, clientAddressString, clientPublicKeyBytes)
|
validateRemoteAddress = server.crypto.validateRemoteAddress(clientAddress, clientAddressString, clientPublicKeyBytes)
|
||||||
if (validateRemoteAddress == PublicKeyValidationState.INVALID) {
|
if (validateRemoteAddress == PublicKeyValidationState.INVALID) {
|
||||||
logger.error { "[$aeronLogInfo] Connection from $clientAddressString not allowed! Public key mismatch." }
|
logger.error { "[$aeronLogInfo] Connection from [$clientAddressString] not allowed! Public key mismatch." }
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -438,7 +438,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
// have to unwind actions!
|
// have to unwind actions!
|
||||||
connectionsPerIpCounts.decrementSlow(clientAddress)
|
connectionsPerIpCounts.decrementSlow(clientAddress)
|
||||||
|
|
||||||
logger.error { "[$aeronLogInfo] Connection from $clientAddressString not allowed! Unable to allocate a session ID for the client connection!" }
|
logger.error { "[$aeronLogInfo] Connection from [$clientAddressString] not allowed! Unable to allocate a session ID for the client connection!" }
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
||||||
@ -458,7 +458,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
connectionsPerIpCounts.decrementSlow(clientAddress)
|
connectionsPerIpCounts.decrementSlow(clientAddress)
|
||||||
sessionIdAllocator.free(connectionSessionId)
|
sessionIdAllocator.free(connectionSessionId)
|
||||||
|
|
||||||
logger.error { "[$aeronLogInfo] Connection from $clientAddressString not allowed! Unable to allocate a stream ID for the client connection!" }
|
logger.error { "[$aeronLogInfo] Connection from [$clientAddressString] not allowed! Unable to allocate a stream ID for the client connection!" }
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
||||||
@ -514,7 +514,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
streamIdAllocator.free(connectionStreamId)
|
streamIdAllocator.free(connectionStreamId)
|
||||||
connection.close()
|
connection.close()
|
||||||
|
|
||||||
logger.error { "[$aeronLogInfo] Connection $clientAddressString was not permitted!" }
|
logger.error { "[$aeronLogInfo] Connection from [$clientAddressString] was not permitted!" }
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
server.writeHandshakeMessage(handshakePublication, aeronLogInfo,
|
||||||
@ -549,7 +549,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
// before we notify connect, we have to wait for the client to tell us that they can receive data
|
// before we notify connect, we have to wait for the client to tell us that they can receive data
|
||||||
pendingConnections[message.connectKey] = connection
|
pendingConnections[message.connectKey] = connection
|
||||||
|
|
||||||
logger.debug { "[$aeronLogInfo - ${message.connectKey}] Connection (${connection.streamId}/${connection.id}) responding to handshake hello." }
|
logger.debug { "[$aeronLogInfo - ${message.connectKey}] Connection (${connection.streamId}/${connection.id}) [$clientAddressString] responding to handshake hello." }
|
||||||
|
|
||||||
// this tells the client all the info to connect.
|
// this tells the client all the info to connect.
|
||||||
server.writeHandshakeMessage(handshakePublication, aeronLogInfo, successMessage) // exception is already caught
|
server.writeHandshakeMessage(handshakePublication, aeronLogInfo, successMessage) // exception is already caught
|
||||||
@ -559,7 +559,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||||||
sessionIdAllocator.free(connectionSessionId)
|
sessionIdAllocator.free(connectionSessionId)
|
||||||
streamIdAllocator.free(connectionStreamId)
|
streamIdAllocator.free(connectionStreamId)
|
||||||
|
|
||||||
logger.error(e) { "[$aeronLogInfo - ${message.connectKey}] Connection (${connection?.streamId}/${connection?.id}) handshake from $clientAddressString crashed! Message $message" }
|
logger.error(e) { "[$aeronLogInfo - ${message.connectKey}] Connection (${connection?.streamId}/${connection?.id}) handshake from [$clientAddressString] crashed! Message $message" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user