diff --git a/Source/Async/AuGroupWorkQueue.cpp b/Source/Async/AuGroupWorkQueue.cpp index 1c6107a4..8c816f4e 100644 --- a/Source/Async/AuGroupWorkQueue.cpp +++ b/Source/Async/AuGroupWorkQueue.cpp @@ -66,7 +66,7 @@ namespace Aurora::Async #endif } - void GroupWorkQueue::Dequeue(AuList &queue, + void GroupWorkQueue::Dequeue(AuListOfHeap &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t id) { diff --git a/Source/Async/AuGroupWorkQueue.hpp b/Source/Async/AuGroupWorkQueue.hpp index 32bfa075..37db3c13 100644 --- a/Source/Async/AuGroupWorkQueue.hpp +++ b/Source/Async/AuGroupWorkQueue.hpp @@ -26,6 +26,6 @@ namespace Aurora::Async void AddWorkEntry(WorkEntry_t entry); - void Dequeue(AuList &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd); + void Dequeue(AuListOfHeap &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd); }; } \ No newline at end of file diff --git a/Source/Async/IThreadPoolInternal.hpp b/Source/Async/IThreadPoolInternal.hpp index 6593f4bd..f9670d0f 100644 --- a/Source/Async/IThreadPoolInternal.hpp +++ b/Source/Async/IThreadPoolInternal.hpp @@ -21,7 +21,7 @@ namespace Aurora::Async AuUInt32 uAtomicCounter {}; AuUInt32 uAtomicIOProcessors {}; - AuUInt32 uAtomicIOProcessorsWorthlessSources {}; + AuUInt32 uAtomicIOProcessorsWorthlessSources {}; // TODO: forgot about this. nevertheless, the default thread local io context doesnt use this, nor should it really because we cant really account for external is relevant conditions. AuUInt32 uAtomicShutdownCookie {}; }; } \ No newline at end of file diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 706fe5a4..ab98770a 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -113,7 +113,7 @@ namespace Aurora::Async return true; } - if (!this->shuttingdown_ || + if (!AuAtomicLoad(&this->shuttingdown_) || bTimedOut) { return false; @@ -236,35 +236,59 @@ namespace Aurora::Async return this->runnersRunning_; } + +#define ASYNC_THREADGROUP_TLS_PREP \ + auto &weakTlsHandle = tlsCurrentThreadPool; \ + auto tlsHandle = AuTryLockMemoryType(weakTlsHandle); \ + if (!tlsHandle || tlsHandle.get() != this) \ + { \ + weakTlsHandle = AuSharedFromThis(); \ + } + +#define ASYNC_THREADGROUP_TLS_UNSET \ + if (tlsHandle && tlsHandle.get() != this) \ + { \ + tlsCurrentThreadPool = weakTlsHandle; \ + } + bool ThreadPool::Poll() { AuUInt32 uCount {}; - return InternalRunOne(GetThreadStateNoWarn(), false, false, uCount); + ASYNC_THREADGROUP_TLS_PREP; + auto bRet = InternalRunOne(GetThreadStateLocal(), false, false, uCount); + ASYNC_THREADGROUP_TLS_UNSET; + return bRet; } bool ThreadPool::RunOnce() { AuUInt32 uCount {}; - return InternalRunOne(GetThreadStateNoWarn(), true, false, uCount); + ASYNC_THREADGROUP_TLS_PREP; + auto bRet = InternalRunOne(GetThreadStateLocal(), true, false, uCount); + ASYNC_THREADGROUP_TLS_UNSET; + return bRet; } bool ThreadPool::Run() { bool ranOnce {}; - auto pJobRunner = GetThreadStateNoWarn(); + ASYNC_THREADGROUP_TLS_PREP; + + auto pJobRunner = GetThreadStateLocal(); if (!pJobRunner) { + ASYNC_THREADGROUP_TLS_UNSET; + this->shutdownEvent_->LockMS(0); return true; } - auto oldTlsHandle = AuExchange(tlsCurrentThreadPool, AuSharedFromThis()); auto auThread = AuThreads::GetThread(); while ((!auThread->Exiting()) && - ((this->shuttingdown_ & 2) != 2) && + ((AuAtomicLoad(&this->shuttingdown_) & 2) != 2) && (!pJobRunner->shutdown.bBreakMainLoop)) { AuUInt32 uCount {}; @@ -272,7 +296,7 @@ namespace Aurora::Async // Do work (blocking) if (!InternalRunOne(pJobRunner, true, true, uCount)) { - if ((this->shuttingdown_ & 2) == 2) + if ((AuAtomicLoad(&this->shuttingdown_) & 2) == 2) { return ranOnce; } @@ -280,20 +304,21 @@ namespace Aurora::Async ranOnce = true; } - tlsCurrentThreadPool = oldTlsHandle; + ASYNC_THREADGROUP_TLS_UNSET; return ranOnce; } bool ThreadPool::InternalRunOne(AuSPtr state, bool block, bool bUntilWork, AuUInt32 &uCount) { + bool bSuccess {}; + if (!state) { SysPushErrorUninitialized("Not an async thread"); return false; } - bool bSuccess {}; EarlyExitTick(); { @@ -373,32 +398,32 @@ namespace Aurora::Async break; } - // pre-wakeup thread terminating check - if (state->thread.pThread->Exiting()) - { - break; - } - // Block if no work items are present if (state->pendingWorkItems.empty()) { - if (this->shuttingdown_ & 2) + // pre-wakeup thread terminating check + if (state->thread.pThread->Exiting()) + { + break; + } + + if (AuAtomicLoad(&this->shuttingdown_) & 2) { break; } state->sync.cvVariable->WaitForSignal(); - if (this->shuttingdown_ & 2) + if (AuAtomicLoad(&this->shuttingdown_) & 2) { break; } - } - // Post-wakeup thread terminating check - if (state->thread.pThread->Exiting()) - { - break; + // Post-wakeup thread terminating check + if (state->thread.pThread->Exiting()) + { + break; + } } if (state->pendingWorkItems.empty() && ( @@ -431,8 +456,8 @@ namespace Aurora::Async { if (InRunnerMode()) { - if ((this->uAtomicCounter == 0) && - this->IsDepleted()) + if ((AuAtomicLoad(&this->uAtomicCounter) == 0) && + this->IsDepleted(state)) { Shutdown(); } @@ -449,8 +474,8 @@ namespace Aurora::Async // in the first task (or deeper) if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep? { - if ((this->uAtomicCounter == state->stackState.uStackCallDepth) && - this->IsDepleted()) + if ((AuAtomicLoad(&this->uAtomicCounter) == state->stackState.uStackCallDepth) && + this->IsDepleted(state)) { return false; } @@ -513,8 +538,8 @@ namespace Aurora::Async if (InRunnerMode()) { if ((runningTasks == 0) && - (this->uAtomicCounter == 0) && - this->IsDepleted()) + (AuAtomicLoad(&this->uAtomicCounter) == 0) && + this->IsDepleted(state)) { Shutdown(); } @@ -527,6 +552,8 @@ namespace Aurora::Async // This is our expected behaviour. Any changes will likely introduce hard to catch bugs across various softwares and exit conditions. void ThreadPool::Shutdown() { + AU_DEBUG_MEMCRUNCH; + auto trySelfPid = AuAsync::GetCurrentWorkerPId(); // Update shutting down flag @@ -690,7 +717,7 @@ namespace Aurora::Async continue; } - if (!pThat->IsSelfDepleted()) + if (!pThat->IsSelfDepleted(nullptr)) { continue; } @@ -714,23 +741,30 @@ namespace Aurora::Async AuUInt32 ThreadPool::PollAndCount(bool bStrict) { AuUInt32 uCount {}; + ASYNC_THREADGROUP_TLS_PREP; auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, false, uCount); + ASYNC_THREADGROUP_TLS_UNSET; return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0); } AuUInt32 ThreadPool::RunAllPending() { AuUInt32 uCount {}; - bool ranAtLeastOne {}; + AuUInt32 uCountTotal {}; + + ASYNC_THREADGROUP_TLS_PREP; do { uCount = 0; - ranAtLeastOne |= this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount); + (void)this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount); + uCountTotal += uCount; } while (uCount); - return uCount ? uCount : false; + ASYNC_THREADGROUP_TLS_UNSET; + + return uCountTotal; } AuSPtr ThreadPool::NewWorkItem(const WorkerId_t &worker, @@ -771,6 +805,8 @@ namespace Aurora::Async AuBST> ThreadPool::GetThreads() { + AU_DEBUG_MEMCRUNCH; + AuBST> ret; for (auto pGroup : this->threadGroups_) @@ -971,15 +1007,33 @@ namespace Aurora::Async return worker->asyncLoop; } - bool ThreadPool::IsSelfDepleted() + bool ThreadPool::IsSelfDepleted(const AuSPtr &pState) { - auto queue = ToKernelWorkQueue(); - return (!queue || queue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors); + AuSPtr pLoopQueue; + + if (pState) + { + pLoopQueue = pState->asyncLoop; + } + else + { + if (auto pLocalThread = this->GetThreadStateLocal()) + { + pLoopQueue = pLocalThread->asyncLoop; + } + } + + if (!pLoopQueue) + { + return true; + } + + return pLoopQueue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors; } - bool ThreadPool::IsDepleted() + bool ThreadPool::IsDepleted(const AuSPtr &state) { - if (!IsSelfDepleted()) + if (!IsSelfDepleted(state)) { return false; } @@ -988,7 +1042,7 @@ namespace Aurora::Async { if (auto pThat = AuTryLockMemoryType(wOther)) { - if (!pThat->IsSelfDepleted()) + if (!pThat->IsSelfDepleted(nullptr)) { return false; } @@ -1178,6 +1232,12 @@ namespace Aurora::Async return {}; } + // TODO: ROXTL + if (this->pHeap) + { + pThreadState->pendingWorkItems.get_allocator().SetHeapRaw(this->pHeap.get()); + } + if (!create) { pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo( @@ -1289,7 +1349,7 @@ namespace Aurora::Async { AU_LOCK_GUARD(this->pRWReadView); - if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions) + if (!AuAtomicLoad(&this->shuttingdown_) && !job->shutdown.bDropSubmissions) { // Pump and barrier + reject all after atomically Barrier(id, 0, false, true); @@ -1306,6 +1366,11 @@ namespace Aurora::Async void ThreadPool::EarlyExitTick() { + if ((AuAtomicLoad(&this->shuttingdown_) & 2) != 2) + { + return; + } + auto jobWorker = GetThreadState(); auto state = jobWorker->parent.lock(); if (!jobWorker) @@ -1314,11 +1379,6 @@ namespace Aurora::Async return; } - if ((this->shuttingdown_ & 2) != 2) - { - return; - } - state->SignalAll(); { @@ -1364,6 +1424,8 @@ namespace Aurora::Async void ThreadPool::ThisExiting() { + AU_DEBUG_MEMCRUNCH; + auto id = GetCurrentThread(); auto state = GetGroup(id.first); @@ -1477,6 +1539,26 @@ namespace Aurora::Async return state->GetThreadByIndex(worker.second); } + AuSPtr ThreadPool::GetThreadStateLocal() + { + auto worker = *tlsWorkerId; + + if (auto pSelf = AuTryLockMemoryType(worker.pool)) + { + auto state = GetGroup(worker.first); + if (!state) + { + return {}; + } + + return state->GetThreadByIndex(worker.second); + } + else + { + return {}; + } + } + AuSPtr ThreadPool::GetThreadHandle(WorkerId_t id) { auto group = GetGroup(id.first); @@ -1513,7 +1595,7 @@ namespace Aurora::Async } } - return ret; + return AuMove(ret); } AUKN_SYM AuSPtr NewThreadPool() diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index 4b3a3c26..cd6291a3 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -88,8 +88,8 @@ namespace Aurora::Async virtual AuUInt64 QueryAbortFence(AuOptional optWorkerId) override; virtual bool QueryShouldAbort(AuOptional optWorkerId, AuUInt64 uFenceMagic) override; - bool IsSelfDepleted(); - bool IsDepleted(); + bool IsSelfDepleted(const AuSPtr &state); + bool IsDepleted(const AuSPtr &state); //virtual bool ScheduleLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) override; @@ -146,6 +146,7 @@ namespace Aurora::Async AuSPtr GetGroup(ThreadGroup_t type); AuSPtr GetThreadState(); AuSPtr GetThreadStateNoWarn(); + AuSPtr GetThreadStateLocal(); AuList> GetThreadHandles(WorkerId_t id); AuSPtr threadGroups_[255]; @@ -160,6 +161,8 @@ namespace Aurora::Async AuList> listWeakDeps_; AuList> listWeakDepsParents_; + AuMemory::AllocHeapUnique_t pHeap { AuMemory::AllocHeapUnique(512 * 1024) }; + friend struct KeepGroupAlive; }; } \ No newline at end of file diff --git a/Source/Async/ThreadState.hpp b/Source/Async/ThreadState.hpp index 8c943347..2fec721f 100644 --- a/Source/Async/ThreadState.hpp +++ b/Source/Async/ThreadState.hpp @@ -30,7 +30,7 @@ namespace Aurora::Async bool shuttingdown {}; bool exitingflag2 {}; AuThreadPrimitives::Event running; - AuList pendingWorkItems; + AuListOfHeap pendingWorkItems; bool bAlreadyDoingExitTick {}; AuThreadPrimitives::SpinLock externalFencesLock; AuList externalFences; diff --git a/Source/IO/AuIOProcessor.cpp b/Source/IO/AuIOProcessor.cpp index 8caf868d..80702046 100644 --- a/Source/IO/AuIOProcessor.cpp +++ b/Source/IO/AuIOProcessor.cpp @@ -529,6 +529,11 @@ namespace Aurora::IO void IOProcessor::WakeupThread() { this->items.cvEvent->Set(); + if (this->asyncWorker) + { + auto worker = this->asyncWorker.Value(); + worker.GetPool()->Wakeup(worker); + } } bool IOProcessor::AddEventListener(const AuSPtr &eventListener)