Executor/src/dorkbox/executor/DeferredProcessResult.kt

574 lines
20 KiB
Kotlin

/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.executor
import dorkbox.executor.Executor.Companion.IO_DISPATCH
import dorkbox.executor.exceptions.InvalidExitValueException
import dorkbox.executor.listener.ProcessListener
import dorkbox.executor.processResults.AsyncProcessOutput
import dorkbox.executor.processResults.ProcessResult
import dorkbox.executor.processResults.SyncProcessResult
import dorkbox.executor.stop.ProcessStopper
import dorkbox.executor.stream.IOStreamHandler
import dorkbox.executor.stream.PumpStreamHandler
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.util.concurrent.*
import kotlin.text.Charsets.UTF_8
internal data class Params(
/**
* Set of main attributes used to start the process.
*/
val processAttributes: ProcessAttributes,
/**
* Helper for stopping the process in case of interruption.
*/
val stopper: ProcessStopper,
/**
* Process event listener (not `null`).
*/
val listener: ProcessListener,
/**
* Used to handle input and output stream of subprocesses.
*/
val streams: IOStreamHandler,
/**
* Logger for logging messages about starting and waiting for the processes.
*/
val logger: Logger?,
/**
* ONLY called if there is an exception while waiting for the process to complete.
*/
val errorMessageHandler: (StringBuilder) -> Unit,
val closeTimeout: Long, val closeTimeoutUnit: TimeUnit,
val asyncProcessStart: Boolean)
class DeferredProcessResult internal constructor(private val process: Process,
private val params: Params,
private val createProcessResults: (Long, Int) -> SyncProcessResult) {
companion object {
private val EOL = "\n".toByteArray(UTF_8)
private val log = LoggerFactory.getLogger(DeferredProcessResult::class.java)
/**
* In case [InvalidExitValueException] is thrown and we have read the process output we include the output up to this length
* in the error message.
*
* If the output is longer we truncate it.
*/
private const val MAX_OUTPUT_SIZE_IN_ERROR_MESSAGE = 5000
/**
* Check the process exit value.
*/
internal fun checkExit(attributes: ProcessAttributes, result: ProcessResult) {
val allowedExitValues = attributes.allowedExitValues
val exitValue = result.exitValue
if (allowedExitValues.isNotEmpty() && !allowedExitValues.contains(exitValue)) {
val sb = StringBuilder()
sb.append("Unexpected exit value: ")
.append(exitValue)
sb.append(", allowed exit values: ")
.append(allowedExitValues)
if (result.hasOutput && result is SyncProcessResult) {
addExceptionMessageSuffix(attributes, sb, result.output.string())
}
throw InvalidExitValueException(sb.toString(), result)
}
}
internal fun addExceptionMessageSuffix(attributes: ProcessAttributes, sb: StringBuilder, outputText: String) {
sb.append(", executed command ")
.append(attributes.command)
if (attributes.directory != null) {
sb.append(" in directory ")
.append(attributes.directory)
}
if (attributes.environment.isNotEmpty()) {
sb.append(" with environment ")
.append(attributes.environment)
}
val length = outputText.length
if (length <= MAX_OUTPUT_SIZE_IN_ERROR_MESSAGE) {
sb.append(", output was ")
.append(length)
.append(" bytes:\n")
.append(outputText.trim())
} else {
sb.append(", output was ")
.append(length)
.append(" bytes (truncated):\n")
val halfLimit = MAX_OUTPUT_SIZE_IN_ERROR_MESSAGE / 2
sb.append(outputText.substring(0, halfLimit))
.append("\n...\n")
.append(outputText.substring(length - halfLimit).trim())
}
}
}
private val waiter = SuspendNotifier()
private val launchingThread = Thread.currentThread()
@Volatile
var calledAwait = false
@Volatile
var waiting = false
@Volatile
lateinit var job: Job
@Volatile
lateinit var thread: Thread
@Volatile
private var processException: Throwable? = null
@Volatile
var processResult: SyncProcessResult? = null
/**
* Starts the process. this is always called.
*/
@ExperimentalCoroutinesApi
fun start() {
// Preserve the MDC context of the caller thread.
val contextMap: Map<String, String>? = MDC.getCopyOfContextMap()
val coroutineExceptionHandler = CoroutineExceptionHandler { _, throwable ->
processException = throwable
}
job = IO_DISPATCH.launch(coroutineExceptionHandler) {
// BY DESIGN, coroutines are not meant to be interrupted, however we are VERY specifically only interrupting an
// IO coroutine/thread (which is allowed for this to happen)
// see: https://discuss.kotlinlang.org/t/calling-blocking-code-in-coroutines/2368/6
thread = Thread.currentThread()
try {
processResult = if (contextMap == null) {
// @throws IOException an error occurred when process was started or stopped.
// @throws InvalidExitValueException if there is an invalid exit value
val exitCode = waitForProcessToComplete()
finishWaiting(exitCode)
} else {
MDC.setContextMap(contextMap)
try {
// @throws IOException an error occurred when process was started or stopped.
// @throws InvalidExitValueException if there is an invalid exit value
val exitCode = waitForProcessToComplete()
finishWaiting(exitCode)
} finally {
MDC.clear()
}
}
} catch (exception: Exception) {
processException = exception
} finally {
try {
// always do this
waiter.doNotify()
} catch (ignored: Exception) {
// we want to ignore any cancellation exceptions
}
}
}
}
/**
* Awaits indefinitely for the process to finish running, without blocking a thread and resumes when process is done,
* returning the [ProcessResult] containing exit code and (optionally) process output.
*
* This suspending function is cancellable.
*
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws TimeoutException if the process has been timed out while running
* @throws CancellationException if the process has been cancelled while running
*/
suspend fun await(): SyncProcessResult {
return await(0L)
}
suspend fun await(timeoutInMs: Long): SyncProcessResult {
return await(timeoutInMs, TimeUnit.MILLISECONDS)
}
fun awaitBlocking(): SyncProcessResult {
return runBlocking {
withContext(Dispatchers.IO) {
await()
}
}
}
fun awaitBlocking(timeoutInMs: Long): SyncProcessResult {
return runBlocking {
withContext(Dispatchers.IO) {
await(timeoutInMs, TimeUnit.MILLISECONDS)
}
}
}
fun awaitBlocking(timeout: Long, timeoutUnit: TimeUnit): SyncProcessResult {
return runBlocking {
withContext(Dispatchers.IO) {
await(timeout, timeoutUnit)
}
}
}
/**
* @throws TimeoutException if the process has been timed out while running
* @throws CancellationException if the process has been cancelled while running
*/
@Suppress("NAME_SHADOWING")
suspend fun await(timeout: Long, timeoutUnit: TimeUnit): SyncProcessResult {
// a timeout of 0 means to wait forever, however we still want to be able to cancel this process, which is the ENTIRE point
// of having a Deferred ProcessResult. We cannot *realistically* have it "wait forever, so we really an absurdly long time
var timeout = timeout
var timeoutUnit = timeoutUnit
if (timeout == 0L) {
timeout = Long.MAX_VALUE
timeoutUnit = TimeUnit.DAYS
}
calledAwait = true
waiting = true
// wait for our timeout, then get the result, if it exists.
withTimeoutOrNull(timeoutUnit.toMillis(timeout)) {
try {
waiter.doWait()
} catch (ignored: Exception) {
// we want to ignore any cancellation exceptions
}
}
waiting = false
if (processResult != null) {
// we know this cannot be reassigned.
return processResult!!
}
var exception: Throwable? = processException
if (exception == null) {
exception = newTimeoutException(process = process, processStackTrace = thread.stackTrace,
timeout = timeout, timeoutUnit = timeoutUnit)
}
// clean the stack trace. This is kind-of dumb to have to do this...
// NOTE: we CANNOT get the location within the calling class suspend function, but we can get the START of the suspend function
val stackTrace = Thread.currentThread().stackTrace
val caller = stackTrace[2].className
val newTrace = mutableListOf<StackTraceElement>()
var foundCaller = false
var doneWithCoroutineStack = false
stackTrace.forEach {
if (foundCaller) {
if (doneWithCoroutineStack) {
newTrace.add(it)
}
else {
// remove all of the kotlin coroutine stack trace info (why would someone ever need to debug coroutines themselves?)
if (!it.className.startsWith("kotlin.coroutines") && !it.className.startsWith("kotlinx.coroutines")) {
doneWithCoroutineStack = true
newTrace.add(it)
}
}
} else if (it.className == caller) {
// cleanup the stack elements which create the stacktrace
foundCaller = true
}
}
exception.stackTrace = newTrace.toTypedArray()
job.cancel()
thread.interrupt()
throw exception
}
/**
* Waits for the process to complete.
*
* Will block until the process is done running, will suspend while trying to close the streams for the process
*
* @throws IOException an error occurred when process was started or stopped.
* @throws InvalidExitValueException if there is an invalid exit value
* @throws InterruptedException
*/
private suspend fun waitForProcessToComplete(): Int {
val exit: Int
var finished = false
try {
// blocks this thread until the process is done running
@Suppress("BlockingMethodInNonBlockingContext")
exit = process.waitFor()
finished = true
LogHelper.logAtLowestLevel(params.logger, "{} stopped with exit code {}", this, exit)
} finally {
if (!finished) {
LogHelper.logAtLowestLevel(params.logger, "Stopping {}...", this)
params.stopper.stop(process)
}
// Helper for closing the process' standard streams.
if (params.closeTimeout == 0L) {
params.streams.stop(process, finished)
} else {
// Only waits a fixed period for the closing.
//
// On timeout a warning is logged but no error is thrown.
// This is primarily used on Windows where sometimes sub process' streams do not close properly.
try {
// IO is used because it will generate as many threads as necessary (in case one thread blocks forever).
IO_DISPATCH.launch {
withTimeout(params.closeTimeoutUnit.toMillis(params.closeTimeout)) {
params.streams.stop(process, finished)
}
}
} catch (e: ExecutionException) {
throw IllegalStateException("Could not close streams of $process", e.cause)
} catch (e: TimeoutCancellationException) {
log.warn("Could not close streams of $process in ${params.closeTimeout} ${getUnitsAsString(params.closeTimeout, params.closeTimeoutUnit)}")
}
}
}
return exit
}
@ExperimentalCoroutinesApi
private suspend fun finishWaiting(exitCode: Int): SyncProcessResult {
return try {
val result = if (params.asyncProcessStart) {
// if we are async, then we have to read all of the data into a bytearray, since we are NO LONGER going to be reading it.
// as this point, the thread has ended, so there is no more data being pumped. MAYBE this data has finished being read, maybe not...
val out = ByteArrayOutputStream()
val channel: Channel<Byte> = (params.streams as PumpStreamHandler).channel
if (calledAwait) {
// if we called await(), then save up the extra data (since calls to await() can also return data)
while (!channel.isEmpty) {
out.write(channel.receive().toInt())
}
}
channel.close()
// we have a new output, since we had to read it from the async channel
SyncProcessResult(PidHelper.get(process), exitCode, out.toByteArray())
} else {
createProcessResults(PidHelper.get(process), exitCode)
}
checkExit(params.processAttributes, result)
params.listener.afterFinish(process, result)
result
} finally {
// Invoke listeners - regardless process finished or got cancelled
params.listener.afterStop(process)
}
}
val output: AsyncProcessOutput by lazy {
if (params.asyncProcessStart) {
val channel = (params.streams as PumpStreamHandler).channel
AsyncProcessOutput(channel, processResult)
} else {
throw IllegalArgumentException("Cannot get synchronous output, the process must be started asynchronously (something is wrong!)")
}
}
/**
* Gets the PID for the currently running process. This doesn't make sense for remotely executed processes (which return 0)
*
* SOMETIMES, this PID is invalid because it can be recycled by linux!
* see: http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6469606
*
* @return 0 if there is no PID (failure to start the process), or -1 if getting the pid is not possible
*/
val pid = PidHelper.get(process)
/**
* Writes the string to the process and send EOL in a safe way
*/
fun writeLine(command: String) {
val outputStream = process.outputStream
outputStream.write(command.toByteArray(UTF_8))
outputStream.write(EOL)
try { outputStream.flush() } catch (ignored: Exception) {}
}
/**
* Writes the string to the process in a safe way
*/
fun write(command: String) {
write(command.toByteArray(UTF_8))
}
/**
* Writes the bytes to the process in a safe way
*/
fun write(bytes: ByteArray) {
process.outputStream.write(bytes)
flush()
}
/**
* Flushes the output stream to the process.
*/
fun flush() {
try { process.outputStream.flush() } catch (ignored: Exception) {}
}
/**
* Cancel waiting for this process to complete
*
* @throws IllegalStateException if this process as not be "started" via await()
*/
fun cancel(message: String = "", cause: Throwable? = null) {
if (!waiting) {
throw IllegalStateException("Unable to cancel a process is not waiting.")
}
val sb = StringBuilder()
if (message.isEmpty()) {
sb.append("Process [pid=${PidHelper.get(process)}] has been cancelled")
} else {
sb.append(message)
}
params.errorMessageHandler(sb)
val exception = CancellationException(sb.toString(), cause)
exception.stackTrace = launchingThread.stackTrace
processException = exception
waiter.cancel()
}
private fun newTimeoutException(process: Process,
processStackTrace: Array<StackTraceElement>,
timeout: Long,
timeoutUnit: TimeUnit): TimeoutException {
val sb = StringBuilder()
val exitValue = getExitCodeOrNull(process)
// set a generic "timed out" exception message
if (exitValue == null) {
sb.append("Timed out waiting for ")
.append(process)
.append(" to finish")
}
else {
sb.append("Timed out finishing ")
.append(process)
sb.append(", exit value: ")
.append(exitValue)
}
sb.append(", timeout: ")
.append(timeout)
.append(" ")
.append(getUnitsAsString(timeout, timeoutUnit))
params.errorMessageHandler(sb)
val newException = TimeoutException(sb.toString())
if (exitValue != null) {
val cause = Exception("Stack dump of worker thread.")
cause.stackTrace = processStackTrace
newException.initCause(cause)
}
return newException
}
private fun getUnitsAsString(timeout: Long, timeUnit: TimeUnit): String {
val result = when (timeUnit) {
TimeUnit.NANOSECONDS -> "nano"
TimeUnit.MICROSECONDS -> "micro"
TimeUnit.MILLISECONDS -> "milli"
TimeUnit.SECONDS -> "second"
TimeUnit.MINUTES -> "minute"
TimeUnit.HOURS -> "hour"
TimeUnit.DAYS -> "day"
}
return if (timeout > 1L) {
// fix plurality
result + "s"
} else {
result
}
}
private fun getExitCodeOrNull(process: Process?): Int? {
return try {
process!!.exitValue()
} catch (e: IllegalThreadStateException) {
null
}
}
}