From 0faf4c78469c38f92a788018c27899068635fc85 Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Sat, 23 Dec 2023 06:32:27 +0000 Subject: [PATCH] [+] Aurora::Processes::OutputOf [+] Aurora::Processes::OutputOfAsync [*] Fix win32 regression --- Include/Aurora/Processes/OutputOf.hpp | 43 ++ Include/Aurora/Processes/Processes.hpp | 1 + Source/IO/FS/Async.NT.cpp | 1 + Source/Processes/AuOutputOf.cpp | 608 +++++++++++++++++++++++++ Source/Processes/AuOutputOf.hpp | 12 + 5 files changed, 665 insertions(+) create mode 100644 Include/Aurora/Processes/OutputOf.hpp create mode 100644 Source/Processes/AuOutputOf.cpp create mode 100644 Source/Processes/AuOutputOf.hpp diff --git a/Include/Aurora/Processes/OutputOf.hpp b/Include/Aurora/Processes/OutputOf.hpp new file mode 100644 index 00000000..169b55ec --- /dev/null +++ b/Include/Aurora/Processes/OutputOf.hpp @@ -0,0 +1,43 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: OutputOf.hpp + Date: 2023-12-23 + Author: Reece +***/ +#pragma once + +#include "StartupParameters.hpp" + +namespace Aurora::Processes +{ + AUKN_INTERFACE(IAsyncCallbacks, + AUI_METHOD(void, OnPipeData, ()), + AUI_METHOD(void, OnExit, ()) + ); + + struct OutputOfResult + { + bool bFailed {}; + bool bNoStart {}; + AuString strStdout; + AuString strStderr; + AuUInt uExitCode {}; + AuSInt sExitCode {}; + }; + + struct IAsyncOutputOf + { + virtual OutputOfResult *GetResult() = 0; + virtual bool BlockUntilFinished() = 0; + virtual bool BlockUntilFinishedMS(AuUInt32 uRelMS) = 0; + virtual bool HasFinished() = 0; + virtual IProcess *GetProcess() = 0; + }; + + AUKN_SYM OutputOfResult OutputOf(StartupParameters &¶meters, + AuOptional toSend); + + AUKN_SYM AuSPtr OutputOfAsync(StartupParameters &¶meters, + const AuSPtr &pCallbacks); +} \ No newline at end of file diff --git a/Include/Aurora/Processes/Processes.hpp b/Include/Aurora/Processes/Processes.hpp index 847163e8..e6c56355 100644 --- a/Include/Aurora/Processes/Processes.hpp +++ b/Include/Aurora/Processes/Processes.hpp @@ -13,5 +13,6 @@ #include "IProcess.hpp" #include "StartupParameters.hpp" #include "Spawn.hpp" +#include "OutputOf.hpp" #include "Open.hpp" \ No newline at end of file diff --git a/Source/IO/FS/Async.NT.cpp b/Source/IO/FS/Async.NT.cpp index 5e02122b..12f6ecfc 100644 --- a/Source/IO/FS/Async.NT.cpp +++ b/Source/IO/FS/Async.NT.cpp @@ -461,6 +461,7 @@ namespace Aurora::IO::FS this->dwLastBytes = 0; this->dwLastAbstractStat = 0; + AuResetMember(this->pMemoryHold); } bool NtAsyncFileTransaction::Failed() diff --git a/Source/Processes/AuOutputOf.cpp b/Source/Processes/AuOutputOf.cpp new file mode 100644 index 00000000..b8fe69c1 --- /dev/null +++ b/Source/Processes/AuOutputOf.cpp @@ -0,0 +1,608 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuOutputOf.cpp + Date: 2023-12-23 + Author: Reece +***/ +#include +#include "AuProcesses.hpp" +#include "AuOutputOf.hpp" + +namespace Aurora::Processes +{ + AUKN_SYM OutputOfResult OutputOf(StartupParameters &¶meters, + AuOptional toSend) + { + OutputOfResult result; + parameters.type = ESpawnType::eSpawnChildProcessWorker; + parameters.fwdOut = EStreamForward::eAsyncPipe; + parameters.fwdErr = EStreamForward::eAsyncPipe; + + if (toSend) + { + result.bFailed = true; + return result; + } + + auto process = SpawnUnique(AuMove(parameters)); + if (!process) + { + result.bFailed = true; + return result; + } + + if (!process->Start()) + { + result.bFailed = true; + result.bNoStart = true; + return result; + } + + bool bIsRunning {true}; + AuByteBuffer stdOutPipe(4096 * 16); + AuMemoryViewWrite readOutView(stdOutPipe); + if (!stdOutPipe) + { + result.bFailed = true; + return result; + } + + AuByteBuffer stdErrPipe(4096 * 16); + AuMemoryViewWrite readErrView(stdErrPipe); + if (!stdErrPipe) + { + result.bFailed = true; + return result; + } + + auto pLoopQueue = AuLoop::NewLoopQueue(); + if (!pLoopQueue) + { + result.bFailed = true; + return result; + } + + auto pTransactionA = process->NewAsyncTransaction(); + if (!pTransactionA) + { + result.bFailed = true; + return result; + } + + auto pTransactionSourceA = pTransactionA->NewLoopSource(); + if (!pTransactionSourceA) + { + result.bFailed = true; + return result; + } + + auto pTransactionB = process->NewErrorStreamAsyncTransaction(); + if (!pTransactionB) + { + result.bFailed = true; + return result; + } + + auto pTransactionSourceB = pTransactionB->NewLoopSource(); + if (!pTransactionSourceB) + { + result.bFailed = true; + return result; + } + + auto pProcessDeadLoopSource = process->AsLoopSource(); + if (!pProcessDeadLoopSource) + { + result.bFailed = true; + return result; + } + + auto pProcessDeadCallback = AuMakeSharedThrow([&](const AuSPtr &source) -> bool + { + result.sExitCode = process->GetExitCode(); + result.uExitCode = (AuUInt)result.sExitCode; + bIsRunning = false; + return false; + }); + + pTransactionA->SetCallback(AuMakeSharedThrow([&](AuUInt64 offset, AuUInt32 length) + { + pTransactionA->Reset(); + + if (length) + { + result.strStdout += AuString(stdOutPipe.begin(), stdOutPipe.begin() + length); + pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&readOutView)); + } + })); + + pTransactionB->SetCallback(AuMakeSharedThrow([&](AuUInt64 offset, AuUInt32 length) + { + pTransactionB->Reset(); + + if (length) + { + result.strStderr += AuString(stdErrPipe.begin(), stdErrPipe.begin() + length); + pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&readErrView)); + } + })); + + if (!pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&readOutView))) + { + result.bFailed = true; + return result; + } + + if (!pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&readErrView))) + { + result.bFailed = true; + pTransactionA->SetCallback({}); + return result; + } + + if (!pLoopQueue->SourceAdd(pTransactionSourceA)) + { + result.bFailed = true; + pTransactionA->SetCallback({}); + pTransactionB->SetCallback({}); + return result; + } + + if (!pLoopQueue->SourceAdd(pTransactionSourceB)) + { + result.bFailed = true; + pTransactionA->SetCallback({}); + pTransactionB->SetCallback({}); + return result; + } + + if (!pLoopQueue->SourceAdd(pProcessDeadLoopSource)) + { + result.bFailed = true; + pTransactionA->SetCallback({}); + pTransactionB->SetCallback({}); + return result; + } + + if (!pLoopQueue->AddCallback(pProcessDeadLoopSource, pProcessDeadCallback)) + { + result.bFailed = true; + pTransactionA->SetCallback({}); + pTransactionB->SetCallback({}); + return result; + } + + if (!pLoopQueue->Commit()) + { + result.bFailed = true; + pTransactionA->SetCallback({}); + pTransactionB->SetCallback({}); + return result; + } + + while (bIsRunning) + { + pLoopQueue->WaitAny(); + } + + while (pTransactionSourceA->IsSignaled() && + pTransactionA->GetLastPacketLength()) + { + } + + while (pTransactionSourceB->IsSignaled() && + pTransactionB->GetLastPacketLength()) + { + } + + return result; + } + + struct AsyncOutputOf : IAsyncOutputOf + { + OutputOfResult result; + AuByteBuffer stdOutPipe; + AuMemoryViewWrite readOutView; + AuByteBuffer stdErrPipe; + AuMemoryViewWrite readErrView; + AuSPtr pLoopQueue; + AuSPtr pTransactionA; + AuSPtr pTransactionB; + AuSPtr pTransactionSourceA; + AuSPtr pTransactionSourceB; + AuSPtr pProcessDeadLoopSource; + AuProcesses::SpawnUnique_t process; + bool bIsRunning { true }; + AuSPtr pPinSelf; + bool bHasFinished {}; + AuSPtr pCallbacks; + AuWorkerID worker; + + AsyncOutputOf() : + stdOutPipe(4096 * 16), + readOutView(stdOutPipe), + stdErrPipe(4096 * 16), + readErrView(stdOutPipe) + { + + } + + IProcess *GetProcess() override + { + return this->process.get(); + } + + OutputOfResult *GetResult() override + { + return &this->result; + } + + bool BlockUntilFinishedMS(AuUInt32 uRelMS) override + { + if (this->bHasFinished) + { + return true; + } + + AuUInt64 uEndTime = AuTime::SteadyClockNS() + AuMSToNS(uRelMS); + + while (this->bIsRunning) + { + auto uNow = AuTime::CurrentClockNS(); + if (uNow >= uEndTime) + { + break; + } + + auto uMS = AuNSToMS(uEndTime - uNow); + if (uMS) + { + this->pLoopQueue->WaitAny(uMS); + } + + if (this->worker) + { + this->worker.GetPool()->Poll(); + } + } + + if (!this->bIsRunning) + { + while (this->pTransactionSourceA->IsSignaled()) + { + } + + while (this->pTransactionSourceB->IsSignaled()) + { + } + + if (this->pCallbacks) + { + this->pCallbacks->OnExit(); + } + + CleanUp(); + return this->bHasFinished = true; + } + else + { + return false; + } + } + + bool BlockUntilFinished() override + { + if (this->bHasFinished) + { + return true; + } + + while (this->bIsRunning) + { + if (this->worker) + { + this->worker.GetPool()->RunOnce(); + } + else + { + this->pLoopQueue->WaitAny(); + } + } + + while (this->pTransactionSourceA->IsSignaled() && + this->pTransactionA->GetLastPacketLength()) + { + } + + while (this->pTransactionSourceB->IsSignaled() && + this->pTransactionB->GetLastPacketLength()) + { + } + + if (!this->bIsRunning) + { + if (this->pCallbacks) + { + this->pCallbacks->OnExit(); + } + + CleanUp(); + return this->bHasFinished = true; + } + else + { + return false; + } + } + + bool HasFinished() override + { + if (this->bHasFinished) + { + return true; + } + + if (this->bIsRunning) + { + if (this->worker) + { + this->worker.GetPool()->Poll(); + } + else + { + this->pLoopQueue->WaitAny(); + } + } + + if (!this->bIsRunning) + { + while (this->pTransactionSourceA->IsSignaled() && + this->pTransactionA->GetLastPacketLength()) + { + } + + while (this->pTransactionSourceB->IsSignaled() && + this->pTransactionB->GetLastPacketLength()) + { + } + + if (this->pCallbacks) + { + this->pCallbacks->OnExit(); + } + + CleanUp(); + return this->bHasFinished = true; + } + else + { + return false; + } + } + + void Finalize() + { + (void)this->HasFinished(); + } + + void CleanUp() + { + if (this->pLoopQueue) + { + if (this->pTransactionSourceA) + { + this->pLoopQueue->SourceRemove(this->pTransactionSourceA); + } + + if (this->pTransactionSourceB) + { + this->pLoopQueue->SourceRemove(this->pTransactionSourceB); + } + + if (this->pProcessDeadLoopSource) + { + this->pLoopQueue->SourceRemove(this->pProcessDeadLoopSource); + } + + this->pLoopQueue->Commit(); + } + + AuResetMember(this->pTransactionSourceA); + AuResetMember(this->pTransactionSourceB); + AuResetMember(this->pTransactionA); + AuResetMember(this->pTransactionA); + AuResetMember(this->pCallbacks); + AuResetMember(this->pPinSelf); + AuResetMember(this->pLoopQueue); + } + }; + + AUKN_SYM AuSPtr OutputOfAsync(StartupParameters &¶meters, + const AuSPtr &pCallbacks) + { + auto pResult = AuMakeSharedThrow(); + parameters.type = ESpawnType::eSpawnChildProcessWorker; + parameters.fwdOut = EStreamForward::eAsyncPipe; + parameters.fwdErr = EStreamForward::eAsyncPipe; + pResult->process = SpawnUnique(AuMove(parameters)); + + if (!pResult->process) + { + pResult->result.bFailed = true; + return pResult; + } + + if (!pResult->process->Start()) + { + pResult->result.bFailed = true; + pResult->result.bNoStart = true; + return pResult; + } + + if (!pResult->stdOutPipe) + { + pResult->result.bFailed = true; + return pResult; + } + + if (!pResult->stdErrPipe) + { + pResult->result.bFailed = true; + return pResult; + } + + auto worker = AuAsync::GetCurrentWorkerPId(); + if (worker) + { + pResult->worker = worker; + pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker); + if (!pResult->pLoopQueue) + { + pResult->result.bFailed = true; + return pResult; + } + } + else + { + pResult->pLoopQueue = AuLoop::NewLoopQueue(); + if (!pResult->pLoopQueue) + { + pResult->result.bFailed = true; + return pResult; + } + } + + pResult->pTransactionA = pResult->process->NewAsyncTransaction(); + if (!pResult->pTransactionA) + { + pResult->result.bFailed = true; + return pResult; + } + + pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource(); + if (!pResult->pTransactionSourceA) + { + pResult->result.bFailed = true; + return pResult; + } + + pResult->pTransactionB = pResult->process->NewErrorStreamAsyncTransaction(); + if (!pResult->pTransactionB) + { + pResult->result.bFailed = true; + return pResult; + } + + pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource(); + if (!pResult->pTransactionSourceB) + { + pResult->result.bFailed = true; + return pResult; + } + + pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource(); + if (!pResult->pProcessDeadLoopSource) + { + pResult->result.bFailed = true; + return pResult; + } + + pResult->pPinSelf = pResult; + pResult->pCallbacks = pCallbacks; + + auto pProcessDeadCallback = AuMakeSharedThrow([=](const AuSPtr &source) -> bool + { + pResult->result.sExitCode = pResult->process->GetExitCode(); + pResult->result.uExitCode = (AuUInt)pResult->result.sExitCode; + pResult->bIsRunning = false; + pResult->HasFinished(); + return true; + }); + + pResult->pTransactionA->SetCallback(AuMakeSharedThrow([=](AuUInt64 offset, AuUInt32 length) + { + pResult->pTransactionA->Reset(); + + if (length) + { + pResult->result.strStdout += AuString(pResult->stdOutPipe.begin(), pResult->stdOutPipe.begin() + length); + if (pResult->pCallbacks) + { + pResult->pCallbacks->OnPipeData(); + } + pResult->pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView)); + } + })); + + pResult->pTransactionB->SetCallback(AuMakeSharedThrow([=](AuUInt64 offset, AuUInt32 length) + { + pResult->pTransactionB->Reset(); + + if (length) + { + pResult->result.strStderr += AuString(pResult->stdErrPipe.begin(), pResult->stdErrPipe.begin() + length); + if (pResult->pCallbacks) + { + pResult->pCallbacks->OnPipeData(); + } + pResult->pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView)); + } + })); + + if (!pResult->pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView))) + { + pResult->result.bFailed = true; + return pResult; + } + + if (!pResult->pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView))) + { + pResult->result.bFailed = true; + pResult->pTransactionA->SetCallback({}); + return pResult; + } + + if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA)) + { + pResult->result.bFailed = true; + pResult->pTransactionA->SetCallback({}); + pResult->pTransactionB->SetCallback({}); + return pResult; + } + + if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB)) + { + pResult->result.bFailed = true; + pResult->pTransactionA->SetCallback({}); + pResult->pTransactionB->SetCallback({}); + return pResult; + } + + if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource)) + { + pResult->result.bFailed = true; + pResult->pTransactionA->SetCallback({}); + pResult->pTransactionB->SetCallback({}); + return pResult; + } + + if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback)) + { + pResult->result.bFailed = true; + pResult->pTransactionA->SetCallback({}); + pResult->pTransactionB->SetCallback({}); + return pResult; + } + + if (!pResult->pLoopQueue->Commit()) + { + pResult->result.bFailed = true; + pResult->pTransactionA->SetCallback({}); + pResult->pTransactionB->SetCallback({}); + return pResult; + } + + return pResult; + } +} \ No newline at end of file diff --git a/Source/Processes/AuOutputOf.hpp b/Source/Processes/AuOutputOf.hpp new file mode 100644 index 00000000..9e4102db --- /dev/null +++ b/Source/Processes/AuOutputOf.hpp @@ -0,0 +1,12 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuOutputOf.hpp + Date: 2023-12-23 + Author: Reece +***/ +#pragma once + +namespace Aurora::Processes +{ +} \ No newline at end of file