From d3587dbf08e80acf17beef64efc3f14d8a2c6da2 Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Tue, 5 Sep 2023 13:54:05 +0100 Subject: [PATCH] [*] 560ca079 cont perf improvement --- Source/Async/GroupState.cpp | 1 + Source/Async/GroupState.hpp | 5 ++-- Source/Async/ThreadPool.cpp | 51 +++++++++++++++++++++++++++++------- Source/Async/ThreadPool.hpp | 2 +- Source/Async/ThreadState.hpp | 1 + 5 files changed, 48 insertions(+), 12 deletions(-) diff --git a/Source/Async/GroupState.cpp b/Source/Async/GroupState.cpp index 0e148c42..a262af7b 100644 --- a/Source/Async/GroupState.cpp +++ b/Source/Async/GroupState.cpp @@ -45,6 +45,7 @@ namespace Aurora::Async for (const auto &worker : this->workers) { + AuAtomicAdd(&worker.second->cvHasWork, 1u); worker.second->cvVariable->Broadcast(); worker.second->eventLs->Set(); } diff --git a/Source/Async/GroupState.hpp b/Source/Async/GroupState.hpp index feffe7d2..c19e0196 100644 --- a/Source/Async/GroupState.hpp +++ b/Source/Async/GroupState.hpp @@ -10,6 +10,7 @@ namespace Aurora::Async { + struct ThreadPool; struct GroupWorkQueue { AuThreadPrimitives::Mutex mutex; @@ -17,9 +18,9 @@ namespace Aurora::Async AuList sortedWork[AuAsync::kEWorkPrioCount]; bool IsEmpty(); - bool IsEmpty(AuWorkerId_t id); + bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id); - void AddWorkEntry(WorkEntry_t entry); + void AddWorkEntry(ThreadState *pState, WorkEntry_t entry); void Dequeue(AuList &queue, int maxPopCount, AuAsync::ThreadId_t idd); }; diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index fe88cb4d..1aebdae2 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -143,7 +143,7 @@ namespace Aurora::Async AuAtomicAdd(&this->uAtomicCounter, 1u); } - state->workQueue.AddWorkEntry(AuMakePair(target.second, runnable)); + state->workQueue.AddWorkEntry(pWorker.get(), AuMakePair(target.second, runnable)); if (target.second == Async::kThreadIdAny) { @@ -341,14 +341,21 @@ namespace Aurora::Async } - void GroupWorkQueue::AddWorkEntry(WorkEntry_t entry) + void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry) { - AU_LOCK_GUARD(this->mutex); - auto prio = (int)entry.second->GetPrio(); SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO"); - + + AU_LOCK_GUARD(this->mutex); this->sortedWork[prio].push_back(entry); + + if (entry.first != kThreadIdAny) + { + if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first)) + { + AuAtomicAdd(&pThat->cvHasWork, 1u); + } + } } bool GroupWorkQueue::IsEmpty() @@ -365,8 +372,17 @@ namespace Aurora::Async return true; } - bool GroupWorkQueue::IsEmpty(AuWorkerId_t id) + bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id) { + #if 1 + auto pHandle = pPool->GetThreadHandle(id); + if (!pHandle) + { + return false; + } + + return !AuAtomicLoad(&pHandle->cvHasWork); + #else AU_LOCK_GUARD(this->mutex); for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount)) @@ -379,8 +395,9 @@ namespace Aurora::Async } } } - + return true; + #endif } void GroupWorkQueue::Dequeue(AuList &queue, int maxPopCount, AuAsync::ThreadId_t id) @@ -483,7 +500,23 @@ namespace Aurora::Async } while (state->pendingWorkItems.empty() && block); - if (group->workQueue.IsEmpty(state->id)) + { + auto uState = state->cvHasWork; + + while (uState && + AuAtomicCompareExchange(&state->cvHasWork, uState - 1, uState) != uState) + { + uState = state->cvHasWork; + + if (uState == 0) + { + break; + } + } + } + + if (!block && // quick hack: is worthy of io reset by virtue of having polled externally (most likely)? + group->workQueue.IsEmpty(this, state->id)) { state->eventLs->Reset(); AuAtomicStore(&state->cvLSActive, 0u); @@ -578,7 +611,7 @@ namespace Aurora::Async for (const auto &item : state->pendingWorkItems) { - group->workQueue.AddWorkEntry(item); + group->workQueue.AddWorkEntry(state.get(), item); } state->pendingWorkItems.clear(); diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index 2a9cb5f5..257bae6a 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -122,6 +122,7 @@ namespace Aurora::Async AuThreads::TLSVariable tlsWorkerId; + AuSPtr GetThreadHandle(WorkerId_t id); private: // TODO: BarrierMultiple bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop); @@ -137,7 +138,6 @@ namespace Aurora::Async AuSPtr GetGroup(ThreadGroup_t type); AuSPtr GetThreadState(); AuSPtr GetThreadStateNoWarn(); - AuSPtr GetThreadHandle(WorkerId_t id); AuList> GetThreadHandles(WorkerId_t id); AuSPtr threadGroups_[255]; diff --git a/Source/Async/ThreadState.hpp b/Source/Async/ThreadState.hpp index d6e2a47c..7061642f 100644 --- a/Source/Async/ThreadState.hpp +++ b/Source/Async/ThreadState.hpp @@ -70,6 +70,7 @@ namespace Aurora::Async AuThreadPrimitives::ConditionVariable cvVariable; AuAUInt32 cvSleepCount {}; AuAUInt32 cvLSActive {}; + AuAUInt32 cvHasWork {}; AuSPtr eventLs; AuSPtr asyncLoopSourceShared; };