[*] Improve AuProcesses CompletionGroup integration

This commit is contained in:
Reece Wilson 2024-03-05 21:50:45 +00:00
parent 0a6c11d919
commit 7b302a7f35
8 changed files with 233 additions and 159 deletions

View File

@ -24,8 +24,8 @@ namespace Aurora::IO::CompletionGroup
virtual AuSPtr<Loop::ILoopSource> ToAndLoopSource() = 0;
virtual AuSPtr<Loop::ILoopSource> ToAnyLoopSource() = 0;
virtual AuSPtr<Async::IWorkItem> OnCompletion() = 0;
virtual AuSPtr<Async::IWorkItem> OnSingleCompletion() = 0;
virtual AuSPtr<Aurora::Async::IWorkItem> OnCompletion() = 0;
virtual AuSPtr<Aurora::Async::IWorkItem> OnSingleCompletion() = 0;
virtual bool HasCompleted() = 0;
virtual AuPair<AuUInt32, AuUInt32> GetStats() = 0;
@ -35,6 +35,7 @@ namespace Aurora::IO::CompletionGroup
virtual AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() = 0;
virtual void TryTrigger() = 0;
virtual void TryTriggerLater() = 0;
// 0 = indefinite
virtual bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS = 0) = 0;

View File

@ -68,6 +68,8 @@ namespace Aurora::Processes
AuOptionalEx<HWInfo::CpuBitId> optAffinity;
AuSPtr<IO::CompletionGroup::ICompletionGroup> pAutoJoinCompletionGroup;
#if defined(AURORA_IS_MODERNNT_DERIVED)
#if defined(CreateProcess)
AuSupplierConsumer<BOOL,

View File

@ -193,7 +193,17 @@ namespace Aurora::IO::CompletionGroup
SysAssert(!this->bTerminated, "Completion group already terminated");
AU_LOCK_GUARD(this->mutex);
this->workItems.push_back(pCompletable);
this->uAdded++;
AuAtomicAdd(&this->uAdded, 1u);
}
void CompletionGroup::UnsafeRemoveItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
{
AU_LOCK_GUARD(this->mutex);
if (AuTryRemove(this->workItems, pCompletable))
{
AuAtomicAdd(&this->uTriggered, 1u);
}
}
void CompletionGroup::AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny)
@ -227,6 +237,14 @@ namespace Aurora::IO::CompletionGroup
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
}
void CompletionGroup::TryTriggerLater()
{
if (auto pEvent = GetTriggerLoopSource())
{
pEvent->Set();
}
}
bool CompletionGroup::WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS)
{
if (auto pLoopSource = this->ToAnyLoopSource())
@ -312,6 +330,11 @@ namespace Aurora::IO::CompletionGroup
return pRet;
}
void CompletionGroup::ResetAnd()
{
AuResetMember(this->pAndBarrier);
}
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletion()
{
AU_LOCK_GUARD(this->cs);

View File

@ -29,6 +29,7 @@ namespace Aurora::IO::CompletionGroup
AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() override;
void TryTrigger() override;
void TryTriggerLater() override;
bool HasCompleted() override;
AuPair<AuUInt32, AuUInt32> GetStats() override;
@ -40,12 +41,15 @@ namespace Aurora::IO::CompletionGroup
bool HasItemsActive();
void AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable) override;
void UnsafeRemoveItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable);
void AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny);
bool IsNeverEnding() override;
void SetNeverEnding(bool bValue) override;
void ResetAnd();
private:
AuMutex mutex;
AuCriticalSection cs;

View File

