diff --git a/Include/Aurora/IO/CompletionGroup/ICompletionGroup.hpp b/Include/Aurora/IO/CompletionGroup/ICompletionGroup.hpp index 41fbcf79..cd06c2ad 100644 --- a/Include/Aurora/IO/CompletionGroup/ICompletionGroup.hpp +++ b/Include/Aurora/IO/CompletionGroup/ICompletionGroup.hpp @@ -24,8 +24,8 @@ namespace Aurora::IO::CompletionGroup virtual AuSPtr ToAndLoopSource() = 0; virtual AuSPtr ToAnyLoopSource() = 0; - virtual AuSPtr OnCompletion() = 0; - virtual AuSPtr OnSingleCompletion() = 0; + virtual AuSPtr OnCompletion() = 0; + virtual AuSPtr OnSingleCompletion() = 0; virtual bool HasCompleted() = 0; virtual AuPair GetStats() = 0; @@ -35,6 +35,7 @@ namespace Aurora::IO::CompletionGroup virtual AuSPtr GetTriggerLoopSource() = 0; virtual void TryTrigger() = 0; + virtual void TryTriggerLater() = 0; // 0 = indefinite virtual bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS = 0) = 0; diff --git a/Include/Aurora/Processes/StartupParameters.hpp b/Include/Aurora/Processes/StartupParameters.hpp index 59ad7d81..06c4a52d 100644 --- a/Include/Aurora/Processes/StartupParameters.hpp +++ b/Include/Aurora/Processes/StartupParameters.hpp @@ -68,6 +68,8 @@ namespace Aurora::Processes AuOptionalEx optAffinity; + AuSPtr pAutoJoinCompletionGroup; + #if defined(AURORA_IS_MODERNNT_DERIVED) #if defined(CreateProcess) AuSupplierConsumerbTerminated, "Completion group already terminated"); AU_LOCK_GUARD(this->mutex); this->workItems.push_back(pCompletable); - this->uAdded++; + AuAtomicAdd(&this->uAdded, 1u); + } + + void CompletionGroup::UnsafeRemoveItem(const AuSPtr &pCompletable) + { + AU_LOCK_GUARD(this->mutex); + + if (AuTryRemove(this->workItems, pCompletable)) + { + AuAtomicAdd(&this->uTriggered, 1u); + } } void CompletionGroup::AddCallbackTick(const AuSPtr &pCallbackInvoker, bool bAny) @@ -227,6 +237,14 @@ namespace Aurora::IO::CompletionGroup return this->anyProbablyAlwaysPresentEvent.GetLoopSource(); } + void CompletionGroup::TryTriggerLater() + { + if (auto pEvent = GetTriggerLoopSource()) + { + pEvent->Set(); + } + } + bool CompletionGroup::WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS) { if (auto pLoopSource = this->ToAnyLoopSource()) @@ -312,6 +330,11 @@ namespace Aurora::IO::CompletionGroup return pRet; } + void CompletionGroup::ResetAnd() + { + AuResetMember(this->pAndBarrier); + } + AuSPtr CompletionGroup::OnSingleCompletion() { AU_LOCK_GUARD(this->cs); diff --git a/Source/IO/CompletionGroup/CompletionGroup.hpp b/Source/IO/CompletionGroup/CompletionGroup.hpp index 481dc94c..2ee7417b 100644 --- a/Source/IO/CompletionGroup/CompletionGroup.hpp +++ b/Source/IO/CompletionGroup/CompletionGroup.hpp @@ -29,6 +29,7 @@ namespace Aurora::IO::CompletionGroup AuSPtr GetTriggerLoopSource() override; void TryTrigger() override; + void TryTriggerLater() override; bool HasCompleted() override; AuPair GetStats() override; @@ -40,12 +41,15 @@ namespace Aurora::IO::CompletionGroup bool HasItemsActive(); void AddWorkItem(const AuSPtr &pCompletable) override; + void UnsafeRemoveItem(const AuSPtr &pCompletable); void AddCallbackTick(const AuSPtr &pCallbackInvoker, bool bAny); bool IsNeverEnding() override; void SetNeverEnding(bool bValue) override; + void ResetAnd(); + private: AuMutex mutex; AuCriticalSection cs; diff --git a/Source/IO/CompletionGroup/CompletionGroupLoopSource.cpp b/Source/IO/CompletionGroup/CompletionGroupLoopSource.cpp index 55880202..1872d714 100644 --- a/Source/IO/CompletionGroup/CompletionGroupLoopSource.cpp +++ b/Source/IO/CompletionGroup/CompletionGroupLoopSource.cpp @@ -48,7 +48,15 @@ namespace Aurora::IO::CompletionGroup if (this->bIsAnd) { - return !this->pParent->HasItemsActive(); + if (!this->pParent->HasItemsActive()) + { + this->pParent->ResetAnd(); + return true; + } + else + { + return false; + } } else { diff --git a/Source/Processes/AuOutputOf.cpp b/Source/Processes/AuOutputOf.cpp index 436ab498..78c697bf 100644 --- a/Source/Processes/AuOutputOf.cpp +++ b/Source/Processes/AuOutputOf.cpp @@ -218,6 +218,7 @@ namespace Aurora::Processes bool bHasFinished {}; AuSPtr pCallbacks; AuWorkerID worker; + AuSPtr pCompletionGroup; AsyncOutputOf() : stdOutPipe(4096 * 16), @@ -269,6 +270,11 @@ namespace Aurora::Processes return true; } + if (!uRelMS) + { + return this->BlockUntilFinished(); + } + AuUInt64 uEndTime = AuTime::SteadyClockNS() + AuMSToNS(uRelMS); while (this->bIsRunning) @@ -293,13 +299,7 @@ namespace Aurora::Processes if (!this->bIsRunning) { - while (this->pTransactionSourceA->IsSignaled()) - { - } - - while (this->pTransactionSourceB->IsSignaled()) - { - } + this->BlockForeverWhileReading(); if (this->pCallbacks) { @@ -315,6 +315,55 @@ namespace Aurora::Processes } } + void BlockForeverWhileReading() + { + if (this->pTransactionSourceA) + { + auto pTransaction = this->pTransactionA; + auto pSource = this->pTransactionSourceA; + while (pTransaction && + pSource->IsSignaled() && + pTransaction->GetLastPacketLength()) + { + } + } + else + { + AuSPtr pTransaction; + AuSPtr pLoopSource; + + while ((pTransaction = this->pTransactionA) && + (pLoopSource = pTransaction->NewLoopSource()) && + pLoopSource->IsSignaled() && + pTransaction->GetLastPacketLength()) + { + } + } + + if (this->pTransactionSourceB) + { + auto pTransaction = this->pTransactionB; + auto pSource = this->pTransactionSourceB; + while (pTransaction && + pSource->IsSignaled() && + pTransaction->GetLastPacketLength()) + { + } + } + else + { + AuSPtr pTransaction; + AuSPtr pLoopSource; + + while ((pTransaction = this->pTransactionB) && + (pLoopSource = pTransaction->NewLoopSource()) && + pLoopSource->IsSignaled() && + pTransaction->GetLastPacketLength()) + { + } + } + } + bool BlockUntilFinished() override { if (this->bHasFinished) @@ -334,21 +383,7 @@ namespace Aurora::Processes } } - if (this->pTransactionSourceA) - { - while (this->pTransactionSourceA->IsSignaled() && - this->pTransactionA->GetLastPacketLength()) - { - } - } - - if (this->pTransactionSourceB) - { - while (this->pTransactionSourceB->IsSignaled() && - this->pTransactionB->GetLastPacketLength()) - { - } - } + this->BlockForeverWhileReading(); if (!this->bIsRunning) { @@ -392,21 +427,7 @@ namespace Aurora::Processes if (!this->bIsRunning) { - if (this->pTransactionSourceA) - { - while (this->pTransactionSourceA->IsSignaled() && - this->pTransactionA->GetLastPacketLength()) - { - } - } - - if (this->pTransactionSourceB) - { - while (this->pTransactionSourceB->IsSignaled() && - this->pTransactionB->GetLastPacketLength()) - { - } - } + this->BlockForeverWhileReading(); if (this->pCallbacks) { @@ -462,11 +483,13 @@ namespace Aurora::Processes AUKN_SYM AuSPtr OutputOfAsync(StartupParameters &¶meters, const AuSPtr &pCallbacks) { + auto pCompletionGroup = parameters.pAutoJoinCompletionGroup; auto pResult = AuMakeSharedThrow(); parameters.type = ESpawnType::eSpawnChildProcessWorker; parameters.fwdOut = EStreamForward::eAsyncPipe; parameters.fwdErr = EStreamForward::eAsyncPipe; pResult->process = SpawnUnique(AuMove(parameters)); + pResult->pCompletionGroup = pCompletionGroup; if (!pResult->process) { @@ -497,14 +520,18 @@ namespace Aurora::Processes if (worker) { pResult->worker = worker; - pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker); - if (!pResult->pLoopQueue) + + if (!pCompletionGroup) { - pResult->result.bFailed = true; - return pResult; + pResult->pLoopQueue = worker.GetPool()->ToKernelWorkQueue(worker); + if (!pResult->pLoopQueue) + { + pResult->result.bFailed = true; + return pResult; + } } } - else + else if (!pCompletionGroup) { pResult->pLoopQueue = AuLoop::NewLoopQueue(); if (!pResult->pLoopQueue) @@ -521,11 +548,18 @@ namespace Aurora::Processes return pResult; } - pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource(); - if (!pResult->pTransactionSourceA) + if (!pCompletionGroup) { - pResult->result.bFailed = true; - return pResult; + pResult->pTransactionSourceA = pResult->pTransactionA->NewLoopSource(); + if (!pResult->pTransactionSourceA) + { + pResult->result.bFailed = true; + return pResult; + } + } + else + { + (void)pResult->pTransactionA->TryAttachToCompletionGroup(pCompletionGroup); } pResult->pTransactionB = pResult->process->NewErrorStreamAsyncTransaction(); @@ -535,18 +569,25 @@ namespace Aurora::Processes return pResult; } - pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource(); - if (!pResult->pTransactionSourceB) + if (!pCompletionGroup) { - pResult->result.bFailed = true; - return pResult; - } + pResult->pTransactionSourceB = pResult->pTransactionB->NewLoopSource(); + if (!pResult->pTransactionSourceB) + { + pResult->result.bFailed = true; + return pResult; + } - pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource(); - if (!pResult->pProcessDeadLoopSource) + pResult->pProcessDeadLoopSource = pResult->process->AsLoopSource(); + if (!pResult->pProcessDeadLoopSource) + { + pResult->result.bFailed = true; + return pResult; + } + } + else { - pResult->result.bFailed = true; - return pResult; + (void)pResult->pTransactionB->TryAttachToCompletionGroup(pCompletionGroup); } pResult->pPinSelf = pResult; @@ -577,6 +618,8 @@ namespace Aurora::Processes pResult->pCallbacks->OnPipeData(); } + (void)pTransactionA->TryAttachToCompletionGroup(pResult->pCompletionGroup); + if (!pTransactionA->StartRead(0, AuUnsafeRaiiToShared(&pResult->readOutView))) { pResult->Finalize(); @@ -604,6 +647,8 @@ namespace Aurora::Processes pResult->pCallbacks->OnPipeData(); } + (void)pTransactionB->TryAttachToCompletionGroup(pResult->pCompletionGroup); + if (!pTransactionB->StartRead(0, AuUnsafeRaiiToShared(&pResult->readErrView))) { pResult->Finalize(); @@ -631,84 +676,87 @@ namespace Aurora::Processes return pResult; } - if (!pResult->pLoopQueue) + if (!pCompletionGroup) { - return pResult; - } + if (!pResult->pLoopQueue) + { + return pResult; + } - if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA)) - { - pResult->result.bFailed = true; - if (pResult->pTransactionA) + if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceA)) { - pResult->pTransactionA->SetCallback({}); + pResult->result.bFailed = true; + if (pResult->pTransactionA) + { + pResult->pTransactionA->SetCallback({}); + } + if (pResult->pTransactionB) + { + pResult->pTransactionB->SetCallback({}); + } + pResult->CleanUp(); + return pResult; } - if (pResult->pTransactionB) - { - pResult->pTransactionB->SetCallback({}); - } - pResult->CleanUp(); - return pResult; - } - if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB)) - { - pResult->result.bFailed = true; - if (pResult->pTransactionA) + if (!pResult->pLoopQueue->SourceAdd(pResult->pTransactionSourceB)) { - pResult->pTransactionA->SetCallback({}); + pResult->result.bFailed = true; + if (pResult->pTransactionA) + { + pResult->pTransactionA->SetCallback({}); + } + if (pResult->pTransactionB) + { + pResult->pTransactionB->SetCallback({}); + } + pResult->CleanUp(); + return pResult; } - if (pResult->pTransactionB) - { - pResult->pTransactionB->SetCallback({}); - } - pResult->CleanUp(); - return pResult; - } - if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource)) - { - pResult->result.bFailed = true; - if (pResult->pTransactionA) + if (!pResult->pLoopQueue->SourceAdd(pResult->pProcessDeadLoopSource)) { - pResult->pTransactionA->SetCallback({}); + pResult->result.bFailed = true; + if (pResult->pTransactionA) + { + pResult->pTransactionA->SetCallback({}); + } + if (pResult->pTransactionB) + { + pResult->pTransactionB->SetCallback({}); + } + pResult->CleanUp(); + return pResult; } - if (pResult->pTransactionB) - { - pResult->pTransactionB->SetCallback({}); - } - pResult->CleanUp(); - return pResult; - } - if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback)) - { - pResult->result.bFailed = true; - if (pResult->pTransactionA) + if (!pResult->pLoopQueue->AddCallback(pResult->pProcessDeadLoopSource, pProcessDeadCallback)) { - pResult->pTransactionA->SetCallback({}); + pResult->result.bFailed = true; + if (pResult->pTransactionA) + { + pResult->pTransactionA->SetCallback({}); + } + if (pResult->pTransactionB) + { + pResult->pTransactionB->SetCallback({}); + } + pResult->CleanUp(); + return pResult; } - if (pResult->pTransactionB) - { - pResult->pTransactionB->SetCallback({}); - } - pResult->CleanUp(); - return pResult; - } - if (!pResult->pLoopQueue->Commit()) - { - pResult->result.bFailed = true; - if (pResult->pTransactionA) + if (!pResult->pLoopQueue->Commit()) { - pResult->pTransactionA->SetCallback({}); + pResult->result.bFailed = true; + if (pResult->pTransactionA) + { + pResult->pTransactionA->SetCallback({}); + } + if (pResult->pTransactionB) + { + pResult->pTransactionB->SetCallback({}); + } + pResult->CleanUp(); + return pResult; } - if (pResult->pTransactionB) - { - pResult->pTransactionB->SetCallback({}); - } - pResult->CleanUp(); - return pResult; } return pResult; diff --git a/Source/Processes/AuProcess.NT.cpp b/Source/Processes/AuProcess.NT.cpp index c44ab908..f09808c0 100644 --- a/Source/Processes/AuProcess.NT.cpp +++ b/Source/Processes/AuProcess.NT.cpp @@ -20,7 +20,7 @@ #include #include #include - +#include #include "AuCreatePipeEx.NT.hpp" namespace Aurora::Processes @@ -85,10 +85,10 @@ namespace Aurora::Processes this->thread_.reset(); } - if (auto pGroup = this->pCompletionGroup_) + if (auto &pGroup = this->pCompletionGroup_) { this->bHasTerminated = true; - pGroup->TryTrigger(); + AuStaticCast(pGroup)->UnsafeRemoveItem(AuUnsafeRaiiToShared(this)); } AuWin32CloseHandle(this->poke_); @@ -709,6 +709,11 @@ namespace Aurora::Processes RelOtherHandles(); + if (auto pCompletionGroup = AuExchange(this->startup_.pAutoJoinCompletionGroup, {})) + { + (void)this->TryAttachProcessExitToCompletionGroup(pCompletionGroup); + } + if (this->startup_.optAffinity) { { @@ -771,16 +776,9 @@ namespace Aurora::Processes this->exitCode_ = exitCode; } - if (auto pCompletionGroup = this->pCompletionGroup_) + if (auto &pCompletionGroup = this->pCompletionGroup_) { - if (auto pTriggerSrc = pCompletionGroup->GetTriggerLoopSource()) - { - pTriggerSrc->Set(); - } - else - { - pCompletionGroup->TryTrigger(); - } + pCompletionGroup->TryTriggerLater(); } }; @@ -814,12 +812,9 @@ namespace Aurora::Processes this->pCompletionGroup_ = pCompletionGroup; pCompletionGroup->AddWorkItem(AuUnsafeRaiiToShared(this)); + // verify the process hasnt already exited at least once + this->pCompletionGroup_->TryTriggerLater(); - if (auto pLoopSource = pCompletionGroup->GetTriggerLoopSource()) - { - // verify the process hasnt already exited at least once - pLoopSource->Set(); - } return true; } diff --git a/Source/Processes/AuProcess.Unix.cpp b/Source/Processes/AuProcess.Unix.cpp index a2a077b7..2ecf9f25 100644 --- a/Source/Processes/AuProcess.Unix.cpp +++ b/Source/Processes/AuProcess.Unix.cpp @@ -17,6 +17,7 @@ #include #include +#include #if defined(AURORA_COMPILER_CLANG) // warning: enumeration values 'kEnumCount' and 'kEnumInvalid' not handled in switch [-Wswitch @@ -115,12 +116,11 @@ namespace Aurora::Processes AuTryRemove(gPidLookupMap, this->pidt_); } - - if (auto pGroup = this->pCompletionGroup_) - { - this->bHasExited = true; - pGroup->TryTrigger(); - } + if (auto &pGroup = this->pCompletionGroup_) + { + this->bHasExited = true; + AuStaticCast(pGroup)->UnsafeRemoveItem(AuUnsafeRaiiToShared(this)); + } ShutdownPipes(); } @@ -160,16 +160,9 @@ namespace Aurora::Processes this->fsErrorStream_->CheckProcess(); } - if (auto pCompletionGroup = this->pCompletionGroup_) + if (auto &pCompletionGroup = this->pCompletionGroup_) { - if (auto pTriggerSrc = pCompletionGroup->GetTriggerLoopSource()) - { - pTriggerSrc->Set(); - } - else - { - pCompletionGroup->TryTrigger(); - } + pCompletionGroup->TryTriggerLater(); } } @@ -497,6 +490,11 @@ namespace Aurora::Processes this->fsErrorStream_->MakeProcess(this); } + if (auto pCompletionGroup = AuExchange(this->startup_.pAutoJoinCompletionGroup, {})) + { + (void)this->TryAttachProcessExitToCompletionGroup(pCompletionGroup); + } + return true; } @@ -767,12 +765,7 @@ namespace Aurora::Processes this->pCompletionGroup_ = pCompletionGroup; pCompletionGroup->AddWorkItem(AuUnsafeRaiiToShared(this)); - - if (auto pLoopSource = pCompletionGroup->GetTriggerLoopSource()) - { - // verify the process hasnt already exited at least once - pLoopSource->Set(); - } + pCompletionGroup->TryTriggerLater(); return true; }