[+] C++20 QOL changes to AuFutures including AuFuture<X, Y>::BlockUntilComplete()

[*] Added additional check for failing dependencies of IWorkItems. Not required for fails to cascade, but this is nice to have here.
This commit is contained in:
Reece Wilson 2024-05-02 20:06:41 +01:00
parent 195af26aa4
commit 373ccc3660
4 changed files with 192 additions and 49 deletions

View File

@ -41,6 +41,14 @@ public:
AU_NO_COPY_NO_MOVE(AuFuture);
~AuFuture()
{
if (auto pWorkItem = AuExchange(this->pWorkItem, nullptr))
{
delete ((std::shared_ptr<Aurora::Async::IWorkItem> *)pWorkItem);
}
}
void OnComplete(CompleteCallback_f callback)
{
AU_DEBUG_MEMCRUNCH;
@ -158,6 +166,53 @@ public:
SubmitComplete();
}
Aurora::Async::IWorkItem *ToWorkItem()
{
AU_LOCK_GUARD(this->mutex);
if (auto pWorkItem = this->pWorkItem)
{
return ((std::shared_ptr<Aurora::Async::IWorkItem> *)pWorkItem)->get();
}
else
{
if (auto pNewFence = Aurora::Async::NewFence())
{
if (this->bDone)
{
pNewFence->Dispatch();
}
this->pWorkItem = new std::shared_ptr<Aurora::Async::IWorkItem>(pNewFence);
return pNewFence.get();
}
return nullptr;
}
}
#if defined(AU_LANG_CPP_20_) || defined(___GIMME_BLOCKUNTILCOMPLETE_LARGE_STACK)
void BlockUntilComplete()
{
{
AU_LOCK_GUARD(this->mutex);
if (this->bDone)
{
return;
}
}
if (auto pWorkItem = this->ToWorkItem())
{
pWorkItem->BlockUntilComplete();
}
else
{
SysPanic();
}
}
#endif
static AuSPtr<AuFuture<T, Error_t>> New()
{
AU_DEBUG_MEMCRUNCH;
@ -274,6 +329,11 @@ private:
pThat->SubmitComplete();
});
}
if (auto pWorkItem = this->pWorkItem)
{
(*(std::shared_ptr<Aurora::Async::IWorkItem> *)pWorkItem)->Dispatch();
}
}
void DoWaterFalls()
@ -328,13 +388,14 @@ private:
CompleteCallback_f callback;
ErrorCallback_f onFailure;
AuOptionalEx<AuAsync::WorkerPId_t> pid; // todo: make weak?
AuOptionalEx<AuAsync::WorkerPId_t> pid;
AuList<AuConsumer<bool, bool>> waterfall;
volatile void *pWorkItem {};
AuUInt8 bComplete : 1 {};
AuUInt8 bFailed : 1 {};
AuUInt8 bDone : 1 {};
AuUInt8 bDoneCb : 1 {};
AuAUInt8 bComplete : 1 {};
AuAUInt8 bFailed : 1 {};
AuAUInt8 bDone : 1 {};
AuAUInt8 bDoneCb : 1 {};
friend struct AuWaterfall;
};
@ -778,7 +839,7 @@ namespace __detail
}
template <typename T>
void await_suspend(T h)
bool await_suspend(T h)
{
auto pFuture = this->pFuture;
@ -789,7 +850,7 @@ namespace __detail
if (__detail::FutureAccessor::IsFinished(*pFuture.get()))
{
return;
return false;
}
if constexpr (!AuIsVoid_v<B>)
@ -806,6 +867,8 @@ namespace __detail
h.resume();
});
}
return true;
}
AuOptionalEx<A> await_resume()
@ -832,7 +895,7 @@ namespace __detail
}
template <typename T>
void await_suspend(T h)
bool await_suspend(T h)
{
auto pFuture = this->pFuture;
@ -843,7 +906,7 @@ namespace __detail
if (__detail::FutureAccessor::IsFinished(*pFuture.get()))
{
return;
return false;
}
if constexpr (!AuIsVoid_v<B>)
@ -860,6 +923,8 @@ namespace __detail
h.resume();
});
}
return true;
}
bool await_resume()
@ -867,6 +932,39 @@ namespace __detail
return !__detail::FutureAccessor::IsFailed(*pFuture.get());
}
};
struct AwaitableWorkItem
{
Aurora::Async::IWorkItem *pWorker {};
bool await_ready()
{
return this->pWorker->HasFinished() || this->pWorker->HasFailed();
}
template <typename T>
bool await_suspend(T h)
{
auto pNext = Aurora::Async::NewWorkFunction(Aurora::Async::GetCurrentWorkerPId(),
[h]()
{
h.resume();
});
if (this->await_ready())
{
return false;
}
this->pWorker->Then(pNext);
return true;
}
bool await_resume()
{
return !this->pWorker->HasFailed();
}
};
}
#if defined(__AUHAS_COROUTINES_CO_AWAIT)
@ -885,4 +983,10 @@ inline auto operator co_await (AuSharedFuture<A, B> pFuture)
return __detail::AwaitableVoid<B> { pFuture };
}
inline auto operator co_await (AuSPtr<Aurora::Async::IWorkItem> pWorkItem)
{
SysAssert(pWorkItem);
return __detail::AwaitableWorkItem { pWorkItem.get() };
}
#endif

