[*] Minor thread pool fixes / improvements before some bigger commits are done

This commit is contained in:
Reece Wilson 2024-09-05 06:38:16 +01:00
parent aafb64e740
commit f466df8464
7 changed files with 141 additions and 51 deletions

View File

@ -66,7 +66,7 @@ namespace Aurora::Async
#endif #endif
} }
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, void GroupWorkQueue::Dequeue(AuListOfHeap<WorkEntry_t> &queue,
AuUInt uMaxPopCount, AuUInt uMaxPopCount,
AuAsync::ThreadId_t id) AuAsync::ThreadId_t id)
{ {

View File

@ -26,6 +26,6 @@ namespace Aurora::Async
void AddWorkEntry(WorkEntry_t entry); void AddWorkEntry(WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd); void Dequeue(AuListOfHeap<WorkEntry_t> &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd);
}; };
} }

View File

@ -21,7 +21,7 @@ namespace Aurora::Async
AuUInt32 uAtomicCounter {}; AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {}; AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {}; AuUInt32 uAtomicIOProcessorsWorthlessSources {}; // TODO: forgot about this. nevertheless, the default thread local io context doesnt use this, nor should it really because we cant really account for external is relevant conditions.
AuUInt32 uAtomicShutdownCookie {}; AuUInt32 uAtomicShutdownCookie {};
}; };
} }

View File

