diff --git a/Source/Async/AuGroupWorkQueue.cpp b/Source/Async/AuGroupWorkQueue.cpp index 8c816f4e..e578fb9b 100644 --- a/Source/Async/AuGroupWorkQueue.cpp +++ b/Source/Async/AuGroupWorkQueue.cpp @@ -66,7 +66,7 @@ namespace Aurora::Async #endif } - void GroupWorkQueue::Dequeue(AuListOfHeap &queue, + bool GroupWorkQueue::Dequeue(AuListOfHeap &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t id) { @@ -82,7 +82,10 @@ namespace Aurora::Async if ((!uAnyCount) && (itr->first == Async::kThreadIdAny)) { - queue.push_back(*itr); + if (!AuTryInsert(queue, *itr)) + { + return false; + } itr = group.erase(itr); uAnyCount++; continue; @@ -91,7 +94,10 @@ namespace Aurora::Async if ((itr->first != Async::kThreadIdAny) && (itr->first == id)) { - queue.push_back(*itr); + if (!AuTryInsert(queue, *itr)) + { + return false; + } itr = group.erase(itr); continue; } @@ -104,6 +110,7 @@ namespace Aurora::Async break; } } - } + return true; + } } \ No newline at end of file diff --git a/Source/Async/AuGroupWorkQueue.hpp b/Source/Async/AuGroupWorkQueue.hpp index 37db3c13..663e11dc 100644 --- a/Source/Async/AuGroupWorkQueue.hpp +++ b/Source/Async/AuGroupWorkQueue.hpp @@ -26,6 +26,6 @@ namespace Aurora::Async void AddWorkEntry(WorkEntry_t entry); - void Dequeue(AuListOfHeap &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd); + bool Dequeue(AuListOfHeap &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd); }; } \ No newline at end of file diff --git a/Source/Async/AuThreadState.cpp b/Source/Async/AuThreadState.cpp index 14cbfe6d..ebeaee42 100644 --- a/Source/Async/AuThreadState.cpp +++ b/Source/Async/AuThreadState.cpp @@ -104,9 +104,12 @@ namespace Aurora::Async void ThreadStateSync::UpdateCVState(ThreadState *pState) { - auto uState = pState->sync.cvHasWork; + auto uState = AuAtomicLoad(&pState->sync.cvHasWork); auto uMin = AuMin(uState, pState->pendingWorkItems.size()); - if (!uMin) uMin = 1; + if (!uMin) + { + uMin = 1; + } while (uState && AuAtomicCompareExchange(&pState->sync.cvHasWork, uState - uMin, uState) != uState) diff --git a/Source/Async/AuThreadState.hpp b/Source/Async/AuThreadState.hpp index b8e5b2a1..6a39ffdb 100644 --- a/Source/Async/AuThreadState.hpp +++ b/Source/Async/AuThreadState.hpp @@ -47,11 +47,14 @@ namespace Aurora::Async struct ThreadStateStack { - AuUInt32 uStackCookie {}; - AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 }; - AuUInt32 uStackCallDepth {}; - AuUInt32 uStackMaxCookie { 5 }; - AuUInt8 uWorkMultipopCount { 32 }; + // Do not optimize away recursive self variables! + // Always evaluate! + // No multithread memory model awareness is required; we just need volatile. + volatile AuUInt32 uStackCookie {}; + volatile AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 }; + volatile AuUInt32 uStackCallDepth {}; + volatile AuUInt32 uStackMaxCookie { 5 }; + volatile AuUInt8 uWorkMultipopCount { 32 }; }; struct ThreadStateFeatureCallbacks diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index ab98770a..798057c9 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -386,9 +386,9 @@ namespace Aurora::Async do { - group->workQueue.Dequeue(state->pendingWorkItems, - state->stackState.uWorkMultipopCount, - state->thread.id.second); + bool bFailedOOM = group->workQueue.Dequeue(state->pendingWorkItems, + state->stackState.uWorkMultipopCount, + state->thread.id.second); state->sync.UpdateCVState(state.get()); @@ -398,6 +398,21 @@ namespace Aurora::Async break; } + // OOM: hardened: sleep for 0.01MS if the heap for task dequeue is full. + // Until the mixed heap object is implemented, we can only dequeue 2^16 tasks globally at a time into a reserved heap. + if (!bFailedOOM) + { + if (state->pendingWorkItems.empty()) + { + AuThreading::SleepNs(10'000); + continue; + } + else + { + break; + } + } + // Block if no work items are present if (state->pendingWorkItems.empty()) { @@ -412,7 +427,11 @@ namespace Aurora::Async break; } - state->sync.cvVariable->WaitForSignal(); + // OOM: hardened: do not sleep after OOM re-try + if (group->workQueue.IsEmpty(this, state->thread.id)) + { + state->sync.cvVariable->WaitForSignal(); + } if (AuAtomicLoad(&this->shuttingdown_) & 2) { @@ -1047,7 +1066,7 @@ namespace Aurora::Async return false; } - if (pThat->uAtomicCounter) + if (AuAtomicLoad(&pThat->uAtomicCounter)) { return false; } @@ -1095,7 +1114,7 @@ namespace Aurora::Async auto pA = this->GetThreadStateNoWarn(); - if (this->shuttingdown_ & 2) // fast + if (AuAtomicLoad(&this->shuttingdown_) & 2) // fast { if (pA->shutdown.bDropSubmissions) { diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index cd6291a3..dc42b445 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -161,6 +161,7 @@ namespace Aurora::Async AuList> listWeakDeps_; AuList> listWeakDepsParents_; + // TODO: fallback heap AuMemory::AllocHeapUnique_t pHeap { AuMemory::AllocHeapUnique(512 * 1024) }; friend struct KeepGroupAlive;