Reece
cf70f0d45c
The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
1073 lines
30 KiB
C++
1073 lines
30 KiB
C++
/***
|
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: ThreadPool.cpp
|
|
Date: 2021-10-30
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "Async.hpp"
|
|
#include "ThreadPool.hpp"
|
|
#include "WorkItem.hpp"
|
|
#include "Schedular.hpp"
|
|
|
|
namespace Aurora::Async
|
|
{
|
|
//STATIC_TLS(WorkerId_t, tlsWorkerId);
|
|
static thread_local AuWPtr<ThreadPool> gCurrentPool;
|
|
static const auto kMagicResortThreshold = 15;
|
|
|
|
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
|
|
{
|
|
auto lkPool = gCurrentPool.lock();
|
|
auto cpy = *lkPool->tlsWorkerId;
|
|
auto lkPool2 = cpy.pool.lock();
|
|
return WorkerPId_t(lkPool, cpy);
|
|
}
|
|
|
|
//
|
|
|
|
ThreadPool::ThreadPool()
|
|
{
|
|
this->rwlock_ = AuThreadPrimitives::RWLockUnique();
|
|
SysAssert(static_cast<bool>(this->rwlock_), "Couldn't initialize ThreadPool. Unable to allocate an RWLock");
|
|
}
|
|
|
|
// internal pool interface
|
|
|
|
bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
|
|
{
|
|
auto curThread = GetThreadState();
|
|
|
|
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
|
|
|
|
if ((unlocker.first == curThread->id.first) && // work group matches
|
|
(workerIdMatches)) // well, crap
|
|
{
|
|
|
|
bool queryAsync = false;
|
|
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
|
|
{
|
|
queryAsync = CtxYield();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
return Threading::WaitFor(primitive.get(), timeoutMs);
|
|
}
|
|
}
|
|
|
|
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
|
|
{
|
|
auto state = GetGroup(target.first);
|
|
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
|
|
|
|
IncrementTasksRunning();
|
|
|
|
{
|
|
AU_LOCK_GUARD(state->cvWorkMutex);
|
|
|
|
#if defined(STAGING) || defined(DEBUG)
|
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
|
|
|
if (target.second != Async::kThreadIdAny)
|
|
{
|
|
auto itr = state->workers.find(target.second);
|
|
if ((itr == state->workers.end()) || (itr->second->rejecting))
|
|
{
|
|
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second);
|
|
DecrementTasksRunning();
|
|
#if 0
|
|
throw "Requested job worker is offline";
|
|
#else
|
|
runnable->CancelAsync();
|
|
return;
|
|
#endif
|
|
}
|
|
}
|
|
else
|
|
{
|
|
auto workers = state->workers;
|
|
bool found = false;
|
|
|
|
for (const auto &worker : state->workers)
|
|
{
|
|
if (!worker.second->rejecting)
|
|
{
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found)
|
|
{
|
|
DecrementTasksRunning();
|
|
#if 0
|
|
throw "No workers available";
|
|
#else
|
|
runnable->CancelAsync();
|
|
return;
|
|
#endif
|
|
}
|
|
}
|
|
#endif
|
|
|
|
if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable)))
|
|
{
|
|
runnable->CancelAsync();
|
|
return;
|
|
}
|
|
|
|
state->dirty++;
|
|
if (state->dirty > kMagicResortThreshold)
|
|
{
|
|
state->dirty = 0;
|
|
state->sorted = false;
|
|
}
|
|
state->eventLs->Set();
|
|
}
|
|
if (target.second == Async::kThreadIdAny)
|
|
{
|
|
state->cvVariable->Signal();
|
|
}
|
|
else
|
|
{
|
|
// sad :(
|
|
// TODO: when we have wait any, add support (^ the trigger) for it here
|
|
state->cvVariable->Broadcast();
|
|
}
|
|
}
|
|
|
|
IThreadPool *ThreadPool::ToThreadPool()
|
|
{
|
|
return this;
|
|
}
|
|
|
|
void ThreadPool::IncrementTasksRunning()
|
|
{
|
|
this->tasksRunning_++;
|
|
}
|
|
|
|
void ThreadPool::DecrementTasksRunning()
|
|
{
|
|
if ((--this->tasksRunning_) == 0)
|
|
{
|
|
if (InRunnerMode())
|
|
{
|
|
Shutdown();
|
|
}
|
|
}
|
|
}
|
|
|
|
// ithreadpool
|
|
|
|
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
return GetGroup(group)->workers.size();
|
|
}
|
|
|
|
void ThreadPool::SetRunningMode(bool eventRunning)
|
|
{
|
|
this->runnersRunning_ = eventRunning;
|
|
}
|
|
|
|
bool ThreadPool::Spawn(WorkerId_t workerId)
|
|
{
|
|
return Spawn(workerId, false);
|
|
}
|
|
|
|
bool ThreadPool::Create(WorkerId_t workerId)
|
|
{
|
|
return Spawn(workerId, true);
|
|
}
|
|
|
|
bool ThreadPool::InRunnerMode()
|
|
{
|
|
return this->runnersRunning_;
|
|
}
|
|
|
|
bool ThreadPool::Poll()
|
|
{
|
|
return InternalRunOne(false);
|
|
}
|
|
|
|
bool ThreadPool::RunOnce()
|
|
{
|
|
return InternalRunOne(true);
|
|
}
|
|
|
|
bool ThreadPool::Run()
|
|
{
|
|
bool ranOnce {};
|
|
|
|
auto auThread = AuThreads::GetThread();
|
|
auto job = GetThreadState();
|
|
|
|
while ((!auThread->Exiting()) && (!job->shuttingdown))
|
|
{
|
|
// Do work (blocking)
|
|
InternalRunOne(true);
|
|
ranOnce = true;
|
|
}
|
|
|
|
return ranOnce;
|
|
}
|
|
|
|
bool ThreadPool::InternalRunOne(bool block)
|
|
{
|
|
auto state = GetThreadState();
|
|
bool success {};
|
|
|
|
do
|
|
{
|
|
if (state->inLoopSourceMode)
|
|
{
|
|
success = PollLoopSource(block);
|
|
}
|
|
else
|
|
{
|
|
success = PollInternal(block);
|
|
success |= state->inLoopSourceMode;
|
|
}
|
|
} while (success);
|
|
|
|
return success;
|
|
}
|
|
|
|
bool ThreadPool::PollInternal(bool block)
|
|
{
|
|
auto state = GetThreadState();
|
|
auto group = state->parent.lock();
|
|
|
|
//state->pendingWorkItems.clear();
|
|
|
|
auto magic = CtxPollPush();
|
|
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
|
|
// TODO: reimplement this
|
|
// this is stupid and gross
|
|
if (group->workQueue.size() > 2)
|
|
{
|
|
if (!group->sorted)
|
|
{
|
|
auto cpy = group->workQueue;
|
|
|
|
std::sort(group->workQueue.begin(), group->workQueue.end(), [&](const WorkEntry_t &a, const WorkEntry_t &b)
|
|
{
|
|
if (a.second->GetPrio() != b.second->GetPrio())
|
|
return a.second->GetPrio() > b.second->GetPrio();
|
|
|
|
AuUInt32 ia {}, ib {};
|
|
for (; ia < cpy.size(); ia++)
|
|
if (cpy[ia].second == a.second)
|
|
break;
|
|
|
|
for (; ib < cpy.size(); ib++)
|
|
if (cpy[ib].second == b.second)
|
|
break;
|
|
|
|
return ia < ib;
|
|
});
|
|
|
|
group->sorted = true;
|
|
group->dirty = 0;
|
|
}
|
|
}
|
|
|
|
do
|
|
{
|
|
// Deque tasks the current thread runner could dipatch
|
|
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
|
|
// It's probable `multipopCount` will equal 1 for your use case
|
|
//
|
|
// Only increment when you know tasks within a group queue will not depend on one another
|
|
// *and* tasks require a small amount of execution time
|
|
//
|
|
// This could be potentially useful for an event dispatcher whereby you're dispatching
|
|
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
|
|
// is a waste of CPU cycles.
|
|
//
|
|
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
|
|
// thread group waits
|
|
for (auto itr = group->workQueue.begin();
|
|
((itr != group->workQueue.end()) &&
|
|
(state->pendingWorkItems.size() < state->multipopCount));
|
|
)
|
|
{
|
|
// TODO: catch low memory condition
|
|
if (itr->first == Async::kThreadIdAny)
|
|
{
|
|
state->pendingWorkItems.push_back(*itr);
|
|
itr = group->workQueue.erase(itr);
|
|
continue;
|
|
}
|
|
|
|
if ((itr->first != Async::kThreadIdAny) && (itr->first == state->id.second))
|
|
{
|
|
state->pendingWorkItems.push_back(*itr);
|
|
itr = group->workQueue.erase(itr);
|
|
continue;
|
|
}
|
|
|
|
itr++;
|
|
}
|
|
|
|
// Consider blocking for more work
|
|
if (!block)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// pre-wakeup thread terminating check
|
|
if (state->threadObject->Exiting() || state->shuttingdown)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// Block if no work items are present
|
|
if (state->pendingWorkItems.empty())
|
|
{
|
|
group->cvVariable->WaitForSignal();
|
|
}
|
|
|
|
// Post-wakeup thread terminating check
|
|
if (state->threadObject->Exiting() || state->shuttingdown)
|
|
{
|
|
break;
|
|
}
|
|
|
|
} while (state->pendingWorkItems.empty());
|
|
|
|
if (group->workQueue.empty())
|
|
{
|
|
group->eventLs->Reset();
|
|
}
|
|
}
|
|
|
|
if (state->pendingWorkItems.empty())
|
|
{
|
|
CtxPollReturn(state, magic, false);
|
|
return false;
|
|
}
|
|
|
|
int runningTasks {};
|
|
|
|
auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis());
|
|
|
|
bool lowPrioCont {};
|
|
bool lowPrioContCached {};
|
|
|
|
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
|
|
{
|
|
if (state->threadObject->Exiting() || state->shuttingdown)
|
|
{
|
|
break;
|
|
}
|
|
|
|
// Set the last frame time for a watchdog later down the line
|
|
state->lastFrameTime = Time::CurrentClockMS();
|
|
|
|
if (itr->second->GetPrio() < 0.25)
|
|
{
|
|
if (lowPrioCont) continue;
|
|
|
|
if (!lowPrioContCached)
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
{
|
|
for (const auto &[pendingWorkA, pendingWorkB] : group->workQueue)
|
|
{
|
|
if (pendingWorkB->GetPrio() > .5)
|
|
{
|
|
lowPrioCont = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
lowPrioContCached = true;
|
|
if (lowPrioCont) continue;
|
|
}
|
|
}
|
|
|
|
// Dispatch
|
|
itr->second->RunAsync();
|
|
|
|
// Remove from our local job queue
|
|
itr = state->pendingWorkItems.erase(itr);
|
|
|
|
// Atomically decrement global task counter
|
|
runningTasks = this->tasksRunning_.fetch_sub(1) - 1;
|
|
}
|
|
|
|
gCurrentPool = oldTlsHandle;
|
|
|
|
// Return popped work back to the groups work pool when our -pump loops were preempted
|
|
if (state->pendingWorkItems.size())
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
// TODO: low memory condition slow path
|
|
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
|
|
group->eventLs->Set();
|
|
state->pendingWorkItems.clear();
|
|
}
|
|
|
|
CtxPollReturn(state, magic, true);
|
|
|
|
if (InRunnerMode())
|
|
{
|
|
if (runningTasks == 0)
|
|
{
|
|
Shutdown();
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool ThreadPool::PollLoopSource(bool block)
|
|
{
|
|
auto state = GetThreadState();
|
|
auto group = state->parent.lock();
|
|
|
|
//state->pendingWorkItems.clear();
|
|
|
|
auto magic = CtxPollPush();
|
|
bool retValue {};
|
|
|
|
// TODO (reece): This function isn't very efficient
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
|
|
AuList<AsyncAppWaitSourceRequest> curLoopReq = state->loopSources;
|
|
AuList<AuSPtr<Loop::ILoopSource>> curLoopSources;
|
|
|
|
auto lenLoopReqs = curLoopReq.size();
|
|
|
|
curLoopSources.resize(lenLoopReqs + 1);
|
|
|
|
for (auto i = 0; i < lenLoopReqs; i++)
|
|
{
|
|
curLoopSources[i] = curLoopReq[i].loopSource;
|
|
}
|
|
|
|
curLoopSources[lenLoopReqs] = group->eventLs;
|
|
|
|
AuList<AuSPtr<Loop::ILoopSource>> nextLoopSources;
|
|
if (block)
|
|
{
|
|
// TODO (reece): work on async epoll like abstraction
|
|
nextLoopSources = Loop::WaitMultipleOrObjects(curLoopSources, 0);
|
|
}
|
|
else
|
|
{
|
|
nextLoopSources.reserve(curLoopSources.size());
|
|
for (const auto &source : curLoopSources)
|
|
{
|
|
if (source->IsSignaled())
|
|
{
|
|
nextLoopSources.push_back(source);
|
|
}
|
|
}
|
|
}
|
|
|
|
auto time = Time::CurrentClockMS();
|
|
|
|
state->loopSources.clear();
|
|
state->loopSources.reserve(curLoopReq.size());
|
|
|
|
if (AuExists(nextLoopSources, group->eventLs))
|
|
{
|
|
PollInternal(false);
|
|
}
|
|
|
|
for (const auto &request : curLoopReq)
|
|
{
|
|
bool remove {};
|
|
bool removeType {};
|
|
|
|
if (AuExists(nextLoopSources, request.loopSource))
|
|
{
|
|
remove = true;
|
|
removeType = true;
|
|
}
|
|
else
|
|
{
|
|
if (request.requestedOffset)
|
|
{
|
|
if (request.endTime < time)
|
|
{
|
|
remove = true;
|
|
removeType = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!remove)
|
|
{
|
|
state->loopSources.push_back(request);
|
|
}
|
|
else
|
|
{
|
|
request.callback(request.loopSource, removeType);
|
|
retValue |= removeType;
|
|
}
|
|
}
|
|
|
|
state->inLoopSourceMode = state->loopSources.size();
|
|
}
|
|
|
|
return retValue;
|
|
}
|
|
|
|
|
|
void ThreadPool::Shutdown()
|
|
{
|
|
// Nested shutdowns can happen; prevent a write lock
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
if (this->shuttingdown_)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Set shutdown flag
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
|
if (AuExchange(this->shuttingdown_, true))
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Noting
|
|
// 1) that StopSched may lockup under a writable lock
|
|
// -> we will terminate a thread that may be dispatching a sys pump event
|
|
// 2) that barrier doesn't need to be under a write lock
|
|
//
|
|
// Perform the following shutdown of the schedular and other available threads under a read lock
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
StopSched();
|
|
|
|
for (auto &[groupId, group] : this->threads_)
|
|
{
|
|
for (auto &[id, worker] : group->workers)
|
|
{
|
|
Barrier(worker->id, 0, false, true);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Finally set the shutdown flag on all of our thread contexts
|
|
// then release them from the runners/workers list
|
|
// then release all group contexts
|
|
AuList<AuThreads::ThreadShared_t> threads;
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
|
|
|
for (auto &[groupId, group] : this->threads_)
|
|
{
|
|
for (auto &[id, worker] : group->workers)
|
|
{
|
|
if (group->cvWorkMutex && group->cvVariable)
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
worker->shuttingdown = true;
|
|
group->cvVariable->Broadcast();
|
|
}
|
|
else
|
|
{
|
|
worker->shuttingdown = true;
|
|
}
|
|
|
|
if (groupId != 0)
|
|
{
|
|
worker->threadObject->SendExitSignal();
|
|
threads.push_back(worker->threadObject);
|
|
}
|
|
|
|
auto &event = worker->running;
|
|
if (event)
|
|
{
|
|
event->Set();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
|
|
for (const auto &thread : threads)
|
|
{
|
|
thread->Exit();
|
|
}
|
|
}
|
|
|
|
bool ThreadPool::Exiting()
|
|
{
|
|
return this->shuttingdown_ || GetThreadState()->exiting;
|
|
}
|
|
|
|
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
|
|
{
|
|
if (!task)
|
|
{
|
|
return {};
|
|
}
|
|
return AuMakeShared<WorkItem>(this, worker, task, supportsBlocking);
|
|
}
|
|
|
|
AuSPtr<IWorkItem> ThreadPool::NewFence()
|
|
{
|
|
return AuMakeShared<WorkItem>(this, WorkerId_t{}, AuSPtr<IWorkItemHandler>{}, true);
|
|
}
|
|
|
|
AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id)
|
|
{
|
|
return GetThreadHandle(id)->threadObject;
|
|
}
|
|
|
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
|
|
{
|
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
|
|
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
|
|
|
|
for (const auto &group : this->threads_)
|
|
{
|
|
AuList<ThreadId_t> workers;
|
|
|
|
for (const auto &thread : group.second->workers)
|
|
{
|
|
workers.push_back(thread.second->id.second);
|
|
}
|
|
|
|
ret[group.first] = workers;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
WorkerId_t ThreadPool::GetCurrentThread()
|
|
{
|
|
return tlsWorkerId;
|
|
}
|
|
|
|
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto group = GetGroup(workerId.first);
|
|
auto currentWorkerId = GetCurrentThread().second;
|
|
|
|
if (workerId.second == Async::kThreadIdAny)
|
|
{
|
|
for (auto &jobWorker : group->workers)
|
|
{
|
|
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
return Barrier(workerId, timeoutMs, requireSignal && workerId.second != currentWorkerId, false);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void ThreadPool::Signal(WorkerId_t workerId)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto group = GetGroup(workerId.first);
|
|
if (workerId.second == Async::kThreadIdAny)
|
|
{
|
|
for (auto &jobWorker : group->workers)
|
|
{
|
|
jobWorker.second->running->Set();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
GetThreadHandle(workerId)->running->Set();
|
|
}
|
|
}
|
|
|
|
AuSPtr<Loop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto a = GetThreadHandle(workerId);
|
|
if (!a)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return a->parent.lock()->asyncLoopSourceShared;
|
|
}
|
|
|
|
void ThreadPool::SyncAllSafe()
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
for (const auto &re : this->threads_)
|
|
{
|
|
for (auto &jobWorker : re.second->workers)
|
|
{
|
|
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
|
|
}
|
|
}
|
|
}
|
|
|
|
void ThreadPool::AddFeature(WorkerId_t id, AuSPtr<AuThreads::IThreadFeature> feature, bool async)
|
|
{
|
|
auto work = AuMakeShared<BasicWorkStdFunc>(([=]()
|
|
{
|
|
GetThreadState()->features.push_back(feature);
|
|
feature->Init();
|
|
}));
|
|
|
|
auto workItem = this->NewWorkItem(id, work, !async)->Dispatch();
|
|
|
|
if (!async)
|
|
{
|
|
workItem->BlockUntilComplete();
|
|
}
|
|
}
|
|
|
|
void ThreadPool::AssertInThreadGroup(ThreadGroup_t group)
|
|
{
|
|
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
|
|
}
|
|
|
|
void ThreadPool::AssertWorker(WorkerId_t id)
|
|
{
|
|
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
|
|
}
|
|
|
|
bool ThreadPool::ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback)
|
|
{
|
|
auto thread = this->GetThreadHandle(workerId);
|
|
if (!thread)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
auto group = thread->parent.lock();
|
|
|
|
{
|
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
|
|
|
AsyncAppWaitSourceRequest req {};
|
|
req.startTime = Time::CurrentClockMS();
|
|
if (timeout)
|
|
{
|
|
req.requestedOffset = timeout;
|
|
req.endTime = req.startTime + timeout;
|
|
}
|
|
req.loopSource = loopSource;
|
|
req.callback = callback;
|
|
|
|
if (!AuTryInsert(thread->loopSources, req))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
thread->inLoopSourceMode = thread->loopSources.size();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
// Unimplemented fiber hooks, 'twas used for science
|
|
|
|
int ThreadPool::CtxPollPush()
|
|
{
|
|
// TOOD (Reece): implement a context switching library
|
|
// Refer to the old implementation of this on pastebin
|
|
return 0;
|
|
}
|
|
|
|
void ThreadPool::CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask)
|
|
{
|
|
}
|
|
|
|
bool ThreadPool::CtxYield()
|
|
{
|
|
bool ranAtLeastOne = false;
|
|
|
|
while (this->InternalRunOne(false))
|
|
{
|
|
ranAtLeastOne = true;
|
|
}
|
|
|
|
return ranAtLeastOne;
|
|
}
|
|
|
|
|
|
// internal api
|
|
|
|
bool ThreadPool::Spawn(WorkerId_t workerId, bool create)
|
|
{
|
|
AU_LOCK_GUARD(rwlock_->AsWritable());
|
|
|
|
if (GetCurrentWorkerPId().pool && create)
|
|
{
|
|
SysPushErrorGeneric("TODO (reece): add support for multiple runners per thread");
|
|
return {};
|
|
}
|
|
|
|
AuSPtr<GroupState> group;
|
|
|
|
// Try fetch or allocate group
|
|
{
|
|
AuSPtr<GroupState>* groupPtr;
|
|
if (!AuTryFind(this->threads_, workerId.first, groupPtr))
|
|
{
|
|
group = AuMakeShared<GroupState>();
|
|
|
|
if (!group->Init())
|
|
{
|
|
SysPushErrorMem("Not enough memory to intiialize a new group state");
|
|
return false;
|
|
}
|
|
|
|
if (!AuTryInsert(this->threads_, AuMakePair(workerId.first, group)))
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
group = *groupPtr;
|
|
}
|
|
}
|
|
|
|
// Assert worker does not already exist
|
|
{
|
|
AuSPtr<ThreadState>* ret;
|
|
|
|
if (AuTryFind(group->workers, workerId.second, ret))
|
|
{
|
|
SysPushErrorGen("Thread ID already exists");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
auto threadState = AuMakeShared<ThreadState>();
|
|
threadState->parent = group;
|
|
threadState->running = AuThreadPrimitives::EventUnique(true, false, true);
|
|
threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0);
|
|
threadState->id = workerId;
|
|
//threadState->eventDriven = runner;
|
|
|
|
if (!create)
|
|
{
|
|
threadState->threadObject = AuThreads::ThreadUnique(AuThreads::ThreadInfo(
|
|
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)),
|
|
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
|
|
gRuntimeConfig.async.threadPoolDefaultStackSize
|
|
));
|
|
if (!threadState->threadObject)
|
|
{
|
|
return {};
|
|
}
|
|
threadState->threadObject->Run();
|
|
}
|
|
else
|
|
{
|
|
threadState->threadObject = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
|
|
|
|
// TODO: this is just a hack
|
|
// we should implement this properly
|
|
threadState->threadObject->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
|
|
{
|
|
|
|
}, []() -> void
|
|
{
|
|
auto pid = GetCurrentWorkerPId();
|
|
if (pid.pool)
|
|
{
|
|
AuStaticPointerCast<ThreadPool>(pid.pool)->ThisExiting();
|
|
}
|
|
}));
|
|
|
|
//
|
|
gCurrentPool = AuWeakFromThis();
|
|
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
|
|
}
|
|
|
|
group->workers.insert(AuMakePair(workerId.second, threadState));
|
|
return true;
|
|
}
|
|
|
|
// private api
|
|
|
|
bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop)
|
|
{
|
|
// TODO: barrier multiple
|
|
auto &semaphore = GetThreadState()->syncSema;
|
|
auto unsafeSemaphore = semaphore.get();
|
|
bool failed {};
|
|
|
|
auto work = AuMakeShared<AsyncFuncRunnable>(
|
|
[=]()
|
|
{
|
|
auto state = GetThreadState();
|
|
|
|
if (drop)
|
|
{
|
|
state->rejecting = true;
|
|
}
|
|
|
|
if (requireSignal)
|
|
{
|
|
state->running->Reset();
|
|
}
|
|
|
|
unsafeSemaphore->Unlock(1);
|
|
|
|
if (requireSignal)
|
|
{
|
|
state->running->Lock();
|
|
}
|
|
},
|
|
[&]()
|
|
{
|
|
unsafeSemaphore->Unlock(1);
|
|
failed = true;
|
|
}
|
|
);
|
|
|
|
if (!work)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
Run(workerId, work);
|
|
|
|
return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore), ms) && !failed;
|
|
}
|
|
|
|
void ThreadPool::Entrypoint(WorkerId_t id)
|
|
{
|
|
gCurrentPool = AuWeakFromThis();
|
|
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
|
|
|
|
auto job = GetThreadState();
|
|
|
|
Run();
|
|
|
|
if (id != WorkerId_t {0, 0})
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
if (!this->shuttingdown_ && !job->rejecting)
|
|
{
|
|
// Pump and barrier + reject all after atomically
|
|
Barrier(id, 0, false, true);
|
|
}
|
|
}
|
|
|
|
ThisExiting();
|
|
|
|
if (id == WorkerId_t {0, 0})
|
|
{
|
|
CleanWorkerPoolReservedZeroFree();
|
|
}
|
|
}
|
|
|
|
void ThreadPool::ThisExiting()
|
|
{
|
|
auto id = GetCurrentThread();
|
|
auto state = GetGroup(id.first);
|
|
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
|
|
|
auto itr = state->workers.find(id.second);
|
|
auto &jobWorker = itr->second;
|
|
|
|
CleanUpWorker(id);
|
|
|
|
// Abort scheduled tasks
|
|
TerminateSceduledTasks(this, id);
|
|
|
|
// Clean up thread features
|
|
// -> transferable TLS handles
|
|
// -> thread specific vms
|
|
// -> anything your brain wishes to imagination
|
|
for (const auto &thread : jobWorker->features)
|
|
{
|
|
try
|
|
{
|
|
thread->Cleanup();
|
|
}
|
|
catch (...)
|
|
{
|
|
AuLogWarn("Couldn't clean up thread feature!");
|
|
Debug::PrintError();
|
|
}
|
|
}
|
|
|
|
jobWorker->features.clear();
|
|
|
|
state->workers.erase(itr);
|
|
}
|
|
}
|
|
|
|
AuSPtr<GroupState> ThreadPool::GetGroup(ThreadGroup_t type)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
AuSPtr<GroupState>* ret;
|
|
if (!AuTryFind(this->threads_, type, ret))
|
|
{
|
|
return {};
|
|
}
|
|
return *ret;
|
|
}
|
|
|
|
AuSPtr<ThreadState> ThreadPool::GetThreadState()
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
auto id = GetCurrentThread();
|
|
auto state = GetGroup(id.first);
|
|
return state->workers[id.second];
|
|
}
|
|
|
|
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
|
|
{
|
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
|
|
|
auto group = GetGroup(id.first);
|
|
if (!group)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
AuSPtr<ThreadState> *ret;
|
|
if (!AuTryFind(group->workers, id.second, ret))
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return *ret;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IThreadPool> NewThreadPool()
|
|
{
|
|
// apps that don't require async shouldn't be burdened with the overhead of this litl spiner
|
|
StartSched();
|
|
return AuMakeShared<ThreadPool>();
|
|
}
|
|
} |