View File

@ -66,29 +66,59 @@ namespace Aurora::Async
bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
AuUInt64 uEndTimeNS = timeoutMs ?
AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(timeoutMs) :
0;
if (auto pCurThread = GetThreadState())
{
bool bWorkerIdMatches = (unlocker.second == pCurThread->thread.id.second) ||
((unlocker.second == Async::kThreadIdAny) &&
(GetThreadWorkersCount(unlocker.first) == 1));
bool bStat {};
if ((unlocker.first == pCurThread->thread.id.first) &&
(unlocker.GetPool().get() == this) &&
(bWorkerIdMatches))
{
bStat = !bool(unlocker);
}
bool queryAsync = false;
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
if (!bStat)
{
// old shid (to clean up)
bool bWorkerIdMatches = (unlocker.second == pCurThread->thread.id.second) ||
((unlocker.second == Async::kThreadIdAny) &&
(GetThreadWorkersCount(unlocker.first) == 1));
bStat = (unlocker.first == pCurThread->thread.id.first) &&
(unlocker.GetPool().get() == this) &&
(bWorkerIdMatches);
}
if (bStat)
{
while (true)
{
queryAsync = CtxYield();
AuUInt32 didntAsk;
bool bTimedOut {};
if (!queryAsync && this->shuttingdown_)
if (primitive->TryLock())
{
return true;
}
this->InternalRunOne(pCurThread, true, false, didntAsk);
if (uEndTimeNS)
{
bTimedOut = AuTime::SteadyClockNS() >= uEndTimeNS;
}
if (primitive->TryLock())
{
return true;
}
if (!this->shuttingdown_ ||
bTimedOut)
{
return false;
}
}
return true;
}
}
@ -118,7 +148,7 @@ namespace Aurora::Async
}
}
bool bRet = Threading::WaitFor(primitive.get(), timeoutMs);
bool bRet = primitive->LockAbsNS(uEndTimeNS);
if (pHandle)
{
@ -209,13 +239,13 @@ namespace Aurora::Async
bool ThreadPool::Poll()
{
AuUInt32 uCount {};
return InternalRunOne(GetThreadStateNoWarn(), false, uCount);
return InternalRunOne(GetThreadStateNoWarn(), false, false, uCount);
}
bool ThreadPool::RunOnce()
{
AuUInt32 uCount {};
return InternalRunOne(GetThreadStateNoWarn(), true, uCount);
return InternalRunOne(GetThreadStateNoWarn(), true, false, uCount);
}
bool ThreadPool::Run()
@ -240,7 +270,7 @@ namespace Aurora::Async
AuUInt32 uCount {};
// Do work (blocking)
if (!InternalRunOne(pJobRunner, true, uCount))
if (!InternalRunOne(pJobRunner, true, true, uCount))
{
if ((this->shuttingdown_ & 2) == 2)
{
@ -255,7 +285,7 @@ namespace Aurora::Async
return ranOnce;
}
bool ThreadPool::InternalRunOne(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount)
bool ThreadPool::InternalRunOne(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{
if (!state)
{
@ -281,11 +311,11 @@ namespace Aurora::Async
asyncLoop->PumpNonblocking();
}
bSuccess = PollInternal(state, false, uCount);
bSuccess = PollInternal(state, false, bUntilWork, uCount);
}
else
{
bSuccess = PollInternal(state, block, uCount);
bSuccess = PollInternal(state, block, bUntilWork, uCount);
}
}
EarlyExitTick();
@ -294,29 +324,29 @@ namespace Aurora::Async
}
#if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_)
AuVoidTask ThreadPool::PollInternal_ForceCoRoutine(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount, bool &bRet)
AuVoidTask ThreadPool::PollInternal_ForceCoRoutine(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount, bool &bRet)
{
bRet = PollInternal_Base(state, block, uCount);
bRet = PollInternal_Base(state, block, bUntilWork, uCount);
co_return;
}
#endif
bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount)
bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{
#if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_)
if (state->stackState.uStackCallDepth &&
gRuntimeConfig.async.bEnableCpp20RecursiveCallstack)
{
bool bRet {};
PollInternal_ForceCoRoutine(state, block, uCount, bRet);
PollInternal_ForceCoRoutine(state, block, bUntilWork, uCount, bRet);
return bRet;
}
#endif
return PollInternal_Base(state, block, uCount);
return PollInternal_Base(state, block, bUntilWork, uCount);
}
bool ThreadPool::PollInternal_Base(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount)
bool ThreadPool::PollInternal_Base(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{
if (!state)
{
@ -379,7 +409,7 @@ namespace Aurora::Async
}
}
while (state->pendingWorkItems.empty() && block);
while (state->pendingWorkItems.empty() && (block && bUntilWork));
if (!block &&
!(this->shuttingdown_ & 2)) // quick hack: is worthy of io reset by virtue of having polled externally (most likely for IO ticks, unlikely for intraprocess ticks)
@ -684,7 +714,7 @@ namespace Aurora::Async
AuUInt32 ThreadPool::PollAndCount(bool bStrict)
{
AuUInt32 uCount {};
auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, uCount);
auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, false, uCount);
return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0);
}
@ -696,7 +726,7 @@ namespace Aurora::Async
do
{
uCount = 0;
ranAtLeastOne |= this->InternalRunOne(this->GetThreadStateNoWarn(), false, uCount);
ranAtLeastOne |= this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount);
}
while (uCount);
@ -1035,10 +1065,10 @@ namespace Aurora::Async
}
}
#if 0
return this->InternalRunOne(false, uCount);
#else
AuUInt32 uCount {};
#if 1
return this->InternalRunOne(pA, false, false, uCount);
#else
do
{
uCount = 0;
@ -1317,12 +1347,11 @@ namespace Aurora::Async
do
{
uCount = 0;
this->PollInternal(jobWorker, false, uCount);
this->PollInternal(jobWorker, false, false, uCount);
}
while (uCount);
}
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex);

