AuroraRuntime/Source/Async/ThreadPool.cpp
Reece Wilson 64af4d0fa8 [+] AuAsync::IThreadPool::PollAndCount
[+] AuAsync::IThreadPool::RunAllPending
2023-03-05 12:55:07 +00:00

1564 lines
43 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ThreadPool.cpp
Date: 2021-10-30
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "ThreadPool.hpp"
#include "AsyncApp.hpp"
#include "WorkItem.hpp"
#include "Schedular.hpp"
#include "ThreadWorkerQueueShim.hpp"
namespace Aurora::Async
{
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static thread_local AuWPtr<ThreadPool> gCurrentPool;
static const auto kMagicResortThreshold = 15;
static thread_local int tlsCallStack;
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
{
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
{
return AuUnsafeRaiiToShared(AuStaticCast<ThreadPool>(gAsyncApp));
}
return AuStaticPointerCast<ThreadPool>(pool);
}
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
{
auto lkPool = gCurrentPool.lock();
if (!lkPool) return {};
auto cpy = *lkPool->tlsWorkerId;
auto lkPool2 = cpy.pool.lock();
return WorkerPId_t(lkPool, cpy);
}
//
ThreadPool::ThreadPool()
{
this->rwlock_ = AuThreadPrimitives::RWLockUnique();
SysAssert(static_cast<bool>(this->rwlock_), "Couldn't initialize ThreadPool. Unable to allocate an RWLock");
}
// internal pool interface
bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
return WaitFor(WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, unlocker }, primitive, timeoutMs);
}
bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
auto curThread = GetThreadState();
if (!curThread)
{
return Threading::WaitFor(primitive.get(), timeoutMs);
}
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == curThread->id.first) &&
(unlocker.pool.get() == this) && // work group matches
(workerIdMatches)) // well, crap
{
bool queryAsync = false;
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
{
queryAsync = CtxYield();
if (!queryAsync && this->shuttingdown_)
{
return false;
}
}
return true;
}
else
{
AuSPtr<ThreadState> pHandle;
{
AU_LOCK_GUARD(AuStaticCast<ThreadPool>(unlocker.pool)->rwlock_->AsReadable());
if (pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker))
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
if (pHandle->exitingflag2)
{
return primitive->TryLock();
}
else
{
pHandle->externalFences.push_back(primitive.get());
}
}
else if (unlocker.pool.get() == this)
{
return primitive->TryLock();
}
}
bool bRet = Threading::WaitFor(primitive.get(), timeoutMs);
if (pHandle)
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
AuTryRemove(pHandle->externalFences, primitive.get());
}
return bRet;
}
}
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
{
auto state = GetGroup(target.first);
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
IncrementTasksRunning();
{
AU_LOCK_GUARD(state->cvWorkMutex);
#if defined(STAGING) || defined(DEBUG)
AU_LOCK_GUARD(rwlock_->AsReadable());
if (target.second != Async::kThreadIdAny)
{
auto itr = state->workers.find(target.second);
if ((itr == state->workers.end()) || (itr->second->rejecting))
{
SysPushErrorGeneric("worker: {}:{} is offline", target.first, target.second);
DecrementTasksRunning();
#if 0
throw "Requested job worker is offline";
#else
runnable->CancelAsync();
return;
#endif
}
}
else
{
auto workers = state->workers;
bool found = false;
for (const auto &worker : state->workers)
{
if (!worker.second->rejecting)
{
found = true;
break;
}
}
if (!found)
{
DecrementTasksRunning();
#if 0
throw "No workers available";
#else
runnable->CancelAsync();
return;
#endif
}
}
#endif
if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable)))
{
DecrementTasksRunning();
runnable->CancelAsync();
return;
}
state->dirty++;
if (state->dirty > kMagicResortThreshold)
{
state->dirty = 0;
state->sorted = false;
}
state->eventLs->Set();
}
if (target.second == Async::kThreadIdAny)
{
state->cvVariable->Signal();
}
else
{
// sad :(
// TODO: when we have wait any, add support (^ the trigger) for it here
state->cvVariable->Broadcast();
}
}
IThreadPool *ThreadPool::ToThreadPool()
{
return this;
}
void ThreadPool::IncrementTasksRunning()
{
this->tasksRunning_++;
}
void ThreadPool::DecrementTasksRunning()
{
if ((--this->tasksRunning_) == 0)
{
if (InRunnerMode())
{
Shutdown();
}
}
}
// ithreadpool
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
return GetGroup(group)->workers.size();
}
void ThreadPool::SetRunningMode(bool eventRunning)
{
this->runnersRunning_ = eventRunning;
}
bool ThreadPool::Spawn(WorkerId_t workerId)
{
return Spawn(workerId, false);
}
bool ThreadPool::Create(WorkerId_t workerId)
{
return Spawn(workerId, true);
}
bool ThreadPool::InRunnerMode()
{
return this->runnersRunning_;
}
bool ThreadPool::Poll()
{
AuUInt32 uCount {};
return InternalRunOne(false, uCount);
}
bool ThreadPool::RunOnce()
{
AuUInt32 uCount {};
return InternalRunOne(true, uCount);
}
bool ThreadPool::Run()
{
bool ranOnce {};
gCurrentPool = AuWeakFromThis();
auto auThread = AuThreads::GetThread();
auto job = GetThreadState();
if (!job)
{
SysPushErrorUninitialized("Not an async thread");
}
while ((!auThread->Exiting()) &&
(!this->shutdown) &&
(!job->bBreakEarly))
{
AuUInt32 uCount {};
// Do work (blocking)
if (!InternalRunOne(true, uCount))
{
if (this->shutdown)
{
return ranOnce;
}
}
ranOnce = true;
}
return ranOnce;
}
bool ThreadPool::InternalRunOne(bool block, AuUInt32 &uCount)
{
auto state = GetThreadState();
if (!state)
{
SysPushErrorUninitialized("Not an async thread");
}
bool success {};
auto runMode = GetCurrentThreadRunMode();
EarlyExitTick();
//do
{
auto asyncLoop = state->asyncLoop;
asyncLoop->OnFrame();
if (asyncLoop->GetSourceCount() > 1)
{
bool bShouldTrySleepForKernel {};
if (runMode == ERunMode::eLowLatencyFreqKernel)
{
if (state->rateLimiter.CheckExchangePass())
{
#if defined(AURORA_PLATFORM_WIN32)
bShouldTrySleepForKernel = asyncLoop->PumpNonblocking();
#else
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
#endif
}
else
{
if (!PollInternal(false, uCount))
{
AuThreading::ContextYield();
}
else
{
success = true;
}
}
}
else if (runMode == ERunMode::eLowLatencyYield)
{
AuThreading::ContextYield();
block = false;
#if defined(AURORA_PLATFORM_WIN32)
bShouldTrySleepForKernel = asyncLoop->PumpNonblocking();
#else
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
#endif
}
else if (runMode == ERunMode::eEfficient)
{
bShouldTrySleepForKernel = block;
if (!block)
{
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
}
}
if (bShouldTrySleepForKernel
// epoll and such like can be checked without read success. kevent works on availablity, not scheduling read like iosubmit
// allow windows to atomically pump instead of wasting time buffering the primitives state
&& ((AuBuild::kIsNtDerived && runMode == ERunMode::eEfficient) ||
(!AuBuild::kIsNtDerived))
&& (asyncLoop->WaitAny(0))
)
{
success = PollInternal(false, uCount);
}
else
{
success |= PollInternal(false, uCount);
}
}
else
{
success = PollInternal(block, uCount);
}
} //while (success);
EarlyExitTick();
return success;
}
bool ThreadPool::PollInternal(bool block, AuUInt32 &uCount)
{
auto state = GetThreadState();
if (!state)
{
SysPushErrorUninitialized("Not an async thread");
}
auto group = state->parent.lock();
//state->pendingWorkItems.clear();
auto magic = CtxPollPush();
{
AU_LOCK_GUARD(group->cvWorkMutex);
// TODO: reimplement this
// this is stupid and gross
if (group->workQueue.size() > group->workers.size()*3)
{
if (!group->sorted)
{
auto cpy = group->workQueue;
std::sort(group->workQueue.begin(), group->workQueue.end(), [&](const WorkEntry_t &a, const WorkEntry_t &b)
{
if (a.second->GetPrio() != b.second->GetPrio())
return a.second->GetPrio() > b.second->GetPrio();
AuUInt32 ia {}, ib {};
for (; ia < cpy.size(); ia++)
if (cpy[ia].second == a.second)
break;
for (; ib < cpy.size(); ib++)
if (cpy[ib].second == b.second)
break;
return ia < ib;
});
group->sorted = true;
group->dirty = 0;
}
}
do
{
// Deque tasks the current thread runner could dipatch
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
// It's probable `multipopCount` will equal 1 for your use case
//
// Only increment when you know tasks within a group queue will not depend on one another
// *and* tasks require a small amount of execution time
//
// This could be potentially useful for an event dispatcher whereby you're dispatching
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
// is a waste of CPU cycles.
//
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
// thread group waits
for (auto itr = group->workQueue.begin();
((itr != group->workQueue.end()) &&
(state->pendingWorkItems.size() < state->multipopCount));
)
{
// TODO: catch low memory condition
if (itr->first == Async::kThreadIdAny)
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) && (itr->first == state->id.second))
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
itr++;
}
// Consider blocking for more work
if (!block)
{
break;
}
// pre-wakeup thread terminating check
if (state->threadObject->Exiting())
{
break;
}
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
if (this->shuttingdown_)
{
//this->EarlyExitTick();
break;
}
group->cvVariable->WaitForSignal();
if (this->shuttingdown_)
{
//this->EarlyExitTick();
break;
}
}
// Post-wakeup thread terminating check
if (state->threadObject->Exiting())
{
break;
}
if (state->pendingWorkItems.empty() && (
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
{
return false;
}
} while (state->pendingWorkItems.empty() && block);
if (group->workQueue.empty())
{
group->eventLs->Reset();
}
}
if (state->pendingWorkItems.empty())
{
CtxPollReturn(state, magic, false);
return false;
}
int runningTasks {};
auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis());
bool lowPrioCont {};
bool lowPrioContCached {};
state->cookie++;
int start = state->cookie;
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the first task (or deeper)
if (InRunnerMode() && tlsCallStack) // are we one call deep?
{
auto queue = ToKernelWorkQueue();
if ((this->tasksRunning_ == tlsCallStack) &&
(!queue || queue->GetSourceCount() <= 1))
{
return false;
}
}
//
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
if (state->threadObject->Exiting() || this->shutdown)
{
break;
}
// Set the last frame time for a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS();
if (itr->second->GetPrio() < 0.25)
{
group->sorted = false;
if (lowPrioCont)
{
itr++;
continue;
}
if (!lowPrioContCached)
{
AU_LOCK_GUARD(group->cvWorkMutex);
{
for (const auto &[pendingWorkA, pendingWorkB] : group->workQueue)
{
if (pendingWorkB->GetPrio() > .5)
{
lowPrioCont = true;
break;
}
}
}
lowPrioContCached = true;
if (lowPrioCont)
{
itr++;
continue;
}
}
}
// Dispatch
auto oops = itr->second;
// Remove from our local job queue
itr = state->pendingWorkItems.erase(itr);
tlsCallStack++;
//SysBenchmark(fmt::format("RunAsync: {}", block));
// Dispatch
oops->RunAsync();
uCount++;
// Atomically decrement global task counter
runningTasks = this->tasksRunning_.fetch_sub(1) - 1;
tlsCallStack--;
if (start != state->cookie)
{
start = state->cookie;
itr = state->pendingWorkItems.begin();
}
}
gCurrentPool = oldTlsHandle;
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
AU_LOCK_GUARD(group->cvWorkMutex);
// TODO: low memory condition slow path
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
group->eventLs->Set();
state->pendingWorkItems.clear();
}
CtxPollReturn(state, magic, true);
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the top most task
if (InRunnerMode())
{
auto queue = ToKernelWorkQueue();
if ((runningTasks == 0) &&
(this->tasksRunning_ == 0 ) &&
(!queue || queue->GetSourceCount() <= 1))
{
Shutdown();
}
}
return true;
}
void ThreadPool::Shutdown()
{
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
// Update shutting down flag
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down soon [microseconds, probably]'
{
if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
{
return;
}
}
auto pLocalRunner = this->GetThreadStateNoWarn();
AuList<WorkerId_t> toBarrier;
// Noting
// 1) that StopSched may lockup under a writable lock
// -> we will terminate a thread that may be dispatching a sys pump event
// 2) that barrier doesn't need to be under a write lock
//
// Perform the following shutdown of the schedular and other available threads under a read lock
{
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
StopSched();
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
if (trySelfPid == worker->id)
{
continue;
}
toBarrier.push_back(worker->id);
}
}
}
// Ehhhh
// We need this fix to a specific V8 deinit lockup
//this->Poll();
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
this->Barrier(id, 0, false, false /* no reject*/); // absolute safest point in shutdown; sync to already submitted work
}
}
// Time for fuckiness
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down after sync barrier'
{
AuAtomicTestAndSet(&this->shuttingdown_, 1);
}
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
AuList<AuThreads::ThreadShared_t> threads;
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
if (group->cvWorkMutex && group->cvVariable)
{
bool bLocked = group->cvWorkMutex->TryLock();
worker->shuttingdown = true;
group->cvVariable->Broadcast();
if (bLocked) group->cvWorkMutex->Unlock();
}
else
{
worker->shuttingdown = true;
}
if (!group->IsSysThread()) // bug?
{
worker->threadObject->SendExitSignal();
threads.push_back(worker->threadObject);
}
auto &event = worker->running;
if (event)
{
event->Set();
}
}
}
}
// Final sync to exit
{
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
auto handle = this->GetThreadHandle(id);
if (handle)
{
handle->rejecting = false;
}
this->Barrier(id, 0, false, true);
}
}
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
for (const auto &thread : threads)
{
thread->Exit();
}
// Is dead flag
this->shutdown = true;
if (pLocalRunner)
{
pLocalRunner->bIsKiller = true;
}
}
bool ThreadPool::Exiting()
{
return this->shuttingdown_ ||
GetThreadState()->exiting;
}
AuUInt32 ThreadPool::PollAndCount(bool bStrict)
{
AuUInt32 uCount {};
auto bRanAtLeastOne = this->InternalRunOne(false, uCount);
return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0);
}
AuUInt32 ThreadPool::RunAllPending()
{
AuUInt32 uCount {};
bool ranAtLeastOne {};
do
{
uCount = 0;
ranAtLeastOne |= this->InternalRunOne(false, uCount);
}
while (uCount);
return uCount ? uCount : false;
}
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking)
{
// Error pass-through
if (!task)
{
return {};
}
return AuMakeShared<WorkItem>(this, WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, worker }, task, bSupportsBlocking);
}
AuSPtr<IWorkItem> ThreadPool::NewFence()
{
return AuMakeShared<WorkItem>(this, AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{}, true);
}
AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id)
{
auto pState = GetThreadHandle(id);
if (!pState)
{
return {};
}
return pState->threadObject;
}
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
{
AU_LOCK_GUARD(rwlock_->AsReadable());
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
for (const auto &group : this->threads_)
{
AuList<ThreadId_t> workers;
AuTryReserve(workers, group.second->workers.size());
for (const auto &thread : group.second->workers)
{
workers.push_back(thread.second->id.second);
}
ret[group.first] = workers;
}
return ret;
}
WorkerId_t ThreadPool::GetCurrentThread()
{
return tlsWorkerId;
}
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(workerId.first);
auto currentWorkerId = GetCurrentThread().second;
if (workerId.second == Async::kThreadIdAny)
{
for (auto &jobWorker : group->workers)
{
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{
return false;
}
}
}
else
{
return Barrier(workerId, timeoutMs, requireSignal && workerId.second != currentWorkerId, false);
}
return true;
}
void ThreadPool::Signal(WorkerId_t workerId)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(workerId.first);
if (workerId.second == Async::kThreadIdAny)
{
for (auto &jobWorker : group->workers)
{
jobWorker.second->running->Set();
}
}
else
{
GetThreadHandle(workerId)->running->Set();
}
}
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto a = GetThreadHandle(workerId);
if (!a)
{
return {};
}
return a->parent.lock()->asyncLoopSourceShared;
}
void ThreadPool::SyncAllSafe()
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
for (const auto &re : this->threads_)
{
for (auto &jobWorker : re.second->workers)
{
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
}
}
}
void ThreadPool::AddFeature(WorkerId_t id,
AuSPtr<AuThreads::IThreadFeature> pFeature,
bool bNonBlock)
{
auto work = AuMakeSharedThrow<BasicWorkStdFunc>(([=]()
{
GetThreadState()->features.push_back(pFeature);
pFeature->Init();
}));
auto pWorkItem = this->NewWorkItem(id, work, !bNonBlock)->Dispatch();
SysAssert(pWorkItem);
if (!bNonBlock)
{
pWorkItem->BlockUntilComplete();
}
}
void ThreadPool::AssertInThreadGroup(ThreadGroup_t group)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
}
void ThreadPool::AssertWorker(WorkerId_t id)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
}
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue()
{
return this->GetThreadState()->asyncLoop;
}
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue(WorkerId_t workerId)
{
auto worker = this->GetThreadHandle(workerId);
if (!worker)
{
SysPushErrorGeneric("Couldn't find requested worker");
return {};
}
return worker->asyncLoop;
}
void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
{
auto states = this->GetThreadHandles(workerId);
if (!states.size())
{
SysPushErrorGeneric("Couldn't find requested worker");
return;
}
for (const auto &state : states)
{
state->runMode = mode.mode;
if (mode.freqMsTick)
{
state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000);
}
}
}
ERunMode ThreadPool::GetCurrentThreadRunMode()
{
auto state = this->GetThreadState();
if (!state)
{
return ERunMode::eEfficient;
}
return state->runMode;
}
ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId)
{
auto worker = this->GetThreadHandle(workerId);
if (!worker)
{
SysPushErrorGeneric("Couldn't find requested worker");
return {};
}
return worker->runMode;
}
// Unimplemented fiber hooks, 'twas used for science
int ThreadPool::CtxPollPush()
{
// TOOD (Reece): implement a context switching library
// Refer to the old implementation of this on pastebin
return 0;
}
void ThreadPool::CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask)
{
}
bool ThreadPool::CtxYield()
{
bool ranAtLeastOne = false;
// !!!
if (this->shutdown ||
this->shuttingdown_ & 2) // fast
{
if (GetThreadState()->rejecting)
{
return false;
}
}
#if 0
return this->InternalRunOne(false, uCount);
#else
AuUInt32 uCount {};
do
{
uCount = 0;
ranAtLeastOne |= this->InternalRunOne(false, uCount);
}
while (uCount);
return uCount || ranAtLeastOne;
#endif
}
// internal api
bool ThreadPool::Spawn(WorkerId_t workerId, bool create)
{
AU_LOCK_GUARD(rwlock_->AsWritable());
if (create)
{
gCurrentPool = AuSharedFromThis();
}
AuSPtr<GroupState> group;
// Try fetch or allocate group
{
AuSPtr<GroupState>* groupPtr;
if (!AuTryFind(this->threads_, workerId.first, groupPtr))
{
group = AuMakeShared<GroupState>();
if (!group->Init())
{
SysPushErrorMemory("Not enough memory to intiialize a new group state");
return false;
}
if (!AuTryInsert(this->threads_, AuMakePair(workerId.first, group)))
{
return false;
}
}
else
{
group = *groupPtr;
}
}
// Assert worker does not already exist
{
AuSPtr<ThreadState>* ret;
if (AuTryFind(group->workers, workerId.second, ret))
{
SysPushErrorGeneric("Thread ID already exists");
return false;
}
}
auto threadState = AuMakeShared<ThreadState>();
if (!threadState)
{
SysPushErrorMemory();
return {};
}
threadState->parent = group;
threadState->running = AuThreadPrimitives::EventUnique(true, false, true);
threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0);
threadState->id = workerId;
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
return {};
}
threadState->asyncLoop->pParent = threadState.get();
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
threadState->runMode = ERunMode::eEfficient;
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
return {};
}
if (!threadState->asyncLoop->Init())
{
SysPushErrorNested();
return {};
}
if (!threadState->syncSema)
{
SysPushErrorMemory();
return {};
}
if (!threadState->syncSema)
{
SysPushErrorMemory();
return {};
}
threadState->asyncLoop->SourceAdd(group->eventLs);
if (!create)
{
threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)),
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
gRuntimeConfig.async.threadPoolDefaultStackSize
));
if (!threadState->threadObject)
{
return {};
}
threadState->threadObject->Run();
}
else
{
threadState->threadObject = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
// TODO: this is just a hack
// we should implement this properly
threadState->threadObject->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
{
}, []() -> void
{
auto pid = GetCurrentWorkerPId();
if (pid.pool)
{
GetWorkerInternal(pid.pool)->ThisExiting();
}
}));
//
gCurrentPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
}
group->workers.insert(AuMakePair(workerId.second, threadState));
return true;
}
// private api
AU_NOINLINE bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop)
{
auto self = GetThreadState();
if (!self)
{
return {};
}
auto &semaphore = self->syncSema;
auto unsafeSemaphore = semaphore.get();
bool failed {};
auto work = AuMakeShared<AsyncFuncRunnable>(
[=]()
{
auto state = GetThreadState();
if (drop)
{
state->rejecting = true;
}
if (requireSignal)
{
state->running->Reset();
}
unsafeSemaphore->Unlock(1);
if (requireSignal)
{
state->running->Lock();
}
},
[&]()
{
unsafeSemaphore->Unlock(1);
failed = true;
}
);
if (!work)
{
return false;
}
Run(workerId, work);
return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore), ms) && !failed;
}
void ThreadPool::Entrypoint(WorkerId_t id)
{
gCurrentPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
auto job = GetThreadState();
Run();
if (id != WorkerId_t {0, 0})
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
if (!this->shuttingdown_ && !job->rejecting)
{
// Pump and barrier + reject all after atomically
Barrier(id, 0, false, true);
}
}
ThisExiting();
if (id == WorkerId_t {0, 0})
{
CleanWorkerPoolReservedZeroFree();
}
}
void ThreadPool::EarlyExitTick()
{
auto jobWorker = GetThreadState();
auto state = jobWorker->parent.lock();
if (!jobWorker)
{
SysPushErrorUninitialized("Not an async thread");
return;
}
if ((this->shuttingdown_ & 2) != 2)
{
return;
}
state->eventLs->Set();
state->cvVariable->Broadcast();
{
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
{
return;
}
AuUInt32 uCount {};
do
{
uCount = 0;
this->PollInternal(false, uCount);
}
while (uCount);
}
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(this->rwlock_->AsReadable()); // WARNING: this should be write, but im seeing a deadlock on exit
// there is no real race condition to be concerned about
// AsReadable shouldn't be enterable while someone is writing (the other accessor)
features = AuExchange(jobWorker->features, {});
}
{
for (const auto &thread : features)
{
try
{
thread->Cleanup();
}
catch (...)
{
SysPushErrorCatch("Couldn't clean up thread feature!");
}
}
if (!this->GetThreadState()->bIsKiller)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
#if 0
// TODO... i know what to do
#else
// this will do for now
if (!jobWorker->rejecting &&
!this->shutdown)
{
this->Barrier(AuAsync::GetCurrentWorkerPId(), 0, false , true);
}
//this->Barrier(AuAsync::GetCurrentWorkerPId(), 0, false, false);
#endif
}
jobWorker->bAlreadyDoingExitTick = false;
jobWorker->bBreakEarly = true;
}
}
void ThreadPool::ThisExiting()
{
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
auto itr = state->workers.find(id.second);
auto &jobWorker = itr->second;
CleanUpWorker(id);
// Abort scheduled tasks
TerminateSceduledTasks(this, id);
// Prevent deadlocks
jobWorker->syncSema->Unlock(10000); // prevent ::Barrier dead-locks
{
AU_LOCK_GUARD(jobWorker->externalFencesLock);
jobWorker->exitingflag2 = true;
for (const auto &pIWaitable : jobWorker->externalFences)
{
pIWaitable->Unlock();
}
jobWorker->externalFences.clear();
}
features = AuExchange(jobWorker->features, {});
}
{
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms
// -> anything your brain wishes to imagination
for (const auto &thread : features)
{
try
{
thread->Cleanup();
}
catch (...)
{
AuLogWarn("Couldn't clean up thread feature!");
Debug::PrintError();
}
}
features.clear();
}
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
auto itr = state->workers.find(id.second);
auto &jobWorker = itr->second;
state->workers.erase(itr);
}
}
AuSPtr<GroupState> ThreadPool::GetGroup(ThreadGroup_t type)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AuSPtr<GroupState>* ret;
if (!AuTryFind(this->threads_, type, ret))
{
return {};
}
return *ret;
}
AuSPtr<ThreadState> ThreadPool::GetThreadState()
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto thread = gCurrentPool.lock();
if (!thread)
{
return {};
}
#if defined(AU_CFG_ID_INTERNAL) || defined(AU_CFG_ID_DEBUG)
if (thread.get() != this)
{
SysPushErrorGeneric("wrong thread");
return {};
}
#endif
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->workers[worker.second];
}
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto thread = gCurrentPool.lock();
if (!thread)
{
return {};
}
if (thread.get() != this)
{
return {};
}
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->workers[worker.second];
}
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
AuSPtr<ThreadState> *ret;
if (!AuTryFind(group->workers, id.second, ret))
{
return {};
}
return *ret;
}
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
AuList<AuSPtr<ThreadState>> ret;
if (id.second != Async::kThreadIdAny)
{
AuSPtr<ThreadState> *ptr;
if (!AuTryFind(group->workers, id.second, ptr))
{
return {};
}
ret.push_back(*ptr);
}
else
{
for (const auto &[key, value] : group->workers)
{
ret.push_back(value);
}
}
return ret;
}
AUKN_SYM AuSPtr<IThreadPool> NewThreadPool()
{
// apps that don't require async shouldn't be burdened with the overhead of this litl spiner
StartSched();
return AuMakeShared<ThreadPool>();
}
}