452 lines
11 KiB
C++
452 lines
11 KiB
C++
/***
|
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: CompletionGroup.cpp
|
|
Date: 2023-12-28
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include <Aurora/IO/IOExperimental.hpp>
|
|
#include "CompletionGroup.hpp"
|
|
#include "CompletionGroupAndedIOWorkItem.hpp"
|
|
|
|
namespace Aurora::IO::CompletionGroup
|
|
{
|
|
struct CompletionGroup;
|
|
|
|
CompletionGroup::CompletionGroup() :
|
|
anyProbablyAlwaysPresentEvent(this, false),
|
|
andPlsDontAllocateFdIfUntouchedEvent(this, true)
|
|
{
|
|
|
|
}
|
|
|
|
CompletionGroup::~CompletionGroup()
|
|
{
|
|
this->ResetMemoryPins();
|
|
}
|
|
|
|
bool CompletionGroup::HasCompleted()
|
|
{
|
|
return this->uTriggered == this->uAdded &&
|
|
bool(this->uAdded);
|
|
}
|
|
|
|
AuPair<AuUInt32, AuUInt32> CompletionGroup::GetStats()
|
|
{
|
|
return AuMakePair(this->uTriggered, this->uAdded);
|
|
}
|
|
|
|
void CompletionGroup::SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks)
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
this->pCallbacks = pCallbacks;
|
|
}
|
|
|
|
void CompletionGroup::ResetMemoryPins()
|
|
{
|
|
AuResetMember(this->pCallbacks);
|
|
AuResetMember(this->pAnyBarrier);
|
|
AuResetMember(this->pAndBarrier);
|
|
|
|
for (const auto &pOld : this->workItems)
|
|
{
|
|
pOld->CleanupForGCWI();
|
|
}
|
|
|
|
AuResetMember(this->callbackTicks);
|
|
this->bTerminated = true;
|
|
}
|
|
|
|
bool CompletionGroup::HasItemsActive()
|
|
{
|
|
return this->workItems.size();
|
|
}
|
|
|
|
void CompletionGroup::DoIOTick(bool bManual)
|
|
{
|
|
AuList<AuPair<AuSPtr<ICompletionGroupWorkItem>, bool>> removedItems;
|
|
|
|
{
|
|
AU_DEBUG_MEMCRUNCH;
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
for (auto itr = this->workItems.begin();
|
|
itr != this->workItems.end();
|
|
)
|
|
{
|
|
if (itr->get()->HasCompletedForGCWI())
|
|
{
|
|
auto that = *itr;
|
|
itr = this->workItems.erase(itr);
|
|
|
|
this->uTriggered++;
|
|
|
|
|
|
if (that->HasNonTrivialCleanup())
|
|
{
|
|
removedItems.push_back(AuMakePair(that, true));
|
|
}
|
|
else if (this->pCallbacks)
|
|
{
|
|
removedItems.push_back(AuMakePair(that, false));
|
|
that->CleanupForGCWI();
|
|
}
|
|
else
|
|
{
|
|
that->CleanupForGCWI();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
itr++;
|
|
}
|
|
}
|
|
}
|
|
|
|
for (const auto &[pRemoved, bNonTrivial] : removedItems)
|
|
{
|
|
if (auto pCallbacks = this->pCallbacks)
|
|
{
|
|
try
|
|
{
|
|
pCallbacks->OnHandleComplete(pRemoved);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
|
|
if (bNonTrivial)
|
|
{
|
|
pRemoved->CleanupForGCWI();
|
|
}
|
|
}
|
|
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
for (const auto &[pCallback, bAny] : this->callbackTicks)
|
|
{
|
|
if (!bAny)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
try
|
|
{
|
|
pCallback->InvokeManualTick();
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (this->workItems.empty() &&
|
|
(bManual || !this->bIsNeverEnding))
|
|
{
|
|
this->andPlsDontAllocateFdIfUntouchedEvent.Set();
|
|
|
|
if (AuExchange(this->pCallbacks, {}))
|
|
{
|
|
try
|
|
{
|
|
this->pCallbacks->OnComplete();
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
for (const auto &[pCallback, bAny] : this->callbackTicks)
|
|
{
|
|
if (bAny)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
try
|
|
{
|
|
pCallback->InvokeManualTick();
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
}
|
|
|
|
// anyone else?
|
|
this->ResetMemoryPins();
|
|
}
|
|
|
|
this->TryCollectInternal();
|
|
}
|
|
|
|
void CompletionGroup::AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
|
|
{
|
|
SysAssert(!this->bTerminated, "Completion group already terminated");
|
|
AU_LOCK_GUARD(this->mutex);
|
|
this->workItems.push_back(pCompletable);
|
|
AuAtomicAdd(&this->uAdded, 1u);
|
|
this->TryCreateInternal();
|
|
}
|
|
|
|
void CompletionGroup::UnsafeRemoveItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
if (AuTryRemove(this->workItems, pCompletable))
|
|
{
|
|
AuAtomicAdd(&this->uTriggered, 1u);
|
|
}
|
|
}
|
|
|
|
void CompletionGroup::AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny)
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
this->callbackTicks.push_back(AuMakePair(pCallbackInvoker, bAny));
|
|
}
|
|
|
|
bool CompletionGroup::IsNeverEnding()
|
|
{
|
|
return this->bIsNeverEnding;
|
|
}
|
|
|
|
void CompletionGroup::SetNeverEnding(bool bValue)
|
|
{
|
|
this->bIsNeverEnding = bValue;
|
|
}
|
|
|
|
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAndLoopSource()
|
|
{
|
|
return this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
|
|
}
|
|
|
|
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAnyLoopSource()
|
|
{
|
|
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
|
|
}
|
|
|
|
AuSPtr<Loop::ILSEvent> CompletionGroup::GetTriggerLoopSource()
|
|
{
|
|
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
|
|
}
|
|
|
|
void CompletionGroup::TryTriggerLater()
|
|
{
|
|
if (auto pEvent = GetTriggerLoopSource())
|
|
{
|
|
pEvent->Set();
|
|
}
|
|
}
|
|
|
|
bool CompletionGroup::WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS)
|
|
{
|
|
if (auto pLoopSource = this->ToAnyLoopSource())
|
|
{
|
|
return pLoopSource->WaitOn(uTimeoutOrZeroMS);
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool CompletionGroup::WaitForAllMS(AuUInt32 uTimeoutOrZeroMS)
|
|
{
|
|
if (auto pLoopSource = this->ToAndLoopSource())
|
|
{
|
|
return pLoopSource->WaitOn(uTimeoutOrZeroMS);
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void CompletionGroup::TryTrigger()
|
|
{
|
|
if (auto pSource = GetTriggerLoopSource())
|
|
{
|
|
pSource->Set();
|
|
}
|
|
|
|
this->DoIOTick(true);
|
|
}
|
|
|
|
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnCompletion()
|
|
{
|
|
AU_LOCK_GUARD(this->cs);
|
|
|
|
if (this->pAndBarrier)
|
|
{
|
|
return this->pAndBarrier;
|
|
}
|
|
|
|
auto pWorker = AuAsync::GetCurrentWorkerPId().GetPool();
|
|
if (!pWorker)
|
|
{
|
|
pWorker = AuUnsafeRaiiToShared(AuAsync::GetAsyncApp());
|
|
}
|
|
|
|
auto pRet = AuMakeShared<CompletionGroupAndedIOWorkItem>((AuAsync::IThreadPoolInternal *)AuStaticPointerCast<AuAsync::ThreadPool>(pWorker).get(),
|
|
AuWorkerPId_t {},
|
|
this->SharedFromThis());
|
|
if (!pRet)
|
|
{
|
|
SysPushErrorMemory();
|
|
return {};
|
|
}
|
|
|
|
// prevent attaching a singular loop source to a loop queue multiple times
|
|
if (this->pAnyBarrier)
|
|
{
|
|
auto pEvent = this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
|
|
if (!pEvent)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
pRet->SetSchedByLoopSourceOnce(pEvent)->Dispatch();
|
|
}
|
|
else
|
|
{
|
|
auto pEvent = this->anyProbablyAlwaysPresentEvent.GetLoopSource();
|
|
if (!pEvent)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
pRet->SetSchedByLoopSourceOnce(pEvent)->Dispatch();
|
|
this->bNoAny = true;
|
|
}
|
|
|
|
this->pAndBarrier = pRet;
|
|
return pRet;
|
|
}
|
|
|
|
void CompletionGroup::ResetAnd()
|
|
{
|
|
AuResetMember(this->pAndBarrier);
|
|
}
|
|
|
|
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletionIntl()
|
|
{
|
|
if (this->bNoAny)
|
|
{
|
|
SysPushErrorGeneric("To prevent double LoopQueue::SourceAdd on the same source, you must not call ::OnSingleCompletion() after ::OnCompletion() in this specific order");
|
|
return {};
|
|
}
|
|
|
|
auto pRet = AuAsync::NewFence();
|
|
if (!pRet)
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
pRet->SetSchedByLoopSourceRepeating(this->anyProbablyAlwaysPresentEvent.GetLoopSource())->Dispatch();
|
|
this->pAnyBarrier = pRet;
|
|
return pRet;
|
|
}
|
|
|
|
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletion()
|
|
{
|
|
AU_LOCK_GUARD(this->cs);
|
|
|
|
if (this->pAnyBarrier)
|
|
{
|
|
return this->pAnyBarrier;
|
|
}
|
|
|
|
if (this->pAnyBarrierInternal)
|
|
{
|
|
return this->pAnyBarrierInternal;
|
|
}
|
|
|
|
if (this->workerId)
|
|
{
|
|
TryCreateInternal();
|
|
return this->pAnyBarrierInternal;
|
|
}
|
|
else
|
|
{
|
|
return this->pAnyBarrier = OnSingleCompletionIntl();
|
|
}
|
|
}
|
|
|
|
void CompletionGroup::TryCreateInternal()
|
|
{
|
|
if (!this->workerId)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (this->pAnyBarrierInternal)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (this->workerId != AuAsync::GetCurrentWorkerPId())
|
|
{
|
|
return;
|
|
}
|
|
|
|
this->pAnyBarrierInternal = OnSingleCompletionIntl();
|
|
this->SetNeverEnding(true);
|
|
|
|
if (auto pKernel = this->workerId.GetPool()->ToKernelWorkQueue(this->workerId))
|
|
{
|
|
(void)pKernel->Commit();
|
|
}
|
|
}
|
|
|
|
void CompletionGroup::TryCollectInternal()
|
|
{
|
|
if (!this->workItems.empty())
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (!this->workerId)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (!this->pAnyBarrierInternal)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (this->workerId != AuAsync::GetCurrentWorkerPId())
|
|
{
|
|
return;
|
|
}
|
|
|
|
AuResetMember(this->pAnyBarrierInternal);
|
|
|
|
if (auto pKernel = this->workerId.GetPool()->ToKernelWorkQueue(this->workerId))
|
|
{
|
|
(void)pKernel->Commit();
|
|
}
|
|
}
|
|
|
|
void CompletionGroup::MakeInternalAsync(AuWorkerID pid)
|
|
{
|
|
this->SetNeverEnding(true);
|
|
this->workerId = pid;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<ICompletionGroup> NewCompletionGroup()
|
|
{
|
|
return AuMakeShared<CompletionGroup>();
|
|
}
|
|
} |