Work on cleaning up how exceptions are thrown/logged

This commit is contained in:
nathan 2020-08-23 01:12:02 +02:00
parent 389af93f0a
commit 59773b4cce
11 changed files with 117 additions and 74 deletions

View File

@ -93,7 +93,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
/**
* So the client class can get remote objects that are THE SAME OBJECT as if called from a connection
*/
override fun getRmiConnectionSupport(): RmiManagerConnections {
override fun getRmiConnectionSupport(): RmiManagerConnections<CONNECTION> {
return rmiConnectionSupport
}

View File

@ -15,15 +15,13 @@
*/
package dorkbox.network.connection
import dorkbox.network.Configuration
import dorkbox.util.collections.ConcurrentEntry
import dorkbox.util.collections.ConcurrentIterator
import dorkbox.util.collections.ConcurrentIterator.headREF
import mu.KLogger
// .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other.
@Suppress("UNCHECKED_CAST")
internal open class ConnectionManager<CONNECTION: Connection>(val logger: KLogger, val config: Configuration) {
internal open class ConnectionManager<CONNECTION: Connection>() {
private val connections = ConcurrentIterator<CONNECTION>()

View File

@ -15,5 +15,6 @@
*/
package dorkbox.network.connection
data class ConnectionParams<C: Connection>(val endPoint: EndPoint<C>, val mediaDriverConnection: MediaDriverConnection,
val publicKeyValidation: PublicKeyValidationState)
data class ConnectionParams<C : Connection>(val endPoint: EndPoint<C>,
val mediaDriverConnection: MediaDriverConnection,
val publicKeyValidation: PublicKeyValidationState)

View File

@ -106,10 +106,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
internal val autoClosableObjects = CopyOnWriteArrayList<AutoCloseable>()
internal val actionDispatch = CoroutineScope(Dispatchers.Default)
// internal val connectionActor = actionDispatch.connectionActor()
internal val listenerManager = ListenerManager<CONNECTION>()
internal val connections = ConnectionManager<CONNECTION>(logger, config)
internal val connections = ConnectionManager<CONNECTION>()
internal val mediaDriverContext: MediaDriver.Context
internal val mediaDriver: MediaDriver
@ -133,7 +132,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// we only want one instance of these created. These will be called appropriately
val settingsStore: SettingsStore
internal val rmiGlobalSupport = RmiManagerGlobal(logger, actionDispatch, config.serialization)
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, actionDispatch, config.serialization)
init {
logger.error("NETWORK STACK IS ONLY IPV4 AT THE MOMENT. IPV6 is in progress!")
@ -318,7 +317,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
* Used for the client, because the client only has ONE ever support connection, and it allows us to create connection specific objects
* from a "global" context
*/
internal open fun getRmiConnectionSupport() : RmiManagerConnections {
internal open fun getRmiConnectionSupport() : RmiManagerConnections<CONNECTION> {
return RmiManagerConnections(logger, rmiGlobalSupport, serialization)
}
@ -512,6 +511,8 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
serialization.returnKryo(kryo)
}
connection as CONNECTION
when (message) {
is PingMessage -> {
// the ping listener (internal use only!)
@ -532,17 +533,27 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
}
is Any -> {
@Suppress("UNCHECKED_CAST")
var hasListeners = listenerManager.notifyOnMessage(connection as CONNECTION, message)
var hasListeners = listenerManager.notifyOnMessage(connection, message)
// each connection registers, and is polled INDEPENDENTLY for messages.
hasListeners = hasListeners or connection.notifyOnMessage(message)
if (!hasListeners) {
listenerManager.notifyError(connection, MessageNotRegisteredException("No message callbacks found for ${message::class.java.simpleName}"))
val exception = MessageNotRegisteredException("No message callbacks found for ${message::class.java.simpleName}")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(connection, exception)
}
}
else -> {
// do nothing, there were problems with the message
val exception = if (message != null) {
MessageNotRegisteredException("No message callbacks found for ${message::class.java.simpleName}")
} else {
MessageNotRegisteredException("Unknown message received!!")
}
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(exception)
}
}
}

View File

@ -42,27 +42,32 @@ internal class ListenerManager<CONNECTION: Connection> {
*
* Neither of these are useful in resolving exception handling from a users perspective, and only clutter the stacktrace.
*/
fun cleanStackTraceReverse(throwable: Throwable) {
fun cleanStackTrace(throwable: Throwable) {
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
val stackTrace = throwable.stackTrace
var newEndIndex = Math.max(0, stackTrace.size - 1)
var newEndIndex = stackTrace.size -1 // offset by 1 because we have to adjust for the access index
if (newEndIndex > 0) {
for (i in newEndIndex downTo 0) {
val stackName = stackTrace[i].className
if (i == newEndIndex) {
if (stackName.startsWith("kotlinx.coroutines.") ||
stackName.startsWith("kotlin.coroutines.") ||
stackName.startsWith("dorkbox.network.")) {
newEndIndex--
} else {
break
}
for (i in newEndIndex downTo 0) {
val stackName = stackTrace[i].className
if (i == newEndIndex) {
if (stackName.startsWith("kotlinx.coroutines.") ||
stackName.startsWith("kotlin.coroutines.") ||
stackName.startsWith("dorkbox.network.")) {
newEndIndex--
} else {
break
}
}
}
newEndIndex++ // have to add 1 back, because a copy must be by size (and we access from 0)
if (newEndIndex > 0) {
// newEndIndex will also remove the VERY LAST CachedMethod or CachedAsmMethod access invocation (because it's offset by 1)
throwable.stackTrace = stackTrace.copyOfRange(0, newEndIndex)
} else {
// keep just one, since it's a stack frame INSIDE our network library, and we need that!
throwable.stackTrace = stackTrace.copyOfRange(0, 1)
}
}
}
@ -313,7 +318,7 @@ internal class ListenerManager<CONNECTION: Connection> {
it(connection)
} catch (t: Throwable) {
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
cleanStackTraceReverse(t)
cleanStackTrace(t)
notifyError(connection, t)
}
}
@ -328,7 +333,7 @@ internal class ListenerManager<CONNECTION: Connection> {
it(connection)
} catch (t: Throwable) {
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
cleanStackTraceReverse(t)
cleanStackTrace(t)
notifyError(connection, t)
}
}
@ -336,7 +341,7 @@ internal class ListenerManager<CONNECTION: Connection> {
/**
* Invoked when there is an error for a specific connection
* <p>
*
* The error is also sent to an error log before notifying callbacks
*/
suspend fun notifyError(connection: CONNECTION, exception: Throwable) {
@ -347,7 +352,7 @@ internal class ListenerManager<CONNECTION: Connection> {
/**
* Invoked when there is an error in general
* <p>
*
* The error is also sent to an error log before notifying callbacks
*/
suspend fun notifyError(exception: Throwable) {
@ -392,6 +397,7 @@ internal class ListenerManager<CONNECTION: Connection> {
try {
func(connection, message)
} catch (t: Throwable) {
cleanStackTrace(t)
notifyError(connection, t)
}
}

View File

@ -49,10 +49,10 @@ internal class ClientHandshake<CONNECTION: Connection>(private val logger: KLogg
private var failed: Exception? = null
lateinit var handler: FragmentHandler
lateinit var endPoint: EndPoint<*>
lateinit var endPoint: EndPoint<CONNECTION>
var sessionId: Int = 0
fun init(endPoint: EndPoint<*>) {
fun init(endPoint: EndPoint<CONNECTION>) {
this.endPoint = endPoint
// now we have a bi-directional connection with the server on the handshake "socket".

View File

@ -215,8 +215,9 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
logger.error("Error creating new connection")
listenerManager.notifyError(connection,
ClientRejectedException("Connection was not permitted!"))
val exception = ClientRejectedException("Connection was not permitted!")
ListenerManager.cleanStackTrace(exception)
listenerManager.notifyError(connection, exception)
endPoint.writeHandshakeMessage(handshakePublication,
HandshakeMessage.error("Connection was not permitted!"))

View File

@ -256,13 +256,13 @@ internal class RmiClient(val isGlobal: Boolean,
val fancyName = RmiUtils.makeFancyMethodName(method)
val exception = TimeoutException("Response timed out: $fancyName")
// from top down, clean up the coroutine stack
RmiUtils.cleanStackTraceForProxy(exception, RmiClient::class.java)
RmiUtils.cleanStackTraceForProxy(exception)
continuation.resumeWithException(exception)
}
is Exception -> {
// reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call
// this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site)
RmiUtils.cleanStackTraceForProxy(Exception(), RmiClient::class.java, any)
RmiUtils.cleanStackTraceForProxy(Exception(), any)
continuation.resumeWithException(any)
}
else -> {
@ -279,13 +279,13 @@ internal class RmiClient(val isGlobal: Boolean,
val fancyName = RmiUtils.makeFancyMethodName(method)
val exception = TimeoutException("Response timed out: $fancyName")
// from top down, clean up the coroutine stack
RmiUtils.cleanStackTraceForProxy(exception, RmiClient::class.java)
RmiUtils.cleanStackTraceForProxy(exception)
throw exception
}
is Exception -> {
// reconstruct the stack trace, so the calling method knows where the method invocation happened, and can trace the call
// this stack will ALWAYS run up to this method (so we remove from the top->down, to get to the call site)
RmiUtils.cleanStackTraceForProxy(Exception(), RmiClient::class.java, any)
RmiUtils.cleanStackTraceForProxy(Exception(), any)
throw any
}
else -> {

View File

@ -17,14 +17,15 @@ package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest
import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse
import dorkbox.network.serialization.NetworkSerializationManager
import dorkbox.util.collections.LockFreeIntMap
import mu.KLogger
internal class RmiManagerConnections(logger: KLogger,
val rmiGlobalSupport: RmiManagerGlobal,
internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
val rmiGlobalSupport: RmiManagerGlobal<CONNECTION>,
private val serialization: NetworkSerializationManager) : RmiObjectCache(logger) {
// It is critical that all of the RMI proxy objects are unique, and are saved/cached PER CONNECTION. These cannot be shared between connections!
@ -82,7 +83,7 @@ internal class RmiManagerConnections(logger: KLogger,
/**
* called on "server"
*/
internal suspend fun onConnectionObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: ConnectionObjectCreateRequest, logger: KLogger) {
suspend fun onConnectionObjectCreateRequest(endPoint: EndPoint<CONNECTION>, connection: CONNECTION, message: ConnectionObjectCreateRequest) {
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
@ -93,7 +94,8 @@ internal class RmiManagerConnections(logger: KLogger,
val response = if (implObject is Exception) {
// whoops!
logger.error("Unable to create remote object!", implObject)
ListenerManager.cleanStackTrace(implObject)
endPoint.listenerManager.notifyError(connection, implObject)
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!
ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, RemoteObjectStorage.INVALID_RMI))
@ -104,13 +106,13 @@ internal class RmiManagerConnections(logger: KLogger,
// this means we could register this object.
// next, scan this object to see if there are any RMI fields
RmiManagerGlobal.scanImplForRmiFields(logger, implObject) {
RmiManagerGlobal.scanImplForRmiFields(endPoint.logger, implObject) {
saveImplObject(it)
}
} else {
logger.error {
"Trying to create an RMI object with the INVALID_RMI id!!"
}
val exception = NullPointerException("Trying to create an RMI object with the INVALID_RMI id!!")
ListenerManager.cleanStackTrace(exception)
endPoint.listenerManager.notifyError(connection, exception)
}
// we send the message ANYWAYS, because the client needs to know it did NOT succeed!

View File

@ -17,6 +17,7 @@ package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest
import dorkbox.network.rmi.messages.ConnectionObjectCreateResponse
import dorkbox.network.rmi.messages.GlobalObjectCreateRequest
@ -31,9 +32,9 @@ import mu.KLogger
import java.lang.reflect.Proxy
import java.util.*
internal class RmiManagerGlobal(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: NetworkSerializationManager) : RmiObjectCache(logger) {
internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: NetworkSerializationManager) : RmiObjectCache(logger) {
companion object {
/**
* Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked
@ -214,15 +215,16 @@ internal class RmiManagerGlobal(logger: KLogger,
/**
* called on "client"
*/
private fun onGenericObjectResponse(endPoint: EndPoint<*>, connection: Connection, logger: KLogger,
isGlobal: Boolean, rmiId: Int, callback: suspend (Int, Any) -> Unit,
rmiObjectCache: RmiObjectCache, serialization: NetworkSerializationManager) {
private suspend fun onGenericObjectResponse(endPoint: EndPoint<CONNECTION>,
connection: CONNECTION,
isGlobal: Boolean,
rmiId: Int,
callback: suspend (Int, Any) -> Unit,
serialization: NetworkSerializationManager) {
// we only create the proxy + execute the callback if the RMI id is valid!
if (rmiId == RemoteObjectStorage.INVALID_RMI) {
logger.error {
"RMI ID '${rmiId}' is invalid. Unable to create RMI object on server."
}
endPoint.listenerManager.notifyError(connection, Exception("RMI ID '${rmiId}' is invalid. Unable to create RMI object on server."))
return
}
@ -240,7 +242,8 @@ internal class RmiManagerGlobal(logger: KLogger,
try {
callback(rmiId, proxyObject)
} catch (e: Exception) {
logger.error("Error getting or creating the remote object $interfaceClass", e)
ListenerManager.cleanStackTrace(e)
endPoint.listenerManager.notifyError(e)
}
}
}
@ -267,13 +270,15 @@ internal class RmiManagerGlobal(logger: KLogger,
*/
@Suppress("DuplicatedCode")
@Throws(IllegalArgumentException::class)
suspend fun manage(endPoint: EndPoint<*>, connection: Connection, message: Any, logger: KLogger) {
suspend fun manage(endPoint: EndPoint<CONNECTION>, connection: CONNECTION, message: Any, logger: KLogger) {
when (message) {
is ConnectionObjectCreateRequest -> {
/**
* called on "server"
*/
connection.rmiConnectionSupport.onConnectionObjectCreateRequest(endPoint, connection, message, logger)
@Suppress("UNCHECKED_CAST")
val rmiConnectionSupport: RmiManagerConnections<CONNECTION> = connection.rmiConnectionSupport as RmiManagerConnections<CONNECTION>
rmiConnectionSupport.onConnectionObjectCreateRequest(endPoint, connection, message)
}
is ConnectionObjectCreateResponse -> {
/**
@ -282,7 +287,7 @@ internal class RmiManagerGlobal(logger: KLogger,
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val rmiId = RmiUtils.unpackRight(message.packedIds)
val callback = removeCallback(callbackId)
onGenericObjectResponse(endPoint, connection, logger, false, rmiId, callback, this, serialization)
onGenericObjectResponse(endPoint, connection, false, rmiId, callback, serialization)
}
is GlobalObjectCreateRequest -> {
/**
@ -297,7 +302,7 @@ internal class RmiManagerGlobal(logger: KLogger,
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val rmiId = RmiUtils.unpackRight(message.packedIds)
val callback = removeCallback(callbackId)
onGenericObjectResponse(endPoint, connection, logger, true, rmiId, callback, this, serialization)
onGenericObjectResponse(endPoint, connection, true, rmiId, callback, serialization)
}
is MethodRequest -> {
/**
@ -320,8 +325,8 @@ internal class RmiManagerGlobal(logger: KLogger,
val implObject = getImplObject<Any>(isGlobal, rmiObjectId, connection)
if (implObject == null) {
logger.error("Unable to resolve implementation object for [global=$isGlobal, objectID=$rmiObjectId, connection=$connection")
endPoint.listenerManager.notifyError(connection,
NullPointerException("Unable to resolve implementation object for [global=$isGlobal, objectID=$rmiObjectId, connection=$connection"))
if (sendResponse) {
returnRmiMessage(connection,
message,
@ -365,7 +370,7 @@ internal class RmiManagerGlobal(logger: KLogger,
var insideResult: Any?
try {
// args!! is safe to do here (even though it doesn't make sense)
insideResult = cachedMethod.invoke(connection, implObject, args)
// insideResult = cachedMethod.invoke(connection, implObject, args)
} catch (ex: Exception) {
insideResult = ex.cause
// added to prevent a stack overflow when references is false, (because 'cause' == "this").
@ -377,12 +382,9 @@ internal class RmiManagerGlobal(logger: KLogger,
else {
insideResult.initCause(null)
}
RmiUtils.cleanStackTraceForImpl(insideResult as Exception, true)
val fancyName = RmiUtils.makeFancyMethodName(cachedMethod)
logger.error("Error invoking method: $fancyName", insideResult)
}
insideResult
// insideResult
Exception(":ASDASDUJHAKDSGJFHAKHDLA")
}
@ -395,6 +397,15 @@ internal class RmiManagerGlobal(logger: KLogger,
// return value!
suspendResult = null
}
else if (suspendResult is Exception) {
RmiUtils.cleanStackTraceForImpl(suspendResult, true)
val fancyName = RmiUtils.makeFancyMethodName(cachedMethod)
val exception = Exception("Error invoking method: $fancyName", suspendResult)
RmiUtils.cleanStackTraceForImpl(exception, true)
endPoint.listenerManager.notifyError(connection, exception)
}
if (sendResponse) {
returnRmiMessage(connection, message, suspendResult, logger)
@ -448,7 +459,7 @@ internal class RmiManagerGlobal(logger: KLogger,
/**
* called on "server"
*/
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<*>, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) {
private suspend fun onGlobalObjectCreateRequest(endPoint: EndPoint<CONNECTION>, connection: Connection, message: GlobalObjectCreateRequest, logger: KLogger) {
val interfaceClassId = RmiUtils.unpackLeft(message.packedIds)
val callbackId = RmiUtils.unpackRight(message.packedIds)
val objectParameters = message.objectParameters

View File

@ -477,10 +477,13 @@ object RmiUtils {
*
* We do this because these stack frames are not useful in resolving exception handling from a users perspective, and only clutter the stacktrace.
*/
fun cleanStackTraceForProxy(localThrowable: Throwable, invokingClass: Class<*>, remoteException: Exception? = null) {
val myClassName = invokingClass.name
val stackTrace = localThrowable.stackTrace
fun cleanStackTraceForProxy(localException: Exception, remoteException: Exception? = null) {
val myClassName = RmiClient::class.java.name
val stackTrace = localException.stackTrace
var newStartIndex = 0
var newEndIndex = stackTrace.size-1
// step 1: Find the start of our method invocation
for (element in stackTrace) {
newStartIndex++
@ -491,14 +494,24 @@ object RmiUtils {
}
}
// step 2: now we have to find the END index, since a proxy invocation ALWAYS happens starting from our network stack
for (i in stackTrace.size-1 downTo 0) {
newEndIndex--
val stackClassName = stackTrace[i].className
if (stackClassName.startsWith("dorkbox.network.rmi.")) {
break
}
}
if (remoteException == null) {
// no remote exception, just cleanup our own callstack
localThrowable.stackTrace = stackTrace.copyOfRange(newStartIndex, stackTrace.size)
localException.stackTrace = stackTrace.copyOfRange(newStartIndex, newEndIndex)
} else {
// merge this info into the remote exception, so we can get the correct call stack info
val newStack = Array<StackTraceElement>(remoteException.stackTrace.size + stackTrace.size - newStartIndex) { stackTrace[0] }
val newStack = Array<StackTraceElement>(remoteException.stackTrace.size + newEndIndex - newStartIndex) { stackTrace[0] }
remoteException.stackTrace.copyInto(newStack)
stackTrace.copyInto(newStack, remoteException.stackTrace.size, newStartIndex)
stackTrace.copyInto(newStack, remoteException.stackTrace.size, newStartIndex, newEndIndex)
remoteException.stackTrace = newStack
}