diff --git a/Include/Aurora/Async/AuFutures.hpp b/Include/Aurora/Async/AuFutures.hpp index 2e5a44dd..778226c3 100644 --- a/Include/Aurora/Async/AuFutures.hpp +++ b/Include/Aurora/Async/AuFutures.hpp @@ -41,6 +41,14 @@ public: AU_NO_COPY_NO_MOVE(AuFuture); + ~AuFuture() + { + if (auto pWorkItem = AuExchange(this->pWorkItem, nullptr)) + { + delete ((std::shared_ptr *)pWorkItem); + } + } + void OnComplete(CompleteCallback_f callback) { AU_DEBUG_MEMCRUNCH; @@ -158,6 +166,53 @@ public: SubmitComplete(); } + Aurora::Async::IWorkItem *ToWorkItem() + { + AU_LOCK_GUARD(this->mutex); + + if (auto pWorkItem = this->pWorkItem) + { + return ((std::shared_ptr *)pWorkItem)->get(); + } + else + { + if (auto pNewFence = Aurora::Async::NewFence()) + { + if (this->bDone) + { + pNewFence->Dispatch(); + } + + this->pWorkItem = new std::shared_ptr(pNewFence); + return pNewFence.get(); + } + + return nullptr; + } + } + +#if defined(AU_LANG_CPP_20_) || defined(___GIMME_BLOCKUNTILCOMPLETE_LARGE_STACK) + void BlockUntilComplete() + { + { + AU_LOCK_GUARD(this->mutex); + if (this->bDone) + { + return; + } + } + + if (auto pWorkItem = this->ToWorkItem()) + { + pWorkItem->BlockUntilComplete(); + } + else + { + SysPanic(); + } + } +#endif + static AuSPtr> New() { AU_DEBUG_MEMCRUNCH; @@ -274,6 +329,11 @@ private: pThat->SubmitComplete(); }); } + + if (auto pWorkItem = this->pWorkItem) + { + (*(std::shared_ptr *)pWorkItem)->Dispatch(); + } } void DoWaterFalls() @@ -328,13 +388,14 @@ private: CompleteCallback_f callback; ErrorCallback_f onFailure; - AuOptionalEx pid; // todo: make weak? + AuOptionalEx pid; AuList> waterfall; + volatile void *pWorkItem {}; - AuUInt8 bComplete : 1 {}; - AuUInt8 bFailed : 1 {}; - AuUInt8 bDone : 1 {}; - AuUInt8 bDoneCb : 1 {}; + AuAUInt8 bComplete : 1 {}; + AuAUInt8 bFailed : 1 {}; + AuAUInt8 bDone : 1 {}; + AuAUInt8 bDoneCb : 1 {}; friend struct AuWaterfall; }; @@ -778,7 +839,7 @@ namespace __detail } template - void await_suspend(T h) + bool await_suspend(T h) { auto pFuture = this->pFuture; @@ -789,7 +850,7 @@ namespace __detail if (__detail::FutureAccessor::IsFinished(*pFuture.get())) { - return; + return false; } if constexpr (!AuIsVoid_v) @@ -806,6 +867,8 @@ namespace __detail h.resume(); }); } + + return true; } AuOptionalEx await_resume() @@ -832,7 +895,7 @@ namespace __detail } template - void await_suspend(T h) + bool await_suspend(T h) { auto pFuture = this->pFuture; @@ -840,10 +903,10 @@ namespace __detail { h.resume(); }); - + if (__detail::FutureAccessor::IsFinished(*pFuture.get())) { - return; + return false; } if constexpr (!AuIsVoid_v) @@ -860,6 +923,8 @@ namespace __detail h.resume(); }); } + + return true; } bool await_resume() @@ -867,6 +932,39 @@ namespace __detail return !__detail::FutureAccessor::IsFailed(*pFuture.get()); } }; + + struct AwaitableWorkItem + { + Aurora::Async::IWorkItem *pWorker {}; + + bool await_ready() + { + return this->pWorker->HasFinished() || this->pWorker->HasFailed(); + } + + template + bool await_suspend(T h) + { + auto pNext = Aurora::Async::NewWorkFunction(Aurora::Async::GetCurrentWorkerPId(), + [h]() + { + h.resume(); + }); + + if (this->await_ready()) + { + return false; + } + + this->pWorker->Then(pNext); + return true; + } + + bool await_resume() + { + return !this->pWorker->HasFailed(); + } + }; } #if defined(__AUHAS_COROUTINES_CO_AWAIT) @@ -885,4 +983,10 @@ inline auto operator co_await (AuSharedFuture pFuture) return __detail::AwaitableVoid { pFuture }; } +inline auto operator co_await (AuSPtr pWorkItem) +{ + SysAssert(pWorkItem); + return __detail::AwaitableWorkItem { pWorkItem.get() }; +} + #endif \ No newline at end of file diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 815f87ce..83b7270c 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -66,29 +66,59 @@ namespace Aurora::Async bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr &primitive, AuUInt32 timeoutMs) { + AuUInt64 uEndTimeNS = timeoutMs ? + AuTime::SteadyClockNS() + AuMSToNS(timeoutMs) : + 0; + if (auto pCurThread = GetThreadState()) { - bool bWorkerIdMatches = (unlocker.second == pCurThread->thread.id.second) || - ((unlocker.second == Async::kThreadIdAny) && - (GetThreadWorkersCount(unlocker.first) == 1)); - - if ((unlocker.first == pCurThread->thread.id.first) && - (unlocker.GetPool().get() == this) && - (bWorkerIdMatches)) + bool bStat {}; + { + bStat = !bool(unlocker); + } - bool queryAsync = false; - while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2))) + if (!bStat) + { + // old shid (to clean up) + bool bWorkerIdMatches = (unlocker.second == pCurThread->thread.id.second) || + ((unlocker.second == Async::kThreadIdAny) && + (GetThreadWorkersCount(unlocker.first) == 1)); + bStat = (unlocker.first == pCurThread->thread.id.first) && + (unlocker.GetPool().get() == this) && + (bWorkerIdMatches); + } + + if (bStat) + { + while (true) { - queryAsync = CtxYield(); + AuUInt32 didntAsk; + bool bTimedOut {}; - if (!queryAsync && this->shuttingdown_) + if (primitive->TryLock()) + { + return true; + } + + this->InternalRunOne(pCurThread, true, false, didntAsk); + + if (uEndTimeNS) + { + bTimedOut = AuTime::SteadyClockNS() >= uEndTimeNS; + } + + if (primitive->TryLock()) + { + return true; + } + + if (!this->shuttingdown_ || + bTimedOut) { return false; } } - - return true; } } @@ -118,7 +148,7 @@ namespace Aurora::Async } } - bool bRet = Threading::WaitFor(primitive.get(), timeoutMs); + bool bRet = primitive->LockAbsNS(uEndTimeNS); if (pHandle) { @@ -209,13 +239,13 @@ namespace Aurora::Async bool ThreadPool::Poll() { AuUInt32 uCount {}; - return InternalRunOne(GetThreadStateNoWarn(), false, uCount); + return InternalRunOne(GetThreadStateNoWarn(), false, false, uCount); } bool ThreadPool::RunOnce() { AuUInt32 uCount {}; - return InternalRunOne(GetThreadStateNoWarn(), true, uCount); + return InternalRunOne(GetThreadStateNoWarn(), true, false, uCount); } bool ThreadPool::Run() @@ -240,7 +270,7 @@ namespace Aurora::Async AuUInt32 uCount {}; // Do work (blocking) - if (!InternalRunOne(pJobRunner, true, uCount)) + if (!InternalRunOne(pJobRunner, true, true, uCount)) { if ((this->shuttingdown_ & 2) == 2) { @@ -255,7 +285,7 @@ namespace Aurora::Async return ranOnce; } - bool ThreadPool::InternalRunOne(AuSPtr state, bool block, AuUInt32 &uCount) + bool ThreadPool::InternalRunOne(AuSPtr state, bool block, bool bUntilWork, AuUInt32 &uCount) { if (!state) { @@ -281,11 +311,11 @@ namespace Aurora::Async asyncLoop->PumpNonblocking(); } - bSuccess = PollInternal(state, false, uCount); + bSuccess = PollInternal(state, false, bUntilWork, uCount); } else { - bSuccess = PollInternal(state, block, uCount); + bSuccess = PollInternal(state, block, bUntilWork, uCount); } } EarlyExitTick(); @@ -294,29 +324,29 @@ namespace Aurora::Async } #if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_) - AuVoidTask ThreadPool::PollInternal_ForceCoRoutine(AuSPtr state, bool block, AuUInt32 &uCount, bool &bRet) + AuVoidTask ThreadPool::PollInternal_ForceCoRoutine(AuSPtr state, bool block, bool bUntilWork, AuUInt32 &uCount, bool &bRet) { - bRet = PollInternal_Base(state, block, uCount); + bRet = PollInternal_Base(state, block, bUntilWork, uCount); co_return; } #endif - bool ThreadPool::PollInternal(AuSPtr state, bool block, AuUInt32 &uCount) + bool ThreadPool::PollInternal(AuSPtr state, bool block, bool bUntilWork, AuUInt32 &uCount) { #if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_) if (state->stackState.uStackCallDepth && gRuntimeConfig.async.bEnableCpp20RecursiveCallstack) { bool bRet {}; - PollInternal_ForceCoRoutine(state, block, uCount, bRet); + PollInternal_ForceCoRoutine(state, block, bUntilWork, uCount, bRet); return bRet; } #endif - return PollInternal_Base(state, block, uCount); + return PollInternal_Base(state, block, bUntilWork, uCount); } - bool ThreadPool::PollInternal_Base(AuSPtr state, bool block, AuUInt32 &uCount) + bool ThreadPool::PollInternal_Base(AuSPtr state, bool block, bool bUntilWork, AuUInt32 &uCount) { if (!state) { @@ -379,7 +409,7 @@ namespace Aurora::Async } } - while (state->pendingWorkItems.empty() && block); + while (state->pendingWorkItems.empty() && (block && bUntilWork)); if (!block && !(this->shuttingdown_ & 2)) // quick hack: is worthy of io reset by virtue of having polled externally (most likely for IO ticks, unlikely for intraprocess ticks) @@ -684,7 +714,7 @@ namespace Aurora::Async AuUInt32 ThreadPool::PollAndCount(bool bStrict) { AuUInt32 uCount {}; - auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, uCount); + auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, false, uCount); return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0); } @@ -696,7 +726,7 @@ namespace Aurora::Async do { uCount = 0; - ranAtLeastOne |= this->InternalRunOne(this->GetThreadStateNoWarn(), false, uCount); + ranAtLeastOne |= this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount); } while (uCount); @@ -1035,10 +1065,10 @@ namespace Aurora::Async } } - #if 0 - return this->InternalRunOne(false, uCount); - #else AuUInt32 uCount {}; + #if 1 + return this->InternalRunOne(pA, false, false, uCount); + #else do { uCount = 0; @@ -1317,12 +1347,11 @@ namespace Aurora::Async do { uCount = 0; - this->PollInternal(jobWorker, false, uCount); + this->PollInternal(jobWorker, false, false, uCount); } while (uCount); } - AuList> features; { AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex); diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index 97062947..472ca406 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -99,12 +99,12 @@ namespace Aurora::Async bool Spawn(WorkerId_t workerId, bool create); - bool InternalRunOne(AuSPtr, bool block, AuUInt32 &uCount); - bool PollInternal(AuSPtr, bool block, AuUInt32 &uCount); + bool InternalRunOne(AuSPtr, bool block, bool bUntilWork, AuUInt32 &uCount); + bool PollInternal(AuSPtr, bool block, bool bUntilWork, AuUInt32 &uCount); #if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_) - AuVoidTask PollInternal_ForceCoRoutine(AuSPtr, bool block, AuUInt32 &uCount, bool &bRet); + AuVoidTask PollInternal_ForceCoRoutine(AuSPtr, bool block, bool bUntilWork, AuUInt32 &uCount, bool &bRet); #endif - bool PollInternal_Base(AuSPtr, bool block, AuUInt32 &uCount); + bool PollInternal_Base(AuSPtr, bool block, bool bUntilWork, AuUInt32 &uCount); size_t GetThreadWorkersCount(ThreadGroup_t group); diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 1be045ee..9b964cd9 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -265,6 +265,12 @@ namespace Aurora::Async { itr = this->waitOn_.erase(itr); } + else if (waitable->HasFailed()) + { + itr = this->waitOn_.erase(itr); + this->Fail(); + return; + } else { return; @@ -603,7 +609,11 @@ namespace Aurora::Async bool WorkItem::BlockUntilComplete() { + #if 0 if (!this->worker_) + #else + if (!this->worker_ && !AuAsync::GetCurrentWorkerPId()) + #endif { this->finishedEvent_->Wait(); return true; @@ -656,7 +666,7 @@ namespace Aurora::Async } } waitProxy(this->finishedEvent_.AsPointer()); - return this->owner_->WaitFor(this->worker_.value(), + return this->owner_->WaitFor(this->worker_.ValueOr(WorkerPId_t {}), AuUnsafeRaiiToShared(&waitProxy), 0 /*forever*/); }