Wrapped logger.debug/trace into if statements to prevent the JVM from creating unnecessary lambdas
This commit is contained in:
parent
d895e04af5
commit
81380fe633
|
@ -844,7 +844,9 @@ open class Client<CONNECTION : Connection>(
|
||||||
// finished with the handshake, so always close these!
|
// finished with the handshake, so always close these!
|
||||||
handshakeConnection.close()
|
handshakeConnection.close()
|
||||||
|
|
||||||
logger.debug { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$addressString] done with handshake." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$addressString] done with handshake." }
|
||||||
|
}
|
||||||
|
|
||||||
newConnection.setImage()
|
newConnection.setImage()
|
||||||
|
|
||||||
|
@ -863,7 +865,9 @@ open class Client<CONNECTION : Connection>(
|
||||||
newConnection.poll()
|
newConnection.poll()
|
||||||
} else {
|
} else {
|
||||||
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
||||||
logger.debug { "[${connection}] connection expired (cleanup)" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "[${connection}] connection expired (cleanup)" }
|
||||||
|
}
|
||||||
|
|
||||||
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
|
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
|
||||||
removeConnection(newConnection)
|
removeConnection(newConnection)
|
||||||
|
@ -881,7 +885,9 @@ open class Client<CONNECTION : Connection>(
|
||||||
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
|
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
|
||||||
|
|
||||||
// this can be closed when the connection is remotely closed in ADDITION to manually closing
|
// this can be closed when the connection is remotely closed in ADDITION to manually closing
|
||||||
logger.debug { "Client event dispatch closing..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Client event dispatch closing..." }
|
||||||
|
}
|
||||||
|
|
||||||
// we only need to run shutdown methods if there was a network outage or D/C
|
// we only need to run shutdown methods if there was a network outage or D/C
|
||||||
if (!shutdownInProgress.value) {
|
if (!shutdownInProgress.value) {
|
||||||
|
@ -916,7 +922,7 @@ open class Client<CONNECTION : Connection>(
|
||||||
|
|
||||||
reconnect()
|
reconnect()
|
||||||
}
|
}
|
||||||
} else {
|
} else if (logger.isDebugEnabled) {
|
||||||
logger.debug { "Closed the Network Event Poller..." }
|
logger.debug { "Closed the Network Event Poller..." }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -999,7 +1005,9 @@ open class Client<CONNECTION : Connection>(
|
||||||
fun removeRegisteredServerKey(address: InetAddress) {
|
fun removeRegisteredServerKey(address: InetAddress) {
|
||||||
val savedPublicKey = storage.getRegisteredServerKey(address)
|
val savedPublicKey = storage.getRegisteredServerKey(address)
|
||||||
if (savedPublicKey != null) {
|
if (savedPublicKey != null) {
|
||||||
logger.debug { "Deleting remote IP address key $address" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Deleting remote IP address key $address" }
|
||||||
|
}
|
||||||
storage.removeRegisteredServerKey(address)
|
storage.removeRegisteredServerKey(address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -267,7 +267,7 @@ abstract class Configuration protected constructor() {
|
||||||
.enableRead()
|
.enableRead()
|
||||||
.startBlocking(60, TimeUnit.SECONDS)
|
.startBlocking(60, TimeUnit.SECONDS)
|
||||||
.output
|
.output
|
||||||
.string().trim().also { logger.trace { "Created new disk: $it" } }
|
.string().trim().also { if (logger.isTraceEnabled) { logger.trace { "Created new disk: $it" } } }
|
||||||
|
|
||||||
// diskutil apfs createContainer /dev/disk4
|
// diskutil apfs createContainer /dev/disk4
|
||||||
val lines = dorkbox.executor.Executor()
|
val lines = dorkbox.executor.Executor()
|
||||||
|
@ -288,7 +288,7 @@ abstract class Configuration protected constructor() {
|
||||||
.enableRead()
|
.enableRead()
|
||||||
.startBlocking(60, TimeUnit.SECONDS)
|
.startBlocking(60, TimeUnit.SECONDS)
|
||||||
.output
|
.output
|
||||||
.string().also { logger.trace { it } }
|
.string().also { if (logger.isTraceEnabled) { logger.trace { it } } }
|
||||||
|
|
||||||
// diskutil mount nobrowse "DevShm"
|
// diskutil mount nobrowse "DevShm"
|
||||||
dorkbox.executor.Executor()
|
dorkbox.executor.Executor()
|
||||||
|
@ -297,7 +297,7 @@ abstract class Configuration protected constructor() {
|
||||||
.enableRead()
|
.enableRead()
|
||||||
.startBlocking(60, TimeUnit.SECONDS)
|
.startBlocking(60, TimeUnit.SECONDS)
|
||||||
.output
|
.output
|
||||||
.string().also { logger.trace { it } }
|
.string().also { if (logger.isTraceEnabled) { logger.trace { it } } }
|
||||||
|
|
||||||
// touch /Volumes/RAMDisk/.metadata_never_index
|
// touch /Volumes/RAMDisk/.metadata_never_index
|
||||||
File("${suggestedLocation}/.metadata_never_index").createNewFile()
|
File("${suggestedLocation}/.metadata_never_index").createNewFile()
|
||||||
|
|
|
@ -297,7 +297,9 @@ open class Server<CONNECTION : Connection>(
|
||||||
pollCount += connection.poll()
|
pollCount += connection.poll()
|
||||||
} else {
|
} else {
|
||||||
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
|
||||||
logger.debug { "[${connection}] connection expired (cleanup)" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "[${connection}] connection expired (cleanup)" }
|
||||||
|
}
|
||||||
|
|
||||||
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
|
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
|
||||||
removeConnection(connection)
|
removeConnection(connection)
|
||||||
|
@ -317,7 +319,9 @@ open class Server<CONNECTION : Connection>(
|
||||||
onClose = object : EventCloseOperator {
|
onClose = object : EventCloseOperator {
|
||||||
override fun invoke() {
|
override fun invoke() {
|
||||||
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
|
val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError
|
||||||
logger.debug { "Server event dispatch closing..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Server event dispatch closing..." }
|
||||||
|
}
|
||||||
|
|
||||||
ipcPoller.close()
|
ipcPoller.close()
|
||||||
ipPoller.close()
|
ipPoller.close()
|
||||||
|
@ -368,8 +372,9 @@ open class Server<CONNECTION : Connection>(
|
||||||
endpointIsRunning.lazySet(false)
|
endpointIsRunning.lazySet(false)
|
||||||
pollerClosedLatch.countDown()
|
pollerClosedLatch.countDown()
|
||||||
|
|
||||||
logger.debug { "Closed the Network Event Poller..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Closed the Network Event Poller..." }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -498,13 +498,13 @@ class AeronDriver constructor(config: Configuration, val logger: KLogger, val en
|
||||||
logger.error(this) { "Aeron Driver [$driverId]: error!" }
|
logger.error(this) { "Aeron Driver [$driverId]: error!" }
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logEverything) {
|
if (logEverything && logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: Creating at '${driver.aeronDirectory}'" }
|
logger.debug { "Aeron Driver [$driverId]: Creating at '${driver.aeronDirectory}'" }
|
||||||
}
|
}
|
||||||
|
|
||||||
internal = driver
|
internal = driver
|
||||||
} else {
|
} else {
|
||||||
if (logEverything) {
|
if (logEverything && logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: Reusing driver" }
|
logger.debug { "Aeron Driver [$driverId]: Reusing driver" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -235,7 +235,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
val isLoaded = mediaDriver != null && aeron != null && aeron?.isClosed == false
|
val isLoaded = mediaDriver != null && aeron != null && aeron?.isClosed == false
|
||||||
if (isLoaded) {
|
if (isLoaded) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: Already running... Not starting again." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Already running... Not starting again." }
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,7 +249,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
// wait for a bit, because we are running, but we ALSO issued a START, and expect it to start.
|
// wait for a bit, because we are running, but we ALSO issued a START, and expect it to start.
|
||||||
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
|
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
|
||||||
// that instance
|
// that instance
|
||||||
logger.debug { "Aeron Driver [$driverId]: Already running. Double checking status..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Already running. Double checking status..." }
|
||||||
|
}
|
||||||
Thread.sleep(context.driverTimeout / 2)
|
Thread.sleep(context.driverTimeout / 2)
|
||||||
running = isRunning()
|
running = isRunning()
|
||||||
}
|
}
|
||||||
|
@ -258,14 +262,16 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
while (count-- > 0) {
|
while (count-- > 0) {
|
||||||
try {
|
try {
|
||||||
mediaDriver = MediaDriver.launch(context.context)
|
mediaDriver = MediaDriver.launch(context.context)
|
||||||
logger.debug { "Aeron Driver [$driverId]: Successfully started" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Successfully started" }
|
||||||
|
}
|
||||||
break
|
break
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.warn(e) { "Aeron Driver [$driverId]: Unable to start at ${context.directory}. Retrying $count more times..." }
|
logger.warn(e) { "Aeron Driver [$driverId]: Unable to start at ${context.directory}. Retrying $count more times..." }
|
||||||
Thread.sleep(context.driverTimeout)
|
Thread.sleep(context.driverTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else if (logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: Not starting. It was already running." }
|
logger.debug { "Aeron Driver [$driverId]: Not starting. It was already running." }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,7 +303,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
// this might succeed if we can connect to the media driver
|
// this might succeed if we can connect to the media driver
|
||||||
aeron = Aeron.connect(aeronDriverContext)
|
aeron = Aeron.connect(aeronDriverContext)
|
||||||
logger.debug { "Aeron Driver [$driverId]: Connected to '${context.directory}'" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Connected to '${context.directory}'" }
|
||||||
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -378,13 +386,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
if (!hasDelay) {
|
if (!hasDelay) {
|
||||||
hasDelay = true
|
hasDelay = true
|
||||||
logger.debug { "Aeron Driver [$driverId]: Delaying creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Delaying creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// the publication has not ACTUALLY been created yet!
|
// the publication has not ACTUALLY been created yet!
|
||||||
Thread.sleep(AERON_PUB_SUB_TIMEOUT)
|
Thread.sleep(AERON_PUB_SUB_TIMEOUT)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasDelay) {
|
if (hasDelay && logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: Delayed creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
|
logger.debug { "Aeron Driver [$driverId]: Delayed creation of publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,7 +404,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
registeredPublicationsTrace.add(publication.registrationId())
|
registeredPublicationsTrace.add(publication.registrationId())
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace { "Aeron Driver [$driverId]: Creating publication [$logInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}, channel=${publication.channel()}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Aeron Driver [$driverId]: Creating publication [$logInfo] :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}, channel=${publication.channel()}" }
|
||||||
|
}
|
||||||
return publication
|
return publication
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +486,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
if (!hasDelay) {
|
if (!hasDelay) {
|
||||||
hasDelay = true
|
hasDelay = true
|
||||||
logger.debug { "Aeron Driver [$driverId]: Delaying creation of ex-publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Delaying creation of ex-publication [$logInfo] :: sessionId=${publicationUri.sessionId()}, streamId=${streamId}" }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// the publication has not ACTUALLY been created yet!
|
// the publication has not ACTUALLY been created yet!
|
||||||
Thread.sleep(AERON_PUB_SUB_TIMEOUT)
|
Thread.sleep(AERON_PUB_SUB_TIMEOUT)
|
||||||
|
@ -489,7 +503,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
registeredPublicationsTrace.add(publication.registrationId())
|
registeredPublicationsTrace.add(publication.registrationId())
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace { "Aeron Driver [$driverId]: Creating ex-publication $logInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}, channel=${publication.channel()}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Aeron Driver [$driverId]: Creating ex-publication $logInfo :: regId=${publication.registrationId()}, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}, channel=${publication.channel()}" }
|
||||||
|
}
|
||||||
return publication
|
return publication
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -570,13 +586,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
if (!hasDelay) {
|
if (!hasDelay) {
|
||||||
hasDelay = true
|
hasDelay = true
|
||||||
logger.debug { "Aeron Driver [$driverId]: Delaying creation of subscription [$logInfo] :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Delaying creation of subscription [$logInfo] :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// the subscription has not ACTUALLY been created yet!
|
// the subscription has not ACTUALLY been created yet!
|
||||||
Thread.sleep(AERON_PUB_SUB_TIMEOUT)
|
Thread.sleep(AERON_PUB_SUB_TIMEOUT)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasDelay) {
|
if (hasDelay && logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: Delayed creation of subscription [$logInfo] :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }
|
logger.debug { "Aeron Driver [$driverId]: Delayed creation of subscription [$logInfo] :: sessionId=${subscriptionUri.sessionId()}, streamId=${streamId}" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,7 +603,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
registeredSubscriptionsTrace.add(subscription.registrationId())
|
registeredSubscriptionsTrace.add(subscription.registrationId())
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace { "Aeron Driver [$driverId]: Creating subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}, channel=${subscription.channel()}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Aeron Driver [$driverId]: Creating subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscriptionUri.sessionId()}, streamId=${subscription.streamId()}, channel=${subscription.channel()}" }
|
||||||
|
}
|
||||||
return subscription
|
return subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -601,7 +621,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
val registrationId = publication.registrationId()
|
val registrationId = publication.registrationId()
|
||||||
|
|
||||||
logger.trace { "Aeron Driver [$driverId]: Closing $name file [$logInfo] :: regId=$registrationId, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Aeron Driver [$driverId]: Closing $name file [$logInfo] :: regId=$registrationId, sessionId=${publication.sessionId()}, streamId=${publication.streamId()}" }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
val aeron1 = aeron
|
val aeron1 = aeron
|
||||||
|
@ -643,7 +665,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
* Guarantee that the publication is closed AND the backing file is removed
|
* Guarantee that the publication is closed AND the backing file is removed
|
||||||
*/
|
*/
|
||||||
fun close(subscription: Subscription, logger: KLogger, logInfo: String) = stateLock.write {
|
fun close(subscription: Subscription, logger: KLogger, logInfo: String) = stateLock.write {
|
||||||
logger.trace { "Aeron Driver [$driverId]: Closing subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Aeron Driver [$driverId]: Closing subscription [$logInfo] :: regId=${subscription.registrationId()}, sessionId=${subscription.images().firstOrNull()?.sessionId()}, streamId=${subscription.streamId()}" }
|
||||||
|
}
|
||||||
|
|
||||||
val aeron1 = aeron
|
val aeron1 = aeron
|
||||||
if (aeron1 == null || aeron1.isClosed) {
|
if (aeron1 == null || aeron1.isClosed) {
|
||||||
|
@ -690,7 +714,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
// only emit the log info once. It's rather spammy otherwise!
|
// only emit the log info once. It's rather spammy otherwise!
|
||||||
if (!didLog) {
|
if (!didLog) {
|
||||||
didLog = true
|
didLog = true
|
||||||
logger.debug { "Aeron Driver [$driverId]: Still running (${aeronDirectory}). Waiting for it to stop..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Still running (${aeronDirectory}). Waiting for it to stop..." }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(intervalTimeoutMS)
|
Thread.sleep(intervalTimeoutMS)
|
||||||
}
|
}
|
||||||
|
@ -718,7 +744,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
fun isInUse(endPoint: EndPoint<*>?, logger: KLogger): Boolean {
|
fun isInUse(endPoint: EndPoint<*>?, logger: KLogger): Boolean {
|
||||||
// as many "sort-cuts" as we can for checking if the current Aeron Driver/client is still in use
|
// as many "sort-cuts" as we can for checking if the current Aeron Driver/client is still in use
|
||||||
if (!isRunning()) {
|
if (!isRunning()) {
|
||||||
logger.trace { "Aeron Driver [$driverId]: not running" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Aeron Driver [$driverId]: not running" }
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -726,8 +754,10 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
if (logger.isTraceEnabled) {
|
if (logger.isTraceEnabled) {
|
||||||
val elements = registeredPublicationsTrace.elements
|
val elements = registeredPublicationsTrace.elements
|
||||||
val joined = elements.joinToString()
|
val joined = elements.joinToString()
|
||||||
logger.debug { "Aeron Driver [$driverId]: has [$joined] publications (${registeredPublications.value} total)" }
|
if (logger.isDebugEnabled) {
|
||||||
} else {
|
logger.debug { "Aeron Driver [$driverId]: has [$joined] publications (${registeredPublications.value} total)" }
|
||||||
|
}
|
||||||
|
} else if (logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: has publications (${registeredPublications.value} total)" }
|
logger.debug { "Aeron Driver [$driverId]: has publications (${registeredPublications.value} total)" }
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
@ -737,15 +767,19 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
if (logger.isTraceEnabled) {
|
if (logger.isTraceEnabled) {
|
||||||
val elements = registeredSubscriptionsTrace.elements
|
val elements = registeredSubscriptionsTrace.elements
|
||||||
val joined = elements.joinToString()
|
val joined = elements.joinToString()
|
||||||
logger.debug { "Aeron Driver [$driverId]: has [$joined] subscriptions (${registeredSubscriptions.value} total)" }
|
if (logger.isDebugEnabled) {
|
||||||
} else {
|
logger.debug { "Aeron Driver [$driverId]: has [$joined] subscriptions (${registeredSubscriptions.value} total)" }
|
||||||
|
}
|
||||||
|
} else if (logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: has subscriptions (${registeredSubscriptions.value} total)" }
|
logger.debug { "Aeron Driver [$driverId]: has subscriptions (${registeredSubscriptions.value} total)" }
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if (endPointUsages.size() > 1 && !endPointUsages.contains(endPoint)) {
|
if (endPointUsages.size() > 1 && !endPointUsages.contains(endPoint)) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: still referenced by ${endPointUsages.size()} endpoints" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: still referenced by ${endPointUsages.size()} endpoints" }
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -763,7 +797,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
var count = 3
|
var count = 3
|
||||||
|
|
||||||
while (count > 0 && currentUsage > 0) {
|
while (count > 0 && currentUsage > 0) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: in use, double checking status" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: in use, double checking status" }
|
||||||
|
}
|
||||||
delayLingerTimeout()
|
delayLingerTimeout()
|
||||||
currentUsage = driverBacklog()?.snapshot()?.size ?: 0
|
currentUsage = driverBacklog()?.snapshot()?.size ?: 0
|
||||||
count--
|
count--
|
||||||
|
@ -776,13 +812,15 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
count = 3
|
count = 3
|
||||||
while (count > 0 && currentUsage > 0) {
|
while (count > 0 && currentUsage > 0) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: in use, double checking status (long)" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: in use, double checking status (long)" }
|
||||||
|
}
|
||||||
delayDriverTimeout()
|
delayDriverTimeout()
|
||||||
currentUsage = driverBacklog()?.snapshot()?.size ?: 0
|
currentUsage = driverBacklog()?.snapshot()?.size ?: 0
|
||||||
count--
|
count--
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentUsage > 0) {
|
if (currentUsage > 0 && logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: usage is: $currentUsage" }
|
logger.debug { "Aeron Driver [$driverId]: usage is: $currentUsage" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -802,7 +840,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
endPointUsages.remove(endPoint)
|
endPointUsages.remove(endPoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace { "Aeron Driver [$driverId]: Requested close... (${endPointUsages.size()} endpoints still in use)" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Aeron Driver [$driverId]: Requested close... (${endPointUsages.size()} endpoints still in use)" }
|
||||||
|
}
|
||||||
|
|
||||||
// ignore the extra driver checks, because in SOME situations, when trying to reconnect upon an error, the
|
// ignore the extra driver checks, because in SOME situations, when trying to reconnect upon an error, the
|
||||||
if (isInUse(endPoint, logger)) {
|
if (isInUse(endPoint, logger)) {
|
||||||
|
@ -810,7 +850,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
// driver gets into a bad state. When this happens, we have to ignore "are we already in use" checks, BECAUSE the driver is now corrupted and unusable!
|
// driver gets into a bad state. When this happens, we have to ignore "are we already in use" checks, BECAUSE the driver is now corrupted and unusable!
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logger.debug { "Aeron Driver [$driverId]: in use, not shutting down this instance." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: in use, not shutting down this instance." }
|
||||||
|
}
|
||||||
|
|
||||||
// reset our contextDefine value, so that this configuration can safely be reused
|
// reset our contextDefine value, so that this configuration can safely be reused
|
||||||
endPoint?.config?.contextDefined = false
|
endPoint?.config?.contextDefined = false
|
||||||
|
@ -820,14 +862,17 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
val removed = AeronDriver.driverConfigurations[driverId]
|
val removed = AeronDriver.driverConfigurations[driverId]
|
||||||
if (removed == null) {
|
if (removed == null) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: already closed. Ignoring close request." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: already closed. Ignoring close request." }
|
||||||
|
}
|
||||||
// reset our contextDefine value, so that this configuration can safely be reused
|
// reset our contextDefine value, so that this configuration can safely be reused
|
||||||
endPoint?.config?.contextDefined = false
|
endPoint?.config?.contextDefined = false
|
||||||
return@write false
|
return@write false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: Closing..." }
|
logger.debug { "Aeron Driver [$driverId]: Closing..." }
|
||||||
|
}
|
||||||
|
|
||||||
// we have to assign context BEFORE we close, because the `getter` for context will create it if necessary
|
// we have to assign context BEFORE we close, because the `getter` for context will create it if necessary
|
||||||
val aeronContext = context
|
val aeronContext = context
|
||||||
|
@ -847,14 +892,18 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
|
|
||||||
|
|
||||||
if (mediaDriver == null) {
|
if (mediaDriver == null) {
|
||||||
logger.debug { "Aeron Driver [$driverId]: No driver started, not stopping driver or context." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: No driver started, not stopping driver or context." }
|
||||||
|
}
|
||||||
|
|
||||||
// reset our contextDefine value, so that this configuration can safely be reused
|
// reset our contextDefine value, so that this configuration can safely be reused
|
||||||
endPoint?.config?.contextDefined = false
|
endPoint?.config?.contextDefined = false
|
||||||
return@write false
|
return@write false
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug { "Aeron Driver [$driverId]: Stopping driver at '${driverDirectory}'..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Stopping driver at '${driverDirectory}'..." }
|
||||||
|
}
|
||||||
|
|
||||||
// if we are the ones that started the media driver, then we must be the ones to close it
|
// if we are the ones that started the media driver, then we must be the ones to close it
|
||||||
try {
|
try {
|
||||||
|
@ -933,7 +982,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
// actually remove it, since we've passed all the checks to guarantee it's closed...
|
// actually remove it, since we've passed all the checks to guarantee it's closed...
|
||||||
AeronDriver.driverConfigurations.remove(driverId)
|
AeronDriver.driverConfigurations.remove(driverId)
|
||||||
|
|
||||||
logger.debug { "Aeron Driver [$driverId]: Closed the media driver at '${driverDirectory}'" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Aeron Driver [$driverId]: Closed the media driver at '${driverDirectory}'" }
|
||||||
|
}
|
||||||
closed = true
|
closed = true
|
||||||
|
|
||||||
return@write true
|
return@write true
|
||||||
|
@ -973,7 +1024,9 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
|
||||||
*/
|
*/
|
||||||
fun deleteLogFile(image: Image) {
|
fun deleteLogFile(image: Image) {
|
||||||
val file = getMediaDriverFile(image)
|
val file = getMediaDriverFile(image)
|
||||||
driverLogger.debug { "Deleting log file: $image" }
|
if (driverLogger.isDebugEnabled) {
|
||||||
|
driverLogger.debug { "Deleting log file: $image" }
|
||||||
|
}
|
||||||
file.delete()
|
file.delete()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -83,11 +83,15 @@ internal class EventPoller {
|
||||||
|
|
||||||
fun configure(logger: KLogger, config: Configuration, endPoint: EndPoint<*>) {
|
fun configure(logger: KLogger, config: Configuration, endPoint: EndPoint<*>) {
|
||||||
lock.write {
|
lock.write {
|
||||||
logger.debug { "Initializing the Network Event Poller..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Initializing the Network Event Poller..." }
|
||||||
|
}
|
||||||
configureEventsEndpoints.add(ByteArrayWrapper.wrap(endPoint.storage.publicKey))
|
configureEventsEndpoints.add(ByteArrayWrapper.wrap(endPoint.storage.publicKey))
|
||||||
|
|
||||||
if (!configured) {
|
if (!configured) {
|
||||||
logger.trace { "Configuring the Network Event Poller..." }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Configuring the Network Event Poller..." }
|
||||||
|
}
|
||||||
|
|
||||||
delayClose = false
|
delayClose = false
|
||||||
running = true
|
running = true
|
||||||
|
@ -205,7 +209,9 @@ internal class EventPoller {
|
||||||
}
|
}
|
||||||
|
|
||||||
lock.write {
|
lock.write {
|
||||||
logger.debug { "Requesting close for the Network Event Poller..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Requesting close for the Network Event Poller..." }
|
||||||
|
}
|
||||||
|
|
||||||
// ONLY if there are no more poll-events do we ACTUALLY shut down.
|
// ONLY if there are no more poll-events do we ACTUALLY shut down.
|
||||||
// when an endpoint closes its polling, it will automatically be removed from this datastructure.
|
// when an endpoint closes its polling, it will automatically be removed from this datastructure.
|
||||||
|
@ -220,19 +226,25 @@ internal class EventPoller {
|
||||||
if (running && sEvents == 0 && cEvents == 0) {
|
if (running && sEvents == 0 && cEvents == 0) {
|
||||||
when (pEvents) {
|
when (pEvents) {
|
||||||
0 -> {
|
0 -> {
|
||||||
logger.debug { "Closing the Network Event Poller..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Closing the Network Event Poller..." }
|
||||||
|
}
|
||||||
doClose(logger)
|
doClose(logger)
|
||||||
}
|
}
|
||||||
1 -> {
|
1 -> {
|
||||||
// this means we are trying to close on our poll event, and obviously it won't work.
|
// this means we are trying to close on our poll event, and obviously it won't work.
|
||||||
logger.debug { "Delayed closing the Network Event Poller..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Delayed closing the Network Event Poller..." }
|
||||||
|
}
|
||||||
delayClose = true
|
delayClose = true
|
||||||
}
|
}
|
||||||
else -> {
|
else -> {
|
||||||
logger.debug { "Not closing the Network Event Poller... (isRunning=$running submitEvents=$sEvents configureEvents=${cEvents} pollEvents=$pEvents)" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Not closing the Network Event Poller... (isRunning=$running submitEvents=$sEvents configureEvents=${cEvents} pollEvents=$pEvents)" }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else if (logger.isDebugEnabled) {
|
||||||
logger.debug { "Not closing the Network Event Poller... (isRunning=$running submitEvents=$sEvents configureEvents=${cEvents} pollEvents=$pEvents)" }
|
logger.debug { "Not closing the Network Event Poller... (isRunning=$running submitEvents=$sEvents configureEvents=${cEvents} pollEvents=$pEvents)" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,7 +199,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
||||||
if (logger.isTraceEnabled) {
|
if (logger.isTraceEnabled) {
|
||||||
// The handshake sessionId IS NOT globally unique
|
// The handshake sessionId IS NOT globally unique
|
||||||
// don't automatically create the lambda when trace is disabled! Because this uses 'outside' scoped info, it's a new lambda each time!
|
// don't automatically create the lambda when trace is disabled! Because this uses 'outside' scoped info, it's a new lambda each time!
|
||||||
logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[$toString0] send: ${message.javaClass.simpleName} : $message" }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return endPoint.write(message, publication, sendIdleStrategy, this@Connection, maxMessageSize, abortEarly)
|
return endPoint.write(message, publication, sendIdleStrategy, this@Connection, maxMessageSize, abortEarly)
|
||||||
}
|
}
|
||||||
|
@ -342,7 +344,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug {"[$toString0] connection closing"}
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug {"[$toString0] connection closing"}
|
||||||
|
}
|
||||||
|
|
||||||
// on close, we want to make sure this file is DELETED!
|
// on close, we want to make sure this file is DELETED!
|
||||||
endPoint.aeronDriver.close(subscription, toString0)
|
endPoint.aeronDriver.close(subscription, toString0)
|
||||||
|
@ -350,7 +354,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
||||||
// notify the remote endPoint that we are closing
|
// notify the remote endPoint that we are closing
|
||||||
// we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back)
|
// we send this AFTER we close our subscription (so that no more messages will be received, when the remote end ping-pong's this message back)
|
||||||
if (sendDisconnectMessage && publication.isConnected) {
|
if (sendDisconnectMessage && publication.isConnected) {
|
||||||
logger.trace { "Sending disconnect message to ${endPoint.otherTypeName}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Sending disconnect message to ${endPoint.otherTypeName}" }
|
||||||
|
}
|
||||||
|
|
||||||
// sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok)
|
// sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok)
|
||||||
send(DisconnectMessage.INSTANCE, true)
|
send(DisconnectMessage.INSTANCE, true)
|
||||||
|
@ -372,7 +378,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
||||||
val connection = this
|
val connection = this
|
||||||
endPoint.ifServer {
|
endPoint.ifServer {
|
||||||
// clean up the resources associated with this connection when it's closed
|
// clean up the resources associated with this connection when it's closed
|
||||||
logger.debug { "[${connection}] freeing resources" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "[${connection}] freeing resources" }
|
||||||
|
}
|
||||||
sessionIdAllocator.free(info.sessionIdPub)
|
sessionIdAllocator.free(info.sessionIdPub)
|
||||||
sessionIdAllocator.free(info.sessionIdSub)
|
sessionIdAllocator.free(info.sessionIdSub)
|
||||||
|
|
||||||
|
@ -385,7 +393,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug {"[$toString0] connection closed"}
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug {"[$toString0] connection closed"}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -521,7 +521,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||||
// since ANY thread can call 'send', we have to take kryo instances in a safe way
|
// since ANY thread can call 'send', we have to take kryo instances in a safe way
|
||||||
val kryo = serialization.take()
|
val kryo = serialization.take()
|
||||||
try {
|
try {
|
||||||
val buffer = kryo.write(connection, message)
|
val buffer = kryo.write(connection, message)
|
||||||
val objectSize = buffer.position()
|
val objectSize = buffer.position()
|
||||||
val internalBuffer = buffer.internalBuffer
|
val internalBuffer = buffer.internalBuffer
|
||||||
|
|
||||||
|
@ -614,7 +614,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||||
// the remote endPoint will send this message if it is closing the connection.
|
// the remote endPoint will send this message if it is closing the connection.
|
||||||
// IF we get this message in time, then we do not have to wait for the connection to expire before closing it
|
// IF we get this message in time, then we do not have to wait for the connection to expire before closing it
|
||||||
is DisconnectMessage -> {
|
is DisconnectMessage -> {
|
||||||
logger.debug { "Received disconnect message from $otherTypeName" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Received disconnect message from $otherTypeName" }
|
||||||
|
}
|
||||||
connection.close(sendDisconnectMessage = false,
|
connection.close(sendDisconnectMessage = false,
|
||||||
notifyDisconnect = true)
|
notifyDisconnect = true)
|
||||||
}
|
}
|
||||||
|
@ -657,7 +659,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||||
*
|
*
|
||||||
* THIS IS PROCESSED ON MULTIPLE THREADS!
|
* THIS IS PROCESSED ON MULTIPLE THREADS!
|
||||||
*/
|
*/
|
||||||
private suspend inline fun processMessageFromChannel(connection: CONNECTION, message: Any) {
|
internal fun processMessageFromChannel(connection: CONNECTION, message: Any) {
|
||||||
when (message) {
|
when (message) {
|
||||||
is Ping -> {
|
is Ping -> {
|
||||||
// PING will also measure APP latency, not just NETWORK PIPE latency
|
// PING will also measure APP latency, not just NETWORK PIPE latency
|
||||||
|
@ -883,13 +885,17 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||||
notifyDisconnect: Boolean,
|
notifyDisconnect: Boolean,
|
||||||
releaseWaitingThreads: Boolean)
|
releaseWaitingThreads: Boolean)
|
||||||
{
|
{
|
||||||
logger.debug { "Requesting close: closeEverything=$closeEverything, releaseWaitingThreads=$releaseWaitingThreads" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Requesting close: closeEverything=$closeEverything, releaseWaitingThreads=$releaseWaitingThreads" }
|
||||||
|
}
|
||||||
|
|
||||||
// 1) endpoints can call close()
|
// 1) endpoints can call close()
|
||||||
// 2) client can close the endpoint if the connection is D/C from aeron (and the endpoint was not closed manually)
|
// 2) client can close the endpoint if the connection is D/C from aeron (and the endpoint was not closed manually)
|
||||||
val shutdownPreviouslyStarted = shutdownInProgress.getAndSet(true)
|
val shutdownPreviouslyStarted = shutdownInProgress.getAndSet(true)
|
||||||
if (closeEverything && shutdownPreviouslyStarted) {
|
if (closeEverything && shutdownPreviouslyStarted) {
|
||||||
logger.debug { "Shutdown previously started, cleaning up..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Shutdown previously started, cleaning up..." }
|
||||||
|
}
|
||||||
// this is only called when the client network event poller shuts down
|
// this is only called when the client network event poller shuts down
|
||||||
// if we have clientConnectionClosed, then run that logic (because it doesn't run on the client when the connection is closed remotely)
|
// if we have clientConnectionClosed, then run that logic (because it doesn't run on the client when the connection is closed remotely)
|
||||||
|
|
||||||
|
@ -904,7 +910,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shutdownPreviouslyStarted) {
|
if (shutdownPreviouslyStarted) {
|
||||||
logger.debug { "Shutdown previously started, ignoring..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Shutdown previously started, ignoring..." }
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -917,7 +925,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||||
}
|
}
|
||||||
|
|
||||||
EventDispatcher.CLOSE.launch {
|
EventDispatcher.CLOSE.launch {
|
||||||
logger.debug { "Shutting down endpoint..." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Shutting down endpoint..." }
|
||||||
|
}
|
||||||
|
|
||||||
// always do this. It is OK to run this multiple times
|
// always do this. It is OK to run this multiple times
|
||||||
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
|
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
|
||||||
|
@ -969,7 +979,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
||||||
shutdownInProgress.lazySet(false)
|
shutdownInProgress.lazySet(false)
|
||||||
|
|
||||||
if (releaseWaitingThreads) {
|
if (releaseWaitingThreads) {
|
||||||
logger.trace { "Counting down the close latch..." }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Counting down the close latch..." }
|
||||||
|
}
|
||||||
closeLatch.countDown()
|
closeLatch.countDown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,9 +130,13 @@ enum class EventDispatcher {
|
||||||
if (DEBUG_EVENTS) {
|
if (DEBUG_EVENTS) {
|
||||||
val id = traceId.getAndIncrement()
|
val id = traceId.getAndIncrement()
|
||||||
executors[eventId].submit {
|
executors[eventId].submit {
|
||||||
logger.debug { "Starting $event : $id" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Starting $event : $id" }
|
||||||
|
}
|
||||||
function()
|
function()
|
||||||
logger.debug { "Finished $event : $id" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Finished $event : $id" }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
executors[eventId].submit(function)
|
executors[eventId].submit(function)
|
||||||
|
|
|
@ -551,7 +551,9 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
|
||||||
*/
|
*/
|
||||||
fun close() {
|
fun close() {
|
||||||
// we have to follow the single-writer principle!
|
// we have to follow the single-writer principle!
|
||||||
logger.debug { "Closing the listener manager" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Closing the listener manager" }
|
||||||
|
}
|
||||||
|
|
||||||
onConnectFilterLock.write {
|
onConnectFilterLock.write {
|
||||||
onConnectFilterList = Array(0) { { true } }
|
onConnectFilterList = Array(0) { { true } }
|
||||||
|
|
|
@ -91,7 +91,8 @@ internal class ClientHandshake<CONNECTION: Connection>(
|
||||||
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
|
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
|
||||||
if (msg !is HandshakeMessage) {
|
if (msg !is HandshakeMessage) {
|
||||||
throw ClientRejectedException("[$logInfo] Connection not allowed! unrecognized message: $msg") .apply { cleanAllStackTrace() }
|
throw ClientRejectedException("[$logInfo] Connection not allowed! unrecognized message: $msg") .apply { cleanAllStackTrace() }
|
||||||
} else {
|
} else if (logger.isTraceEnabled) {
|
||||||
|
|
||||||
logger.trace { "[$logInfo] (${msg.connectKey}) received HS: $msg" }
|
logger.trace { "[$logInfo] (${msg.connectKey}) received HS: $msg" }
|
||||||
}
|
}
|
||||||
msg
|
msg
|
||||||
|
|
|
@ -67,7 +67,9 @@ internal class ClientHandshakeDriver(
|
||||||
reliable: Boolean,
|
reliable: Boolean,
|
||||||
logger: KLogger
|
logger: KLogger
|
||||||
): ClientHandshakeDriver {
|
): ClientHandshakeDriver {
|
||||||
logger.trace { "Starting client handshake" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Starting client handshake" }
|
||||||
|
}
|
||||||
|
|
||||||
var isUsingIPC = false
|
var isUsingIPC = false
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,9 @@ internal class Handshaker<CONNECTION : Connection>(
|
||||||
@Suppress("DuplicatedCode")
|
@Suppress("DuplicatedCode")
|
||||||
internal fun writeMessage(publication: Publication, logInfo: String, message: HandshakeMessage): Boolean {
|
internal fun writeMessage(publication: Publication, logInfo: String, message: HandshakeMessage): Boolean {
|
||||||
// The handshake sessionId IS NOT globally unique
|
// The handshake sessionId IS NOT globally unique
|
||||||
logger.trace { "[$logInfo] (${message.connectKey}) send HS: $message" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[$logInfo] (${message.connectKey}) send HS: $message" }
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
val buffer = handshakeWriteKryo.write(message)
|
val buffer = handshakeWriteKryo.write(message)
|
||||||
|
|
|
@ -78,7 +78,9 @@ class RandomId65kAllocator(private val min: Int, max: Int) {
|
||||||
|
|
||||||
val count = assigned.incrementAndGet()
|
val count = assigned.incrementAndGet()
|
||||||
val id = cache.take()
|
val id = cache.take()
|
||||||
logger.trace { "Allocating $id (total $count)" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Allocating $id (total $count)" }
|
||||||
|
}
|
||||||
return id
|
return id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +94,9 @@ class RandomId65kAllocator(private val min: Int, max: Int) {
|
||||||
if (assigned < 0) {
|
if (assigned < 0) {
|
||||||
throw AllocationException("Unequal allocate/free method calls attempting to free [$id] (too many 'free' calls).")
|
throw AllocationException("Unequal allocate/free method calls attempting to free [$id] (too many 'free' calls).")
|
||||||
}
|
}
|
||||||
logger.trace { "Freeing $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Freeing $id" }
|
||||||
|
}
|
||||||
cache.put(id)
|
cache.put(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -130,7 +130,9 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server is the "source", client mirrors the server
|
// Server is the "source", client mirrors the server
|
||||||
logger.debug { "[${existingConnection}] (${message.connectKey}) Connection done with handshake." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "[${existingConnection}] (${message.connectKey}) Connection done with handshake." }
|
||||||
|
}
|
||||||
|
|
||||||
existingConnection.setImage()
|
existingConnection.setImage()
|
||||||
|
|
||||||
|
@ -381,7 +383,9 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
||||||
// before we notify connect, we have to wait for the client to tell us that they can receive data
|
// before we notify connect, we have to wait for the client to tell us that they can receive data
|
||||||
pendingConnections[message.connectKey] = connection
|
pendingConnections[message.connectKey] = connection
|
||||||
|
|
||||||
logger.debug { "[$logInfo] (${message.connectKey}) Connection (${connection.id}) responding to handshake hello." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "[$logInfo] (${message.connectKey}) Connection (${connection.id}) responding to handshake hello." }
|
||||||
|
}
|
||||||
|
|
||||||
// this tells the client all the info to connect.
|
// this tells the client all the info to connect.
|
||||||
handshaker.writeMessage(handshakePublication, logInfo, successMessage) // exception is already caught!
|
handshaker.writeMessage(handshakePublication, logInfo, successMessage) // exception is already caught!
|
||||||
|
@ -618,7 +622,9 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
||||||
// before we notify connect, we have to wait for the client to tell us that they can receive data
|
// before we notify connect, we have to wait for the client to tell us that they can receive data
|
||||||
pendingConnections[message.connectKey] = connection
|
pendingConnections[message.connectKey] = connection
|
||||||
|
|
||||||
logger.debug { "[$logInfo] (${message.connectKey}) Connection (${connection.id}) responding to handshake hello." }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "[$logInfo] (${message.connectKey}) Connection (${connection.id}) responding to handshake hello." }
|
||||||
|
}
|
||||||
|
|
||||||
// this tells the client all the info to connect.
|
// this tells the client all the info to connect.
|
||||||
handshaker.writeMessage(handshakePublication, logInfo, successMessage) // exception is already caught
|
handshaker.writeMessage(handshakePublication, logInfo, successMessage) // exception is already caught
|
||||||
|
|
|
@ -97,7 +97,7 @@ internal object ServerHandshakePollers {
|
||||||
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
|
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
|
||||||
if (msg !is HandshakeMessage) {
|
if (msg !is HandshakeMessage) {
|
||||||
throw ServerHandshakeException("[$logInfo] Connection not allowed! unrecognized message: $msg")
|
throw ServerHandshakeException("[$logInfo] Connection not allowed! unrecognized message: $msg")
|
||||||
} else {
|
} else if (logger.isTraceEnabled) {
|
||||||
logger.trace { "[$logInfo] (${msg.connectKey}) received HS: $msg" }
|
logger.trace { "[$logInfo] (${msg.connectKey}) received HS: $msg" }
|
||||||
}
|
}
|
||||||
msg
|
msg
|
||||||
|
@ -318,7 +318,7 @@ internal object ServerHandshakePollers {
|
||||||
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
|
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
|
||||||
if (msg !is HandshakeMessage) {
|
if (msg !is HandshakeMessage) {
|
||||||
throw ServerHandshakeException("[$logInfo] Connection not allowed! unrecognized message: $msg")
|
throw ServerHandshakeException("[$logInfo] Connection not allowed! unrecognized message: $msg")
|
||||||
} else {
|
} else if (logger.isTraceEnabled) {
|
||||||
logger.trace { "[$logInfo] (${msg.connectKey}) received HS: $msg" }
|
logger.trace { "[$logInfo] (${msg.connectKey}) received HS: $msg" }
|
||||||
}
|
}
|
||||||
msg
|
msg
|
||||||
|
|
|
@ -204,7 +204,9 @@ internal class RemoteObjectStorage(val logger: KLogger) {
|
||||||
if (nextObjectId != INVALID_RMI) {
|
if (nextObjectId != INVALID_RMI) {
|
||||||
objectMap.put(nextObjectId, `object`)
|
objectMap.put(nextObjectId, `object`)
|
||||||
|
|
||||||
logger.trace { "Remote object <proxy:$nextObjectId> registered with .toString() = '${`object`}'" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Remote object <proxy:$nextObjectId> registered with .toString() = '${`object`}'" }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nextObjectId
|
return nextObjectId
|
||||||
|
@ -222,7 +224,9 @@ internal class RemoteObjectStorage(val logger: KLogger) {
|
||||||
|
|
||||||
objectMap.put(objectId, `object`)
|
objectMap.put(objectId, `object`)
|
||||||
|
|
||||||
logger.trace { "Remote object <proxy:$objectId> registered with .toString() = '${`object`}'" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Remote object <proxy:$objectId> registered with .toString() = '${`object`}'" }
|
||||||
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -237,7 +241,9 @@ internal class RemoteObjectStorage(val logger: KLogger) {
|
||||||
val rmiObject = objectMap.remove(objectId) as T?
|
val rmiObject = objectMap.remove(objectId) as T?
|
||||||
returnId(objectId)
|
returnId(objectId)
|
||||||
|
|
||||||
logger.trace { "Object <proxy #${objectId}> removed" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Object <proxy #${objectId}> removed" }
|
||||||
|
}
|
||||||
return rmiObject
|
return rmiObject
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,7 +258,9 @@ internal class RemoteObjectStorage(val logger: KLogger) {
|
||||||
} else {
|
} else {
|
||||||
returnId(objectId)
|
returnId(objectId)
|
||||||
|
|
||||||
logger.trace { "Object '${remoteObject}' (ID: ${objectId}) removed from RMI system." }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "Object '${remoteObject}' (ID: ${objectId}) removed from RMI system." }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
* NOTE: async RMI will never call this (because async doesn't return a response)
|
* NOTE: async RMI will never call this (because async doesn't return a response)
|
||||||
*/
|
*/
|
||||||
fun notifyWaiter(id: Int, result: Any?, logger: KLogger) {
|
fun notifyWaiter(id: Int, result: Any?, logger: KLogger) {
|
||||||
logger.trace { "[RM] notify: $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[RM] notify: $id" }
|
||||||
|
}
|
||||||
|
|
||||||
val previous = pendingLock.write {
|
val previous = pendingLock.write {
|
||||||
val previous = pending[id]
|
val previous = pending[id]
|
||||||
|
@ -91,7 +93,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
|
|
||||||
// if NULL, since either we don't exist (because we were async), or it was cancelled
|
// if NULL, since either we don't exist (because we were async), or it was cancelled
|
||||||
if (previous is ResponseWaiter) {
|
if (previous is ResponseWaiter) {
|
||||||
logger.trace { "[RM] valid-notify: $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[RM] valid-notify: $id" }
|
||||||
|
}
|
||||||
|
|
||||||
// this means we were NOT timed out! (we cannot be timed out here)
|
// this means we were NOT timed out! (we cannot be timed out here)
|
||||||
previous.doNotify()
|
previous.doNotify()
|
||||||
|
@ -104,7 +108,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
* This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different)
|
* This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different)
|
||||||
*/
|
*/
|
||||||
fun <T> removeWaiterCallback(id: Int, logger: KLogger): T? {
|
fun <T> removeWaiterCallback(id: Int, logger: KLogger): T? {
|
||||||
logger.trace { "[RM] get-callback: $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[RM] get-callback: $id" }
|
||||||
|
}
|
||||||
|
|
||||||
val previous = pendingLock.write {
|
val previous = pendingLock.write {
|
||||||
val previous = pending[id]
|
val previous = pending[id]
|
||||||
|
@ -135,7 +141,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
fun prep(logger: KLogger): ResponseWaiter {
|
fun prep(logger: KLogger): ResponseWaiter {
|
||||||
val waiter = waiterCache.take()
|
val waiter = waiterCache.take()
|
||||||
rmiWaitersInUse.getAndIncrement()
|
rmiWaitersInUse.getAndIncrement()
|
||||||
logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
|
||||||
|
}
|
||||||
|
|
||||||
// this will initialize the result
|
// this will initialize the result
|
||||||
waiter.prep()
|
waiter.prep()
|
||||||
|
@ -155,7 +163,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
fun prepWithCallback(logger: KLogger, function: Any): Int {
|
fun prepWithCallback(logger: KLogger, function: Any): Int {
|
||||||
val waiter = waiterCache.take()
|
val waiter = waiterCache.take()
|
||||||
rmiWaitersInUse.getAndIncrement()
|
rmiWaitersInUse.getAndIncrement()
|
||||||
logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[RM] prep in-use: ${rmiWaitersInUse.value}" }
|
||||||
|
}
|
||||||
|
|
||||||
// this will initialize the result
|
// this will initialize the result
|
||||||
waiter.prep()
|
waiter.prep()
|
||||||
|
@ -188,7 +198,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
): Any? {
|
): Any? {
|
||||||
val id = RmiUtils.unpackUnsignedRight(responseWaiter.id)
|
val id = RmiUtils.unpackUnsignedRight(responseWaiter.id)
|
||||||
|
|
||||||
logger.trace { "[RM] get: $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[RM] get: $id" }
|
||||||
|
}
|
||||||
|
|
||||||
// deletes the entry in the map
|
// deletes the entry in the map
|
||||||
val resultOrWaiter = pendingLock.write {
|
val resultOrWaiter = pendingLock.write {
|
||||||
|
@ -203,7 +215,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
rmiWaitersInUse.getAndDecrement()
|
rmiWaitersInUse.getAndDecrement()
|
||||||
|
|
||||||
if (resultOrWaiter is ResponseWaiter) {
|
if (resultOrWaiter is ResponseWaiter) {
|
||||||
logger.trace { "[RM] timeout cancel ($timeoutMillis): $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "[RM] timeout cancel ($timeoutMillis): $id" }
|
||||||
|
}
|
||||||
|
|
||||||
return if (connection.isClosed() || connection.isClosed()) {
|
return if (connection.isClosed() || connection.isClosed()) {
|
||||||
null
|
null
|
||||||
|
@ -217,7 +231,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
||||||
|
|
||||||
fun close() {
|
fun close() {
|
||||||
// technically, this isn't closing it, so much as it's cleaning it out
|
// technically, this isn't closing it, so much as it's cleaning it out
|
||||||
logger.debug { "Closing the response manager for RMI" }
|
if (logger.isDebugEnabled) {
|
||||||
|
logger.debug { "Closing the response manager for RMI" }
|
||||||
|
}
|
||||||
|
|
||||||
// wait for responses, or wait for timeouts!
|
// wait for responses, or wait for timeouts!
|
||||||
while (rmiWaitersInUse.value > 0) {
|
while (rmiWaitersInUse.value > 0) {
|
||||||
|
|
|
@ -713,19 +713,27 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
|
||||||
|
|
||||||
when (typeId) {
|
when (typeId) {
|
||||||
0 -> {
|
0 -> {
|
||||||
logger.trace { "REGISTRATION (0) ${clazz.name}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "REGISTRATION (0) ${clazz.name}" }
|
||||||
|
}
|
||||||
newRegistrations.add(ClassRegistration0(clazz, maker.newInstantiatorOf(Class.forName(serializerName)).newInstance() as Serializer<Any>))
|
newRegistrations.add(ClassRegistration0(clazz, maker.newInstantiatorOf(Class.forName(serializerName)).newInstance() as Serializer<Any>))
|
||||||
}
|
}
|
||||||
1 -> {
|
1 -> {
|
||||||
logger.trace { "REGISTRATION (1) ${clazz.name} :: $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "REGISTRATION (1) ${clazz.name} :: $id" }
|
||||||
|
}
|
||||||
newRegistrations.add(ClassRegistration1(clazz, id))
|
newRegistrations.add(ClassRegistration1(clazz, id))
|
||||||
}
|
}
|
||||||
2 -> {
|
2 -> {
|
||||||
logger.trace { "REGISTRATION (2) ${clazz.name} :: $id" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "REGISTRATION (2) ${clazz.name} :: $id" }
|
||||||
|
}
|
||||||
newRegistrations.add(ClassRegistration2(clazz, maker.newInstantiatorOf(Class.forName(serializerName)).newInstance() as Serializer<Any>, id))
|
newRegistrations.add(ClassRegistration2(clazz, maker.newInstantiatorOf(Class.forName(serializerName)).newInstance() as Serializer<Any>, id))
|
||||||
}
|
}
|
||||||
3 -> {
|
3 -> {
|
||||||
logger.trace { "REGISTRATION (3) ${clazz.name}" }
|
if (logger.isTraceEnabled) {
|
||||||
|
logger.trace { "REGISTRATION (3) ${clazz.name}" }
|
||||||
|
}
|
||||||
newRegistrations.add(ClassRegistration3(clazz))
|
newRegistrations.add(ClassRegistration3(clazz))
|
||||||
}
|
}
|
||||||
4 -> {
|
4 -> {
|
||||||
|
@ -746,11 +754,13 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
|
||||||
|
|
||||||
// NOTE: implClass can still be null!
|
// NOTE: implClass can still be null!
|
||||||
|
|
||||||
logger.trace {
|
if (logger.isTraceEnabled) {
|
||||||
if (implClass != null) {
|
logger.trace {
|
||||||
"REGISTRATION (RMI-CLIENT) ${clazz.name} -> ${implClass.name}"
|
if (implClass != null) {
|
||||||
} else {
|
"REGISTRATION (RMI-CLIENT) ${clazz.name} -> ${implClass.name}"
|
||||||
"REGISTRATION (RMI-CLIENT) ${clazz.name}"
|
} else {
|
||||||
|
"REGISTRATION (RMI-CLIENT) ${clazz.name}"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue