From e037fc214a18412178bce905d081c08541556673 Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Sat, 11 Nov 2023 10:11:09 +0000 Subject: [PATCH] [*] Begin cleaning up AuAsync (2/?) 6af9940b, ??? --- Include/Aurora/Async/IThreadPool.hpp | 16 - Source/Async/Async.cpp | 2 +- Source/Async/AsyncApp.cpp | 17 +- Source/Async/AsyncApp.hpp | 3 - Source/Async/AuAThreadPoolInternal.cpp | 14 + Source/Async/AuAThreadPoolInternal.hpp | 13 + Source/Async/AuAsyncKeepGroupAlive.cpp | 47 ++ Source/Async/AuAsyncKeepGroupAlive.hpp | 13 + Source/Async/AuAsyncMicrocounter.cpp | 14 + Source/Async/AuAsyncMicrocounter.hpp | 13 + Source/Async/AuGroupWorkQueue.cpp | 112 ++++ Source/Async/AuGroupWorkQueue.hpp | 31 ++ .../Async/{Schedular.cpp => AuSchedular.cpp} | 6 +- .../Async/{Schedular.hpp => AuSchedular.hpp} | 2 +- Source/Async/AuThreadState.cpp | 93 ++++ Source/Async/AuThreadState.hpp | 62 +++ Source/Async/AuThreadStateSingletons.cpp | 14 + Source/Async/AuThreadStateSingletons.hpp | 13 + Source/Async/GroupState.cpp | 44 +- Source/Async/GroupState.hpp | 39 +- Source/Async/IAsyncRunnable.hpp | 0 Source/Async/IThreadPoolInternal.hpp | 27 + Source/Async/ThreadPool.cpp | 479 ++++-------------- Source/Async/ThreadPool.hpp | 28 +- Source/Async/ThreadState.hpp | 40 +- Source/Async/ThreadWorkerQueueShim.cpp | 7 +- Source/Async/ThreadWorkerQueueShim.hpp | 2 +- Source/Async/WorkItem.cpp | 6 +- 28 files changed, 650 insertions(+), 507 deletions(-) create mode 100644 Source/Async/AuAThreadPoolInternal.cpp create mode 100644 Source/Async/AuAThreadPoolInternal.hpp create mode 100644 Source/Async/AuAsyncKeepGroupAlive.cpp create mode 100644 Source/Async/AuAsyncKeepGroupAlive.hpp create mode 100644 Source/Async/AuAsyncMicrocounter.cpp create mode 100644 Source/Async/AuAsyncMicrocounter.hpp create mode 100644 Source/Async/AuGroupWorkQueue.cpp create mode 100644 Source/Async/AuGroupWorkQueue.hpp rename Source/Async/{Schedular.cpp => AuSchedular.cpp} (98%) rename Source/Async/{Schedular.hpp => AuSchedular.hpp} (94%) create mode 100644 Source/Async/AuThreadState.cpp create mode 100644 Source/Async/AuThreadState.hpp create mode 100644 Source/Async/AuThreadStateSingletons.cpp create mode 100644 Source/Async/AuThreadStateSingletons.hpp create mode 100644 Source/Async/IAsyncRunnable.hpp create mode 100644 Source/Async/IThreadPoolInternal.hpp diff --git a/Include/Aurora/Async/IThreadPool.hpp b/Include/Aurora/Async/IThreadPool.hpp index 05bf1e00..d2fc99c9 100644 --- a/Include/Aurora/Async/IThreadPool.hpp +++ b/Include/Aurora/Async/IThreadPool.hpp @@ -15,19 +15,6 @@ namespace Aurora::IO::Loop namespace Aurora::Async { - AUE_DEFINE(ERunMode, - ( - eLowLatencyYield, // uses high perf cond var + yield + trylock - eLowLatencyFreqKernel, // uses high perf cond var + timeout(freqency) - eEfficient // delegates sleep to the kernel once kernel objects are scheduled - )); - - struct RunMode - { - ERunMode mode; - AuUInt16 freqMsTick; - }; - struct IThreadPool { // Spawning @@ -122,9 +109,6 @@ namespace Aurora::Async // AuIO overlapped-IO glue virtual AuSPtr ToKernelWorkQueue() = 0; virtual AuSPtr ToKernelWorkQueue(WorkerId_t workerId) = 0; - virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) = 0; - virtual ERunMode GetCurrentThreadRunMode() = 0; - virtual ERunMode GetThreadRunMode(WorkerId_t workerId) = 0; // virtual AuSPtr GetShutdownEvent() = 0; diff --git a/Source/Async/Async.cpp b/Source/Async/Async.cpp index 4faa4004..6fdd798f 100644 --- a/Source/Async/Async.cpp +++ b/Source/Async/Async.cpp @@ -7,7 +7,7 @@ ***/ #include #include "Async.hpp" -#include "Schedular.hpp" +#include "AuSchedular.hpp" #include "AsyncApp.hpp" namespace Aurora::Async diff --git a/Source/Async/AsyncApp.cpp b/Source/Async/AsyncApp.cpp index b5028b40..ab9a4f6c 100644 --- a/Source/Async/AsyncApp.cpp +++ b/Source/Async/AsyncApp.cpp @@ -9,7 +9,7 @@ #include "Async.hpp" #include "AsyncApp.hpp" #include "WorkItem.hpp" -#include "Schedular.hpp" +#include "AuSchedular.hpp" #include @@ -237,19 +237,4 @@ namespace Aurora::Async { return ThreadPool::ToKernelWorkQueue(workerId); } - - void AsyncApp::UpdateWorkMode(WorkerId_t workerId, RunMode mode) - { - ThreadPool::UpdateWorkMode(workerId, mode); - } - - ERunMode AsyncApp::GetCurrentThreadRunMode() - { - return ThreadPool::GetCurrentThreadRunMode(); - } - - ERunMode AsyncApp::GetThreadRunMode(WorkerId_t workerId) - { - return ThreadPool::GetThreadRunMode(workerId); - } } \ No newline at end of file diff --git a/Source/Async/AsyncApp.hpp b/Source/Async/AsyncApp.hpp index d45205c6..f5249160 100644 --- a/Source/Async/AsyncApp.hpp +++ b/Source/Async/AsyncApp.hpp @@ -45,9 +45,6 @@ namespace Aurora::Async AuSPtr ToKernelWorkQueue() override; AuSPtr ToKernelWorkQueue(WorkerId_t workerId) override; - void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override; - ERunMode GetCurrentThreadRunMode() override; - ERunMode GetThreadRunMode(WorkerId_t workerId) override; // Main thread logic void Start() override; diff --git a/Source/Async/AuAThreadPoolInternal.cpp b/Source/Async/AuAThreadPoolInternal.cpp new file mode 100644 index 00000000..7a585025 --- /dev/null +++ b/Source/Async/AuAThreadPoolInternal.cpp @@ -0,0 +1,14 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuAThreadPoolInternal.cpp + Date: 2023-11-04 + Author: Reece +***/ +#include +#include "AuAThreadPoolInternal.hpp" + +namespace Aurora::Async +{ + +} \ No newline at end of file diff --git a/Source/Async/AuAThreadPoolInternal.hpp b/Source/Async/AuAThreadPoolInternal.hpp new file mode 100644 index 00000000..5cb5c2af --- /dev/null +++ b/Source/Async/AuAThreadPoolInternal.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuGroupWorkQueue.hpp + Date: 2023-11-04 + Author: Reece +***/ +#pragma once + +namespace Aurora::Async +{ + +} \ No newline at end of file diff --git a/Source/Async/AuAsyncKeepGroupAlive.cpp b/Source/Async/AuAsyncKeepGroupAlive.cpp new file mode 100644 index 00000000..b38c866f --- /dev/null +++ b/Source/Async/AuAsyncKeepGroupAlive.cpp @@ -0,0 +1,47 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuAsyncKeepGroupAlive.cpp + Date: 2023-11-04 + Author: Reece +***/ +#include +#include "AuAsyncKeepGroupAlive.hpp" +#include "ThreadPool.hpp" +#include "ThreadState.hpp" +#include "GroupState.hpp" + +namespace Aurora::Async +{ + struct KeepGroupAlive + { + KeepGroupAlive(AuSPtr pPool) : + pPool(AuStaticCast(pPool)) + { + AuAtomicAdd(&this->pPool->uAtomicCounter, 1u); + } + + ~KeepGroupAlive() + { + auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u); + + if (uNow == 0) + { + for (const auto &pState : this->pPool->threadGroups_) + { + if (pState) + { + pState->SignalAll(); + } + } + } + } + + AuSPtr pPool; + }; + + AUKN_SYM AuSPtr KeepThreadPoolAlive(AuSPtr pPool) + { + return AuMakeSharedThrow(pPool); + } +} \ No newline at end of file diff --git a/Source/Async/AuAsyncKeepGroupAlive.hpp b/Source/Async/AuAsyncKeepGroupAlive.hpp new file mode 100644 index 00000000..7bc4c5af --- /dev/null +++ b/Source/Async/AuAsyncKeepGroupAlive.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuAsyncKeepGroupAlive.hpp + Date: 2023-11-04 + Author: Reece +***/ +#pragma once + +namespace Aurora::Async +{ + +} \ No newline at end of file diff --git a/Source/Async/AuAsyncMicrocounter.cpp b/Source/Async/AuAsyncMicrocounter.cpp new file mode 100644 index 00000000..58b26995 --- /dev/null +++ b/Source/Async/AuAsyncMicrocounter.cpp @@ -0,0 +1,14 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuAsyncMicrocounter.cpp + Date: 2023-11-04 + Author: Reece +***/ +#include +#include "AuAsyncMicrocounter.hpp" + +namespace Aurora::Async +{ + +} \ No newline at end of file diff --git a/Source/Async/AuAsyncMicrocounter.hpp b/Source/Async/AuAsyncMicrocounter.hpp new file mode 100644 index 00000000..45ed5e68 --- /dev/null +++ b/Source/Async/AuAsyncMicrocounter.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuAsyncMicrocounter.hpp + Date: 2023-11-04 + Author: Reece +***/ +#pragma once + +namespace Aurora::Async +{ + +} \ No newline at end of file diff --git a/Source/Async/AuGroupWorkQueue.cpp b/Source/Async/AuGroupWorkQueue.cpp new file mode 100644 index 00000000..1236b3c0 --- /dev/null +++ b/Source/Async/AuGroupWorkQueue.cpp @@ -0,0 +1,112 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuGroupWorkQueue.cpp + Date: 2023-11-04 + Author: Reece +***/ +#include +#include "AuGroupWorkQueue.hpp" +#include "ThreadState.hpp" +#include "GroupState.hpp" +#include "ThreadPool.hpp" + +namespace Aurora::Async +{ + void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry) + { + AU_DEBUG_MEMCRUNCH; + + auto prio = (int)entry.second->GetPrio(); + SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO"); + + AU_LOCK_GUARD(this->mutex); + this->sortedWork[prio].push_back(entry); + + if (entry.first != kThreadIdAny) + { + if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first)) + { + AuAtomicAdd(&pThat->sync.cvHasWork, 1u); + } + } + } + + bool GroupWorkQueue::IsEmpty() + { + AU_LOCK_GUARD(this->mutex); + for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount)) + { + if (this->sortedWork[i].size()) + { + return false; + } + } + + return true; + } + + bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id) + { + #if 1 + auto pHandle = pPool->GetThreadHandle(id); + if (!pHandle) + { + return false; + } + + return !AuAtomicLoad(&pHandle->sync.cvHasWork); + #else + AU_LOCK_GUARD(this->mutex); + + for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount)) + { + for (const auto &[srcId, pA] : this->sortedWork[i]) + { + if (id == srcId) + { + return false; + } + } + } + + return true; + #endif + } + + void GroupWorkQueue::Dequeue(AuList &queue, int maxPopCount, AuAsync::ThreadId_t id) + { + AU_LOCK_GUARD(this->mutex); + + for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount)) + { + auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i]; + + for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); ) + { + if (itr->first == Async::kThreadIdAny) + { + queue.push_back(*itr); + itr = group.erase(itr); + continue; + } + + if ((itr->first != Async::kThreadIdAny) && + (itr->first == id)) + { + queue.push_back(*itr); + itr = group.erase(itr); + continue; + } + + itr++; + } + + if (queue.size()) + { + break; + } + } + } + +} \ No newline at end of file diff --git a/Source/Async/AuGroupWorkQueue.hpp b/Source/Async/AuGroupWorkQueue.hpp new file mode 100644 index 00000000..2ad7d6ea --- /dev/null +++ b/Source/Async/AuGroupWorkQueue.hpp @@ -0,0 +1,31 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuGroupWorkQueue.hpp + Date: 2023-11-04 + Author: Reece +***/ +#pragma once + +namespace Aurora::Async +{ + struct IAsyncRunnable; + struct ThreadPool; + struct ThreadState; + + using WorkEntry_t = AuPair>; + + struct GroupWorkQueue + { + AuThreadPrimitives::Mutex mutex; + AuUInt32 uItems {}; + AuList sortedWork[AuAsync::kEWorkPrioCount]; + + bool IsEmpty(); + bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id); + + void AddWorkEntry(ThreadState *pState, WorkEntry_t entry); + + void Dequeue(AuList &queue, int maxPopCount, AuAsync::ThreadId_t idd); + }; +} \ No newline at end of file diff --git a/Source/Async/Schedular.cpp b/Source/Async/AuSchedular.cpp similarity index 98% rename from Source/Async/Schedular.cpp rename to Source/Async/AuSchedular.cpp index fbfd0107..3da9462e 100644 --- a/Source/Async/Schedular.cpp +++ b/Source/Async/AuSchedular.cpp @@ -1,13 +1,13 @@ /*** - Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. + Copyright (C) 2021-2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved. - File: Schedular.cpp + File: AuSchedular.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" -#include "Schedular.hpp" +#include "AuSchedular.hpp" //#include "AsyncApp.hpp" #include "ThreadPool.hpp" #include diff --git a/Source/Async/Schedular.hpp b/Source/Async/AuSchedular.hpp similarity index 94% rename from Source/Async/Schedular.hpp rename to Source/Async/AuSchedular.hpp index f7c316ea..c1f4f78c 100644 --- a/Source/Async/Schedular.hpp +++ b/Source/Async/AuSchedular.hpp @@ -1,7 +1,7 @@ /*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. - File: Schedular.hpp + File: AuSchedular.hpp Date: 2021-6-26 Author: Reece ***/ diff --git a/Source/Async/AuThreadState.cpp b/Source/Async/AuThreadState.cpp new file mode 100644 index 00000000..c1985dc2 --- /dev/null +++ b/Source/Async/AuThreadState.cpp @@ -0,0 +1,93 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuThreadState.hpp + Date: 2023-11-04 + Author: Reece +***/ +#include +#include "Async.hpp" +#include "AuThreadState.hpp" +#include +#include "ThreadWorkerQueueShim.hpp" + +namespace Aurora::Async +{ + ThreadStateSync::ThreadStateSync(): + cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer())) + { + } + + bool ThreadStateSync::Init() + { + this->eventLs = AuLoop::NewLSAsync(); + return true; + } + + bool ThreadStateBase::Init() + { + if (!(this->asyncLoop = AuMakeShared())) + { + SysPushErrorMemory(); + return {}; + } + this->asyncLoop->pParent = this; + + if (!this->asyncLoop->Init()) + { + SysPushErrorNested(); + return {}; + } + + if (!this->sync.Init()) + { + SysPushErrorNested(); + return {}; + } + + if (!this->asyncLoop->SourceAdd(this->sync.eventLs)) + { + SysPushErrorNested(); + return {}; + } + + return true; + } + + void ThreadStateSync::SetEvent(bool bBoth) + { + if (auto pEvent = this->eventLs) + { + if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0) + { + pEvent->Set(); + } + } + + if (bBoth) + { + this->cvWorkMutex->Lock(); + this->cvWorkMutex->Unlock(); + + this->cvVariable->Signal(); + } + } + + void ThreadStateSync::UpdateCVState(ThreadState *pState) + { + auto uState = pState->sync.cvHasWork; + auto uMin = AuMin(uState, pState->pendingWorkItems.size()); + if (!uMin) uMin = 1; + + while (uState && + AuAtomicCompareExchange(&pState->sync.cvHasWork, uState - uMin, uState) != uState) + { + uState = pState->sync.cvHasWork; + + if (uState < uMin) + { + uMin = uState; + } + } + } +} \ No newline at end of file diff --git a/Source/Async/AuThreadState.hpp b/Source/Async/AuThreadState.hpp new file mode 100644 index 00000000..474eda22 --- /dev/null +++ b/Source/Async/AuThreadState.hpp @@ -0,0 +1,62 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuThreadState.hpp + Date: 2023-11-04 + Author: Reece +***/ +#pragma once + +namespace Aurora::Async +{ + struct AsyncLoop; + struct ThreadState; + + struct ThreadStateShutdown + { + bool bBreakMainLoop {}; + bool bIsKillerThread {}; + AuUInt32 uShutdownFence { 1 }; + }; + + struct ThreadStateSync + { + ThreadStateSync(); + + AuThreadPrimitives::ConditionMutex cvWorkMutex; + AuThreadPrimitives::ConditionVariable cvVariable; + AuAUInt32 cvLSActive {}; + AuAUInt32 cvHasWork {}; + AuSPtr eventLs; + + void SetEvent(bool bBoth = true); + bool Init(); + void UpdateCVState(ThreadState *pState); + }; + + struct ThreadStateMeta + { + bool bOwnsThread {}; + AuThreads::ThreadShared_t pThread; + }; + + struct ThreadStateStack + { + AuUInt32 uStackCookie {}; + AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 }; + AuUInt32 uStackCallDepth {}; + AuUInt32 uStackMaxCookie { 5 }; + + }; + + struct ThreadStateBase + { + ThreadStateShutdown shutdown; + ThreadStateMeta thread; + ThreadStateSync sync; + AuSPtr asyncLoop; + ThreadStateStack stackState; + + bool Init(); + }; +} \ No newline at end of file diff --git a/Source/Async/AuThreadStateSingletons.cpp b/Source/Async/AuThreadStateSingletons.cpp new file mode 100644 index 00000000..035ac357 --- /dev/null +++ b/Source/Async/AuThreadStateSingletons.cpp @@ -0,0 +1,14 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuThreadStateSingletons.cpp + Date: 2023-11-04 + Author: Reece +***/ +#include +#include "AuThreadStateSingletons.hpp" + +namespace Aurora::Async +{ + +} \ No newline at end of file diff --git a/Source/Async/AuThreadStateSingletons.hpp b/Source/Async/AuThreadStateSingletons.hpp new file mode 100644 index 00000000..ce583d18 --- /dev/null +++ b/Source/Async/AuThreadStateSingletons.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuThreadStateSingletons.hpp + Date: 2023-11-04 + Author: Reece +***/ +#pragma once + +namespace Aurora::Async +{ + +} \ No newline at end of file diff --git a/Source/Async/GroupState.cpp b/Source/Async/GroupState.cpp index 35fd7f59..27c5d5a0 100644 --- a/Source/Async/GroupState.cpp +++ b/Source/Async/GroupState.cpp @@ -64,7 +64,7 @@ namespace Aurora::Async } } - void GroupState::BroadCast() + void GroupState::SignalAll() { AU_LOCK_GUARD(this->workersMutex); @@ -75,12 +75,11 @@ namespace Aurora::Async continue; } - AuAtomicAdd(&pWorker->cvHasWork, 1u); - pWorker->SetEvent(); + pWorker->sync.SetEvent(); } } - void GroupState::AddWorker(ThreadId_t id, AuSPtr pState) + bool GroupState::AddWorker(ThreadId_t id, AuSPtr pState) { AU_LOCK_GUARD(this->workersMutex); @@ -89,6 +88,41 @@ namespace Aurora::Async this->wpWorkers[id] = pState; } - SysAssert(AuTryInsert(this->workers, AuMakePair(id, pState))); + if (!AuTryInsert(this->workers, AuMakePair(id, pState))) + { + this->wpWorkers[id] = {}; + return {}; + } + + return true; + } + + AuSPtr GroupState::CreateWorker(WorkerId_t workerId, bool bCreate) + { + if (auto pThreadState = AuMakeShared()) + { + pThreadState->thread.bOwnsThread = !bCreate; + pThreadState->parent = this->SharedFromThis(); + pThreadState->id = workerId; + + if (!pThreadState->Init()) + { + SysPushErrorNested(); + return {}; + } + + if (!this->AddWorker(workerId.second, pThreadState)) + { + SysPushErrorNested(); + return {}; + } + + return pThreadState; + } + else + { + SysPushErrorMemory(); + return {}; + } } } \ No newline at end of file diff --git a/Source/Async/GroupState.hpp b/Source/Async/GroupState.hpp index 88c8682b..e8afb33a 100644 --- a/Source/Async/GroupState.hpp +++ b/Source/Async/GroupState.hpp @@ -6,47 +6,38 @@ Author: Reece ***/ #pragma once + #include "ThreadState.hpp" +#include "AuGroupWorkQueue.hpp" namespace Aurora::Async { struct ThreadPool; - struct GroupWorkQueue - { - AuThreadPrimitives::Mutex mutex; - AuUInt32 uItems {}; - AuList sortedWork[AuAsync::kEWorkPrioCount]; - - bool IsEmpty(); - bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id); - - void AddWorkEntry(ThreadState *pState, WorkEntry_t entry); - - void Dequeue(AuList &queue, int maxPopCount, AuAsync::ThreadId_t idd); - }; - - struct GroupState + + + struct GroupState : + AuEnableSharedFromThis { + // group id ThreadGroup_t group; - + + // work items GroupWorkQueue workQueue; + // tracked workers AuThreadPrimitives::Mutex workersMutex; AuBST> workers; - AuWPtr wpWorkers[32]; + AuWPtr wpWorkers[32]; // linear non-locking lookup table + // bool Init(); - void BroadCast(); + void SignalAll(); void Decommit(ThreadId_t id); - void AddWorker(ThreadId_t id, AuSPtr pState); + bool AddWorker(ThreadId_t id, AuSPtr pState); + AuSPtr CreateWorker(WorkerId_t id, bool bCreate); AuSPtr GetThreadByIndex(ThreadId_t uIndex); - - bool inline IsSysThread() - { - return group == 0; - } }; } \ No newline at end of file diff --git a/Source/Async/IAsyncRunnable.hpp b/Source/Async/IAsyncRunnable.hpp new file mode 100644 index 00000000..e69de29b diff --git a/Source/Async/IThreadPoolInternal.hpp b/Source/Async/IThreadPoolInternal.hpp new file mode 100644 index 00000000..0e0c3641 --- /dev/null +++ b/Source/Async/IThreadPoolInternal.hpp @@ -0,0 +1,27 @@ +/*** + Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IThreadPoolInternal.hpp + Date: 2023-11-04 + Author: Reece +***/ +#pragma once + +namespace Aurora::Async +{ + struct ThreadState; + struct IAsyncRunnable; + + struct IThreadPoolInternal + { + virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) = 0; + virtual void Run(WorkerId_t target, AuSPtr runnable) = 0; + virtual AuSPtr GetThreadHandle(WorkerId_t id) = 0; + virtual IThreadPool *ToThreadPool() = 0; + + AuUInt32 uAtomicCounter {}; + AuUInt32 uAtomicIOProcessors {}; + AuUInt32 uAtomicIOProcessorsWorthlessSources {}; + AuUInt32 uAtomicShutdownCookie {}; + }; +} \ No newline at end of file diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 26fea5cb..152cc7ac 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -10,17 +10,14 @@ #include "ThreadPool.hpp" #include "AsyncApp.hpp" #include "WorkItem.hpp" -#include "Schedular.hpp" +#include "AuSchedular.hpp" #include "ThreadWorkerQueueShim.hpp" -#include namespace Aurora::Async { //STATIC_TLS(WorkerId_t, tlsWorkerId); - static thread_local AuWPtr gCurrentPool; - static const auto kMagicResortThreshold = 15; + static thread_local AuWPtr tlsCurrentThreadPool; - static thread_local int tlsCallStack; inline auto GetWorkerInternal(const AuSPtr &pool) { if (pool.get() == AuStaticCast(gAsyncApp)) @@ -33,17 +30,28 @@ namespace Aurora::Async AUKN_SYM WorkerPId_t GetCurrentWorkerPId() { - auto lkPool = gCurrentPool.lock(); - if (!lkPool) return {}; + auto lkPool = tlsCurrentThreadPool.lock(); + if (!lkPool) + { + return {}; + } + auto cpy = *lkPool->tlsWorkerId; - auto lkPool2 = cpy.pool.lock(); - return WorkerPId_t(lkPool, cpy); + if (auto pPool = AuTryLockMemoryType(cpy.pool)) + { + return WorkerPId_t(pPool, cpy); + } + else + { + return {}; + } } // ThreadPool::ThreadPool() : shutdownEvent_(false, false, true) { + this->pRWReadView = this->rwlock_->AsReadable(); } // internal pool interface @@ -136,8 +144,6 @@ namespace Aurora::Async return; } - AU_DEBUG_MEMCRUNCH; - if (bIncrement) { AuAtomicAdd(&this->uAtomicCounter, 1u); @@ -147,20 +153,11 @@ namespace Aurora::Async if (target.second == Async::kThreadIdAny) { - state->BroadCast(); + state->SignalAll(); } else { - if (AuAtomicLoad(&pWorker->cvSleepCount)) - { - // Barrier: - pWorker->cvWorkMutex->Lock(); - pWorker->cvWorkMutex->Unlock(); - - pWorker->cvVariable->Signal(); - } - - pWorker->SetEvent(false); + pWorker->sync.SetEvent(); } } @@ -173,7 +170,7 @@ namespace Aurora::Async size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group) { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); return GetGroup(group)->workers.size(); } @@ -221,14 +218,12 @@ namespace Aurora::Async return true; } - gCurrentPool = AuWeakFromThis(); - + auto oldTlsHandle = AuExchange(tlsCurrentThreadPool, AuSharedFromThis()); auto auThread = AuThreads::GetThread(); - while ((!auThread->Exiting()) && ((this->shuttingdown_ & 2) != 2) && - (!pJobRunner->bBreakEarly)) + (!pJobRunner->shutdown.bBreakMainLoop)) { AuUInt32 uCount {}; @@ -243,6 +238,8 @@ namespace Aurora::Async ranOnce = true; } + tlsCurrentThreadPool = oldTlsHandle; + return ranOnce; } @@ -254,163 +251,34 @@ namespace Aurora::Async return false; } - bool success {}; - auto runMode = GetCurrentThreadRunMode(); - + bool bSuccess {}; EarlyExitTick(); - //do { auto asyncLoop = state->asyncLoop; - asyncLoop->OnFrame(); if (asyncLoop->GetSourceCount() > 1) { - bool bShouldTrySleepForKernel = block; - if (!block) + if (block) { - bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek(); + asyncLoop->WaitAny(0); } - - if (bShouldTrySleepForKernel) + else { - AuAtomicAdd(&state->cvSleepCount, 1u); - if (block) - { - asyncLoop->WaitAny(0); - } - else - { - asyncLoop->PumpNonblocking(); - } - AuAtomicSub(&state->cvSleepCount, 1u); + asyncLoop->PumpNonblocking(); } - success = PollInternal(state, false, uCount); + bSuccess = PollInternal(state, false, uCount); } else { - success = PollInternal(state, block, uCount); + bSuccess = PollInternal(state, block, uCount); } - } //while (success); - + } EarlyExitTick(); - return success; - } - - - void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry) - { - auto prio = (int)entry.second->GetPrio(); - SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO"); - - AU_LOCK_GUARD(this->mutex); - this->sortedWork[prio].push_back(entry); - - if (entry.first != kThreadIdAny) - { - if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first)) - { - AuAtomicAdd(&pThat->cvHasWork, 1u); - } - } - } - - bool GroupWorkQueue::IsEmpty() - { - AU_LOCK_GUARD(this->mutex); - for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount)) - { - if (this->sortedWork[i].size()) - { - return false; - } - } - - return true; - } - - bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id) - { - #if 1 - auto pHandle = pPool->GetThreadHandle(id); - if (!pHandle) - { - return false; - } - - return !AuAtomicLoad(&pHandle->cvHasWork); - #else - AU_LOCK_GUARD(this->mutex); - - for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount)) - { - for (const auto &[srcId, pA] : this->sortedWork[i]) - { - if (id == srcId) - { - return false; - } - } - } - - return true; - #endif - } - - void GroupWorkQueue::Dequeue(AuList &queue, int maxPopCount, AuAsync::ThreadId_t id) - { - AU_LOCK_GUARD(this->mutex); - - for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount)) - { - auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i]; - - for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); ) - { - if (itr->first == Async::kThreadIdAny) - { - queue.push_back(*itr); - itr = group.erase(itr); - continue; - } - - if ((itr->first != Async::kThreadIdAny) && - (itr->first == id)) - { - queue.push_back(*itr); - itr = group.erase(itr); - continue; - } - - itr++; - } - - if (queue.size()) - { - break; - } - } - } - - void ThreadPool::DoThing(ThreadState *pState) - { - auto uState = pState->cvHasWork; - auto uMin = AuMin(uState, pState->pendingWorkItems.size()); - if (!uMin) uMin = 1; - - while (uState && - AuAtomicCompareExchange(&pState->cvHasWork, uState - uMin, uState) != uState) - { - uState = pState->cvHasWork; - - if (uState < uMin) - { - uMin = uState; - } - } + return bSuccess; } bool ThreadPool::PollInternal(AuSPtr state, bool block, AuUInt32 &uCount) @@ -423,17 +291,14 @@ namespace Aurora::Async auto group = state->parent.lock(); - //state->pendingWorkItems.clear(); - { - AuAtomicAdd(&state->cvSleepCount, 1u); - AU_LOCK_GUARD(state->cvWorkMutex); + AU_LOCK_GUARD(state->sync.cvWorkMutex); do { group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second); - this->DoThing(state.get()); + state->sync.UpdateCVState(state.get()); // Consider blocking for more work if (!block) @@ -442,12 +307,11 @@ namespace Aurora::Async } // pre-wakeup thread terminating check - if (state->threadObject->Exiting()) + if (state->thread.pThread->Exiting()) { break; } - // Block if no work items are present if (state->pendingWorkItems.empty()) { @@ -456,7 +320,7 @@ namespace Aurora::Async break; } - state->cvVariable->WaitForSignal(); + state->sync.cvVariable->WaitForSignal(); if (this->shuttingdown_ & 2) { @@ -465,7 +329,7 @@ namespace Aurora::Async } // Post-wakeup thread terminating check - if (state->threadObject->Exiting()) + if (state->thread.pThread->Exiting()) { break; } @@ -474,7 +338,6 @@ namespace Aurora::Async (this->GetThreadState()->asyncLoop->GetSourceCount() > 1) || this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek())) { - AuAtomicSub(&state->cvSleepCount, 1u); return false; } @@ -490,12 +353,10 @@ namespace Aurora::Async if (group->workQueue.IsEmpty(this, state->id)) { - state->eventLs->Reset(); // ...until we're done - AuAtomicStore(&state->cvLSActive, 0u); + state->sync.eventLs->Reset(); // ...until we're done + AuAtomicStore(&state->sync.cvLSActive, 0u); } } - - AuAtomicSub(&state->cvSleepCount, 1u); } if (state->pendingWorkItems.empty()) @@ -513,24 +374,16 @@ namespace Aurora::Async } int runningTasks {}; - - auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis()); - - bool lowPrioCont {}; - bool lowPrioContCached {}; - - state->cookie++; - - int start = state->cookie; + auto uStartCookie = state->stackState.uStackCookie++; // Account for // while (AuAsync.GetCurrentPool()->runForever()); // in the first task (or deeper) - if (InRunnerMode() && tlsCallStack) // are we one call deep? + if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep? { auto queue = ToKernelWorkQueue(); - if ((this->uAtomicCounter == tlsCallStack) && + if ((this->uAtomicCounter == state->stackState.uStackCallDepth) && this->IsDepleted()) { return false; @@ -538,25 +391,20 @@ namespace Aurora::Async } // - for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) { - if (state->threadObject->Exiting()) + if (state->thread.pThread->Exiting()) { break; } - // Set the last frame time for a watchdog later down the line - state->lastFrameTime = Time::CurrentClockMS(); - - // Dispatch auto oops = itr->second; // Remove from our local job queue itr = state->pendingWorkItems.erase(itr); - tlsCallStack++; + state->stackState.uStackCallDepth++; //SysBenchmark(fmt::format("RunAsync: {}", block)); // Dispatch @@ -566,21 +414,19 @@ namespace Aurora::Async // Atomically decrement global task counter runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u); - tlsCallStack--; + state->stackState.uStackCallDepth--; - if (start != state->cookie) + if (uStartCookie != state->stackState.uStackCookie) { - start = state->cookie; + uStartCookie = state->stackState.uStackCookie; itr = state->pendingWorkItems.begin(); } } - gCurrentPool = oldTlsHandle; - // Return popped work back to the groups work pool when our -pump loops were preempted if (state->pendingWorkItems.size()) { - AU_LOCK_GUARD(state->cvWorkMutex); + AU_LOCK_GUARD(state->sync.cvWorkMutex); for (const auto &item : state->pendingWorkItems) { @@ -588,8 +434,8 @@ namespace Aurora::Async } state->pendingWorkItems.clear(); - state->cvVariable->Broadcast(); - state->eventLs->Set(); + state->sync.cvVariable->Broadcast(); + state->sync.eventLs->Set(); } // Account for @@ -631,29 +477,25 @@ namespace Aurora::Async // wait for regular prio work to complete { + for (auto pGroup : this->threadGroups_) { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); - - for (auto pGroup : this->threadGroups_) + if (!pGroup) { - if (!pGroup) + continue; + } + + AU_LOCK_GUARD(pGroup->workersMutex); + for (auto &[id, worker] : pGroup->workers) + { + if (trySelfPid == worker->id) { continue; } - for (auto &[id, worker] : pGroup->workers) - { - if (trySelfPid == worker->id) - { - continue; - } - - toBarrier.push_back(worker->id); - } + toBarrier.push_back(worker->id); } } - for (const auto &id : toBarrier) { if (trySelfPid == id) @@ -689,7 +531,7 @@ namespace Aurora::Async AuList threads; AuList> states; { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); for (auto pGroup : this->threadGroups_) { @@ -701,7 +543,7 @@ namespace Aurora::Async for (auto &[id, pState] : pGroup->workers) { // main loop: - if (pState && pState->cvWorkMutex && pState->cvVariable) + if (pState) { states.push_back(pState); pState->shuttingdown = true; @@ -712,10 +554,10 @@ namespace Aurora::Async } // thread object: - if (pState->bCreate) + if (pState->thread.bOwnsThread) { - pState->threadObject->SendExitSignal(); - threads.push_back(pState->threadObject); + pState->thread.pThread->SendExitSignal(); + threads.push_back(pState->thread.pThread); } // unrefreeze signals: @@ -728,13 +570,10 @@ namespace Aurora::Async } } + // Break all condvar loops, just in case + for (const auto &pState : states) { - for (const auto &pState : states) - { - AU_LOCK_GUARD(pState->cvWorkMutex); - pState->cvVariable->Broadcast(); - pState->eventLs->Set(); - } + pState->sync.SetEvent(); } // Final sync to exit @@ -764,11 +603,13 @@ namespace Aurora::Async // Is dead flag this->shutdownEvent_->Set(); + // if (pLocalRunner) { - pLocalRunner->bIsKiller = true; + pLocalRunner->shutdown.bIsKillerThread = true; } + // Notify observing threads of our work exhaustion for (const auto &wOther : this->listWeakDepsParents_) { if (auto pThat = AuTryLockMemoryType(wOther)) @@ -854,12 +695,12 @@ namespace Aurora::Async return {}; } - return pState->threadObject; + return pState->thread.pThread; } AuBST> ThreadPool::GetThreads() { - AU_LOCK_GUARD(rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); AuBST> ret; @@ -892,7 +733,7 @@ namespace Aurora::Async bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); auto group = GetGroup(workerId.first); auto currentWorkerId = GetCurrentThread().second; @@ -917,7 +758,7 @@ namespace Aurora::Async void ThreadPool::Signal(WorkerId_t workerId) { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); auto group = GetGroup(workerId.first); if (workerId.second == Async::kThreadIdAny) @@ -935,7 +776,7 @@ namespace Aurora::Async AuSPtr ThreadPool::WorkerToLoopSource(WorkerId_t workerId) { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); auto a = GetThreadHandle(workerId); if (!a) @@ -943,12 +784,12 @@ namespace Aurora::Async return {}; } - return a->asyncLoopSourceShared; + return a->sync.eventLs; } void ThreadPool::SyncAllSafe() { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); for (auto pGroup : this->threadGroups_) { @@ -1008,46 +849,6 @@ namespace Aurora::Async return worker->asyncLoop; } - void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode) - { - auto states = this->GetThreadHandles(workerId); - if (!states.size()) - { - SysPushErrorGeneric("Couldn't find requested worker"); - return; - } - - for (const auto &state : states) - { - state->runMode = mode.mode; - if (mode.freqMsTick) - { - state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000); - } - } - } - - ERunMode ThreadPool::GetCurrentThreadRunMode() - { - auto state = this->GetThreadState(); - if (!state) - { - return ERunMode::eEfficient; - } - return state->runMode; - } - - ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId) - { - auto worker = this->GetThreadHandle(workerId); - if (!worker) - { - SysPushErrorGeneric("Couldn't find requested worker"); - return {}; - } - return worker->runMode; - } - bool ThreadPool::IsSelfDepleted() { auto queue = ToKernelWorkQueue(); @@ -1152,7 +953,7 @@ namespace Aurora::Async { if (auto pState = this->GetThreadHandle(workerId)) { - AuAtomicAdd(&pState->uShutdownFence, 1u); + AuAtomicAdd(&pState->shutdown.uShutdownFence, 1u); } } @@ -1160,7 +961,7 @@ namespace Aurora::Async { if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId()))) { - return (AuUInt64(pState->uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie); + return (AuUInt64(pState->shutdown.uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie); } else { @@ -1185,7 +986,7 @@ namespace Aurora::Async if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId()))) { - return uThreadCookie != pState->uShutdownFence; + return uThreadCookie != pState->shutdown.uShutdownFence; } else { @@ -1202,7 +1003,7 @@ namespace Aurora::Async if (create) { - gCurrentPool = AuSharedFromThis(); + tlsCurrentThreadPool = AuSharedFromThis(); } AuSPtr pGroup; @@ -1234,73 +1035,34 @@ namespace Aurora::Async } } - auto threadState = AuMakeShared(); - if (!threadState) + auto pThreadState = pGroup->CreateWorker(workerId, create); + if (!pThreadState) { - SysPushErrorMemory(); return {}; } - threadState->parent = pGroup; - threadState->id = workerId; - threadState->asyncLoop = AuMakeShared(); - if (!threadState->asyncLoop) - { - SysPushErrorMemory(); - return {}; - } - - threadState->eventLs = AuLoop::NewLSAsync(); - if (!threadState->eventLs) - { - SysPushErrorMemory(); - return {}; - } - - threadState->asyncLoopSourceShared = threadState->eventLs; - - threadState->asyncLoop->pParent = threadState.get(); - threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds - threadState->runMode = ERunMode::eEfficient; - - if (!threadState->asyncLoop) - { - SysPushErrorMemory(); - return {}; - } - - if (!threadState->asyncLoop->Init()) - { - SysPushErrorNested(); - return {}; - } - - threadState->asyncLoop->SourceAdd(threadState->eventLs); - - threadState->bCreate = create; - if (!create) { - threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo( - AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)), + pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo( + AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, workerId)), AuThreads::IThreadVectorsFunctional::OnExit_t{}), gRuntimeConfig.async.threadPoolDefaultStackSize )); - if (!threadState->threadObject) + if (!pThreadState->thread.pThread) { return {}; } - threadState->threadObject->Run(); + pThreadState->thread.pThread->Run(); } else { - threadState->threadObject = AuSPtr(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){}); + pThreadState->thread.pThread = AuSPtr(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){}); // TODO: this is just a hack // we should implement this properly - threadState->threadObject->AddLastHopeTlsHook(AuMakeShared([]() -> void + pThreadState->thread.pThread->AddLastHopeTlsHook(AuMakeShared([]() -> void { }, []() -> void @@ -1312,13 +1074,11 @@ namespace Aurora::Async } })); - // - gCurrentPool = AuWeakFromThis(); + tlsCurrentThreadPool = AuWeakFromThis(); tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId); } - pGroup->AddWorker(workerId.second, threadState); - + pGroup->AddWorker(workerId.second, pThreadState); return true; } @@ -1377,7 +1137,11 @@ namespace Aurora::Async void ThreadPool::Entrypoint(WorkerId_t id) { - gCurrentPool = AuWeakFromThis(); + { + AU_LOCK_GUARD(this->pRWReadView); + } + + tlsCurrentThreadPool = AuWeakFromThis(); tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id); auto job = GetThreadState(); @@ -1386,7 +1150,7 @@ namespace Aurora::Async if (id != WorkerId_t {0, 0}) { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); if (!this->shuttingdown_ && !job->rejecting) { @@ -1418,7 +1182,7 @@ namespace Aurora::Async return; } - state->BroadCast(); + state->SignalAll(); { if (AuExchange(jobWorker->bAlreadyDoingExitTick, true)) @@ -1457,11 +1221,9 @@ namespace Aurora::Async jobWorker->isDeadEvent->Set(); - jobWorker->bAlreadyDoingExitTick = false; - jobWorker->bBreakEarly = true; + jobWorker->shutdown.bBreakMainLoop = true; } - } void ThreadPool::ThisExiting() @@ -1473,7 +1235,7 @@ namespace Aurora::Async AuList> features; { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); pLocalState->isDeadEvent->Set(); @@ -1530,7 +1292,7 @@ namespace Aurora::Async AuSPtr ThreadPool::GetThreadState() { - auto thread = gCurrentPool.lock(); + auto thread = tlsCurrentThreadPool.lock(); if (!thread) { return {}; @@ -1545,7 +1307,6 @@ namespace Aurora::Async #endif auto worker = *tlsWorkerId; - auto state = GetGroup(worker.first); if (!state) { @@ -1557,7 +1318,7 @@ namespace Aurora::Async AuSPtr ThreadPool::GetThreadStateNoWarn() { - auto thread = gCurrentPool.lock(); + auto thread = tlsCurrentThreadPool.lock(); if (!thread) { return {}; @@ -1592,7 +1353,7 @@ namespace Aurora::Async AuList> ThreadPool::GetThreadHandles(WorkerId_t id) { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); + AU_LOCK_GUARD(this->pRWReadView); auto group = GetGroup(id.first); if (!group) @@ -1626,34 +1387,4 @@ namespace Aurora::Async return AuMakeShared(); } - struct KeepGroupAlive - { - KeepGroupAlive(AuSPtr pPool) : pPool(AuStaticCast(pPool)) - { - AuAtomicAdd(&this->pPool->uAtomicCounter, 1u); - } - - ~KeepGroupAlive() - { - auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u); - - if (uNow == 0) - { - for (const auto &pState : this->pPool->threadGroups_) - { - if (pState) - { - pState->BroadCast(); - } - } - } - } - - AuSPtr pPool; - }; - - AUKN_SYM AuSPtr KeepThreadPoolAlive(AuSPtr pPool) - { - return AuMakeSharedThrow(pPool); - } } \ No newline at end of file diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index 373a2083..79ab33a1 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -7,6 +7,8 @@ ***/ #pragma once +#include "IThreadPoolInternal.hpp" + namespace Aurora::Async { struct GroupState; @@ -15,21 +17,10 @@ namespace Aurora::Async //class WorkItem; - struct IThreadPoolInternal - { - virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) = 0; - virtual void Run(WorkerId_t target, AuSPtr runnable) = 0; - virtual AuSPtr GetThreadHandle(WorkerId_t id) = 0; - virtual IThreadPool *ToThreadPool() = 0; - - AuUInt32 uAtomicCounter {}; - AuUInt32 uAtomicIOProcessors {}; - AuUInt32 uAtomicIOProcessorsWorthlessSources {}; - AuUInt32 uAtomicShutdownCookie {}; - }; - - - struct ThreadPool : IThreadPool, IThreadPoolInternal, AuEnableSharedFromThis + struct ThreadPool : + IThreadPool, + IThreadPoolInternal, + AuEnableSharedFromThis { ThreadPool(); @@ -81,9 +72,6 @@ namespace Aurora::Async virtual AuSPtr ToKernelWorkQueue() override; virtual AuSPtr ToKernelWorkQueue(WorkerId_t workerId) override; - virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override; - virtual ERunMode GetCurrentThreadRunMode() override; - virtual ERunMode GetThreadRunMode(WorkerId_t workerId) override; virtual AuSPtr GetShutdownEvent() override; virtual void AddDependency(AuSPtr pPool) override; @@ -105,7 +93,6 @@ namespace Aurora::Async bool InternalRunOne(AuSPtr, bool block, AuUInt32 &uCount); bool PollInternal(AuSPtr, bool block, AuUInt32 &uCount); - void DoThing(ThreadState *pState); size_t GetThreadWorkersCount(ThreadGroup_t group); @@ -152,7 +139,10 @@ namespace Aurora::Async AuSPtr threadGroups_[255]; AuUInt32 shuttingdown_ {}; bool shutdown {}; + AuThreadPrimitives::RWRenterableLock rwlock_; + AuThreading::IWaitable *pRWReadView {}; + AuThreadPrimitives::Event shutdownEvent_; bool runnersRunning_ {}; AuList> listWeakDeps_; diff --git a/Source/Async/ThreadState.hpp b/Source/Async/ThreadState.hpp index 1cfe776d..b349df81 100644 --- a/Source/Async/ThreadState.hpp +++ b/Source/Async/ThreadState.hpp @@ -8,11 +8,11 @@ #pragma once #include "AsyncRunnable.hpp" +#include "AuThreadState.hpp" +#include "AuGroupWorkQueue.hpp" namespace Aurora::Async { - using WorkEntry_t = AuPair>; - // TODO: this is a hack because i havent implemented an epoll abstraction yet struct AsyncAppWaitSourceRequest { @@ -26,10 +26,9 @@ namespace Aurora::Async struct GroupState; struct AsyncLoop; - struct ThreadState + struct ThreadState : ThreadStateBase { ThreadState() : running(true, false, true), - cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer())), isDeadEvent(false, false, true) { @@ -38,8 +37,6 @@ namespace Aurora::Async // :vomit: WorkerId_t id; AuUInt8 multipopCount = 32; - AuUInt32 lastFrameTime {}; - AuThreads::ThreadShared_t threadObject; AuWPtr parent; AuThreadPrimitives::Semaphore syncSema; AuThreadPrimitives::Event isDeadEvent; @@ -53,14 +50,8 @@ namespace Aurora::Async //bool running; AuList loopSources; AuList pendingWorkItems; - AuSPtr asyncLoop; - Utility::RateLimiter rateLimiter; - ERunMode runMode; int cookie {0}; bool bAlreadyDoingExitTick {}; - bool bBreakEarly {}; - bool bIsKiller {}; - bool bCreate {}; // AuThreadPrimitives::SpinLock externalFencesLock; @@ -68,31 +59,8 @@ namespace Aurora::Async // - AuThreadPrimitives::ConditionMutex cvWorkMutex; - AuThreadPrimitives::ConditionVariable cvVariable; - AuAUInt32 cvSleepCount {}; - AuAUInt32 cvLSActive {}; - AuAUInt32 cvHasWork {}; - AuSPtr eventLs; - AuSPtr asyncLoopSourceShared; - - inline void SetEvent(bool bBoth = true) - { - if (auto pEvent = this->eventLs) - { - if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0) - { - pEvent->Set(); - } - } - - if (bBoth) - { - this->cvVariable->Signal(); - } - } + // - AuUInt32 uShutdownFence { 1 }; }; } \ No newline at end of file diff --git a/Source/Async/ThreadWorkerQueueShim.cpp b/Source/Async/ThreadWorkerQueueShim.cpp index 566195fb..2bf3780a 100644 --- a/Source/Async/ThreadWorkerQueueShim.cpp +++ b/Source/Async/ThreadWorkerQueueShim.cpp @@ -54,13 +54,10 @@ namespace Aurora::Async void AsyncLoop::Schedule() { - if (AuThreads::GetThread() != this->pParent->threadObject.get()) + if (AuThreads::GetThread() != this->pParent->thread.pThread.get()) { AuAtomicAdd(&this->commitPending_, 1u); - { - AU_LOCK_GUARD(this->pParent->parent.lock()->workQueue.mutex); - this->pParent->SetEvent(); - } + this->pParent->sync.SetEvent(); } else { diff --git a/Source/Async/ThreadWorkerQueueShim.hpp b/Source/Async/ThreadWorkerQueueShim.hpp index 7dc98323..65b22ced 100644 --- a/Source/Async/ThreadWorkerQueueShim.hpp +++ b/Source/Async/ThreadWorkerQueueShim.hpp @@ -14,7 +14,7 @@ namespace Aurora::Async { struct AsyncLoop : AuLoop::LoopQueue { - ThreadState *pParent; + ThreadStateBase *pParent; void OnFrame(); virtual bool AddCallback (const AuSPtr &source, const AuSPtr &subscriber) override; diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 42e2b21a..7b0b2399 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -9,7 +9,7 @@ #include "Async.hpp" #include "WorkItem.hpp" #include "AsyncApp.hpp" -#include "Schedular.hpp" +#include "AuSchedular.hpp" #if defined(AURORA_COMPILER_CLANG) // warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch @@ -37,7 +37,7 @@ namespace Aurora::Async if (auto pWorker = this->GetState()) { - this->optOtherCookie = pWorker->uShutdownFence; + this->optOtherCookie = pWorker->shutdown.uShutdownFence; } } @@ -288,7 +288,7 @@ namespace Aurora::Async { if (auto pWorker = this->GetState()) { - if (this->optOtherCookie.value() != pWorker->uShutdownFence) + if (this->optOtherCookie.value() != pWorker->shutdown.uShutdownFence) { this->Fail(); return false;