AuroraRuntime/Source/IO/CompletionGroup/CompletionGroup.cpp
2024-01-11 12:19:54 +00:00

333 lines
8.4 KiB
C++

/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroup.cpp
Date: 2023-12-28
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "CompletionGroup.hpp"
#include "CompletionGroupAndedIOWorkItem.hpp"
namespace Aurora::IO::CompletionGroup
{
struct CompletionGroup;
CompletionGroup::CompletionGroup() :
anyProbablyAlwaysPresentEvent(this, false),
andPlsDontAllocateFdIfUntouchedEvent(this, true)
{
}
CompletionGroup::~CompletionGroup()
{
this->ResetMemoryPins();
}
bool CompletionGroup::HasCompleted()
{
return this->uTriggered == this->uAdded &&
bool(this->uAdded);
}
AuPair<AuUInt32, AuUInt32> CompletionGroup::GetStats()
{
return AuMakePair(this->uTriggered, this->uAdded);
}
void CompletionGroup::SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks)
{
AU_LOCK_GUARD(this->mutex);
this->pCallbacks = pCallbacks;
}
void CompletionGroup::ResetMemoryPins()
{
AuResetMember(this->pCallbacks);
AuResetMember(this->pAnyBarrier);
AuResetMember(this->pAndBarrier);
for (const auto &pOld : this->workItems)
{
pOld->CleanupForGCWI();
}
AuResetMember(this->callbackTicks);
this->bTerminated = true;
}
bool CompletionGroup::HasItemsActive()
{
return this->workItems.size();
}
void CompletionGroup::DoIOTick(bool bManual)
{
AuList<AuSPtr<ICompletionGroupWorkItem>> removedItems;
{
AU_DEBUG_MEMCRUNCH;
AU_LOCK_GUARD(this->mutex);
for (auto itr = this->workItems.begin();
itr != this->workItems.end();
)
{
if (itr->get()->HasCompletedForGCWI())
{
auto that = *itr;
itr = this->workItems.erase(itr);
this->uTriggered++;
that->CleanupForGCWI();
if (this->pCallbacks)
{
removedItems.push_back(that);
}
}
else
{
itr++;
}
}
}
if (auto pCallbacks = this->pCallbacks)
{
for (const auto &pRemoved : removedItems)
{
try
{
pCallbacks->OnHandleComplete(pRemoved);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
{
AU_LOCK_GUARD(this->mutex);
for (const auto &[pCallback, bAny] : this->callbackTicks)
{
if (!bAny)
{
continue;
}
try
{
pCallback->InvokeManualTick();
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (this->workItems.empty() &&
(bManual || !this->bIsNeverEnding))
{
this->andPlsDontAllocateFdIfUntouchedEvent.Set();
if (AuExchange(this->pCallbacks, {}))
{
try
{
this->pCallbacks->OnComplete();
}
catch (...)
{
SysPushErrorCatch();
}
}
{
AU_LOCK_GUARD(this->mutex);
for (const auto &[pCallback, bAny] : this->callbackTicks)
{
if (bAny)
{
continue;
}
try
{
pCallback->InvokeManualTick();
}
catch (...)
{
SysPushErrorCatch();
}
}
}
// anyone else?
this->ResetMemoryPins();
}
}
void CompletionGroup::AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable)
{
SysAssert(!this->bTerminated, "Completion group already terminated");
AU_LOCK_GUARD(this->mutex);
this->workItems.push_back(pCompletable);
this->uAdded++;
}
void CompletionGroup::AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny)
{
AU_LOCK_GUARD(this->mutex);
this->callbackTicks.push_back(AuMakePair(pCallbackInvoker, bAny));
}
bool CompletionGroup::IsNeverEnding()
{
return this->bIsNeverEnding;
}
void CompletionGroup::SetNeverEnding(bool bValue)
{
this->bIsNeverEnding = bValue;
}
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAndLoopSource()
{
return this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
}
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAnyLoopSource()
{
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
}
AuSPtr<Loop::ILSEvent> CompletionGroup::GetTriggerLoopSource()
{
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
}
bool CompletionGroup::WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS)
{
if (auto pLoopSource = this->ToAnyLoopSource())
{
return pLoopSource->WaitOn(uTimeoutOrZeroMS);
}
else
{
return false;
}
}
bool CompletionGroup::WaitForAllMS(AuUInt32 uTimeoutOrZeroMS)
{
if (auto pLoopSource = this->ToAndLoopSource())
{
return pLoopSource->WaitOn(uTimeoutOrZeroMS);
}
else
{
return false;
}
}
void CompletionGroup::TryTrigger()
{
if (auto pSource = GetTriggerLoopSource())
{
pSource->Set();
}
this->DoIOTick(true);
}
AuSPtr<Async::IWorkItem> CompletionGroup::OnCompletion()
{
AU_LOCK_GUARD(this->cs);
if (this->pAndBarrier)
{
return this->pAndBarrier;
}
auto pWorker = AuAsync::GetCurrentWorkerPId().GetPool();
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(AuAsync::GetAsyncApp());
}
auto pRet = AuMakeShared<CompletionGroupAndedIOWorkItem>((AuAsync::IThreadPoolInternal *)AuStaticPointerCast<AuAsync::ThreadPool>(pWorker).get(),
AuWorkerPId_t {},
this->SharedFromThis());
if (!pRet)
{
SysPushErrorMemory();
return {};
}
// prevent attaching a singular loop source to a loop queue multiple times
if (this->pAnyBarrier)
{
auto pEvent = this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
if (!pEvent)
{
return {};
}
pRet->SetSchedByLoopSource(pEvent)->Dispatch();
}
else
{
auto pEvent = this->anyProbablyAlwaysPresentEvent.GetLoopSource();
if (!pEvent)
{
return {};
}
pRet->SetSchedByLoopSource(pEvent)->Dispatch();
this->bNoAny = true;
}
this->pAndBarrier = pRet;
return pRet;
}
AuSPtr<Async::IWorkItem> CompletionGroup::OnSingleCompletion()
{
AU_LOCK_GUARD(this->cs);
if (this->pAnyBarrier)
{
return this->pAnyBarrier;
}
if (!this->bNoAny)
{
SysPushErrorGeneric("To prevent double LoopQueue::SourceAdd on the same source, you must not call ::OnSingleCompletion() after ::OnCompletion() in this specific order");
return {};
}
auto pRet = AuAsync::NewFence();
if (!pRet)
{
SysPushErrorNested();
return {};
}
pRet->SetSchedByLoopSource(this->anyProbablyAlwaysPresentEvent.GetLoopSource())->Dispatch();
this->pAnyBarrier = pRet;
return pRet;
}
AUKN_SYM AuSPtr<ICompletionGroup> NewCompletionGroup()
{
return AuMakeShared<CompletionGroup>();
}
}