/*** Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuOutputOf.cpp Date: 2023-12-23 Author: Reece ***/ #include #include "AuProcesses.hpp" #include "AuOutputOf.hpp" namespace Aurora::Processes { AUKN_SYM OutputOfResult OutputOf(StartupParameters &¶meters, AuOptional toSend) { OutputOfResult result; parameters.type = ESpawnType::eSpawnChildProcessWorker; parameters.fwdOut = EStreamForward::eAsyncPipe; parameters.fwdErr = EStreamForward::eAsyncPipe; if (toSend) { result.bFailed = true; return result; } auto process = SpawnUnique(AuMove(parameters)); if (!process) { result.bFailed = true; return result; } if (!process->Start()) { result.bFailed = true; result.bNoStart = true; return result; } bool bIsRunning {true}; AuByteBuffer stdOutPipe(4096 * 16); AuMemoryViewWrite readOutView(stdOutPipe); if (!stdOutPipe) { result.bFailed = true; return result; } AuByteBuffer stdErrPipe(4096 * 16); AuMemoryViewWrite readErrView(stdErrPipe); if (!stdErrPipe) { result.bFailed = true; return result; } auto pLoopQueue = AuLoop::NewLoopQueue(); if (!pLoopQueue) { result.bFailed = true; return result; } auto pTransactionA = process->NewAsyncTransaction(); if (!pTransactionA) { result.bFailed = true; return result; } auto pTransactionSourceA = pTransactionA->NewLoopSource(); if (!pTransactionSourceA) { result.bFailed = true; return result; } auto pTransactionB = process->NewErrorStreamAsyncTransaction(); if (!pTransactionB) { result.bFailed = true; return result; } auto pTransactionSourceB = pTransactionB->NewLoopSource(); if (!pTransactionSourceB) { result.bFailed = true; return result; } auto pProcessDeadLoopSource = process->AsLoopSource(); if (!pProcessDeadLoopSource) { result.bFailed = true; return result; } auto pProcessDeadCallback = AuMakeSharedThrow([&](const AuSPtr &source) -> bool { result.sExitCode = process->GetExitCode(); result.uExitCode = (AuUInt)result.sExitCode; bIsRunning = false; return false; }); pTransactionA->SetCallback(AuMakeSharedThrow([&](AuUInt64 offset, AuUInt32 length) { pTransactionA->Reset(); if (length) { result.strStdout += AuString(stdOutPipe.begin(), stdOutPipe.begin() + length); pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&readOutView)); } })); pTransactionB->SetCallback(AuMakeSharedThrow([&](AuUInt64 offset, AuUInt32 length) { pTransactionB->Reset(); if (length) { result.strStderr += AuString(stdErrPipe.begin(), stdErrPipe.begin() + length); pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&readErrView)); } })); if (!pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&readOutView))) { result.bFailed = true; return result; } if (!pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&readErrView))) { result.bFailed = true; pTransactionA->SetCallback({}); return result; } if (!pLoopQueue->SourceAdd(pTransactionSourceA)) { result.bFailed = true; pTransactionA->SetCallback({}); pTransactionB->SetCallback({}); return result; } if (!pLoopQueue->SourceAdd(pTransactionSourceB)) { result.bFailed = true; pTransactionA->SetCallback({}); pTransactionB->SetCallback({}); return result; } if (!pLoopQueue->SourceAdd(pProcessDeadLoopSource)) { result.bFailed = true; pTransactionA->SetCallback({}); pTransactionB->SetCallback({}); return result; } if (!pLoopQueue->AddCallback(pProcessDeadLoopSource, pProcessDeadCallback)) { result.bFailed = true; pTransactionA->SetCallback({}); pTransactionB->SetCallback({}); return result; } if (!pLoopQueue->Commit()) { result.bFailed = true; pTransactionA->SetCallback({}); pTransactionB->SetCallback({}); return result; } while (bIsRunning) { pLoopQueue->WaitAny(); } while (pTransactionSourceA->IsSignaled() && pTransactionA->GetLastPacketLength()) { } while (pTransactionSourceB->IsSignaled() && pTransactionB->GetLastPacketLength()) { } return result; } struct AsyncOutputOf : IAsyncOutputOf { OutputOfResult result; AuByteBuffer stdOutPipe; AuMemoryViewWrite readOutView; AuByteBuffer stdErrPipe; AuMemoryViewWrite readErrView; AuSPtr pLoopQueue; AuSPtr pTransactionA; AuSPtr pTransactionB; AuSPtr pTransactionSourceA; AuSPtr pTransactionSourceB; AuSPtr pProcessDeadLoopSource; AuProcesses::SpawnUnique_t process; bool bIsRunning { true }; AuSPtr pPinSelf; bool bHasFinished {}; AuSPtr pCallbacks; AuWorkerID worker; AsyncOutputOf() : stdOutPipe(4096 * 16), readOutView(stdOutPipe), stdErrPipe(4096 * 16), readErrView(stdOutPipe) { } IProcess *GetProcess() override { return this->process.get(); } OutputOfResult *GetResult() override { return &this->result; } bool BlockUntilFinishedMS(AuUInt32 uRelMS) override { if (this->bHasFinished) { return true; } AuUInt64 uEndTime = AuTime::SteadyClockNS() + AuMSToNS(uRelMS); while (this->bIsRunning) { auto uNow = AuTime::SteadyClockNS(); if (uNow >= uEndTime) { break; } auto uMS = AuNSToMS(uEndTime - uNow); if (uMS) { this->pLoopQueue->WaitAny(uMS); } if (this->worker) { this->worker.GetPool()->Poll(); } } if (!this->bIsRunning) { while (this->pTransactionSourceA->IsSignaled()) { } while (this->pTransactionSourceB->IsSignaled()) { } if (this->pCallbacks) { this->pCallbacks->OnExit(); } CleanUp(); return this->bHasFinished = true; } else { return false; } } bool BlockUntilFinished() override { if (this->bHasFinished) { return true; } while (this->bIsRunning) { if (this->worker) { this->worker.GetPool()->RunOnce(); } else if (this->pLoopQueue) { this->pLoopQueue->WaitAny(); } } if (this->pTransactionSourceA) { while (this->pTransactionSourceA->IsSignaled() && this->pTransactionA->GetLastPacketLength()) { } } if (this->pTransactionSourceB) { while (this->pTransactionSourceB->IsSignaled() && this->pTransactionB->GetLastPacketLength()) { } } if (!this->bIsRunning) { if (this->pCallbacks) { this->pCallbacks->OnExit(); } CleanUp(); return this->bHasFinished = true; } else { return false; } } bool HasFinished() override { if (this->bHasFinished) { return true; } if (this->bIsRunning) { if (this->worker) { this->worker.GetPool()->Poll(); } else if (this->pLoopQueue) { this->pLoopQueue->WaitAny(); } } if (!this->bIsRunning) { if (this->pTransactionSourceA) { while (this->pTransactionSourceA->IsSignaled() && this->pTransactionA->GetLastPacketLength()) { } } if (this->pTransactionSourceB) { while (this->pTransactionSourceB->IsSignaled() && this->pTransactionB->GetLastPacketLength()) { } } if (this->pCallbacks) { this->pCallbacks->OnExit(); } CleanUp(); return this->bHasFinished = true; } else { return false; } } void Finalize() { (void)this->HasFinished(); } void CleanUp() { if (this->pLoopQueue) { if (this->pTransactionSourceA) { this->pLoopQueue->SourceRemove(this->pTransactionSourceA); } if (this->pTransactionSourceB) { this->pLoopQueue->SourceRemove(this->pTransactionSourceB); } if (this->pProcessDeadLoopSource) { this->pLoopQueue->SourceRemove(this->pProcessDeadLoopSource); } this->pLoopQueue->Commit(); } AuResetMember(this->pTransactionSourceA); AuResetMember(this->pTransactionSourceB); AuResetMember(this->pTransactionA); AuResetMember(this->pTransactionA); AuResetMember(this->pCallbacks); AuResetMember(this->pPinSelf); AuResetMember(this->pLoopQueue); } }; AUKN_SYM AuSPtr OutputOfAsync(StartupParameters &¶meters, const AuSPtr &pCallbacks) { auto pResult = AuMakeSharedThrow(); parameters.type = ESpawnType::eSpawnChildProcessWorker; parameters.fwdOut = EStreamForward::eAsyncPipe; parameters.fwdErr = EStreamForward::eAsyncPipe; pResult->process = SpawnUnique(AuMove(parameters)); if (!pResult->process) { pResult->result.bFailed = true; return pResult; } if (!pResult->process->Start()) { pResult->result.bFailed = true; pResult->result.bNoStart = true; return pResult; } if (!pResult->stdOutPipe) { pResult->result.bFailed = true; return pResult; } if (!pResult->stdErrPipe) { pResult->result.bFailed = true; return pResult; } auto worker = AuAsync::GetCurrentWorkerPId(); if (worker) { pResult->worker = worker; pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker); if (!pResult->pLoopQueue) { pResult->result.bFailed = true; return pResult; } } else { pResult->pLoopQueue = AuLoop::NewLoopQueue(); if (!pResult->pLoopQueue) { pResult->result.bFailed = true; return pResult; } } pResult->pTransactionA = pResult->process->NewAsyncTransaction(); if (!pResult->pTransactionA) { pResult->result.bFailed = true; return pResult; } pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource(); if (!pResult->pTransactionSourceA) { pResult->result.bFailed = true; return pResult; } pResult->pTransactionB = pResult->process->NewErrorStreamAsyncTransaction(); if (!pResult->pTransactionB) { pResult->result.bFailed = true; return pResult; } pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource(); if (!pResult->pTransactionSourceB) { pResult->result.bFailed = true; return pResult; } pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource(); if (!pResult->pProcessDeadLoopSource) { pResult->result.bFailed = true; return pResult; } pResult->pPinSelf = pResult; pResult->pCallbacks = pCallbacks; auto pProcessDeadCallback = AuMakeSharedThrow([=](const AuSPtr &source) -> bool { pResult->result.sExitCode = pResult->process->GetExitCode(); pResult->result.uExitCode = (AuUInt)pResult->result.sExitCode; pResult->bIsRunning = false; pResult->HasFinished(); return true; }); pResult->pTransactionA->SetCallback(AuMakeSharedThrow([=](AuUInt64 offset, AuUInt32 length) { auto pTransactionA = pResult->pTransactionA; if (pTransactionA) { pTransactionA->Reset(); } if (length) { pResult->result.strStdout += AuString(pResult->stdOutPipe.begin(), pResult->stdOutPipe.begin() + length); if (pResult->pCallbacks) { pResult->pCallbacks->OnPipeData(); } if (!pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView))) { pResult->Finalize(); } } else { pResult->Finalize(); } })); pResult->pTransactionB->SetCallback(AuMakeSharedThrow([=](AuUInt64 offset, AuUInt32 length) { auto pTransactionB = pResult->pTransactionB; if (pTransactionB) { pTransactionB->Reset(); } if (length) { pResult->result.strStderr += AuString(pResult->stdErrPipe.begin(), pResult->stdErrPipe.begin() + length); if (pResult->pCallbacks) { pResult->pCallbacks->OnPipeData(); } if (!pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView))) { pResult->Finalize(); } } else { pResult->Finalize(); } })); if (!pResult->pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView))) { pResult->result.bFailed = true; return pResult; } if (!pResult->pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView))) { pResult->result.bFailed = true; pResult->pTransactionA->SetCallback({}); return pResult; } if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA)) { pResult->result.bFailed = true; pResult->pTransactionA->SetCallback({}); pResult->pTransactionB->SetCallback({}); return pResult; } if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB)) { pResult->result.bFailed = true; pResult->pTransactionA->SetCallback({}); pResult->pTransactionB->SetCallback({}); return pResult; } if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource)) { pResult->result.bFailed = true; pResult->pTransactionA->SetCallback({}); pResult->pTransactionB->SetCallback({}); return pResult; } if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback)) { pResult->result.bFailed = true; pResult->pTransactionA->SetCallback({}); pResult->pTransactionB->SetCallback({}); return pResult; } if (!pResult->pLoopQueue->Commit()) { pResult->result.bFailed = true; pResult->pTransactionA->SetCallback({}); pResult->pTransactionB->SetCallback({}); return pResult; } return pResult; } }