[*] Minor thread pool fixes / improvements before some bigger commits are done

This commit is contained in:
Reece Wilson 2024-09-05 06:38:16 +01:00
parent aafb64e740
commit f466df8464
7 changed files with 141 additions and 51 deletions

View File

@ -66,7 +66,7 @@ namespace Aurora::Async
#endif
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue,
void GroupWorkQueue::Dequeue(AuListOfHeap<WorkEntry_t> &queue,
AuUInt uMaxPopCount,
AuAsync::ThreadId_t id)
{

View File

@ -26,6 +26,6 @@ namespace Aurora::Async
void AddWorkEntry(WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd);
void Dequeue(AuListOfHeap<WorkEntry_t> &queue, AuUInt uMaxPopCount, AuAsync::ThreadId_t idd);
};
}

View File

@ -21,7 +21,7 @@ namespace Aurora::Async
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {}; // TODO: forgot about this. nevertheless, the default thread local io context doesnt use this, nor should it really because we cant really account for external is relevant conditions.
AuUInt32 uAtomicShutdownCookie {};
};
}

View File

@ -113,7 +113,7 @@ namespace Aurora::Async
return true;
}
if (!this->shuttingdown_ ||
if (!AuAtomicLoad(&this->shuttingdown_) ||
bTimedOut)
{
return false;
@ -236,35 +236,59 @@ namespace Aurora::Async
return this->runnersRunning_;
}
#define ASYNC_THREADGROUP_TLS_PREP \
auto &weakTlsHandle = tlsCurrentThreadPool; \
auto tlsHandle = AuTryLockMemoryType(weakTlsHandle); \
if (!tlsHandle || tlsHandle.get() != this) \
{ \
weakTlsHandle = AuSharedFromThis(); \
}
#define ASYNC_THREADGROUP_TLS_UNSET \
if (tlsHandle && tlsHandle.get() != this) \
{ \
tlsCurrentThreadPool = weakTlsHandle; \
}
bool ThreadPool::Poll()
{
AuUInt32 uCount {};
return InternalRunOne(GetThreadStateNoWarn(), false, false, uCount);
ASYNC_THREADGROUP_TLS_PREP;
auto bRet = InternalRunOne(GetThreadStateLocal(), false, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return bRet;
}
bool ThreadPool::RunOnce()
{
AuUInt32 uCount {};
return InternalRunOne(GetThreadStateNoWarn(), true, false, uCount);
ASYNC_THREADGROUP_TLS_PREP;
auto bRet = InternalRunOne(GetThreadStateLocal(), true, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return bRet;
}
bool ThreadPool::Run()
{
bool ranOnce {};
auto pJobRunner = GetThreadStateNoWarn();
ASYNC_THREADGROUP_TLS_PREP;
auto pJobRunner = GetThreadStateLocal();
if (!pJobRunner)
{
ASYNC_THREADGROUP_TLS_UNSET;
this->shutdownEvent_->LockMS(0);
return true;
}
auto oldTlsHandle = AuExchange(tlsCurrentThreadPool, AuSharedFromThis());
auto auThread = AuThreads::GetThread();
while ((!auThread->Exiting()) &&
((this->shuttingdown_ & 2) != 2) &&
((AuAtomicLoad(&this->shuttingdown_) & 2) != 2) &&
(!pJobRunner->shutdown.bBreakMainLoop))
{
AuUInt32 uCount {};
@ -272,7 +296,7 @@ namespace Aurora::Async
// Do work (blocking)
if (!InternalRunOne(pJobRunner, true, true, uCount))
{
if ((this->shuttingdown_ & 2) == 2)
if ((AuAtomicLoad(&this->shuttingdown_) & 2) == 2)
{
return ranOnce;
}
@ -280,20 +304,21 @@ namespace Aurora::Async
ranOnce = true;
}
tlsCurrentThreadPool = oldTlsHandle;
ASYNC_THREADGROUP_TLS_UNSET;
return ranOnce;
}
bool ThreadPool::InternalRunOne(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{
bool bSuccess {};
if (!state)
{
SysPushErrorUninitialized("Not an async thread");
return false;
}
bool bSuccess {};
EarlyExitTick();
{
@ -373,32 +398,32 @@ namespace Aurora::Async
break;
}
// pre-wakeup thread terminating check
if (state->thread.pThread->Exiting())
{
break;
}
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
if (this->shuttingdown_ & 2)
// pre-wakeup thread terminating check
if (state->thread.pThread->Exiting())
{
break;
}
if (AuAtomicLoad(&this->shuttingdown_) & 2)
{
break;
}
state->sync.cvVariable->WaitForSignal();
if (this->shuttingdown_ & 2)
if (AuAtomicLoad(&this->shuttingdown_) & 2)
{
break;
}
}
// Post-wakeup thread terminating check
if (state->thread.pThread->Exiting())
{
break;
// Post-wakeup thread terminating check
if (state->thread.pThread->Exiting())
{
break;
}
}
if (state->pendingWorkItems.empty() && (
@ -431,8 +456,8 @@ namespace Aurora::Async
{
if (InRunnerMode())
{
if ((this->uAtomicCounter == 0) &&
this->IsDepleted())
if ((AuAtomicLoad(&this->uAtomicCounter) == 0) &&
this->IsDepleted(state))
{
Shutdown();
}
@ -449,8 +474,8 @@ namespace Aurora::Async
// in the first task (or deeper)
if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep?
{
if ((this->uAtomicCounter == state->stackState.uStackCallDepth) &&
this->IsDepleted())
if ((AuAtomicLoad(&this->uAtomicCounter) == state->stackState.uStackCallDepth) &&
this->IsDepleted(state))
{
return false;
}
@ -513,8 +538,8 @@ namespace Aurora::Async
if (InRunnerMode())
{
if ((runningTasks == 0) &&
(this->uAtomicCounter == 0) &&
this->IsDepleted())
(AuAtomicLoad(&this->uAtomicCounter) == 0) &&
this->IsDepleted(state))
{
Shutdown();
}
@ -527,6 +552,8 @@ namespace Aurora::Async
// This is our expected behaviour. Any changes will likely introduce hard to catch bugs across various softwares and exit conditions.
void ThreadPool::Shutdown()
{
AU_DEBUG_MEMCRUNCH;
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
// Update shutting down flag
@ -690,7 +717,7 @@ namespace Aurora::Async
continue;
}
if (!pThat->IsSelfDepleted())
if (!pThat->IsSelfDepleted(nullptr))
{
continue;
}
@ -714,23 +741,30 @@ namespace Aurora::Async
AuUInt32 ThreadPool::PollAndCount(bool bStrict)
{
AuUInt32 uCount {};
ASYNC_THREADGROUP_TLS_PREP;
auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0);
}
AuUInt32 ThreadPool::RunAllPending()
{
AuUInt32 uCount {};
bool ranAtLeastOne {};
AuUInt32 uCountTotal {};
ASYNC_THREADGROUP_TLS_PREP;
do
{
uCount = 0;
ranAtLeastOne |= this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount);
(void)this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount);
uCountTotal += uCount;
}
while (uCount);
return uCount ? uCount : false;
ASYNC_THREADGROUP_TLS_UNSET;
return uCountTotal;
}
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker,
@ -771,6 +805,8 @@ namespace Aurora::Async
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
{
AU_DEBUG_MEMCRUNCH;
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
for (auto pGroup : this->threadGroups_)
@ -971,15 +1007,33 @@ namespace Aurora::Async
return worker->asyncLoop;
}
bool ThreadPool::IsSelfDepleted()
bool ThreadPool::IsSelfDepleted(const AuSPtr<ThreadState> &pState)
{
auto queue = ToKernelWorkQueue();
return (!queue || queue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors);
AuSPtr<AuLoop::ILoopQueue> pLoopQueue;
if (pState)
{
pLoopQueue = pState->asyncLoop;
}
else
{
if (auto pLocalThread = this->GetThreadStateLocal())
{
pLoopQueue = pLocalThread->asyncLoop;
}
}
if (!pLoopQueue)
{
return true;
}
return pLoopQueue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors;
}
bool ThreadPool::IsDepleted()
bool ThreadPool::IsDepleted(const AuSPtr<ThreadState> &state)
{
if (!IsSelfDepleted())
if (!IsSelfDepleted(state))
{
return false;
}
@ -988,7 +1042,7 @@ namespace Aurora::Async
{
if (auto pThat = AuTryLockMemoryType(wOther))
{
if (!pThat->IsSelfDepleted())
if (!pThat->IsSelfDepleted(nullptr))
{
return false;
}
@ -1178,6 +1232,12 @@ namespace Aurora::Async
return {};
}
// TODO: ROXTL
if (this->pHeap)
{
pThreadState->pendingWorkItems.get_allocator().SetHeapRaw(this->pHeap.get());
}
if (!create)
{
pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo(
@ -1289,7 +1349,7 @@ namespace Aurora::Async
{
AU_LOCK_GUARD(this->pRWReadView);
if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions)
if (!AuAtomicLoad(&this->shuttingdown_) && !job->shutdown.bDropSubmissions)
{
// Pump and barrier + reject all after atomically
Barrier(id, 0, false, true);
@ -1306,6 +1366,11 @@ namespace Aurora::Async
void ThreadPool::EarlyExitTick()
{
if ((AuAtomicLoad(&this->shuttingdown_) & 2) != 2)
{
return;
}
auto jobWorker = GetThreadState();
auto state = jobWorker->parent.lock();
if (!jobWorker)
@ -1314,11 +1379,6 @@ namespace Aurora::Async
return;
}
if ((this->shuttingdown_ & 2) != 2)
{
return;
}
state->SignalAll();
{
@ -1364,6 +1424,8 @@ namespace Aurora::Async
void ThreadPool::ThisExiting()
{
AU_DEBUG_MEMCRUNCH;
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
@ -1477,6 +1539,26 @@ namespace Aurora::Async
return state->GetThreadByIndex(worker.second);
}
AuSPtr<ThreadState> ThreadPool::GetThreadStateLocal()
{
auto worker = *tlsWorkerId;
if (auto pSelf = AuTryLockMemoryType(worker.pool))
{
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->GetThreadByIndex(worker.second);
}
else
{
return {};
}
}
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
{
auto group = GetGroup(id.first);
@ -1513,7 +1595,7 @@ namespace Aurora::Async
}
}
return ret;
return AuMove(ret);
}
AUKN_SYM AuSPtr<IThreadPool> NewThreadPool()

View File

@ -88,8 +88,8 @@ namespace Aurora::Async
virtual AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) override;
virtual bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) override;
bool IsSelfDepleted();
bool IsDepleted();
bool IsSelfDepleted(const AuSPtr<ThreadState> &state);
bool IsDepleted(const AuSPtr<ThreadState> &state);
//virtual bool ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) override;
@ -146,6 +146,7 @@ namespace Aurora::Async
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
AuSPtr<ThreadState> GetThreadState();
AuSPtr<ThreadState> GetThreadStateNoWarn();
AuSPtr<ThreadState> GetThreadStateLocal();
AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id);
AuSPtr<GroupState> threadGroups_[255];
@ -160,6 +161,8 @@ namespace Aurora::Async
AuList<AuWPtr<ThreadPool>> listWeakDeps_;
AuList<AuWPtr<ThreadPool>> listWeakDepsParents_;
AuMemory::AllocHeapUnique_t pHeap { AuMemory::AllocHeapUnique(512 * 1024) };
friend struct KeepGroupAlive;
};
}

View File

@ -30,7 +30,7 @@ namespace Aurora::Async
bool shuttingdown {};
bool exitingflag2 {};
AuThreadPrimitives::Event running;
AuList<WorkEntry_t> pendingWorkItems;
AuListOfHeap<WorkEntry_t> pendingWorkItems;
bool bAlreadyDoingExitTick {};
AuThreadPrimitives::SpinLock externalFencesLock;
AuList<AuThreading::IWaitable *> externalFences;

View File

@ -529,6 +529,11 @@ namespace Aurora::IO
void IOProcessor::WakeupThread()
{
this->items.cvEvent->Set();
if (this->asyncWorker)
{
auto worker = this->asyncWorker.Value();
worker.GetPool()->Wakeup(worker);
}
}
bool IOProcessor::AddEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener)