1229 lines
34 KiB
C++
1229 lines
34 KiB
C++
/***
|
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: ThreadPool.cpp
|
|
Date: 2021-10-30
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "Async.hpp"
|
|
#include "ThreadPool.hpp"
|
|
#include "AsyncApp.hpp"
|
|
#include "WorkItem.hpp"
|
|
#include "Schedular.hpp"
|
|
#include "ThreadWorkerQueueShim.hpp"
|
|
|
|
namespace Aurora::Async
|
|
{
|
|
//STATIC_TLS(WorkerId_t, tlsWorkerId);
|
|
static thread_local AuWPtr<ThreadPool> gCurrentPool;
|
|
static const auto kMagicResortThreshold = 15;
|
|
|
|
static thread_local int tlsCallStack;
|
|
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
|
|
{
|
|
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
|
|
{
|
|
return AuUnsafeRaiiToShared(AuStaticCast<ThreadPool>(gAsyncApp));
|
|
}
|
|
|
|
return AuStaticPointerCast<ThreadPool>(pool);
|
|
}
|
|
|
|
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
|
|
{
|
|
auto lkPool = gCurrentPool.lock();
|
|
if (!lkPool) return {};
|
|
auto cpy = *lkPool->tlsWorkerId;
|
|
auto lkPool2 = cpy.pool.lock();
|
|
return WorkerPId_t(lkPool, cpy);
|
|
}
|
|
|
|
//
|
|
|
|
ThreadPool::ThreadPool()
|
|
{
|
|
this->rwlock_ = AuThreadPrimitives::RWLockUnique();
|
|
SysAssert(static_cast<bool>(this->rwlock_), "Couldn't initialize ThreadPool. Unable to allocate an RWLock");
|
|
}
|
|
|
|
// internal pool interface
|
|
|
|
bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
|
|
{
|
|
auto curThread = GetThreadState();
|
|
|
|
if (!curThread)
|
|
{
|
|
return Threading::WaitFor(primitive.get(), timeoutMs);
|
|
}
|
|
|
|
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
|
|
|
|
if ((unlocker.first == curThread->id.first) && // work group matches
|
|
(workerIdMatches)) // well, crap
|
|
{
|
|
|
|
bool queryAsync = false;
|
|
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
|
|
{
|
|
queryAsync = CtxYield();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
return Threading::WaitFor(primitive.get(), timeoutMs);
|
|
}
|
|
}
|
|
|
|
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
|
|
{
|
|
auto state = GetGroup(target.first);
|
|
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
|
|
|
|
IncrementTasksRunning();
|
|
|
|
{
|
|
AU_LOCK_GUARD(state->cvWorkMutex);
|
|
|
|
#if defined(STAGING) || defined(DEBUG)
|
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
|
|
|
if (target.second != Async::kThreadIdAny)
|
|
{
|
|
auto itr = state->workers.find(target.second);
|
|
if ((itr == state->workers.end()) || (itr->second->rejecting))
|
|
{
|
|
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second);
|
|
DecrementTasksRunning();
|
|
#if 0
|
|
throw "Requested job worker is offline";
|
|
#else
|
|
runnable->CancelAsync();
|
|
return;
|
|
#endif
|
|
}
|
|
}
|
|
else
|
|
{
|
|
auto workers = state->workers;
|
|
bool found = false;
|
|
|
|
for (const auto &worker : state->workers)
|
|
{
|
|
if (!worker.second->rejecting)
|
|
{
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found)
|
|
{
|
|
DecrementTasksRunning();
|
|
#if 0
|
|
throw "No workers available";
|
|
#else
|
|
runnable->CancelAsync();
|
|
return;
|
|
#endif
|
|
}
|
|
}
|
|
#endif
|
|
|
|
if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable)))
|
|
{
|
|
DecrementTasksRunning();
|
|
runnable->CancelAsync();
|
|
return;
|
|
}
|
|
|
|
state->dirty++;
|
|
if (state->dirty > kMagicResortThreshold)
|
|
{
|
|
state->dirty = 0;
|
|
state->sorted = false;
|
|
}
|
|
state->eventLs->Set();
|
|
}
|
|
if (target.second == Async::kThreadIdAny)
|
|
{
|
|
state->cvVariable->Signal();
|
|
}
|
|
else
|
|
{
|
|
// sad :(
|
|
// TODO: when we have wait any, add support (^ the trigger) for it here
|
|
state->cvVariable->Broadcast();
|
|
}
|
|
}
|
|
|
|
IThreadPool *ThreadPool::ToThreadPool()
|
|
{
|
|
return this;
|
|
}
|
|
|
|
void ThreadPool::IncrementTasksRunning()
|
|
{
|
|
this->tasksRunning_++;
|
|
}
|
|
|
|
void ThreadPool::DecrementTasksRunning()
|
|
{
|
|
if ((--this->tasksRunning_) == 0)
|
|
{
|
|
if (InRunnerMode())
|
|
{
|
|
Shutdown();
|
|
}
|
|
}
|
|
}
|
|
|
|
// ithreadpool
|
|
|
|
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
return GetGroup(group)->workers.size();
|
|
}
|
|
|
|
void ThreadPool::SetRunningMode(bool eventRunning)
|
|
{
|
|
this->runnersRunning_ = eventRunning;
|
|
}
|
|
|
|
bool ThreadPool::Spawn(WorkerId_t workerId)
|
|
{
|
|
return Spawn(workerId, false);
|
|
}
|
|
|
|
bool ThreadPool::Create(WorkerId_t workerId)
|
|
{
|
|
return Spawn(workerId, true);
|
|
}
|
|
|
|
bool ThreadPool::InRunnerMode()
|
|
{
|
|
return this->runnersRunning_;
|
|
}
|
|
|
|
bool ThreadPool::Poll()
|
|
{
|
|
return InternalRunOne(false);
|
|
}
|
|
|
|
bool ThreadPool::RunOnce()
|
|
{
|
|
return InternalRunOne(true);
|
|
}
|
|
|
|
bool ThreadPool::Run()
|
|
{
|
|
bool ranOnce {};
|
|
|
|
auto auThread = AuThreads::GetThread();
|
|
auto job = GetThreadState();
|
|
|
|
if (!job)
|
|
{
|
|
SysPushErrorUninitialized("Not an async thread");
|
|
}
|
|
|
|
while ((!auThread->Exiting()) && (!job->shuttingdown))
|
|
{
|
|
// Do work (blocking)
|
|
InternalRunOne(true);
|
|
ranOnce = true;
|
|
}
|
|
|
|
return ranOnce;
|
|
}
|
|
|
|
bool ThreadPool::InternalRunOne(bool block)
|
|
{
|
|
auto state = GetThreadState();
|
|
if (!state)
|
|
{
|
|
SysPushErrorUninitialized("Not an async thread");
|
|
}
|
|
|
|
bool success {};
|
|
auto runMode = GetCurrentThreadRunMode();
|
|
|
|
//do
|
|
{
|
|
auto asyncLoop = state->asyncLoop;
|
|
|
|
asyncLoop->OnFrame();
|
|
|
|
if (asyncLoop->GetSourceCount() > 1)
|
|
{
|
|
bool bShouldTrySleepForKernel {};
|
|
|
|
if (runMode == ERunMode::eLowLatencyFreqKernel)
|
|
{
|
|
if (state->rateLimiter.CheckExchangePass())
|
|
{
|
|
#if defined(AURORA_PLATFORM_WIN32)
|
|
bShouldTrySleepForKernel = asyncLoop->PumpNonblocking();
|
|
#else
|
|
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
|
|
#endif
|
|
}
|
|
else
|
|
{
|
|
if (!PollInternal(false))
|
|
{
|
|
AuThreading::ContextYield();
|
|
}
|
|
else
|
|
{
|
|
success = true;
|
|
}
|
|
}
|
|
}
|
|
else if (runMode == ERunMode::eLowLatencyYield)
|
|
{
|
|
AuThreading::ContextYield();
|
|
block = false;
|
|
#if defined(AURORA_PLATFORM_WIN32)
|
|
bShouldTrySleepForKernel = asyncLoop->PumpNonblocking();
|
|
#else
|
|
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
|
|
#endif
|
|
}
|
|
else if (runMode == ERunMode::eEfficient)
|
|
{
|
|
bShouldTrySleepForKernel = block;
|
|
if (!block)
|
|
{
|
|
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
|
|
}
|
|
}
|
|
|
|
if (bShouldTrySleepForKernel
|
|
// epoll and such like can be checked without read success. kevent works on availablity, not scheduling read like iosubmit
|
|
// allow windows to atomically pump instead of wasting time buffering the primitives state
|
|
&& ((AuBuild::kIsNtDerived && runMode == ERunMode::eEfficient) ||
|
|
(!AuBuild::kIsNtDerived))
|
|
|
|
&& (asyncLoop->WaitAny(0))
|
|
)
|
|
{
|
|
PollInternal(block);
|
|
success = true;
|
|
}
|
|
else
|
|
{
|
|
success |= PollInternal(block);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
success = PollInternal(block);
|
|
}
|
|
} //while (success);
|
|
|
|
return success;
|
|
}
|
|
|
|
bool ThreadPool::PollInternal(bool block)
|
|
{
|
|
auto state = GetThreadState();
|
|
if (!state)
|
|
{
|
|
SysPushErrorUninitialized("Not an async thread");
|
|
}
|
|
|
|
auto group = state->parent.lock();
|
|
|
|
//state->pendingWorkItems.clear();
|
|
|
|
auto magic = CtxPollPush();
|
|
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
|
|
// TODO: reimplement this
|
|
// this is stupid and gross
|
|
if (group->workQueue.size() > group->workers.size()*3)
|
|
{
|
|
if (!group->sorted)
|
|
{
|
|
auto cpy = group->workQueue;
|
|
|
|
std::sort(group->workQueue.begin(), group->workQueue.end(), [&](const WorkEntry_t &a, const WorkEntry_t &b)
|
|
{
|
|
if (a.second->GetPrio() != b.second->GetPrio())
|
|
return a.second->GetPrio() > b.second->GetPrio();
|
|
|
|
AuUInt32 ia {}, ib {};
|
|
for (; ia < cpy.size(); ia++)
|
|
if (cpy[ia].second == a.second)
|
|
break;
|
|
|
|
for (; ib < cpy.size(); ib++)
|
|
if (cpy[ib].second == b.second)
|
|
break;
|
|
|
|
return ia < ib;
|
|
});
|
|
|
|
group->sorted = true;
|
|
group->dirty = 0;
|
|
}
|
|
}
|
|
|
|
do
|
|
{
|
|
// Deque tasks the current thread runner could dipatch
|
|
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
|
|
// It's probable `multipopCount` will equal 1 for your use case
|
|
//
|
|
// Only increment when you know tasks within a group queue will not depend on one another
|
|
// *and* tasks require a small amount of execution time
|
|
//
|
|
// This could be potentially useful for an event dispatcher whereby you're dispatching
|
|
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
|
|
// is a waste of CPU cycles.
|
|
//
|
|
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
|
|
// thread group waits
|
|
for (auto itr = group->workQueue.begin();
|
|
((itr != group->workQueue.end()) &&
|
|
(state->pendingWorkItems.size() < state->multipopCount));
|
|
)
|
|
{
|
|
// TODO: catch low memory condition
|
|
if (itr->first == Async::kThreadIdAny)
|
|
{
|
|
state->pendingWorkItems.push_back(*itr);
|
|
itr = group->workQueue.erase(itr);
|
|
continue;
|
|
}
|
|
|
|
if ((itr->first != Async::kThreadIdAny) && (itr->first == state->id.second))
|
|
{
|
|
state->pendingWorkItems.push_back(*itr);
|
|
itr = group->workQueue.erase(itr);
|
|
continue;
|
|
}
|
|
|
|
itr++;
|
|
}
|
|
|
|
// Consider blocking for more work
|
|
if (!block)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// pre-wakeup thread terminating check
|
|
if (state->threadObject->Exiting() || state->shuttingdown)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// Block if no work items are present
|
|
if (state->pendingWorkItems.empty())
|
|
{
|
|
group->cvVariable->WaitForSignal();
|
|
}
|
|
|
|
// Post-wakeup thread terminating check
|
|
if (state->threadObject->Exiting() || state->shuttingdown)
|
|
{
|
|
break;
|
|
}
|
|
|
|
} while (state->pendingWorkItems.empty());
|
|
|
|
if (group->workQueue.empty())
|
|
{
|
|
group->eventLs->Reset();
|
|
}
|
|
}
|
|
|
|
if (state->pendingWorkItems.empty())
|
|
{
|
|
CtxPollReturn(state, magic, false);
|
|
return false;
|
|
}
|
|
|
|
int runningTasks {};
|
|
|
|
auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis());
|
|
|
|
bool lowPrioCont {};
|
|
bool lowPrioContCached {};
|
|
|
|
state->cookie++;
|
|
|
|
int start = state->cookie;
|
|
|
|
// Account for
|
|
// while (AuAsync.GetCurrentPool()->runForever());
|
|
// in the first task (or deeper)
|
|
if (InRunnerMode() && tlsCallStack) // are we one call deep?
|
|
{
|
|
auto queue = ToKernelWorkQueue();
|
|
|
|
if ((this->tasksRunning_ == tlsCallStack) &&
|
|
(!queue || queue->GetSourceCount() <= 1))
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
//
|
|
|
|
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
|
|
{
|
|
if (state->threadObject->Exiting() || state->shuttingdown)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// Set the last frame time for a watchdog later down the line
|
|
state->lastFrameTime = Time::CurrentClockMS();
|
|
|
|
if (itr->second->GetPrio() < 0.25)
|
|
{
|
|
group->sorted = false;
|
|
|
|
if (lowPrioCont)
|
|
{
|
|
itr++;
|
|
continue;
|
|
}
|
|
|
|
if (!lowPrioContCached)
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
{
|
|
for (const auto &[pendingWorkA, pendingWorkB] : group->workQueue)
|
|
{
|
|
if (pendingWorkB->GetPrio() > .5)
|
|
{
|
|
lowPrioCont = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
lowPrioContCached = true;
|
|
if (lowPrioCont)
|
|
{
|
|
itr++;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Dispatch
|
|
auto oops = itr->second;
|
|
|
|
// Remove from our local job queue
|
|
itr = state->pendingWorkItems.erase(itr);
|
|
|
|
tlsCallStack++;
|
|
|
|
// Dispatch
|
|
oops->RunAsync();
|
|
|
|
// Atomically decrement global task counter
|
|
runningTasks = this->tasksRunning_.fetch_sub(1) - 1;
|
|
|
|
tlsCallStack--;
|
|
|
|
if (start != state->cookie)
|
|
{
|
|
start = state->cookie;
|
|
itr = state->pendingWorkItems.begin();
|
|
}
|
|
}
|
|
|
|
gCurrentPool = oldTlsHandle;
|
|
|
|
// Return popped work back to the groups work pool when our -pump loops were preempted
|
|
if (state->pendingWorkItems.size())
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
// TODO: low memory condition slow path
|
|
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
|
|
group->eventLs->Set();
|
|
state->pendingWorkItems.clear();
|
|
}
|
|
|
|
CtxPollReturn(state, magic, true);
|
|
|
|
|
|
// Account for
|
|
// while (AuAsync.GetCurrentPool()->runForever());
|
|
// in the top most task
|
|
if (InRunnerMode())
|
|
{
|
|
auto queue = ToKernelWorkQueue();
|
|
|
|
if ((runningTasks == 0) &&
|
|
(this->tasksRunning_ == 0 ) &&
|
|
(!queue || queue->GetSourceCount() <= 1))
|
|
{
|
|
Shutdown();
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void ThreadPool::Shutdown()
|
|
{
|
|
// Nested shutdowns can happen; prevent a write lock
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
if (this->shuttingdown_)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Set shutdown flag
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
|
if (AuExchange(this->shuttingdown_, true))
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Noting
|
|
// 1) that StopSched may lockup under a writable lock
|
|
// -> we will terminate a thread that may be dispatching a sys pump event
|
|
// 2) that barrier doesn't need to be under a write lock
|
|
//
|
|
// Perform the following shutdown of the schedular and other available threads under a read lock
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
StopSched();
|
|
|
|
for (auto &[groupId, group] : this->threads_)
|
|
{
|
|
for (auto &[id, worker] : group->workers)
|
|
{
|
|
Barrier(worker->id, 0, false, true);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Finally set the shutdown flag on all of our thread contexts
|
|
// then release them from the runners/workers list
|
|
// then release all group contexts
|
|
AuList<AuThreads::ThreadShared_t> threads;
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
|
|
|
for (auto &[groupId, group] : this->threads_)
|
|
{
|
|
for (auto &[id, worker] : group->workers)
|
|
{
|
|
if (group->cvWorkMutex && group->cvVariable)
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
worker->shuttingdown = true;
|
|
group->cvVariable->Broadcast();
|
|
}
|
|
else
|
|
{
|
|
worker->shuttingdown = true;
|
|
}
|
|
|
|
if (groupId != 0)
|
|
{
|
|
worker->threadObject->SendExitSignal();
|
|
threads.push_back(worker->threadObject);
|
|
}
|
|
|
|
auto &event = worker->running;
|
|
if (event)
|
|
{
|
|
event->Set();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
|
|
for (const auto &thread : threads)
|
|
{
|
|
thread->Exit();
|
|
}
|
|
}
|
|
|
|
bool ThreadPool::Exiting()
|
|
{
|
|
return this->shuttingdown_ || GetThreadState()->exiting;
|
|
}
|
|
|
|
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
|
|
{
|
|
if (!task)
|
|
{
|
|
return {};
|
|
}
|
|
return AuMakeShared<WorkItem>(this, worker, task, supportsBlocking);
|
|
}
|
|
|
|
AuSPtr<IWorkItem> ThreadPool::NewFence()
|
|
{
|
|
return AuMakeShared<WorkItem>(this, WorkerId_t{}, AuSPtr<IWorkItemHandler>{}, true);
|
|
}
|
|
|
|
AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id)
|
|
{
|
|
return GetThreadHandle(id)->threadObject;
|
|
}
|
|
|
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
|
|
{
|
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
|
|
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
|
|
|
|
for (const auto &group : this->threads_)
|
|
{
|
|
AuList<ThreadId_t> workers;
|
|
|
|
for (const auto &thread : group.second->workers)
|
|
{
|
|
workers.push_back(thread.second->id.second);
|
|
}
|
|
|
|
ret[group.first] = workers;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
WorkerId_t ThreadPool::GetCurrentThread()
|
|
{
|
|
return tlsWorkerId;
|
|
}
|
|
|
|
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto group = GetGroup(workerId.first);
|
|
auto currentWorkerId = GetCurrentThread().second;
|
|
|
|
if (workerId.second == Async::kThreadIdAny)
|
|
{
|
|
for (auto &jobWorker : group->workers)
|
|
{
|
|
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
return Barrier(workerId, timeoutMs, requireSignal && workerId.second != currentWorkerId, false);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void ThreadPool::Signal(WorkerId_t workerId)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto group = GetGroup(workerId.first);
|
|
if (workerId.second == Async::kThreadIdAny)
|
|
{
|
|
for (auto &jobWorker : group->workers)
|
|
{
|
|
jobWorker.second->running->Set();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
GetThreadHandle(workerId)->running->Set();
|
|
}
|
|
}
|
|
|
|
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto a = GetThreadHandle(workerId);
|
|
if (!a)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return a->parent.lock()->asyncLoopSourceShared;
|
|
}
|
|
|
|
void ThreadPool::SyncAllSafe()
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
for (const auto &re : this->threads_)
|
|
{
|
|
for (auto &jobWorker : re.second->workers)
|
|
{
|
|
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
|
|
}
|
|
}
|
|
}
|
|
|
|
void ThreadPool::AddFeature(WorkerId_t id, AuSPtr<AuThreads::IThreadFeature> feature, bool async)
|
|
{
|
|
auto work = AuMakeShared<BasicWorkStdFunc>(([=]()
|
|
{
|
|
GetThreadState()->features.push_back(feature);
|
|
feature->Init();
|
|
}));
|
|
|
|
|
|
auto workItem = this->NewWorkItem(id, work, !async)->Dispatch();
|
|
SysAssert(workItem);
|
|
|
|
if (!async)
|
|
{
|
|
workItem->BlockUntilComplete();
|
|
}
|
|
}
|
|
|
|
void ThreadPool::AssertInThreadGroup(ThreadGroup_t group)
|
|
{
|
|
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
|
|
}
|
|
|
|
void ThreadPool::AssertWorker(WorkerId_t id)
|
|
{
|
|
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
|
|
}
|
|
|
|
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue()
|
|
{
|
|
return this->GetThreadState()->asyncLoop;
|
|
}
|
|
|
|
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue(WorkerId_t workerId)
|
|
{
|
|
auto worker = this->GetThreadHandle(workerId);
|
|
if (!worker)
|
|
{
|
|
SysPushErrorGen("Couldn't find requested worker");
|
|
return {};
|
|
}
|
|
return worker->asyncLoop;
|
|
}
|
|
|
|
void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
|
|
{
|
|
auto states = this->GetThreadHandles(workerId);
|
|
if (!states.size())
|
|
{
|
|
SysPushErrorGen("Couldn't find requested worker");
|
|
return;
|
|
}
|
|
|
|
for (const auto &state : states)
|
|
{
|
|
state->runMode = mode.mode;
|
|
if (mode.freqMsTick)
|
|
{
|
|
state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000);
|
|
}
|
|
}
|
|
}
|
|
|
|
ERunMode ThreadPool::GetCurrentThreadRunMode()
|
|
{
|
|
auto state = this->GetThreadState();
|
|
if (!state)
|
|
{
|
|
return ERunMode::eEfficient;
|
|
}
|
|
return state->runMode;
|
|
}
|
|
|
|
ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId)
|
|
{
|
|
auto worker = this->GetThreadHandle(workerId);
|
|
if (!worker)
|
|
{
|
|
SysPushErrorGen("Couldn't find requested worker");
|
|
return {};
|
|
}
|
|
return worker->runMode;
|
|
}
|
|
|
|
// Unimplemented fiber hooks, 'twas used for science
|
|
|
|
int ThreadPool::CtxPollPush()
|
|
{
|
|
// TOOD (Reece): implement a context switching library
|
|
// Refer to the old implementation of this on pastebin
|
|
return 0;
|
|
}
|
|
|
|
void ThreadPool::CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask)
|
|
{
|
|
}
|
|
|
|
bool ThreadPool::CtxYield()
|
|
{
|
|
bool ranAtLeastOne = false;
|
|
|
|
while (this->InternalRunOne(false))
|
|
{
|
|
ranAtLeastOne = true;
|
|
}
|
|
|
|
return ranAtLeastOne;
|
|
}
|
|
|
|
|
|
// internal api
|
|
|
|
bool ThreadPool::Spawn(WorkerId_t workerId, bool create)
|
|
{
|
|
AU_LOCK_GUARD(rwlock_->AsWritable());
|
|
|
|
if (GetCurrentWorkerPId().pool && create)
|
|
{
|
|
SysPushErrorGeneric("TODO (reece): add support for multiple runners per thread");
|
|
return {};
|
|
}
|
|
|
|
AuSPtr<GroupState> group;
|
|
|
|
// Try fetch or allocate group
|
|
{
|
|
AuSPtr<GroupState>* groupPtr;
|
|
if (!AuTryFind(this->threads_, workerId.first, groupPtr))
|
|
{
|
|
group = AuMakeShared<GroupState>();
|
|
|
|
if (!group->Init())
|
|
{
|
|
SysPushErrorMem("Not enough memory to intiialize a new group state");
|
|
return false;
|
|
}
|
|
|
|
if (!AuTryInsert(this->threads_, AuMakePair(workerId.first, group)))
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
group = *groupPtr;
|
|
}
|
|
}
|
|
|
|
// Assert worker does not already exist
|
|
{
|
|
AuSPtr<ThreadState>* ret;
|
|
|
|
if (AuTryFind(group->workers, workerId.second, ret))
|
|
{
|
|
SysPushErrorGen("Thread ID already exists");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
auto threadState = AuMakeShared<ThreadState>();
|
|
threadState->parent = group;
|
|
threadState->running = AuThreadPrimitives::EventUnique(true, false, true);
|
|
threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0);
|
|
threadState->id = workerId;
|
|
threadState->asyncLoop = AuStaticCast<AsyncLoop>(AuLoop::NewLoopQueue());
|
|
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
|
|
threadState->runMode = ERunMode::eEfficient;
|
|
|
|
if (!threadState->asyncLoop)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
if (!threadState->syncSema)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
if (!threadState->syncSema)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
threadState->asyncLoop->SourceAdd(group->eventLs);
|
|
|
|
if (!create)
|
|
{
|
|
threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo(
|
|
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)),
|
|
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
|
|
gRuntimeConfig.async.threadPoolDefaultStackSize
|
|
));
|
|
if (!threadState->threadObject)
|
|
{
|
|
return {};
|
|
}
|
|
threadState->threadObject->Run();
|
|
}
|
|
else
|
|
{
|
|
threadState->threadObject = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
|
|
|
|
// TODO: this is just a hack
|
|
// we should implement this properly
|
|
threadState->threadObject->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
|
|
{
|
|
|
|
}, []() -> void
|
|
{
|
|
auto pid = GetCurrentWorkerPId();
|
|
if (pid.pool)
|
|
{
|
|
GetWorkerInternal(pid.pool)->ThisExiting();
|
|
}
|
|
}));
|
|
|
|
//
|
|
gCurrentPool = AuWeakFromThis();
|
|
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
|
|
}
|
|
|
|
group->workers.insert(AuMakePair(workerId.second, threadState));
|
|
return true;
|
|
}
|
|
|
|
// private api
|
|
|
|
bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop)
|
|
{
|
|
auto self = GetThreadState();
|
|
if (!self)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
auto &semaphore = self->syncSema;
|
|
auto unsafeSemaphore = semaphore.get();
|
|
bool failed {};
|
|
|
|
auto work = AuMakeShared<AsyncFuncRunnable>(
|
|
[=]()
|
|
{
|
|
auto state = GetThreadState();
|
|
|
|
if (drop)
|
|
{
|
|
state->rejecting = true;
|
|
}
|
|
|
|
if (requireSignal)
|
|
{
|
|
state->running->Reset();
|
|
}
|
|
|
|
unsafeSemaphore->Unlock(1);
|
|
|
|
if (requireSignal)
|
|
{
|
|
state->running->Lock();
|
|
}
|
|
},
|
|
[&]()
|
|
{
|
|
unsafeSemaphore->Unlock(1);
|
|
failed = true;
|
|
}
|
|
);
|
|
|
|
if (!work)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
Run(workerId, work);
|
|
|
|
return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore), ms) && !failed;
|
|
}
|
|
|
|
void ThreadPool::Entrypoint(WorkerId_t id)
|
|
{
|
|
gCurrentPool = AuWeakFromThis();
|
|
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
|
|
|
|
auto job = GetThreadState();
|
|
|
|
Run();
|
|
|
|
if (id != WorkerId_t {0, 0})
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
if (!this->shuttingdown_ && !job->rejecting)
|
|
{
|
|
// Pump and barrier + reject all after atomically
|
|
Barrier(id, 0, false, true);
|
|
}
|
|
}
|
|
|
|
ThisExiting();
|
|
|
|
if (id == WorkerId_t {0, 0})
|
|
{
|
|
CleanWorkerPoolReservedZeroFree();
|
|
}
|
|
}
|
|
|
|
void ThreadPool::ThisExiting()
|
|
{
|
|
auto id = GetCurrentThread();
|
|
auto state = GetGroup(id.first);
|
|
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
|
|
|
auto itr = state->workers.find(id.second);
|
|
auto &jobWorker = itr->second;
|
|
|
|
CleanUpWorker(id);
|
|
|
|
// Abort scheduled tasks
|
|
TerminateSceduledTasks(this, id);
|
|
|
|
// Clean up thread features
|
|
// -> transferable TLS handles
|
|
// -> thread specific vms
|
|
// -> anything your brain wishes to imagination
|
|
for (const auto &thread : jobWorker->features)
|
|
{
|
|
try
|
|
{
|
|
thread->Cleanup();
|
|
}
|
|
catch (...)
|
|
{
|
|
AuLogWarn("Couldn't clean up thread feature!");
|
|
Debug::PrintError();
|
|
}
|
|
}
|
|
|
|
jobWorker->features.clear();
|
|
|
|
state->workers.erase(itr);
|
|
}
|
|
}
|
|
|
|
AuSPtr<GroupState> ThreadPool::GetGroup(ThreadGroup_t type)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
AuSPtr<GroupState>* ret;
|
|
if (!AuTryFind(this->threads_, type, ret))
|
|
{
|
|
return {};
|
|
}
|
|
return *ret;
|
|
}
|
|
|
|
AuSPtr<ThreadState> ThreadPool::GetThreadState()
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto thread = gCurrentPool.lock();
|
|
if (!thread)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
#if defined(AU_CFG_ID_INTERNAL) || defined(AU_CFG_ID_DEBUG)
|
|
if (thread.get() != this)
|
|
{
|
|
SysPushErrorGeneric("wrong thread");
|
|
return {};
|
|
}
|
|
#endif
|
|
|
|
auto worker = *tlsWorkerId;
|
|
|
|
auto state = GetGroup(worker.first);
|
|
if (!state)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return state->workers[worker.second];
|
|
}
|
|
|
|
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto group = GetGroup(id.first);
|
|
if (!group)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
AuSPtr<ThreadState> *ret;
|
|
if (!AuTryFind(group->workers, id.second, ret))
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return *ret;
|
|
}
|
|
|
|
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto group = GetGroup(id.first);
|
|
if (!group)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
AuList<AuSPtr<ThreadState>> ret;
|
|
if (id.second != Async::kThreadIdAny)
|
|
{
|
|
AuSPtr<ThreadState> *ptr;
|
|
if (!AuTryFind(group->workers, id.second, ptr))
|
|
{
|
|
return {};
|
|
}
|
|
ret.push_back(*ptr);
|
|
}
|
|
else
|
|
{
|
|
for (const auto &[key, value] : group->workers)
|
|
{
|
|
ret.push_back(value);
|
|
}
|
|
}
|
|
|
|
|
|
return ret;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IThreadPool> NewThreadPool()
|
|
{
|
|
// apps that don't require async shouldn't be burdened with the overhead of this litl spiner
|
|
StartSched();
|
|
return AuMakeShared<ThreadPool>();
|
|
}
|
|
} |