From adb1db70f6006c75ee2d55a19249fea8d91adaf8 Mon Sep 17 00:00:00 2001 From: Robinson Date: Fri, 24 Sep 2021 13:16:07 +0200 Subject: [PATCH] Added Raw/Buffered async process output, so the data can be streamed in different ways if desired --- .../processResults/AsyncProcessOutput.kt | 102 +++++++++++++++--- 1 file changed, 85 insertions(+), 17 deletions(-) diff --git a/src/dorkbox/executor/processResults/AsyncProcessOutput.kt b/src/dorkbox/executor/processResults/AsyncProcessOutput.kt index 9d59b46..b6f6cd8 100644 --- a/src/dorkbox/executor/processResults/AsyncProcessOutput.kt +++ b/src/dorkbox/executor/processResults/AsyncProcessOutput.kt @@ -19,6 +19,7 @@ package dorkbox.executor.processResults +import dorkbox.executor.stream.PumpStreamHandler import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException @@ -26,6 +27,7 @@ import kotlinx.coroutines.yield import java.io.ByteArrayOutputStream import java.io.UnsupportedEncodingException import java.nio.charset.Charset +import java.util.concurrent.atomic.AtomicInteger /** * Standard output of a finished process. @@ -45,35 +47,39 @@ open class AsyncProcessOutput(private val channel: Channel, private val pr } - var previousValue: Int? = null + var previousValue = AtomicInteger(-1) private suspend fun getBuffered(): ByteArray { // if the process has FINISHED running, then we have to get the output a different way - val out = ByteArrayOutputStream() + val out = ByteArrayOutputStream(PumpStreamHandler.DEFAULT_SIZE) - if (previousValue != null) { - out.write(previousValue!!) - previousValue = null + val value = previousValue.getAndSet(-1) + if (value != -1) { + out.write(value) } var toInt: Int try { while (true) { toInt = channel.receive().toInt() + if (toInt == NEW_LINE_NIX) { + // now return the output. + break + } if (toInt == NEW_LINE_WIN) { - // do we have a *nix line also? + // do we have a *nix line also? /r/n needs to be managed toInt = channel.receive().toInt() if (toInt != NEW_LINE_NIX) { - // whoops, save this - previousValue = toInt + // whoops, not /n, save this + previousValue.set(toInt) } // now return the output. break } else { - // save this! + // save this to our buffer and keep going! out.write(toInt) yield() } @@ -95,29 +101,91 @@ open class AsyncProcessOutput(private val channel: Channel, private val pr return out.toByteArray() } + // instantly get the data in the buffer instead of waiting for a newline + private suspend fun getRaw(): ByteArray { + // if the process has FINISHED running, then we have to get the output a different way + val out = ByteArrayOutputStream(2) + + val value = previousValue.getAndSet(-1) + if (value != -1) { + out.write(value) + } + + try { + val toInt = channel.receive().toInt() + if (value != 1) { + out.write(toInt) + } else { + // if we're the only thing, then write us out directly + return byteArrayOf(toInt.toByte()) + } + } catch (ignored: ClosedReceiveChannelException) { + // the process closed. Read the output from the processResult (if it was defined) + // The processResult is defined when the process exits. + val internalBytes = processResult?.output?.bytes_ + if (internalBytes != null) { + if (out.size() == 0) { + return internalBytes + } + else { + out.write(internalBytes, 0, internalBytes.size) + } + } + } + + return out.toByteArray() + } + /** * @return output of the finished process converted to a String using platform's default encoding. */ suspend fun string(): String { + return String(getRaw()) + } + + private suspend fun stringBuffered(): String { return String(getBuffered()) } /** - * @return output of the finished process converted to UTF-8 String. + * @return output of the process converted to UTF-8 String. */ suspend fun utf8(): String { return string(charset = Charsets.UTF_8) } + /** + * @return buffered output of the process converted to UTF-8 String. + */ + suspend fun utf8Buffered(): String { + return stringBuffered(charset = Charsets.UTF_8) + } + /** * @param charset The name of a supported char set. * - * @return output of the finished process converted to a String. + * @return output of the process converted to a String. * * @throws IllegalStateException if the char set was not supported. */ @Throws(IllegalStateException::class) suspend fun string(charset: Charset): String { + return try { + String(getRaw(), charset) + } catch (e: UnsupportedEncodingException) { + throw IllegalStateException(e.message) + } + } + + /** + * @param charset The name of a supported char set. + * + * @return buffered output of the process converted to a String. + * + * @throws IllegalStateException if the char set was not supported. + */ + @Throws(IllegalStateException::class) + suspend fun stringBuffered(charset: Charset): String { return try { String(getBuffered(), charset) } catch (e: UnsupportedEncodingException) { @@ -126,25 +194,25 @@ open class AsyncProcessOutput(private val channel: Channel, private val pr } /** - * @return output lines of the finished process converted using platform's default encoding. + * @return buffered output lines of the finished process converted using platform's default encoding. */ suspend fun lines(): List { - return ProcessOutput.getLinesFrom(string()) + return ProcessOutput.getLinesFrom(stringBuffered()) } /** - * @return output lines of the finished process converted using UTF-8. + * @return buffered output lines of the finished process converted using UTF-8. */ suspend fun linesAsUtf8(): List { - return ProcessOutput.getLinesFrom(utf8()) + return ProcessOutput.getLinesFrom(utf8Buffered()) } /** * @param charset The name of a supported char set. * - * @return output lines of the finished process converted using a given char set. + * @return buffered output lines of the finished process converted using a given char set. */ suspend fun getLines(charset: Charset): List { - return ProcessOutput.getLinesFrom(string(charset)) + return ProcessOutput.getLinesFrom(stringBuffered(charset)) } }