AuroraRuntime/Source/IO/CompletionGroup/CompletionGroup.cpp

452 lines
11 KiB
C++

/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroup.cpp
Date: 2023-12-28
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "CompletionGroup.hpp"
#include "CompletionGroupAndedIOWorkItem.hpp"
namespace Aurora::IO::CompletionGroup
{
struct CompletionGroup;
CompletionGroup::CompletionGroup() :
anyProbablyAlwaysPresentEvent(this, false),
andPlsDontAllocateFdIfUntouchedEvent(this, true)
{
}
CompletionGroup::~CompletionGroup()
{
this->ResetMemoryPins();
}
bool CompletionGroup::HasCompleted()
{
return this->uTriggered == this->uAdded &&
bool(this->uAdded);
}
AuPair<AuUInt32, AuUInt32> CompletionGroup::GetStats()
{
return AuMakePair(this->uTriggered, this->uAdded);
}
void CompletionGroup::SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks)
{
AU_LOCK_GUARD(this->mutex);
this->pCallbacks = pCallbacks;
}
void CompletionGroup::ResetMemoryPins()
{
AuResetMember(this->pCallbacks);
AuResetMember(this->pAnyBarrier);
AuResetMember(this->pAndBarrier);
for (const auto &pOld : this->workItems)
{
pOld->CleanupForGCWI();
}
AuResetMember(this->callbackTicks);
this->bTerminated = true;
}
bool CompletionGroup::HasItemsActive()
{
return this->workItems.size();
}
void CompletionGroup::DoIOTick(bool bManual)
{
AuList<AuPair<AuSPtr<ICompletionGroupWorkItem>, bool>> removedItems;
{
AU_DEBUG_MEMCRUNCH;
AU_LOCK_GUARD(this->mutex);
for (auto itr = this->workItems.begin();
itr != this->workItems.end();
)
{
if (itr->get()->HasCompletedForGCWI())
{
auto that = *itr;
itr = this->workItems.erase(itr);
this->uTriggered++;
if (that->HasNonTrivialCleanup())
{
removedItems.push_back(AuMakePair(that, true));
}
else if (this->pCallbacks)
{
removedItems.push_back(AuMakePair(that, false));
that->CleanupForGCWI();
}
else
{
that->CleanupForGCWI();
}
}
else
{
itr++;
}
}
}
for (const auto &[pRemoved, bNonTrivial] : removedItems)
{
if (auto pCallbacks = this->pCallbacks)
{
try
{
pCallbacks->OnHandleComplete(pRemoved);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (bNonTrivial)
{
pRemoved->CleanupForGCWI();
}
}
{
AU_LOCK_GUARD(this->mutex);
for (const auto &[pCallback, bAny] : this->callbackTicks)
{
if (!bAny)
{
continue;
}
try
{
pCallback->InvokeManualTick();
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (this->workItems.empty() &&
(bManual || !this->bIsNeverEnding))
{
this->andPlsDontAllocateFdIfUntouchedEvent.Set();
if (AuExchange(this->pCallbacks, {}))
{
try
{
this->pCallbacks->OnComplete();
}
catch (...)
{
SysPushErrorCatch();
}
}
{
AU_LOCK_GUARD(this->mutex);
for (const auto &[pCallback, bAny] : this->callbackTicks)
{
if (bAny)
{
continue;
}
try
{
pCallback->InvokeManualTick();
}
catch (...)
{
SysPushErrorCatch();
}
}
}
// anyone else?
this->ResetMemoryPins();
}
this->TryCollectInternal();
}
void CompletionGroup::AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
{
SysAssert(!this->bTerminated, "Completion group already terminated");
AU_LOCK_GUARD(this->mutex);
this->workItems.push_back(pCompletable);
AuAtomicAdd(&this->uAdded, 1u);
this->TryCreateInternal();
}
void CompletionGroup::UnsafeRemoveItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
{
AU_LOCK_GUARD(this->mutex);
if (AuTryRemove(this->workItems, pCompletable))
{
AuAtomicAdd(&this->uTriggered, 1u);
}
}
void CompletionGroup::AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny)
{
AU_LOCK_GUARD(this->mutex);
this->callbackTicks.push_back(AuMakePair(pCallbackInvoker, bAny));
}
bool CompletionGroup::IsNeverEnding()
{
return this->bIsNeverEnding;
}
void CompletionGroup::SetNeverEnding(bool bValue)
{
this->bIsNeverEnding = bValue;
}
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAndLoopSource()
{
return this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
}
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAnyLoopSource()
{
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
}
AuSPtr<Loop::ILSEvent> CompletionGroup::GetTriggerLoopSource()
{
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
}
void CompletionGroup::TryTriggerLater()
{
if (auto pEvent = GetTriggerLoopSource())
{
pEvent->Set();
}
}
bool CompletionGroup::WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS)
{
if (auto pLoopSource = this->ToAnyLoopSource())
{
return pLoopSource->WaitOn(uTimeoutOrZeroMS);
}
else
{
return false;
}
}
bool CompletionGroup::WaitForAllMS(AuUInt32 uTimeoutOrZeroMS)
{
if (auto pLoopSource = this->ToAndLoopSource())
{
return pLoopSource->WaitOn(uTimeoutOrZeroMS);
}
else
{
return false;
}
}
void CompletionGroup::TryTrigger()
{
if (auto pSource = GetTriggerLoopSource())
{
pSource->Set();
}
this->DoIOTick(true);
}
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnCompletion()
{
AU_LOCK_GUARD(this->cs);
if (this->pAndBarrier)
{
return this->pAndBarrier;
}
auto pWorker = AuAsync::GetCurrentWorkerPId().GetPool();
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(AuAsync::GetAsyncApp());
}
auto pRet = AuMakeShared<CompletionGroupAndedIOWorkItem>((AuAsync::IThreadPoolInternal *)AuStaticPointerCast<AuAsync::ThreadPool>(pWorker).get(),
AuWorkerPId_t {},
this->SharedFromThis());
if (!pRet)
{
SysPushErrorMemory();
return {};
}
// prevent attaching a singular loop source to a loop queue multiple times
if (this->pAnyBarrier)
{
auto pEvent = this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
if (!pEvent)
{
return {};
}
pRet->SetSchedByLoopSourceOnce(pEvent)->Dispatch();
}
else
{
auto pEvent = this->anyProbablyAlwaysPresentEvent.GetLoopSource();
if (!pEvent)
{
return {};
}
pRet->SetSchedByLoopSourceOnce(pEvent)->Dispatch();
this->bNoAny = true;
}
this->pAndBarrier = pRet;
return pRet;
}
void CompletionGroup::ResetAnd()
{
AuResetMember(this->pAndBarrier);
}
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletionIntl()
{
if (this->bNoAny)
{
SysPushErrorGeneric("To prevent double LoopQueue::SourceAdd on the same source, you must not call ::OnSingleCompletion() after ::OnCompletion() in this specific order");
return {};
}
auto pRet = AuAsync::NewFence();
if (!pRet)
{
SysPushErrorNested();
return {};
}
pRet->SetSchedByLoopSourceRepeating(this->anyProbablyAlwaysPresentEvent.GetLoopSource())->Dispatch();
this->pAnyBarrier = pRet;
return pRet;
}
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletion()
{
AU_LOCK_GUARD(this->cs);
if (this->pAnyBarrier)
{
return this->pAnyBarrier;
}
if (this->pAnyBarrierInternal)
{
return this->pAnyBarrierInternal;
}
if (this->workerId)
{
TryCreateInternal();
return this->pAnyBarrierInternal;
}
else
{
return this->pAnyBarrier = OnSingleCompletionIntl();
}
}
void CompletionGroup::TryCreateInternal()
{
if (!this->workerId)
{
return;
}
if (this->pAnyBarrierInternal)
{
return;
}
if (this->workerId != AuAsync::GetCurrentWorkerPId())
{
return;
}
this->pAnyBarrierInternal = OnSingleCompletionIntl();
this->SetNeverEnding(true);
if (auto pKernel = this->workerId.GetPool()->ToKernelWorkQueue(this->workerId))
{
(void)pKernel->Commit();
}
}
void CompletionGroup::TryCollectInternal()
{
if (!this->workItems.empty())
{
return;
}
if (!this->workerId)
{
return;
}
if (!this->pAnyBarrierInternal)
{
return;
}
if (this->workerId != AuAsync::GetCurrentWorkerPId())
{
return;
}
AuResetMember(this->pAnyBarrierInternal);
if (auto pKernel = this->workerId.GetPool()->ToKernelWorkQueue(this->workerId))
{
(void)pKernel->Commit();
}
}
void CompletionGroup::MakeInternalAsync(AuWorkerID pid)
{
this->SetNeverEnding(true);
this->workerId = pid;
}
AUKN_SYM AuSPtr<ICompletionGroup> NewCompletionGroup()
{
return AuMakeShared<CompletionGroup>();
}
}