[*] (f466df84
: cont): OOM hardening
This commit is contained in:
parent
f466df8464
commit
830286de46
@ -66,7 +66,7 @@ namespace Aurora::Async
|
||||
#endif
|
||||
}
|
||||
|
||||
void GroupWorkQueue::Dequeue(AuListOfHeap<WorkEntry_t> &queue,
|
||||
bool GroupWorkQueue::Dequeue(AuListOfHeap<WorkEntry_t> &queue,
|
||||
AuUInt uMaxPopCount,
|
||||
AuAsync::ThreadId_t id)
|
||||
{
|
||||
@ -82,7 +82,10 @@ namespace Aurora::Async
|
||||
if ((!uAnyCount) &&
|
||||
(itr->first == Async::kThreadIdAny))
|
||||
{
|
||||
queue.push_back(*itr);
|
||||
if (!AuTryInsert(queue, *itr))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
itr = group.erase(itr);
|
||||
uAnyCount++;
|
||||
continue;
|
||||
@ -91,7 +94,10 @@ namespace Aurora::Async
|
||||
if ((itr->first != Async::kThreadIdAny) &&
|
||||
(itr->first == id))
|
||||
{
|
||||
queue.push_back(*itr);
|
||||
if (!AuTryInsert(queue, *itr))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
itr = group.erase(itr);
|
||||
continue;
|
||||
}
|
||||
@ -104,6 +110,7 @@ namespace Aurora::Async
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
@ -26,6 +26,6 @@ namespace Aurora::Async
|
||||
|
||||
void AddWorkEntry(WorkEntry_t entry);
|
||||
|
||||
void Dequeue(AuListOfHeap<WorkEntry_t> &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd);
|
||||
bool Dequeue(AuListOfHeap<WorkEntry_t> &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd);
|
||||
};
|
||||
}
|
@ -104,9 +104,12 @@ namespace Aurora::Async
|
||||
|
||||
void ThreadStateSync::UpdateCVState(ThreadState *pState)
|
||||
{
|
||||
auto uState = pState->sync.cvHasWork;
|
||||
auto uState = AuAtomicLoad(&pState->sync.cvHasWork);
|
||||
auto uMin = AuMin(uState, pState->pendingWorkItems.size());
|
||||
if (!uMin) uMin = 1;
|
||||
if (!uMin)
|
||||
{
|
||||
uMin = 1;
|
||||
}
|
||||
|
||||
while (uState &&
|
||||
AuAtomicCompareExchange<AuUInt32>(&pState->sync.cvHasWork, uState - uMin, uState) != uState)
|
||||
|
@ -47,11 +47,14 @@ namespace Aurora::Async
|
||||
|
||||
struct ThreadStateStack
|
||||
{
|
||||
AuUInt32 uStackCookie {};
|
||||
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
|
||||
AuUInt32 uStackCallDepth {};
|
||||
AuUInt32 uStackMaxCookie { 5 };
|
||||
AuUInt8 uWorkMultipopCount { 32 };
|
||||
// Do not optimize away recursive self variables!
|
||||
// Always evaluate!
|
||||
// No multithread memory model awareness is required; we just need volatile.
|
||||
volatile AuUInt32 uStackCookie {};
|
||||
volatile AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
|
||||
volatile AuUInt32 uStackCallDepth {};
|
||||
volatile AuUInt32 uStackMaxCookie { 5 };
|
||||
volatile AuUInt8 uWorkMultipopCount { 32 };
|
||||
};
|
||||
|
||||
struct ThreadStateFeatureCallbacks
|
||||
|
@ -386,9 +386,9 @@ namespace Aurora::Async
|
||||
|
||||
do
|
||||
{
|
||||
group->workQueue.Dequeue(state->pendingWorkItems,
|
||||
state->stackState.uWorkMultipopCount,
|
||||
state->thread.id.second);
|
||||
bool bFailedOOM = group->workQueue.Dequeue(state->pendingWorkItems,
|
||||
state->stackState.uWorkMultipopCount,
|
||||
state->thread.id.second);
|
||||
|
||||
state->sync.UpdateCVState(state.get());
|
||||
|
||||
@ -398,6 +398,21 @@ namespace Aurora::Async
|
||||
break;
|
||||
}
|
||||
|
||||
// OOM: hardened: sleep for 0.01MS if the heap for task dequeue is full.
|
||||
// Until the mixed heap object is implemented, we can only dequeue 2^16 tasks globally at a time into a reserved heap.
|
||||
if (!bFailedOOM)
|
||||
{
|
||||
if (state->pendingWorkItems.empty())
|
||||
{
|
||||
AuThreading::SleepNs(10'000);
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Block if no work items are present
|
||||
if (state->pendingWorkItems.empty())
|
||||
{
|
||||
@ -412,7 +427,11 @@ namespace Aurora::Async
|
||||
break;
|
||||
}
|
||||
|
||||
state->sync.cvVariable->WaitForSignal();
|
||||
// OOM: hardened: do not sleep after OOM re-try
|
||||
if (group->workQueue.IsEmpty(this, state->thread.id))
|
||||
{
|
||||
state->sync.cvVariable->WaitForSignal();
|
||||
}
|
||||
|
||||
if (AuAtomicLoad(&this->shuttingdown_) & 2)
|
||||
{
|
||||
@ -1047,7 +1066,7 @@ namespace Aurora::Async
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pThat->uAtomicCounter)
|
||||
if (AuAtomicLoad(&pThat->uAtomicCounter))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@ -1095,7 +1114,7 @@ namespace Aurora::Async
|
||||
|
||||
auto pA = this->GetThreadStateNoWarn();
|
||||
|
||||
if (this->shuttingdown_ & 2) // fast
|
||||
if (AuAtomicLoad(&this->shuttingdown_) & 2) // fast
|
||||
{
|
||||
if (pA->shutdown.bDropSubmissions)
|
||||
{
|
||||
|
@ -161,6 +161,7 @@ namespace Aurora::Async
|
||||
AuList<AuWPtr<ThreadPool>> listWeakDeps_;
|
||||
AuList<AuWPtr<ThreadPool>> listWeakDepsParents_;
|
||||
|
||||
// TODO: fallback heap
|
||||
AuMemory::AllocHeapUnique_t pHeap { AuMemory::AllocHeapUnique(512 * 1024) };
|
||||
|
||||
friend struct KeepGroupAlive;
|
||||
|
Loading…
Reference in New Issue
Block a user