View File

@ -99,12 +99,12 @@ namespace Aurora::Async
bool Spawn(WorkerId_t workerId, bool create);
bool InternalRunOne(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
bool PollInternal(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
bool InternalRunOne(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
bool PollInternal(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
#if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_)
AuVoidTask PollInternal_ForceCoRoutine(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount, bool &bRet);
AuVoidTask PollInternal_ForceCoRoutine(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount, bool &bRet);
#endif
bool PollInternal_Base(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
bool PollInternal_Base(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
size_t GetThreadWorkersCount(ThreadGroup_t group);

View File

@ -265,6 +265,12 @@ namespace Aurora::Async
{
itr = this->waitOn_.erase(itr);
}
else if (waitable->HasFailed())
{
itr = this->waitOn_.erase(itr);
this->Fail();
return;
}
else
{
return;
@ -603,7 +609,11 @@ namespace Aurora::Async
bool WorkItem::BlockUntilComplete()
{
#if 0
if (!this->worker_)
#else
if (!this->worker_ && !AuAsync::GetCurrentWorkerPId())
#endif
{
this->finishedEvent_->Wait();
return true;
@ -656,7 +666,7 @@ namespace Aurora::Async
}
} waitProxy(this->finishedEvent_.AsPointer());
return this->owner_->WaitFor(this->worker_.value(),
return this->owner_->WaitFor(this->worker_.ValueOr(WorkerPId_t {}),
AuUnsafeRaiiToShared(&waitProxy),
0 /*forever*/);
}