/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: ThreadPool.cpp Date: 2021-10-30 Author: Reece ***/ #include #include "Async.hpp" #include "ThreadPool.hpp" #include "WorkItem.hpp" #include "Schedular.hpp" #include "ThreadWorkerQueueShim.hpp" namespace Aurora::Async { //STATIC_TLS(WorkerId_t, tlsWorkerId); static thread_local AuWPtr gCurrentPool; static const auto kMagicResortThreshold = 15; AUKN_SYM WorkerPId_t GetCurrentWorkerPId() { auto lkPool = gCurrentPool.lock(); auto cpy = *lkPool->tlsWorkerId; auto lkPool2 = cpy.pool.lock(); return WorkerPId_t(lkPool, cpy); } // ThreadPool::ThreadPool() { this->rwlock_ = AuThreadPrimitives::RWLockUnique(); SysAssert(static_cast(this->rwlock_), "Couldn't initialize ThreadPool. Unable to allocate an RWLock"); } // internal pool interface bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 timeoutMs) { auto curThread = GetThreadState(); bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1)); if ((unlocker.first == curThread->id.first) && // work group matches (workerIdMatches)) // well, crap { bool queryAsync = false; while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2))) { queryAsync = CtxYield(); } return true; } else { return Threading::WaitFor(primitive.get(), timeoutMs); } } void ThreadPool::Run(WorkerId_t target, AuSPtr runnable) { auto state = GetGroup(target.first); SysAssert(static_cast(state), "couldn't dispatch a task to an offline group"); IncrementTasksRunning(); { AU_LOCK_GUARD(state->cvWorkMutex); #if defined(STAGING) || defined(DEBUG) AU_LOCK_GUARD(rwlock_->AsReadable()); if (target.second != Async::kThreadIdAny) { auto itr = state->workers.find(target.second); if ((itr == state->workers.end()) || (itr->second->rejecting)) { SysPushErrorGen("worker: {}:{} is offline", target.first, target.second); DecrementTasksRunning(); #if 0 throw "Requested job worker is offline"; #else runnable->CancelAsync(); return; #endif } } else { auto workers = state->workers; bool found = false; for (const auto &worker : state->workers) { if (!worker.second->rejecting) { found = true; break; } } if (!found) { DecrementTasksRunning(); #if 0 throw "No workers available"; #else runnable->CancelAsync(); return; #endif } } #endif if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable))) { runnable->CancelAsync(); return; } state->dirty++; if (state->dirty > kMagicResortThreshold) { state->dirty = 0; state->sorted = false; } state->eventLs->Set(); } if (target.second == Async::kThreadIdAny) { state->cvVariable->Signal(); } else { // sad :( // TODO: when we have wait any, add support (^ the trigger) for it here state->cvVariable->Broadcast(); } } IThreadPool *ThreadPool::ToThreadPool() { return this; } void ThreadPool::IncrementTasksRunning() { this->tasksRunning_++; } void ThreadPool::DecrementTasksRunning() { if ((--this->tasksRunning_) == 0) { if (InRunnerMode()) { Shutdown(); } } } // ithreadpool size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); return GetGroup(group)->workers.size(); } void ThreadPool::SetRunningMode(bool eventRunning) { this->runnersRunning_ = eventRunning; } bool ThreadPool::Spawn(WorkerId_t workerId) { return Spawn(workerId, false); } bool ThreadPool::Create(WorkerId_t workerId) { return Spawn(workerId, true); } bool ThreadPool::InRunnerMode() { return this->runnersRunning_; } bool ThreadPool::Poll() { return InternalRunOne(false); } bool ThreadPool::RunOnce() { return InternalRunOne(true); } bool ThreadPool::Run() { bool ranOnce {}; auto auThread = AuThreads::GetThread(); auto job = GetThreadState(); while ((!auThread->Exiting()) && (!job->shuttingdown)) { // Do work (blocking) InternalRunOne(true); ranOnce = true; } return ranOnce; } bool ThreadPool::InternalRunOne(bool block) { auto state = GetThreadState(); bool success {}; auto runMode = GetCurrentThreadRunMode(); do { auto asyncLoop = state->asyncLoop; asyncLoop->OnFrame(); if (asyncLoop->GetSourceCount() > 1) { bool bShouldTrySleepForKernel {}; if (runMode == ERunMode::eLowLatencyFreqKernel) { if (state->rateLimiter.CheckExchangePass()) { bShouldTrySleepForKernel = asyncLoop->IsSignaled(); } else { if (!PollInternal(false)) { AuThreading::ContextYield(); } else { success = true; } } } else if (runMode == ERunMode::eLowLatencyYield) { AuThreading::ContextYield(); block = false; bShouldTrySleepForKernel = asyncLoop->IsSignaled(); } else if (runMode == ERunMode::eEfficient) { bShouldTrySleepForKernel = block; if (!block) { bShouldTrySleepForKernel = asyncLoop->IsSignaled(); } } if (bShouldTrySleepForKernel && asyncLoop->WaitAny(0)) { PollInternal(block); success = true; } else { success |= PollInternal(block); } } else { success = PollInternal(block); } } while (success); return success; } bool ThreadPool::PollInternal(bool block) { auto state = GetThreadState(); auto group = state->parent.lock(); //state->pendingWorkItems.clear(); auto magic = CtxPollPush(); { AU_LOCK_GUARD(group->cvWorkMutex); // TODO: reimplement this // this is stupid and gross if (group->workQueue.size() > group->workers.size()*3) { if (!group->sorted) { auto cpy = group->workQueue; std::sort(group->workQueue.begin(), group->workQueue.end(), [&](const WorkEntry_t &a, const WorkEntry_t &b) { if (a.second->GetPrio() != b.second->GetPrio()) return a.second->GetPrio() > b.second->GetPrio(); AuUInt32 ia {}, ib {}; for (; ia < cpy.size(); ia++) if (cpy[ia].second == a.second) break; for (; ib < cpy.size(); ib++) if (cpy[ib].second == b.second) break; return ia < ib; }); group->sorted = true; group->dirty = 0; } } do { // Deque tasks the current thread runner could dipatch // Noting that `multipopCount` determines how aggressive threads are in dequeuing work // It's probable `multipopCount` will equal 1 for your use case // // Only increment when you know tasks within a group queue will not depend on one another // *and* tasks require a small amount of execution time // // This could be potentially useful for an event dispatcher whereby you're dispatching // hundreds of items per second, across a thread or two, knowing dequeuing one instead of all // is a waste of CPU cycles. // // Remember, incrementing `multipopCount` is potentially dangerous the second you have local // thread group waits for (auto itr = group->workQueue.begin(); ((itr != group->workQueue.end()) && (state->pendingWorkItems.size() < state->multipopCount)); ) { // TODO: catch low memory condition if (itr->first == Async::kThreadIdAny) { state->pendingWorkItems.push_back(*itr); itr = group->workQueue.erase(itr); continue; } if ((itr->first != Async::kThreadIdAny) && (itr->first == state->id.second)) { state->pendingWorkItems.push_back(*itr); itr = group->workQueue.erase(itr); continue; } itr++; } // Consider blocking for more work if (!block) { break; } // pre-wakeup thread terminating check if (state->threadObject->Exiting() || state->shuttingdown) { break; } // Block if no work items are present if (state->pendingWorkItems.empty()) { group->cvVariable->WaitForSignal(); } // Post-wakeup thread terminating check if (state->threadObject->Exiting() || state->shuttingdown) { break; } } while (state->pendingWorkItems.empty()); if (group->workQueue.empty()) { group->eventLs->Reset(); } } if (state->pendingWorkItems.empty()) { CtxPollReturn(state, magic, false); return false; } int runningTasks {}; auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis()); bool lowPrioCont {}; bool lowPrioContCached {}; for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) { if (state->threadObject->Exiting() || state->shuttingdown) { break; } // Set the last frame time for a watchdog later down the line state->lastFrameTime = Time::CurrentClockMS(); if (itr->second->GetPrio() < 0.25) { group->sorted = false; if (lowPrioCont) continue; if (!lowPrioContCached) { AU_LOCK_GUARD(group->cvWorkMutex); { for (const auto &[pendingWorkA, pendingWorkB] : group->workQueue) { if (pendingWorkB->GetPrio() > .5) { lowPrioCont = true; break; } } } lowPrioContCached = true; if (lowPrioCont) continue; } } // Dispatch itr->second->RunAsync(); // Remove from our local job queue itr = state->pendingWorkItems.erase(itr); // Atomically decrement global task counter runningTasks = this->tasksRunning_.fetch_sub(1) - 1; } gCurrentPool = oldTlsHandle; // Return popped work back to the groups work pool when our -pump loops were preempted if (state->pendingWorkItems.size()) { AU_LOCK_GUARD(group->cvWorkMutex); // TODO: low memory condition slow path group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end()); group->eventLs->Set(); state->pendingWorkItems.clear(); } CtxPollReturn(state, magic, true); if (InRunnerMode()) { if (runningTasks == 0) { Shutdown(); } } return true; } void ThreadPool::Shutdown() { // Nested shutdowns can happen; prevent a write lock { AU_LOCK_GUARD(this->rwlock_->AsReadable()); if (this->shuttingdown_) { return; } } // Set shutdown flag { AU_LOCK_GUARD(this->rwlock_->AsWritable()); if (AuExchange(this->shuttingdown_, true)) { return; } } // Noting // 1) that StopSched may lockup under a writable lock // -> we will terminate a thread that may be dispatching a sys pump event // 2) that barrier doesn't need to be under a write lock // // Perform the following shutdown of the schedular and other available threads under a read lock { AU_LOCK_GUARD(this->rwlock_->AsReadable()); StopSched(); for (auto &[groupId, group] : this->threads_) { for (auto &[id, worker] : group->workers) { Barrier(worker->id, 0, false, true); } } } // Finally set the shutdown flag on all of our thread contexts // then release them from the runners/workers list // then release all group contexts AuList threads; { AU_LOCK_GUARD(this->rwlock_->AsWritable()); for (auto &[groupId, group] : this->threads_) { for (auto &[id, worker] : group->workers) { if (group->cvWorkMutex && group->cvVariable) { AU_LOCK_GUARD(group->cvWorkMutex); worker->shuttingdown = true; group->cvVariable->Broadcast(); } else { worker->shuttingdown = true; } if (groupId != 0) { worker->threadObject->SendExitSignal(); threads.push_back(worker->threadObject); } auto &event = worker->running; if (event) { event->Set(); } } } } // Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads for (const auto &thread : threads) { thread->Exit(); } } bool ThreadPool::Exiting() { return this->shuttingdown_ || GetThreadState()->exiting; } AuSPtr ThreadPool::NewWorkItem(const WorkerId_t &worker, const AuSPtr &task, bool supportsBlocking) { if (!task) { return {}; } return AuMakeShared(this, worker, task, supportsBlocking); } AuSPtr ThreadPool::NewFence() { return AuMakeShared(this, WorkerId_t{}, AuSPtr{}, true); } AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id) { return GetThreadHandle(id)->threadObject; } AuBST> ThreadPool::GetThreads() { AU_LOCK_GUARD(rwlock_->AsReadable()); AuBST> ret; for (const auto &group : this->threads_) { AuList workers; for (const auto &thread : group.second->workers) { workers.push_back(thread.second->id.second); } ret[group.first] = workers; } return ret; } WorkerId_t ThreadPool::GetCurrentThread() { return tlsWorkerId; } bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); auto group = GetGroup(workerId.first); auto currentWorkerId = GetCurrentThread().second; if (workerId.second == Async::kThreadIdAny) { for (auto &jobWorker : group->workers) { if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min? { return false; } } } else { return Barrier(workerId, timeoutMs, requireSignal && workerId.second != currentWorkerId, false); } return true; } void ThreadPool::Signal(WorkerId_t workerId) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); auto group = GetGroup(workerId.first); if (workerId.second == Async::kThreadIdAny) { for (auto &jobWorker : group->workers) { jobWorker.second->running->Set(); } } else { GetThreadHandle(workerId)->running->Set(); } } AuSPtr ThreadPool::WorkerToLoopSource(WorkerId_t workerId) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); auto a = GetThreadHandle(workerId); if (!a) { return {}; } return a->parent.lock()->asyncLoopSourceShared; } void ThreadPool::SyncAllSafe() { AU_LOCK_GUARD(this->rwlock_->AsReadable()); for (const auto &re : this->threads_) { for (auto &jobWorker : re.second->workers) { SysAssert(Barrier(jobWorker.second->id, 0, false, false)); } } } void ThreadPool::AddFeature(WorkerId_t id, AuSPtr feature, bool async) { auto work = AuMakeShared(([=]() { GetThreadState()->features.push_back(feature); feature->Init(); })); auto workItem = this->NewWorkItem(id, work, !async)->Dispatch(); if (!async) { workItem->BlockUntilComplete(); } } void ThreadPool::AssertInThreadGroup(ThreadGroup_t group) { SysAssert(static_cast(tlsWorkerId).first == group); } void ThreadPool::AssertWorker(WorkerId_t id) { SysAssert(static_cast(tlsWorkerId) == id); } AuSPtr ThreadPool::ToKernelWorkQueue() { return this->GetThreadState()->asyncLoop; } AuSPtr ThreadPool::ToKernelWorkQueue(WorkerId_t workerId) { auto worker = this->GetThreadHandle(workerId); if (!worker) { SysPushErrorGen("Couldn't find requested worker"); return {}; } return worker->asyncLoop; } void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode) { auto states = this->GetThreadHandles(workerId); if (!states.size()) { SysPushErrorGen("Couldn't find requested worker"); return; } for (const auto &state : states) { state->runMode = mode.mode; if (mode.freqMsTick) { state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000); } } } ERunMode ThreadPool::GetCurrentThreadRunMode() { auto state = this->GetThreadState(); if (!state) { return ERunMode::eEfficient; } return state->runMode; } ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId) { auto worker = this->GetThreadHandle(workerId); if (!worker) { SysPushErrorGen("Couldn't find requested worker"); return {}; } return worker->runMode; } // Unimplemented fiber hooks, 'twas used for science int ThreadPool::CtxPollPush() { // TOOD (Reece): implement a context switching library // Refer to the old implementation of this on pastebin return 0; } void ThreadPool::CtxPollReturn(const AuSPtr &state, int status, bool hitTask) { } bool ThreadPool::CtxYield() { bool ranAtLeastOne = false; while (this->InternalRunOne(false)) { ranAtLeastOne = true; } return ranAtLeastOne; } // internal api bool ThreadPool::Spawn(WorkerId_t workerId, bool create) { AU_LOCK_GUARD(rwlock_->AsWritable()); if (GetCurrentWorkerPId().pool && create) { SysPushErrorGeneric("TODO (reece): add support for multiple runners per thread"); return {}; } AuSPtr group; // Try fetch or allocate group { AuSPtr* groupPtr; if (!AuTryFind(this->threads_, workerId.first, groupPtr)) { group = AuMakeShared(); if (!group->Init()) { SysPushErrorMem("Not enough memory to intiialize a new group state"); return false; } if (!AuTryInsert(this->threads_, AuMakePair(workerId.first, group))) { return false; } } else { group = *groupPtr; } } // Assert worker does not already exist { AuSPtr* ret; if (AuTryFind(group->workers, workerId.second, ret)) { SysPushErrorGen("Thread ID already exists"); return false; } } auto threadState = AuMakeShared(); threadState->parent = group; threadState->running = AuThreadPrimitives::EventUnique(true, false, true); threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0); threadState->id = workerId; threadState->asyncLoop = AuMakeShared(); threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds if (!threadState->asyncLoop) { SysPushErrorMem(); return {}; } if (!threadState->syncSema) { SysPushErrorMem(); return {}; } if (!threadState->syncSema) { SysPushErrorMem(); return {}; } threadState->asyncLoop->SourceAdd(group->eventLs); if (!create) { threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo( AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)), AuThreads::IThreadVectorsFunctional::OnExit_t{}), gRuntimeConfig.async.threadPoolDefaultStackSize )); if (!threadState->threadObject) { return {}; } threadState->threadObject->Run(); } else { threadState->threadObject = AuSPtr(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){}); // TODO: this is just a hack // we should implement this properly threadState->threadObject->AddLastHopeTlsHook(AuMakeShared([]() -> void { }, []() -> void { auto pid = GetCurrentWorkerPId(); if (pid.pool) { AuStaticPointerCast(pid.pool)->ThisExiting(); } })); // gCurrentPool = AuWeakFromThis(); tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId); } group->workers.insert(AuMakePair(workerId.second, threadState)); return true; } // private api bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop) { // TODO: barrier multiple auto &semaphore = GetThreadState()->syncSema; auto unsafeSemaphore = semaphore.get(); bool failed {}; auto work = AuMakeShared( [=]() { auto state = GetThreadState(); if (drop) { state->rejecting = true; } if (requireSignal) { state->running->Reset(); } unsafeSemaphore->Unlock(1); if (requireSignal) { state->running->Lock(); } }, [&]() { unsafeSemaphore->Unlock(1); failed = true; } ); if (!work) { return false; } Run(workerId, work); return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore), ms) && !failed; } void ThreadPool::Entrypoint(WorkerId_t id) { gCurrentPool = AuWeakFromThis(); tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id); auto job = GetThreadState(); Run(); if (id != WorkerId_t {0, 0}) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); if (!this->shuttingdown_ && !job->rejecting) { // Pump and barrier + reject all after atomically Barrier(id, 0, false, true); } } ThisExiting(); if (id == WorkerId_t {0, 0}) { CleanWorkerPoolReservedZeroFree(); } } void ThreadPool::ThisExiting() { auto id = GetCurrentThread(); auto state = GetGroup(id.first); { AU_LOCK_GUARD(this->rwlock_->AsWritable()); auto itr = state->workers.find(id.second); auto &jobWorker = itr->second; CleanUpWorker(id); // Abort scheduled tasks TerminateSceduledTasks(this, id); // Clean up thread features // -> transferable TLS handles // -> thread specific vms // -> anything your brain wishes to imagination for (const auto &thread : jobWorker->features) { try { thread->Cleanup(); } catch (...) { AuLogWarn("Couldn't clean up thread feature!"); Debug::PrintError(); } } jobWorker->features.clear(); state->workers.erase(itr); } } AuSPtr ThreadPool::GetGroup(ThreadGroup_t type) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); AuSPtr* ret; if (!AuTryFind(this->threads_, type, ret)) { return {}; } return *ret; } AuSPtr ThreadPool::GetThreadState() { AU_LOCK_GUARD(this->rwlock_->AsReadable()); auto id = GetCurrentThread(); auto state = GetGroup(id.first); return state->workers[id.second]; } AuSPtr ThreadPool::GetThreadHandle(WorkerId_t id) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); auto group = GetGroup(id.first); if (!group) { return {}; } AuSPtr *ret; if (!AuTryFind(group->workers, id.second, ret)) { return {}; } return *ret; } AuList> ThreadPool::GetThreadHandles(WorkerId_t id) { AU_LOCK_GUARD(this->rwlock_->AsReadable()); auto group = GetGroup(id.first); if (!group) { return {}; } AuList> ret; if (id.second != Async::kThreadIdAny) { AuSPtr *ptr; if (!AuTryFind(group->workers, id.second, ptr)) { return {}; } ret.push_back(*ptr); } else { for (const auto &[key, value] : group->workers) { ret.push_back(value); } } return ret; } AUKN_SYM AuSPtr NewThreadPool() { // apps that don't require async shouldn't be burdened with the overhead of this litl spiner StartSched(); return AuMakeShared(); } }