@ -113,7 +113,7 @@ namespace Aurora::Async
return true; return true;
} }
if (!this->shuttingdown_ || if (!AuAtomicLoad(&this->shuttingdown_) ||
bTimedOut) bTimedOut)
{ {
return false; return false;
@ -236,35 +236,59 @@ namespace Aurora::Async
return this->runnersRunning_; return this->runnersRunning_;
} }
#define ASYNC_THREADGROUP_TLS_PREP \
auto &weakTlsHandle = tlsCurrentThreadPool; \
auto tlsHandle = AuTryLockMemoryType(weakTlsHandle); \
if (!tlsHandle || tlsHandle.get() != this) \
{ \
weakTlsHandle = AuSharedFromThis(); \
}
#define ASYNC_THREADGROUP_TLS_UNSET \
if (tlsHandle && tlsHandle.get() != this) \
{ \
tlsCurrentThreadPool = weakTlsHandle; \
}
bool ThreadPool::Poll() bool ThreadPool::Poll()
{ {
AuUInt32 uCount {}; AuUInt32 uCount {};
return InternalRunOne(GetThreadStateNoWarn(), false, false, uCount); ASYNC_THREADGROUP_TLS_PREP;
auto bRet = InternalRunOne(GetThreadStateLocal(), false, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return bRet;
} }
bool ThreadPool::RunOnce() bool ThreadPool::RunOnce()
{ {
AuUInt32 uCount {}; AuUInt32 uCount {};
return InternalRunOne(GetThreadStateNoWarn(), true, false, uCount); ASYNC_THREADGROUP_TLS_PREP;
auto bRet = InternalRunOne(GetThreadStateLocal(), true, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return bRet;
} }
bool ThreadPool::Run() bool ThreadPool::Run()
{ {
bool ranOnce {}; bool ranOnce {};
auto pJobRunner = GetThreadStateNoWarn(); ASYNC_THREADGROUP_TLS_PREP;
auto pJobRunner = GetThreadStateLocal();
if (!pJobRunner) if (!pJobRunner)
{ {
ASYNC_THREADGROUP_TLS_UNSET;
this->shutdownEvent_->LockMS(0); this->shutdownEvent_->LockMS(0);
return true; return true;
} }
auto oldTlsHandle = AuExchange(tlsCurrentThreadPool, AuSharedFromThis());
auto auThread = AuThreads::GetThread(); auto auThread = AuThreads::GetThread();
while ((!auThread->Exiting()) && while ((!auThread->Exiting()) &&
((this->shuttingdown_ & 2) != 2) && ((AuAtomicLoad(&this->shuttingdown_) & 2) != 2) &&
(!pJobRunner->shutdown.bBreakMainLoop)) (!pJobRunner->shutdown.bBreakMainLoop))
{ {
AuUInt32 uCount {}; AuUInt32 uCount {};
@ -272,7 +296,7 @@ namespace Aurora::Async
// Do work (blocking) // Do work (blocking)
if (!InternalRunOne(pJobRunner, true, true, uCount)) if (!InternalRunOne(pJobRunner, true, true, uCount))
{ {
if ((this->shuttingdown_ & 2) == 2) if ((AuAtomicLoad(&this->shuttingdown_) & 2) == 2)
{ {
return ranOnce; return ranOnce;
} }
@ -280,20 +304,21 @@ namespace Aurora::Async
ranOnce = true; ranOnce = true;
} }
tlsCurrentThreadPool = oldTlsHandle; ASYNC_THREADGROUP_TLS_UNSET;
return ranOnce; return ranOnce;
} }
bool ThreadPool::InternalRunOne(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount) bool ThreadPool::InternalRunOne(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{ {
bool bSuccess {};
if (!state) if (!state)
{ {
SysPushErrorUninitialized("Not an async thread"); SysPushErrorUninitialized("Not an async thread");
return false; return false;
} }
bool bSuccess {};
EarlyExitTick(); EarlyExitTick();
{ {
@ -373,33 +398,33 @@ namespace Aurora::Async
break; break;
} }
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
// pre-wakeup thread terminating check // pre-wakeup thread terminating check
if (state->thread.pThread->Exiting()) if (state->thread.pThread->Exiting())
{ {
break; break;
} }
// Block if no work items are present if (AuAtomicLoad(&this->shuttingdown_) & 2)
if (state->pendingWorkItems.empty())
{
if (this->shuttingdown_ & 2)
{ {
break; break;
} }
state->sync.cvVariable->WaitForSignal(); state->sync.cvVariable->WaitForSignal();
if (this->shuttingdown_ & 2) if (AuAtomicLoad(&this->shuttingdown_) & 2)
{ {
break; break;
} }
}
// Post-wakeup thread terminating check // Post-wakeup thread terminating check
if (state->thread.pThread->Exiting()) if (state->thread.pThread->Exiting())
{ {
break; break;
} }
}
if (state->pendingWorkItems.empty() && ( if (state->pendingWorkItems.empty() && (
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) || (this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
@ -431,8 +456,8 @@ namespace Aurora::Async
{ {
if (InRunnerMode()) if (InRunnerMode())
{ {
if ((this->uAtomicCounter == 0) && if ((AuAtomicLoad(&this->uAtomicCounter) == 0) &&
this->IsDepleted()) this->IsDepleted(state))
{ {
Shutdown(); Shutdown();
} }
@ -449,8 +474,8 @@ namespace Aurora::Async
// in the first task (or deeper) // in the first task (or deeper)
if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep? if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep?
{ {
if ((this->uAtomicCounter == state->stackState.uStackCallDepth) && if ((AuAtomicLoad(&this->uAtomicCounter) == state->stackState.uStackCallDepth) &&
this->IsDepleted()) this->IsDepleted(state))
{ {
return false; return false;
} }
@ -513,8 +538,8 @@ namespace Aurora::Async
if (InRunnerMode()) if (InRunnerMode())
{ {
if ((runningTasks == 0) && if ((runningTasks == 0) &&
(this->uAtomicCounter == 0) && (AuAtomicLoad(&this->uAtomicCounter) == 0) &&
this->IsDepleted()) this->IsDepleted(state))
{ {
Shutdown(); Shutdown();
} }
@ -527,6 +552,8 @@ namespace Aurora::Async
// This is our expected behaviour. Any changes will likely introduce hard to catch bugs across various softwares and exit conditions. // This is our expected behaviour. Any changes will likely introduce hard to catch bugs across various softwares and exit conditions.
void ThreadPool::Shutdown() void ThreadPool::Shutdown()
{ {
AU_DEBUG_MEMCRUNCH;
auto trySelfPid = AuAsync::GetCurrentWorkerPId(); auto trySelfPid = AuAsync::GetCurrentWorkerPId();
// Update shutting down flag // Update shutting down flag
@ -690,7 +717,7 @@ namespace Aurora::Async
continue; continue;
} }
if (!pThat->IsSelfDepleted()) if (!pThat->IsSelfDepleted(nullptr))
{ {
continue; continue;
} }
@ -714,23 +741,30 @@ namespace Aurora::Async
AuUInt32 ThreadPool::PollAndCount(bool bStrict) AuUInt32 ThreadPool::PollAndCount(bool bStrict)
{ {
AuUInt32 uCount {}; AuUInt32 uCount {};
ASYNC_THREADGROUP_TLS_PREP;
auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, false, uCount); auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0); return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0);
} }
AuUInt32 ThreadPool::RunAllPending() AuUInt32 ThreadPool::RunAllPending()
{ {
AuUInt32 uCount {}; AuUInt32 uCount {};
bool ranAtLeastOne {}; AuUInt32 uCountTotal {};
ASYNC_THREADGROUP_TLS_PREP;
do do
{ {
uCount = 0; uCount = 0;
ranAtLeastOne |= this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount); (void)this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount);
uCountTotal += uCount;
} }
while (uCount); while (uCount);
return uCount ? uCount : false; ASYNC_THREADGROUP_TLS_UNSET;
return uCountTotal;
} }
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker, AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker,
@ -771,6 +805,8 @@ namespace Aurora::Async
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads() AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
{ {
AU_DEBUG_MEMCRUNCH;
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret; AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
for (auto pGroup : this->threadGroups_) for (auto pGroup : this->threadGroups_)
@ -971,15 +1007,33 @@ namespace Aurora::Async
return worker->asyncLoop; return worker->asyncLoop;
} }
bool ThreadPool::IsSelfDepleted() bool ThreadPool::IsSelfDepleted(const AuSPtr<ThreadState> &pState)
{ {
auto queue = ToKernelWorkQueue(); AuSPtr<AuLoop::ILoopQueue> pLoopQueue;
return (!queue || queue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors);
if (pState)
{
pLoopQueue = pState->asyncLoop;
}
else
{
if (auto pLocalThread = this->GetThreadStateLocal())
{
pLoopQueue = pLocalThread->asyncLoop;
}
} }
bool ThreadPool::IsDepleted() if (!pLoopQueue)
{ {
if (!IsSelfDepleted()) return true;
}
return pLoopQueue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors;
}
bool ThreadPool::IsDepleted(const AuSPtr<ThreadState> &state)
{
if (!IsSelfDepleted(state))
{ {
return false; return false;
} }
@ -988,7 +1042,7 @@ namespace Aurora::Async
{ {
if (auto pThat = AuTryLockMemoryType(wOther)) if (auto pThat = AuTryLockMemoryType(wOther))
{ {
if (!pThat->IsSelfDepleted()) if (!pThat->IsSelfDepleted(nullptr))
{ {
return false; return false;
} }
@ -1178,6 +1232,12 @@ namespace Aurora::Async
return {}; return {};
} }
// TODO: ROXTL
if (this->pHeap)
{
pThreadState->pendingWorkItems.get_allocator().SetHeapRaw(this->pHeap.get());
}
if (!create) if (!create)
{ {
pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo( pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo(
@ -1289,7 +1349,7 @@ namespace Aurora::Async
{ {
AU_LOCK_GUARD(this->pRWReadView); AU_LOCK_GUARD(this->pRWReadView);
if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions) if (!AuAtomicLoad(&this->shuttingdown_) && !job->shutdown.bDropSubmissions)
{ {
// Pump and barrier + reject all after atomically // Pump and barrier + reject all after atomically
Barrier(id, 0, false, true); Barrier(id, 0, false, true);
@ -1306,6 +1366,11 @@ namespace Aurora::Async
void ThreadPool::EarlyExitTick() void ThreadPool::EarlyExitTick()
{ {
if ((AuAtomicLoad(&this->shuttingdown_) & 2) != 2)
{
return;
}
auto jobWorker = GetThreadState(); auto jobWorker = GetThreadState();
auto state = jobWorker->parent.lock(); auto state = jobWorker->parent.lock();
if (!jobWorker) if (!jobWorker)
@ -1314,11 +1379,6 @@ namespace Aurora::Async
return; return;
} }
if ((this->shuttingdown_ & 2) != 2)
{
return;
}
state->SignalAll(); state->SignalAll();
{ {
@ -1364,6 +1424,8 @@ namespace Aurora::Async
void ThreadPool::ThisExiting() void ThreadPool::ThisExiting()
{ {
AU_DEBUG_MEMCRUNCH;
auto id = GetCurrentThread(); auto id = GetCurrentThread();
auto state = GetGroup(id.first); auto state = GetGroup(id.first);
@ -1477,6 +1539,26 @@ namespace Aurora::Async
return state->GetThreadByIndex(worker.second); return state->GetThreadByIndex(worker.second);
} }
AuSPtr<ThreadState> ThreadPool::GetThreadStateLocal()
{
auto worker = *tlsWorkerId;
if (auto pSelf = AuTryLockMemoryType(worker.pool))
{
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->GetThreadByIndex(worker.second);
}
else
{
return {};
}
}
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id) AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
{ {
auto group = GetGroup(id.first); auto group = GetGroup(id.first);
@ -1513,7 +1595,7 @@ namespace Aurora::Async
} }
} }
return ret; return AuMove(ret);
} }
AUKN_SYM AuSPtr<IThreadPool> NewThreadPool() AUKN_SYM AuSPtr<IThreadPool> NewThreadPool()

View File

@ -88,8 +88,8 @@ namespace Aurora::Async
virtual AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) override; virtual AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) override;
virtual bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) override; virtual bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) override;
bool IsSelfDepleted(); bool IsSelfDepleted(const AuSPtr<ThreadState> &state);
bool IsDepleted(); bool IsDepleted(const AuSPtr<ThreadState> &state);
//virtual bool ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) override; //virtual bool ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) override;
@ -146,6 +146,7 @@ namespace Aurora::Async
AuSPtr<GroupState> GetGroup(ThreadGroup_t type); AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
AuSPtr<ThreadState> GetThreadState(); AuSPtr<ThreadState> GetThreadState();
AuSPtr<ThreadState> GetThreadStateNoWarn(); AuSPtr<ThreadState> GetThreadStateNoWarn();
AuSPtr<ThreadState> GetThreadStateLocal();
AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id); AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id);
AuSPtr<GroupState> threadGroups_[255]; AuSPtr<GroupState> threadGroups_[255];
@ -160,6 +161,8 @@ namespace Aurora::Async
AuList<AuWPtr<ThreadPool>> listWeakDeps_; AuList<AuWPtr<ThreadPool>> listWeakDeps_;
AuList<AuWPtr<ThreadPool>> listWeakDepsParents_; AuList<AuWPtr<ThreadPool>> listWeakDepsParents_;
AuMemory::AllocHeapUnique_t pHeap { AuMemory::AllocHeapUnique(512 * 1024) };
friend struct KeepGroupAlive; friend struct KeepGroupAlive;
}; };
} }

View File

@ -30,7 +30,7 @@ namespace Aurora::Async
bool shuttingdown {}; bool shuttingdown {};
bool exitingflag2 {}; bool exitingflag2 {};
AuThreadPrimitives::Event running; AuThreadPrimitives::Event running;
AuList<WorkEntry_t> pendingWorkItems; AuListOfHeap<WorkEntry_t> pendingWorkItems;
bool bAlreadyDoingExitTick {}; bool bAlreadyDoingExitTick {};
AuThreadPrimitives::SpinLock externalFencesLock; AuThreadPrimitives::SpinLock externalFencesLock;
AuList<AuThreading::IWaitable *> externalFences; AuList<AuThreading::IWaitable *> externalFences;

View File

@ -529,6 +529,11 @@ namespace Aurora::IO
void IOProcessor::WakeupThread() void IOProcessor::WakeupThread()
{ {
this->items.cvEvent->Set(); this->items.cvEvent->Set();
if (this->asyncWorker)
{
auto worker = this->asyncWorker.Value();
worker.GetPool()->Wakeup(worker);
}
} }
bool IOProcessor::AddEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener) bool IOProcessor::AddEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener)