AuroraRuntime/Source/Processes/AuOutputOf.cpp

764 lines
22 KiB
C++

/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuOutputOf.cpp
Date: 2023-12-23
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuProcesses.hpp"
#include "AuOutputOf.hpp"
namespace Aurora::Processes
{
AUKN_SYM OutputOfResult OutputOf(StartupParameters &&parameters,
AuOptional<AuMemoryViewRead> toSend)
{
OutputOfResult result;
parameters.type = ESpawnType::eSpawnChildProcessWorker;
parameters.fwdOut = EStreamForward::eAsyncPipe;
parameters.fwdErr = EStreamForward::eAsyncPipe;
if (toSend)
{
result.bFailed = true;
return result;
}
auto process = SpawnUnique(AuMove(parameters));
if (!process)
{
result.bFailed = true;
return result;
}
if (!process->Start())
{
result.bFailed = true;
result.bNoStart = true;
return result;
}
bool bIsRunning {true};
AuByteBuffer stdOutPipe(4096 * 16);
AuMemoryViewWrite readOutView(stdOutPipe);
if (!stdOutPipe)
{
result.bFailed = true;
return result;
}
AuByteBuffer stdErrPipe(4096 * 16);
AuMemoryViewWrite readErrView(stdErrPipe);
if (!stdErrPipe)
{
result.bFailed = true;
return result;
}
auto pLoopQueue = AuLoop::NewLoopQueue();
if (!pLoopQueue)
{
result.bFailed = true;
return result;
}
auto pTransactionA = process->NewAsyncTransaction();
if (!pTransactionA)
{
result.bFailed = true;
return result;
}
auto pTransactionSourceA = pTransactionA->NewLoopSource();
if (!pTransactionSourceA)
{
result.bFailed = true;
return result;
}
auto pTransactionB = process->NewErrorStreamAsyncTransaction();
if (!pTransactionB)
{
result.bFailed = true;
return result;
}
auto pTransactionSourceB = pTransactionB->NewLoopSource();
if (!pTransactionSourceB)
{
result.bFailed = true;
return result;
}
auto pProcessDeadLoopSource = process->AsLoopSource();
if (!pProcessDeadLoopSource)
{
result.bFailed = true;
return result;
}
auto pProcessDeadCallback = AuMakeSharedThrow<AuLoop::ILoopSourceSubscriberFunctional>([&](const AuSPtr<AuLoop::ILoopSource> &source) -> bool
{
result.sExitCode = process->GetExitCode();
result.uExitCode = (AuUInt)result.sExitCode;
bIsRunning = false;
return false;
});
pTransactionA->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([&](AuUInt64 offset, AuUInt32 length)
{
pTransactionA->Reset();
if (length)
{
result.strStdout += AuString(stdOutPipe.begin(), stdOutPipe.begin() + length);
pTransactionA->StartRead(0, readOutView);
}
}));
pTransactionB->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([&](AuUInt64 offset, AuUInt32 length)
{
pTransactionB->Reset();
if (length)
{
result.strStderr += AuString(stdErrPipe.begin(), stdErrPipe.begin() + length);
pTransactionB->StartRead(0, readErrView);
}
}));
if (!pTransactionA->StartRead(0, readOutView))
{
result.bFailed = true;
return result;
}
if (!pTransactionB->StartRead(0, readErrView))
{
result.bFailed = true;
pTransactionA->SetCallback({});
return result;
}
if (!pLoopQueue->SourceAdd(pTransactionSourceA))
{
result.bFailed = true;
pTransactionA->SetCallback({});
pTransactionB->SetCallback({});
return result;
}
if (!pLoopQueue->SourceAdd(pTransactionSourceB))
{
result.bFailed = true;
pTransactionA->SetCallback({});
pTransactionB->SetCallback({});
return result;
}
if (!pLoopQueue->SourceAdd(pProcessDeadLoopSource))
{
result.bFailed = true;
pTransactionA->SetCallback({});
pTransactionB->SetCallback({});
return result;
}
if (!pLoopQueue->AddCallback(pProcessDeadLoopSource, pProcessDeadCallback))
{
result.bFailed = true;
pTransactionA->SetCallback({});
pTransactionB->SetCallback({});
return result;
}
if (!pLoopQueue->Commit())
{
result.bFailed = true;
pTransactionA->SetCallback({});
pTransactionB->SetCallback({});
return result;
}
while (bIsRunning)
{
pLoopQueue->WaitAny();
}
while (pTransactionSourceA->IsSignaled() &&
pTransactionA->GetLastPacketLength())
{
}
while (pTransactionSourceB->IsSignaled() &&
pTransactionB->GetLastPacketLength())
{
}
return result;
}
struct AsyncOutputOf : IAsyncOutputOf
{
OutputOfResult result;
AuByteBuffer stdOutPipe;
AuMemoryViewWrite readOutView;
AuByteBuffer stdErrPipe;
AuMemoryViewWrite readErrView;
AuSPtr<AuLoop::ILoopQueue> pLoopQueue;
AuSPtr<AuIO::IAsyncTransaction> pTransactionA;
AuSPtr<AuIO::IAsyncTransaction> pTransactionB;
AuSPtr<AuLoop::ILoopSource> pTransactionSourceA;
AuSPtr<AuLoop::ILoopSource> pTransactionSourceB;
AuSPtr<AuLoop::ILoopSource> pProcessDeadLoopSource;
AuProcesses::SpawnUnique_t process;
bool bIsRunning { true };
AuSPtr<void> pPinSelf;
bool bHasFinished {};
AuSPtr<IAsyncCallbacks> pCallbacks;
AuWorkerID worker;
AuSPtr<IO::CompletionGroup::ICompletionGroup> pCompletionGroup;
AsyncOutputOf() :
stdOutPipe(4096 * 16),
readOutView(stdOutPipe),
stdErrPipe(4096 * 16),
readErrView(stdOutPipe)
{
}
IProcess *GetProcess() override
{
return this->process.get();
}
OutputOfResult *GetResult() override
{
return &this->result;
}
bool TryAttachProcessExitToCompletionGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pCompletionGroup) override
{
if (this->process)
{
return this->process->TryAttachProcessExitToCompletionGroup(pCompletionGroup);
}
else
{
return {};
}
}
IO::CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override
{
if (this->process)
{
return this->process->ToCompletionGroupHandle();
}
else
{
return {};
}
}
bool BlockUntilFinishedMS(AuUInt32 uRelMS) override
{
if (this->bHasFinished)
{
return true;
}
if (!uRelMS)
{
return this->BlockUntilFinished();
}
AuUInt64 uEndTime = AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(uRelMS);
while (this->bIsRunning)
{
auto uNow = AuTime::SteadyClockNS();
if (uNow >= uEndTime)
{
break;
}
auto uMS = AuNSToMS<AuUInt64>(uEndTime - uNow);
if (uMS)
{
this->pLoopQueue->WaitAny(uMS);
}
if (this->worker)
{
this->worker.GetPool()->Poll();
}
}
if (!this->bIsRunning)
{
this->BlockForeverWhileReading();
if (this->pCallbacks)
{
this->pCallbacks->OnExit();
}
CleanUp();
return this->bHasFinished = true;
}
else
{
return false;
}
}
void BlockForeverWhileReading()
{
if (this->pTransactionSourceA)
{
auto pTransaction = this->pTransactionA;
auto pSource = this->pTransactionSourceA;
while (pTransaction &&
pSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
else
{
AuSPtr<AuIO::IAsyncTransaction> pTransaction;
AuSPtr<IO::Loop::ILoopSource> pLoopSource;
while ((pTransaction = this->pTransactionA) &&
(pLoopSource = pTransaction->NewLoopSource()) &&
pLoopSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
if (this->pTransactionSourceB)
{
auto pTransaction = this->pTransactionB;
auto pSource = this->pTransactionSourceB;
while (pTransaction &&
pSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
else
{
AuSPtr<AuIO::IAsyncTransaction> pTransaction;
AuSPtr<IO::Loop::ILoopSource> pLoopSource;
while ((pTransaction = this->pTransactionB) &&
(pLoopSource = pTransaction->NewLoopSource()) &&
pLoopSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
}
bool BlockUntilFinished() override
{
if (this->bHasFinished)
{
return true;
}
while (this->bIsRunning)
{
if (this->worker)
{
this->worker.GetPool()->RunOnce();
}
else if (this->pLoopQueue)
{
this->pLoopQueue->WaitAny();
}
}
this->BlockForeverWhileReading();
if (!this->bIsRunning)
{
if (this->pCallbacks)
{
this->pCallbacks->OnExit();
}
CleanUp();
return this->bHasFinished = true;
}
else
{
return false;
}
}
bool HasFinished() override
{
if (this->bHasFinished)
{
return true;
}
if (this->bIsRunning)
{
if (this->worker)
{
this->worker.GetPool()->Poll();
}
else if (this->pLoopQueue)
{
this->pLoopQueue->WaitAny();
}
if (this->bIsRunning)
{
this->bIsRunning = !this->process->HasExited();
}
}
if (!this->bIsRunning)
{
this->BlockForeverWhileReading();
if (this->pCallbacks)
{
this->pCallbacks->OnExit();
}
CleanUp();
return this->bHasFinished = true;
}
else
{
return false;
}
}
void Finalize()
{
(void)this->HasFinished();
}
void CleanUp()
{
if (this->pLoopQueue)
{
if (this->pTransactionSourceA)
{
this->pLoopQueue->SourceRemove(this->pTransactionSourceA);
}
if (this->pTransactionSourceB)
{
this->pLoopQueue->SourceRemove(this->pTransactionSourceB);
}
if (this->pProcessDeadLoopSource)
{
this->pLoopQueue->SourceRemove(this->pProcessDeadLoopSource);
}
this->pLoopQueue->Commit();
}
AuResetMember(this->pTransactionSourceA);
AuResetMember(this->pTransactionSourceB);
AuResetMember(this->pTransactionA);
AuResetMember(this->pTransactionA);
AuResetMember(this->pCallbacks);
AuResetMember(this->pPinSelf);
AuResetMember(this->pLoopQueue);
}
};
AUKN_SYM AuSPtr<IAsyncOutputOf> OutputOfAsync(StartupParameters &&parameters,
const AuSPtr<IAsyncCallbacks> &pCallbacks)
{
auto pCompletionGroup = parameters.pAutoJoinCompletionGroup;
auto pResult = AuMakeSharedThrow<AsyncOutputOf>();
parameters.type = ESpawnType::eSpawnChildProcessWorker;
parameters.fwdOut = EStreamForward::eAsyncPipe;
parameters.fwdErr = EStreamForward::eAsyncPipe;
pResult->process = SpawnUnique(AuMove(parameters));
pResult->pCompletionGroup = pCompletionGroup;
if (!pResult->process)
{
pResult->result.bFailed = true;
return pResult;
}
if (!pResult->process->Start())
{
pResult->result.bFailed = true;
pResult->result.bNoStart = true;
return pResult;
}
if (!pResult->stdOutPipe)
{
pResult->result.bFailed = true;
return pResult;
}
if (!pResult->stdErrPipe)
{
pResult->result.bFailed = true;
return pResult;
}
auto worker = AuAsync::GetCurrentWorkerPId();
if (worker)
{
pResult->worker = worker;
if (!pCompletionGroup)
{
pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker);
if (!pResult->pLoopQueue)
{
pResult->result.bFailed = true;
return pResult;
}
}
}
else if (!pCompletionGroup)
{
pResult->pLoopQueue = AuLoop::NewLoopQueue();
if (!pResult->pLoopQueue)
{
pResult->result.bFailed = true;
return pResult;
}
}
pResult->pTransactionA = pResult->process->NewAsyncTransaction();
if (!pResult->pTransactionA)
{
pResult->result.bFailed = true;
return pResult;
}
if (!pCompletionGroup)
{
pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource();
if (!pResult->pTransactionSourceA)
{
pResult->result.bFailed = true;
return pResult;
}
}
else
{
(void)pResult->pTransactionA->TryAttachToCompletionGroup(pCompletionGroup);
}
pResult->pTransactionB = pResult->process->NewErrorStreamAsyncTransaction();
if (!pResult->pTransactionB)
{
pResult->result.bFailed = true;
return pResult;
}
if (!pCompletionGroup)
{
pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource();
if (!pResult->pTransactionSourceB)
{
pResult->result.bFailed = true;
return pResult;
}
pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource();
if (!pResult->pProcessDeadLoopSource)
{
pResult->result.bFailed = true;
return pResult;
}
}
else
{
(void)pResult->pTransactionB->TryAttachToCompletionGroup(pCompletionGroup);
}
pResult->pPinSelf = pResult;
pResult->pCallbacks = pCallbacks;
auto pProcessDeadCallback = AuMakeSharedThrow<AuLoop::ILoopSourceSubscriberFunctional>([=](const AuSPtr<AuLoop::ILoopSource> &source) -> bool
{
pResult->result.sExitCode = pResult->process->GetExitCode();
pResult->result.uExitCode = (AuUInt)pResult->result.sExitCode;
pResult->bIsRunning = false;
pResult->HasFinished();
return true;
});
pResult->pTransactionA->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 offset, AuUInt32 length)
{
auto pTransactionA = pResult->pTransactionA;
if (pTransactionA)
{
pTransactionA->Reset();
}
if (length)
{
pResult->result.strStdout += AuString(pResult->stdOutPipe.begin(), pResult->stdOutPipe.begin() + length);
if (pResult->pCallbacks)
{
pResult->pCallbacks->OnPipeData();
}
(void)pTransactionA->TryAttachToCompletionGroup(pResult->pCompletionGroup);
if (!pTransactionA->StartRead(0, pResult->readOutView))
{
pResult->Finalize();
}
}
else
{
pResult->Finalize();
}
}));
pResult->pTransactionB->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 offset, AuUInt32 length)
{
auto pTransactionB = pResult->pTransactionB;
if (pTransactionB)
{
pTransactionB->Reset();
}
if (length)
{
pResult->result.strStderr += AuString(pResult->stdErrPipe.begin(), pResult->stdErrPipe.begin() + length);
if (pResult->pCallbacks)
{
pResult->pCallbacks->OnPipeData();
}
(void)pTransactionB->TryAttachToCompletionGroup(pResult->pCompletionGroup);
if (!pTransactionB->StartRead(0, pResult->readErrView))
{
pResult->Finalize();
}
}
else
{
pResult->Finalize();
}
}));
if (!pResult->pTransactionA->StartRead(0, pResult->readOutView))
{
pResult->result.bFailed = true;
return pResult;
}
if (!pResult->pTransactionB->StartRead(0, pResult->readErrView))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
return pResult;
}
if (!pCompletionGroup)
{
if (!pResult->pLoopQueue)
{
return pResult;
}
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->Commit())
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
}
return pResult;
}
}