@ -48,7 +48,15 @@ namespace Aurora::IO::CompletionGroup
if (this->bIsAnd)
{
return !this->pParent->HasItemsActive();
if (!this->pParent->HasItemsActive())
{
this->pParent->ResetAnd();
return true;
}
else
{
return false;
}
}
else
{

View File

@ -218,6 +218,7 @@ namespace Aurora::Processes
bool bHasFinished {};
AuSPtr<IAsyncCallbacks> pCallbacks;
AuWorkerID worker;
AuSPtr<IO::CompletionGroup::ICompletionGroup> pCompletionGroup;
AsyncOutputOf() :
stdOutPipe(4096 * 16),
@ -269,6 +270,11 @@ namespace Aurora::Processes
return true;
}
if (!uRelMS)
{
return this->BlockUntilFinished();
}
AuUInt64 uEndTime = AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(uRelMS);
while (this->bIsRunning)
@ -293,13 +299,7 @@ namespace Aurora::Processes
if (!this->bIsRunning)
{
while (this->pTransactionSourceA->IsSignaled())
{
}
while (this->pTransactionSourceB->IsSignaled())
{
}
this->BlockForeverWhileReading();
if (this->pCallbacks)
{
@ -315,6 +315,55 @@ namespace Aurora::Processes
}
}
void BlockForeverWhileReading()
{
if (this->pTransactionSourceA)
{
auto pTransaction = this->pTransactionA;
auto pSource = this->pTransactionSourceA;
while (pTransaction &&
pSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
else
{
AuSPtr<AuIO::IAsyncTransaction> pTransaction;
AuSPtr<IO::Loop::ILoopSource> pLoopSource;
while ((pTransaction = this->pTransactionA) &&
(pLoopSource = pTransaction->NewLoopSource()) &&
pLoopSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
if (this->pTransactionSourceB)
{
auto pTransaction = this->pTransactionB;
auto pSource = this->pTransactionSourceB;
while (pTransaction &&
pSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
else
{
AuSPtr<AuIO::IAsyncTransaction> pTransaction;
AuSPtr<IO::Loop::ILoopSource> pLoopSource;
while ((pTransaction = this->pTransactionB) &&
(pLoopSource = pTransaction->NewLoopSource()) &&
pLoopSource->IsSignaled() &&
pTransaction->GetLastPacketLength())
{
}
}
}
bool BlockUntilFinished() override
{
if (this->bHasFinished)
@ -334,21 +383,7 @@ namespace Aurora::Processes
}
}
if (this->pTransactionSourceA)
{
while (this->pTransactionSourceA->IsSignaled() &&
this->pTransactionA->GetLastPacketLength())
{
}
}
if (this->pTransactionSourceB)
{
while (this->pTransactionSourceB->IsSignaled() &&
this->pTransactionB->GetLastPacketLength())
{
}
}
this->BlockForeverWhileReading();
if (!this->bIsRunning)
{
@ -392,21 +427,7 @@ namespace Aurora::Processes
if (!this->bIsRunning)
{
if (this->pTransactionSourceA)
{
while (this->pTransactionSourceA->IsSignaled() &&
this->pTransactionA->GetLastPacketLength())
{
}
}
if (this->pTransactionSourceB)
{
while (this->pTransactionSourceB->IsSignaled() &&
this->pTransactionB->GetLastPacketLength())
{
}
}
this->BlockForeverWhileReading();
if (this->pCallbacks)
{
@ -462,11 +483,13 @@ namespace Aurora::Processes
AUKN_SYM AuSPtr<IAsyncOutputOf> OutputOfAsync(StartupParameters &&parameters,
const AuSPtr<IAsyncCallbacks> &pCallbacks)
{
auto pCompletionGroup = parameters.pAutoJoinCompletionGroup;
auto pResult = AuMakeSharedThrow<AsyncOutputOf>();
parameters.type = ESpawnType::eSpawnChildProcessWorker;
parameters.fwdOut = EStreamForward::eAsyncPipe;
parameters.fwdErr = EStreamForward::eAsyncPipe;
pResult->process = SpawnUnique(AuMove(parameters));
pResult->pCompletionGroup = pCompletionGroup;
if (!pResult->process)
{
@ -497,14 +520,18 @@ namespace Aurora::Processes
if (worker)
{
pResult->worker = worker;
pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker);
if (!pResult->pLoopQueue)
if (!pCompletionGroup)
{
pResult->result.bFailed = true;
return pResult;
pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker);
if (!pResult->pLoopQueue)
{
pResult->result.bFailed = true;
return pResult;
}
}
}
else
else if (!pCompletionGroup)
{
pResult->pLoopQueue = AuLoop::NewLoopQueue();
if (!pResult->pLoopQueue)
@ -521,11 +548,18 @@ namespace Aurora::Processes
return pResult;
}
pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource();
if (!pResult->pTransactionSourceA)
if (!pCompletionGroup)
{
pResult->result.bFailed = true;
return pResult;
pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource();
if (!pResult->pTransactionSourceA)
{
pResult->result.bFailed = true;
return pResult;
}
}
else
{
(void)pResult->pTransactionA->TryAttachToCompletionGroup(pCompletionGroup);
}
pResult->pTransactionB = pResult->process->NewErrorStreamAsyncTransaction();
@ -535,18 +569,25 @@ namespace Aurora::Processes
return pResult;
}
pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource();
if (!pResult->pTransactionSourceB)
if (!pCompletionGroup)
{
pResult->result.bFailed = true;
return pResult;
}
pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource();
if (!pResult->pTransactionSourceB)
{
pResult->result.bFailed = true;
return pResult;
}
pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource();
if (!pResult->pProcessDeadLoopSource)
pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource();
if (!pResult->pProcessDeadLoopSource)
{
pResult->result.bFailed = true;
return pResult;
}
}
else
{
pResult->result.bFailed = true;
return pResult;
(void)pResult->pTransactionB->TryAttachToCompletionGroup(pCompletionGroup);
}
pResult->pPinSelf = pResult;
@ -577,6 +618,8 @@ namespace Aurora::Processes
pResult->pCallbacks->OnPipeData();
}
(void)pTransactionA->TryAttachToCompletionGroup(pResult->pCompletionGroup);
if (!pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView)))
{
pResult->Finalize();
@ -604,6 +647,8 @@ namespace Aurora::Processes
pResult->pCallbacks->OnPipeData();
}
(void)pTransactionB->TryAttachToCompletionGroup(pResult->pCompletionGroup);
if (!pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView)))
{
pResult->Finalize();
@ -631,84 +676,87 @@ namespace Aurora::Processes
return pResult;
}
if (!pResult->pLoopQueue)
if (!pCompletionGroup)
{
return pResult;
}
if (!pResult->pLoopQueue)
{
return pResult;
}
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA))
{
pResult->pTransactionA->SetCallback({});
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB))
{
pResult->pTransactionA->SetCallback({});
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource))
{
pResult->pTransactionA->SetCallback({});
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback))
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback))
{
pResult->pTransactionA->SetCallback({});
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (!pResult->pLoopQueue->Commit())
{
pResult->result.bFailed = true;
if (pResult->pTransactionA)
if (!pResult->pLoopQueue->Commit())
{
pResult->pTransactionA->SetCallback({});
pResult->result.bFailed = true;
if (pResult->pTransactionA)
{
pResult->pTransactionA->SetCallback({});
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
if (pResult->pTransactionB)
{
pResult->pTransactionB->SetCallback({});
}
pResult->CleanUp();
return pResult;
}
return pResult;

View File

@ -20,7 +20,7 @@
#include <Source/IO/FS/Async.NT.hpp>
#include <Source/IO/Loop/LSHandle.hpp>
#include <Source/IO/AuIOHandle.hpp>
#include <Source/IO/CompletionGroup/CompletionGroup.hpp>
#include "AuCreatePipeEx.NT.hpp"
namespace Aurora::Processes
@ -85,10 +85,10 @@ namespace Aurora::Processes
this->thread_.reset();
}
if (auto pGroup = this->pCompletionGroup_)
if (auto &pGroup = this->pCompletionGroup_)
{
this->bHasTerminated = true;
pGroup->TryTrigger();
AuStaticCast<IO::CompletionGroup::CompletionGroup>(pGroup)->UnsafeRemoveItem(AuUnsafeRaiiToShared(this));
}
AuWin32CloseHandle(this->poke_);
@ -709,6 +709,11 @@ namespace Aurora::Processes
RelOtherHandles();
if (auto pCompletionGroup = AuExchange(this->startup_.pAutoJoinCompletionGroup, {}))
{
(void)this->TryAttachProcessExitToCompletionGroup(pCompletionGroup);
}
if (this->startup_.optAffinity)
{
{
@ -771,16 +776,9 @@ namespace Aurora::Processes
this->exitCode_ = exitCode;
}
if (auto pCompletionGroup = this->pCompletionGroup_)
if (auto &pCompletionGroup = this->pCompletionGroup_)
{
if (auto pTriggerSrc = pCompletionGroup->GetTriggerLoopSource())
{
pTriggerSrc->Set();
}
else
{
pCompletionGroup->TryTrigger();
}
pCompletionGroup->TryTriggerLater();
}
};
@ -814,12 +812,9 @@ namespace Aurora::Processes
this->pCompletionGroup_ = pCompletionGroup;
pCompletionGroup->AddWorkItem(AuUnsafeRaiiToShared(this));
// verify the process hasnt already exited at least once
this->pCompletionGroup_->TryTriggerLater();
if (auto pLoopSource = pCompletionGroup->GetTriggerLoopSource())
{
// verify the process hasnt already exited at least once
pLoopSource->Set();
}
return true;
}

View File

@ -17,6 +17,7 @@
#include <sys/wait.h>
#include <Source/IO/AuIOHandle.Unix.hpp>
#include <Source/IO/CompletionGroup/CompletionGroup.hpp>
#if defined(AURORA_COMPILER_CLANG)
// warning: enumeration values 'kEnumCount' and 'kEnumInvalid' not handled in switch [-Wswitch
@ -115,12 +116,11 @@ namespace Aurora::Processes
AuTryRemove(gPidLookupMap, this->pidt_);
}
if (auto pGroup = this->pCompletionGroup_)
{
this->bHasExited = true;
pGroup->TryTrigger();
}
if (auto &pGroup = this->pCompletionGroup_)
{
this->bHasExited = true;
AuStaticCast<IO::CompletionGroup::CompletionGroup>(pGroup)->UnsafeRemoveItem(AuUnsafeRaiiToShared(this));
}
ShutdownPipes();
}
@ -160,16 +160,9 @@ namespace Aurora::Processes
this->fsErrorStream_->CheckProcess();
}
if (auto pCompletionGroup = this->pCompletionGroup_)
if (auto &pCompletionGroup = this->pCompletionGroup_)
{
if (auto pTriggerSrc = pCompletionGroup->GetTriggerLoopSource())
{
pTriggerSrc->Set();
}
else
{
pCompletionGroup->TryTrigger();
}
pCompletionGroup->TryTriggerLater();
}
}
@ -497,6 +490,11 @@ namespace Aurora::Processes
this->fsErrorStream_->MakeProcess(this);
}
if (auto pCompletionGroup = AuExchange(this->startup_.pAutoJoinCompletionGroup, {}))
{
(void)this->TryAttachProcessExitToCompletionGroup(pCompletionGroup);
}
return true;
}
@ -767,12 +765,7 @@ namespace Aurora::Processes
this->pCompletionGroup_ = pCompletionGroup;
pCompletionGroup->AddWorkItem(AuUnsafeRaiiToShared(this));
if (auto pLoopSource = pCompletionGroup->GetTriggerLoopSource())
{
// verify the process hasnt already exited at least once
pLoopSource->Set();
}
pCompletionGroup->TryTriggerLater();
return true;
}