716 lines
20 KiB
C++
716 lines
20 KiB
C++
/***
|
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuOutputOf.cpp
|
|
Date: 2023-12-23
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "AuProcesses.hpp"
|
|
#include "AuOutputOf.hpp"
|
|
|
|
namespace Aurora::Processes
|
|
{
|
|
AUKN_SYM OutputOfResult OutputOf(StartupParameters &¶meters,
|
|
AuOptional<AuMemoryViewRead> toSend)
|
|
{
|
|
OutputOfResult result;
|
|
parameters.type = ESpawnType::eSpawnChildProcessWorker;
|
|
parameters.fwdOut = EStreamForward::eAsyncPipe;
|
|
parameters.fwdErr = EStreamForward::eAsyncPipe;
|
|
|
|
if (toSend)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto process = SpawnUnique(AuMove(parameters));
|
|
if (!process)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
if (!process->Start())
|
|
{
|
|
result.bFailed = true;
|
|
result.bNoStart = true;
|
|
return result;
|
|
}
|
|
|
|
bool bIsRunning {true};
|
|
AuByteBuffer stdOutPipe(4096 * 16);
|
|
AuMemoryViewWrite readOutView(stdOutPipe);
|
|
if (!stdOutPipe)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
AuByteBuffer stdErrPipe(4096 * 16);
|
|
AuMemoryViewWrite readErrView(stdErrPipe);
|
|
if (!stdErrPipe)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto pLoopQueue = AuLoop::NewLoopQueue();
|
|
if (!pLoopQueue)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto pTransactionA = process->NewAsyncTransaction();
|
|
if (!pTransactionA)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto pTransactionSourceA = pTransactionA->NewLoopSource();
|
|
if (!pTransactionSourceA)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto pTransactionB = process->NewErrorStreamAsyncTransaction();
|
|
if (!pTransactionB)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto pTransactionSourceB = pTransactionB->NewLoopSource();
|
|
if (!pTransactionSourceB)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto pProcessDeadLoopSource = process->AsLoopSource();
|
|
if (!pProcessDeadLoopSource)
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
auto pProcessDeadCallback = AuMakeSharedThrow<AuLoop::ILoopSourceSubscriberFunctional>([&](const AuSPtr<AuLoop::ILoopSource> &source) -> bool
|
|
{
|
|
result.sExitCode = process->GetExitCode();
|
|
result.uExitCode = (AuUInt)result.sExitCode;
|
|
bIsRunning = false;
|
|
return false;
|
|
});
|
|
|
|
pTransactionA->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([&](AuUInt64 offset, AuUInt32 length)
|
|
{
|
|
pTransactionA->Reset();
|
|
|
|
if (length)
|
|
{
|
|
result.strStdout += AuString(stdOutPipe.begin(), stdOutPipe.begin() + length);
|
|
pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&readOutView));
|
|
}
|
|
}));
|
|
|
|
pTransactionB->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([&](AuUInt64 offset, AuUInt32 length)
|
|
{
|
|
pTransactionB->Reset();
|
|
|
|
if (length)
|
|
{
|
|
result.strStderr += AuString(stdErrPipe.begin(), stdErrPipe.begin() + length);
|
|
pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&readErrView));
|
|
}
|
|
}));
|
|
|
|
if (!pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&readOutView)))
|
|
{
|
|
result.bFailed = true;
|
|
return result;
|
|
}
|
|
|
|
if (!pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&readErrView)))
|
|
{
|
|
result.bFailed = true;
|
|
pTransactionA->SetCallback({});
|
|
return result;
|
|
}
|
|
|
|
if (!pLoopQueue->SourceAdd(pTransactionSourceA))
|
|
{
|
|
result.bFailed = true;
|
|
pTransactionA->SetCallback({});
|
|
pTransactionB->SetCallback({});
|
|
return result;
|
|
}
|
|
|
|
if (!pLoopQueue->SourceAdd(pTransactionSourceB))
|
|
{
|
|
result.bFailed = true;
|
|
pTransactionA->SetCallback({});
|
|
pTransactionB->SetCallback({});
|
|
return result;
|
|
}
|
|
|
|
if (!pLoopQueue->SourceAdd(pProcessDeadLoopSource))
|
|
{
|
|
result.bFailed = true;
|
|
pTransactionA->SetCallback({});
|
|
pTransactionB->SetCallback({});
|
|
return result;
|
|
}
|
|
|
|
if (!pLoopQueue->AddCallback(pProcessDeadLoopSource, pProcessDeadCallback))
|
|
{
|
|
result.bFailed = true;
|
|
pTransactionA->SetCallback({});
|
|
pTransactionB->SetCallback({});
|
|
return result;
|
|
}
|
|
|
|
if (!pLoopQueue->Commit())
|
|
{
|
|
result.bFailed = true;
|
|
pTransactionA->SetCallback({});
|
|
pTransactionB->SetCallback({});
|
|
return result;
|
|
}
|
|
|
|
while (bIsRunning)
|
|
{
|
|
pLoopQueue->WaitAny();
|
|
}
|
|
|
|
while (pTransactionSourceA->IsSignaled() &&
|
|
pTransactionA->GetLastPacketLength())
|
|
{
|
|
}
|
|
|
|
while (pTransactionSourceB->IsSignaled() &&
|
|
pTransactionB->GetLastPacketLength())
|
|
{
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
struct AsyncOutputOf : IAsyncOutputOf
|
|
{
|
|
OutputOfResult result;
|
|
AuByteBuffer stdOutPipe;
|
|
AuMemoryViewWrite readOutView;
|
|
AuByteBuffer stdErrPipe;
|
|
AuMemoryViewWrite readErrView;
|
|
AuSPtr<AuLoop::ILoopQueue> pLoopQueue;
|
|
AuSPtr<AuIO::IAsyncTransaction> pTransactionA;
|
|
AuSPtr<AuIO::IAsyncTransaction> pTransactionB;
|
|
AuSPtr<AuLoop::ILoopSource> pTransactionSourceA;
|
|
AuSPtr<AuLoop::ILoopSource> pTransactionSourceB;
|
|
AuSPtr<AuLoop::ILoopSource> pProcessDeadLoopSource;
|
|
AuProcesses::SpawnUnique_t process;
|
|
bool bIsRunning { true };
|
|
AuSPtr<void> pPinSelf;
|
|
bool bHasFinished {};
|
|
AuSPtr<IAsyncCallbacks> pCallbacks;
|
|
AuWorkerID worker;
|
|
|
|
AsyncOutputOf() :
|
|
stdOutPipe(4096 * 16),
|
|
readOutView(stdOutPipe),
|
|
stdErrPipe(4096 * 16),
|
|
readErrView(stdOutPipe)
|
|
{
|
|
|
|
}
|
|
|
|
IProcess *GetProcess() override
|
|
{
|
|
return this->process.get();
|
|
}
|
|
|
|
OutputOfResult *GetResult() override
|
|
{
|
|
return &this->result;
|
|
}
|
|
|
|
bool TryAttachProcessExitToCompletionGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pCompletionGroup) override
|
|
{
|
|
if (this->process)
|
|
{
|
|
return this->process->TryAttachProcessExitToCompletionGroup(pCompletionGroup);
|
|
}
|
|
else
|
|
{
|
|
return {};
|
|
}
|
|
}
|
|
|
|
IO::CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override
|
|
{
|
|
if (this->process)
|
|
{
|
|
return this->process->ToCompletionGroupHandle();
|
|
}
|
|
else
|
|
{
|
|
return {};
|
|
}
|
|
}
|
|
|
|
bool BlockUntilFinishedMS(AuUInt32 uRelMS) override
|
|
{
|
|
if (this->bHasFinished)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
AuUInt64 uEndTime = AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(uRelMS);
|
|
|
|
while (this->bIsRunning)
|
|
{
|
|
auto uNow = AuTime::SteadyClockNS();
|
|
if (uNow >= uEndTime)
|
|
{
|
|
break;
|
|
}
|
|
|
|
auto uMS = AuNSToMS<AuUInt64>(uEndTime - uNow);
|
|
if (uMS)
|
|
{
|
|
this->pLoopQueue->WaitAny(uMS);
|
|
}
|
|
|
|
if (this->worker)
|
|
{
|
|
this->worker.GetPool()->Poll();
|
|
}
|
|
}
|
|
|
|
if (!this->bIsRunning)
|
|
{
|
|
while (this->pTransactionSourceA->IsSignaled())
|
|
{
|
|
}
|
|
|
|
while (this->pTransactionSourceB->IsSignaled())
|
|
{
|
|
}
|
|
|
|
if (this->pCallbacks)
|
|
{
|
|
this->pCallbacks->OnExit();
|
|
}
|
|
|
|
CleanUp();
|
|
return this->bHasFinished = true;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool BlockUntilFinished() override
|
|
{
|
|
if (this->bHasFinished)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
while (this->bIsRunning)
|
|
{
|
|
if (this->worker)
|
|
{
|
|
this->worker.GetPool()->RunOnce();
|
|
}
|
|
else if (this->pLoopQueue)
|
|
{
|
|
this->pLoopQueue->WaitAny();
|
|
}
|
|
}
|
|
|
|
if (this->pTransactionSourceA)
|
|
{
|
|
while (this->pTransactionSourceA->IsSignaled() &&
|
|
this->pTransactionA->GetLastPacketLength())
|
|
{
|
|
}
|
|
}
|
|
|
|
if (this->pTransactionSourceB)
|
|
{
|
|
while (this->pTransactionSourceB->IsSignaled() &&
|
|
this->pTransactionB->GetLastPacketLength())
|
|
{
|
|
}
|
|
}
|
|
|
|
if (!this->bIsRunning)
|
|
{
|
|
if (this->pCallbacks)
|
|
{
|
|
this->pCallbacks->OnExit();
|
|
}
|
|
|
|
CleanUp();
|
|
return this->bHasFinished = true;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool HasFinished() override
|
|
{
|
|
if (this->bHasFinished)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
if (this->bIsRunning)
|
|
{
|
|
if (this->worker)
|
|
{
|
|
this->worker.GetPool()->Poll();
|
|
}
|
|
else if (this->pLoopQueue)
|
|
{
|
|
this->pLoopQueue->WaitAny();
|
|
}
|
|
|
|
if (this->bIsRunning)
|
|
{
|
|
this->bIsRunning = !this->process->HasExited();
|
|
}
|
|
}
|
|
|
|
if (!this->bIsRunning)
|
|
{
|
|
if (this->pTransactionSourceA)
|
|
{
|
|
while (this->pTransactionSourceA->IsSignaled() &&
|
|
this->pTransactionA->GetLastPacketLength())
|
|
{
|
|
}
|
|
}
|
|
|
|
if (this->pTransactionSourceB)
|
|
{
|
|
while (this->pTransactionSourceB->IsSignaled() &&
|
|
this->pTransactionB->GetLastPacketLength())
|
|
{
|
|
}
|
|
}
|
|
|
|
if (this->pCallbacks)
|
|
{
|
|
this->pCallbacks->OnExit();
|
|
}
|
|
|
|
CleanUp();
|
|
return this->bHasFinished = true;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void Finalize()
|
|
{
|
|
(void)this->HasFinished();
|
|
}
|
|
|
|
void CleanUp()
|
|
{
|
|
if (this->pLoopQueue)
|
|
{
|
|
if (this->pTransactionSourceA)
|
|
{
|
|
this->pLoopQueue->SourceRemove(this->pTransactionSourceA);
|
|
}
|
|
|
|
if (this->pTransactionSourceB)
|
|
{
|
|
this->pLoopQueue->SourceRemove(this->pTransactionSourceB);
|
|
}
|
|
|
|
if (this->pProcessDeadLoopSource)
|
|
{
|
|
this->pLoopQueue->SourceRemove(this->pProcessDeadLoopSource);
|
|
}
|
|
|
|
this->pLoopQueue->Commit();
|
|
}
|
|
|
|
AuResetMember(this->pTransactionSourceA);
|
|
AuResetMember(this->pTransactionSourceB);
|
|
AuResetMember(this->pTransactionA);
|
|
AuResetMember(this->pTransactionA);
|
|
AuResetMember(this->pCallbacks);
|
|
AuResetMember(this->pPinSelf);
|
|
AuResetMember(this->pLoopQueue);
|
|
}
|
|
};
|
|
|
|
AUKN_SYM AuSPtr<IAsyncOutputOf> OutputOfAsync(StartupParameters &¶meters,
|
|
const AuSPtr<IAsyncCallbacks> &pCallbacks)
|
|
{
|
|
auto pResult = AuMakeSharedThrow<AsyncOutputOf>();
|
|
parameters.type = ESpawnType::eSpawnChildProcessWorker;
|
|
parameters.fwdOut = EStreamForward::eAsyncPipe;
|
|
parameters.fwdErr = EStreamForward::eAsyncPipe;
|
|
pResult->process = SpawnUnique(AuMove(parameters));
|
|
|
|
if (!pResult->process)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->process->Start())
|
|
{
|
|
pResult->result.bFailed = true;
|
|
pResult->result.bNoStart = true;
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->stdOutPipe)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->stdErrPipe)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
auto worker = AuAsync::GetCurrentWorkerPId();
|
|
if (worker)
|
|
{
|
|
pResult->worker = worker;
|
|
pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker);
|
|
if (!pResult->pLoopQueue)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
pResult->pLoopQueue = AuLoop::NewLoopQueue();
|
|
if (!pResult->pLoopQueue)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
}
|
|
|
|
pResult->pTransactionA = pResult->process->NewAsyncTransaction();
|
|
if (!pResult->pTransactionA)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource();
|
|
if (!pResult->pTransactionSourceA)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
pResult->pTransactionB = pResult->process->NewErrorStreamAsyncTransaction();
|
|
if (!pResult->pTransactionB)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource();
|
|
if (!pResult->pTransactionSourceB)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource();
|
|
if (!pResult->pProcessDeadLoopSource)
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
pResult->pPinSelf = pResult;
|
|
pResult->pCallbacks = pCallbacks;
|
|
|
|
auto pProcessDeadCallback = AuMakeSharedThrow<AuLoop::ILoopSourceSubscriberFunctional>([=](const AuSPtr<AuLoop::ILoopSource> &source) -> bool
|
|
{
|
|
pResult->result.sExitCode = pResult->process->GetExitCode();
|
|
pResult->result.uExitCode = (AuUInt)pResult->result.sExitCode;
|
|
pResult->bIsRunning = false;
|
|
pResult->HasFinished();
|
|
return true;
|
|
});
|
|
|
|
pResult->pTransactionA->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 offset, AuUInt32 length)
|
|
{
|
|
auto pTransactionA = pResult->pTransactionA;
|
|
if (pTransactionA)
|
|
{
|
|
pTransactionA->Reset();
|
|
}
|
|
|
|
if (length)
|
|
{
|
|
pResult->result.strStdout += AuString(pResult->stdOutPipe.begin(), pResult->stdOutPipe.begin() + length);
|
|
if (pResult->pCallbacks)
|
|
{
|
|
pResult->pCallbacks->OnPipeData();
|
|
}
|
|
|
|
if (!pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView)))
|
|
{
|
|
pResult->Finalize();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
pResult->Finalize();
|
|
}
|
|
}));
|
|
|
|
pResult->pTransactionB->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 offset, AuUInt32 length)
|
|
{
|
|
auto pTransactionB = pResult->pTransactionB;
|
|
if (pTransactionB)
|
|
{
|
|
pTransactionB->Reset();
|
|
}
|
|
|
|
if (length)
|
|
{
|
|
pResult->result.strStderr += AuString(pResult->stdErrPipe.begin(), pResult->stdErrPipe.begin() + length);
|
|
if (pResult->pCallbacks)
|
|
{
|
|
pResult->pCallbacks->OnPipeData();
|
|
}
|
|
|
|
if (!pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView)))
|
|
{
|
|
pResult->Finalize();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
pResult->Finalize();
|
|
}
|
|
}));
|
|
|
|
if (!pResult->pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView)))
|
|
{
|
|
pResult->result.bFailed = true;
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView)))
|
|
{
|
|
pResult->result.bFailed = true;
|
|
if (pResult->pTransactionA)
|
|
{
|
|
pResult->pTransactionA->SetCallback({});
|
|
}
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->pLoopQueue)
|
|
{
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA))
|
|
{
|
|
pResult->result.bFailed = true;
|
|
if (pResult->pTransactionA)
|
|
{
|
|
pResult->pTransactionA->SetCallback({});
|
|
}
|
|
if (pResult->pTransactionB)
|
|
{
|
|
pResult->pTransactionB->SetCallback({});
|
|
}
|
|
pResult->CleanUp();
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB))
|
|
{
|
|
pResult->result.bFailed = true;
|
|
if (pResult->pTransactionA)
|
|
{
|
|
pResult->pTransactionA->SetCallback({});
|
|
}
|
|
if (pResult->pTransactionB)
|
|
{
|
|
pResult->pTransactionB->SetCallback({});
|
|
}
|
|
pResult->CleanUp();
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource))
|
|
{
|
|
pResult->result.bFailed = true;
|
|
if (pResult->pTransactionA)
|
|
{
|
|
pResult->pTransactionA->SetCallback({});
|
|
}
|
|
if (pResult->pTransactionB)
|
|
{
|
|
pResult->pTransactionB->SetCallback({});
|
|
}
|
|
pResult->CleanUp();
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback))
|
|
{
|
|
pResult->result.bFailed = true;
|
|
if (pResult->pTransactionA)
|
|
{
|
|
pResult->pTransactionA->SetCallback({});
|
|
}
|
|
if (pResult->pTransactionB)
|
|
{
|
|
pResult->pTransactionB->SetCallback({});
|
|
}
|
|
pResult->CleanUp();
|
|
return pResult;
|
|
}
|
|
|
|
if (!pResult->pLoopQueue->Commit())
|
|
{
|
|
pResult->result.bFailed = true;
|
|
if (pResult->pTransactionA)
|
|
{
|
|
pResult->pTransactionA->SetCallback({});
|
|
}
|
|
if (pResult->pTransactionB)
|
|
{
|
|
pResult->pTransactionB->SetCallback({});
|
|
}
|
|
pResult->CleanUp();
|
|
return pResult;
|
|
}
|
|
|
|
return pResult;
|
|
}
|
|
} |