AuroraRuntime/Source/Async/ThreadPool.cpp

1564 lines
43 KiB
C++
Raw Normal View History

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ThreadPool.cpp
Date: 2021-10-30
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "ThreadPool.hpp"
#include "AsyncApp.hpp"
#include "WorkItem.hpp"
#include "Schedular.hpp"
2022-03-10 15:35:01 +00:00
#include "ThreadWorkerQueueShim.hpp"
namespace Aurora::Async
{
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static thread_local AuWPtr<ThreadPool> gCurrentPool;
static const auto kMagicResortThreshold = 15;
static thread_local int tlsCallStack;
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
{
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
{
return AuUnsafeRaiiToShared(AuStaticCast<ThreadPool>(gAsyncApp));
}
return AuStaticPointerCast<ThreadPool>(pool);
}
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
{
auto lkPool = gCurrentPool.lock();
if (!lkPool) return {};
auto cpy = *lkPool->tlsWorkerId;
auto lkPool2 = cpy.pool.lock();
return WorkerPId_t(lkPool, cpy);
}
//
ThreadPool::ThreadPool()
{
this->rwlock_ = AuThreadPrimitives::RWLockUnique();
SysAssert(static_cast<bool>(this->rwlock_), "Couldn't initialize ThreadPool. Unable to allocate an RWLock");
}
// internal pool interface
bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
return WaitFor(WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, unlocker }, primitive, timeoutMs);
}
bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
auto curThread = GetThreadState();
if (!curThread)
{
return Threading::WaitFor(primitive.get(), timeoutMs);
}
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == curThread->id.first) &&
(unlocker.pool.get() == this) && // work group matches
(workerIdMatches)) // well, crap
{
bool queryAsync = false;
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
{
queryAsync = CtxYield();
if (!queryAsync && this->shuttingdown_)
{
return false;
}
}
return true;
}
else
{
AuSPtr<ThreadState> pHandle;
{
AU_LOCK_GUARD(AuStaticCast<ThreadPool>(unlocker.pool)->rwlock_->AsReadable());
if (pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker))
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
if (pHandle->exitingflag2)
{
return primitive->TryLock();
}
else
{
pHandle->externalFences.push_back(primitive.get());
}
}
else if (unlocker.pool.get() == this)
{
return primitive->TryLock();
}
}
bool bRet = Threading::WaitFor(primitive.get(), timeoutMs);
if (pHandle)
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
AuTryRemove(pHandle->externalFences, primitive.get());
}
return bRet;
}
}
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
{
auto state = GetGroup(target.first);
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
IncrementTasksRunning();
{
AU_LOCK_GUARD(state->cvWorkMutex);
#if defined(STAGING) || defined(DEBUG)
AU_LOCK_GUARD(rwlock_->AsReadable());
if (target.second != Async::kThreadIdAny)
{
auto itr = state->workers.find(target.second);
if ((itr == state->workers.end()) || (itr->second->rejecting))
{
SysPushErrorGeneric("worker: {}:{} is offline", target.first, target.second);
DecrementTasksRunning();
#if 0
throw "Requested job worker is offline";
#else
runnable->CancelAsync();
return;
#endif
}
}
else
{
auto workers = state->workers;
bool found = false;
for (const auto &worker : state->workers)
{
if (!worker.second->rejecting)
{
found = true;
break;
}
}
if (!found)
{
DecrementTasksRunning();
#if 0
throw "No workers available";
#else
runnable->CancelAsync();
return;
#endif
}
}
#endif
if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable)))
{
DecrementTasksRunning();
runnable->CancelAsync();
return;
}
state->dirty++;
if (state->dirty > kMagicResortThreshold)
{
state->dirty = 0;
state->sorted = false;
}
state->eventLs->Set();
}
if (target.second == Async::kThreadIdAny)
{
state->cvVariable->Signal();
}
else
{
// sad :(
// TODO: when we have wait any, add support (^ the trigger) for it here
state->cvVariable->Broadcast();
}
}
IThreadPool *ThreadPool::ToThreadPool()
{
return this;
}
void ThreadPool::IncrementTasksRunning()
{
this->tasksRunning_++;
}
void ThreadPool::DecrementTasksRunning()
{
if ((--this->tasksRunning_) == 0)
{
if (InRunnerMode())
{
Shutdown();
}
}
}
// ithreadpool
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD(this->rwlock_->AsReadable());
return GetGroup(group)->workers.size();
}
void ThreadPool::SetRunningMode(bool eventRunning)
{
this->runnersRunning_ = eventRunning;
}
bool ThreadPool::Spawn(WorkerId_t workerId)
{
return Spawn(workerId, false);
}
bool ThreadPool::Create(WorkerId_t workerId)
{
return Spawn(workerId, true);
}
bool ThreadPool::InRunnerMode()
{
return this->runnersRunning_;
}
bool ThreadPool::Poll()
{
AuUInt32 uCount {};
return InternalRunOne(false, uCount);
}
bool ThreadPool::RunOnce()
{
AuUInt32 uCount {};
return InternalRunOne(true, uCount);
}
bool ThreadPool::Run()
{
bool ranOnce {};
gCurrentPool = AuWeakFromThis();
auto auThread = AuThreads::GetThread();
auto job = GetThreadState();
if (!job)
{
SysPushErrorUninitialized("Not an async thread");
}
while ((!auThread->Exiting()) &&
(!this->shutdown) &&
(!job->bBreakEarly))
{
AuUInt32 uCount {};
// Do work (blocking)
if (!InternalRunOne(true, uCount))
{
if (this->shutdown)
{
return ranOnce;
}
}
ranOnce = true;
}
return ranOnce;
}
bool ThreadPool::InternalRunOne(bool block, AuUInt32 &uCount)
{
auto state = GetThreadState();
if (!state)
{
SysPushErrorUninitialized("Not an async thread");
}
bool success {};
2022-03-10 15:35:01 +00:00
auto runMode = GetCurrentThreadRunMode();
EarlyExitTick();
2022-03-10 15:35:01 +00:00
//do
{
2022-03-10 15:35:01 +00:00
auto asyncLoop = state->asyncLoop;
asyncLoop->OnFrame();
if (asyncLoop->GetSourceCount() > 1)
{
2022-03-10 15:35:01 +00:00
bool bShouldTrySleepForKernel {};
if (runMode == ERunMode::eLowLatencyFreqKernel)
{
if (state->rateLimiter.CheckExchangePass())
{
#if defined(AURORA_PLATFORM_WIN32)
bShouldTrySleepForKernel = asyncLoop->PumpNonblocking();
#else
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
#endif
2022-03-10 15:35:01 +00:00
}
else
{
if (!PollInternal(false, uCount))
2022-03-10 15:35:01 +00:00
{
AuThreading::ContextYield();
}
else
{
success = true;
}
}
}
else if (runMode == ERunMode::eLowLatencyYield)
{
AuThreading::ContextYield();
block = false;
#if defined(AURORA_PLATFORM_WIN32)
bShouldTrySleepForKernel = asyncLoop->PumpNonblocking();
#else
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
#endif
2022-03-10 15:35:01 +00:00
}
else if (runMode == ERunMode::eEfficient)
{
bShouldTrySleepForKernel = block;
if (!block)
{
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
2022-03-10 15:35:01 +00:00
}
}
if (bShouldTrySleepForKernel
// epoll and such like can be checked without read success. kevent works on availablity, not scheduling read like iosubmit
// allow windows to atomically pump instead of wasting time buffering the primitives state
&& ((AuBuild::kIsNtDerived && runMode == ERunMode::eEfficient) ||
(!AuBuild::kIsNtDerived))
&& (asyncLoop->WaitAny(0))
)
2022-03-10 15:35:01 +00:00
{
success = PollInternal(false, uCount);
2022-03-10 15:35:01 +00:00
}
else
{
success |= PollInternal(false, uCount);
2022-03-10 15:35:01 +00:00
}
}
else
{
success = PollInternal(block, uCount);
}
} //while (success);
EarlyExitTick();
return success;
}
bool ThreadPool::PollInternal(bool block, AuUInt32 &uCount)
{
auto state = GetThreadState();
if (!state)
{
SysPushErrorUninitialized("Not an async thread");
}
auto group = state->parent.lock();
//state->pendingWorkItems.clear();
auto magic = CtxPollPush();
{
AU_LOCK_GUARD(group->cvWorkMutex);
// TODO: reimplement this
// this is stupid and gross
2022-03-10 15:35:01 +00:00
if (group->workQueue.size() > group->workers.size()*3)
{
if (!group->sorted)
{
auto cpy = group->workQueue;
std::sort(group->workQueue.begin(), group->workQueue.end(), [&](const WorkEntry_t &a, const WorkEntry_t &b)
{
if (a.second->GetPrio() != b.second->GetPrio())
return a.second->GetPrio() > b.second->GetPrio();
AuUInt32 ia {}, ib {};
for (; ia < cpy.size(); ia++)
if (cpy[ia].second == a.second)
break;
for (; ib < cpy.size(); ib++)
if (cpy[ib].second == b.second)
break;
return ia < ib;
});
group->sorted = true;
group->dirty = 0;
}
}
do
{
// Deque tasks the current thread runner could dipatch
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
// It's probable `multipopCount` will equal 1 for your use case
//
// Only increment when you know tasks within a group queue will not depend on one another
// *and* tasks require a small amount of execution time
//
// This could be potentially useful for an event dispatcher whereby you're dispatching
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
// is a waste of CPU cycles.
//
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
// thread group waits
for (auto itr = group->workQueue.begin();
((itr != group->workQueue.end()) &&
(state->pendingWorkItems.size() < state->multipopCount));
)
{
// TODO: catch low memory condition
if (itr->first == Async::kThreadIdAny)
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) && (itr->first == state->id.second))
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
itr++;
}
// Consider blocking for more work
if (!block)
{
break;
}
// pre-wakeup thread terminating check
if (state->threadObject->Exiting())
{
break;
}
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
if (this->shuttingdown_)
{
//this->EarlyExitTick();
break;
}
group->cvVariable->WaitForSignal();
if (this->shuttingdown_)
{
//this->EarlyExitTick();
break;
}
}
// Post-wakeup thread terminating check
if (state->threadObject->Exiting())
{
break;
}
if (state->pendingWorkItems.empty() && (
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
{
return false;
}
} while (state->pendingWorkItems.empty() && block);
if (group->workQueue.empty())
{
group->eventLs->Reset();
}
}
if (state->pendingWorkItems.empty())
{
CtxPollReturn(state, magic, false);
return false;
}
int runningTasks {};
2022-01-19 17:08:13 +00:00
auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis());
bool lowPrioCont {};
bool lowPrioContCached {};
state->cookie++;
int start = state->cookie;
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the first task (or deeper)
if (InRunnerMode() && tlsCallStack) // are we one call deep?
{
auto queue = ToKernelWorkQueue();
if ((this->tasksRunning_ == tlsCallStack) &&
(!queue || queue->GetSourceCount() <= 1))
{
return false;
}
}
//
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
if (state->threadObject->Exiting() || this->shutdown)
{
break;
}
// Set the last frame time for a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS();
if (itr->second->GetPrio() < 0.25)
{
2022-03-10 15:35:01 +00:00
group->sorted = false;
if (lowPrioCont)
{
itr++;
continue;
}
if (!lowPrioContCached)
{
AU_LOCK_GUARD(group->cvWorkMutex);
{
for (const auto &[pendingWorkA, pendingWorkB] : group->workQueue)
{
if (pendingWorkB->GetPrio() > .5)
{
lowPrioCont = true;
break;
}
}
}
lowPrioContCached = true;
if (lowPrioCont)
{
itr++;
continue;
}
}
}
// Dispatch
2022-05-19 03:07:10 +00:00
auto oops = itr->second;
// Remove from our local job queue
itr = state->pendingWorkItems.erase(itr);
tlsCallStack++;
//SysBenchmark(fmt::format("RunAsync: {}", block));
2022-05-19 03:07:10 +00:00
// Dispatch
oops->RunAsync();
uCount++;
2022-05-19 03:07:10 +00:00
// Atomically decrement global task counter
runningTasks = this->tasksRunning_.fetch_sub(1) - 1;
tlsCallStack--;
if (start != state->cookie)
{
start = state->cookie;
itr = state->pendingWorkItems.begin();
}
}
gCurrentPool = oldTlsHandle;
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
AU_LOCK_GUARD(group->cvWorkMutex);
// TODO: low memory condition slow path
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
group->eventLs->Set();
state->pendingWorkItems.clear();
}
CtxPollReturn(state, magic, true);
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the top most task
if (InRunnerMode())
{
auto queue = ToKernelWorkQueue();
if ((runningTasks == 0) &&
(this->tasksRunning_ == 0 ) &&
(!queue || queue->GetSourceCount() <= 1))
{
Shutdown();
}
}
return true;
}
void ThreadPool::Shutdown()
{
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
// Update shutting down flag
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down soon [microseconds, probably]'
{
if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
{
return;
}
}
auto pLocalRunner = this->GetThreadStateNoWarn();
AuList<WorkerId_t> toBarrier;
// Noting
// 1) that StopSched may lockup under a writable lock
// -> we will terminate a thread that may be dispatching a sys pump event
// 2) that barrier doesn't need to be under a write lock
//
// Perform the following shutdown of the schedular and other available threads under a read lock
{
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
StopSched();
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
if (trySelfPid == worker->id)
{
continue;
}
toBarrier.push_back(worker->id);
}
}
}
// Ehhhh
// We need this fix to a specific V8 deinit lockup
//this->Poll();
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
this->Barrier(id, 0, false, false /* no reject*/); // absolute safest point in shutdown; sync to already submitted work
}
}
// Time for fuckiness
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down after sync barrier'
{
AuAtomicTestAndSet(&this->shuttingdown_, 1);
}
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
AuList<AuThreads::ThreadShared_t> threads;
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
2022-01-19 11:47:29 +00:00
if (group->cvWorkMutex && group->cvVariable)
{
bool bLocked = group->cvWorkMutex->TryLock();
worker->shuttingdown = true;
group->cvVariable->Broadcast();
if (bLocked) group->cvWorkMutex->Unlock();
}
2022-01-19 11:47:29 +00:00
else
{
worker->shuttingdown = true;
}
if (!group->IsSysThread()) // bug?
{
worker->threadObject->SendExitSignal();
threads.push_back(worker->threadObject);
}
auto &event = worker->running;
if (event)
{
event->Set();
}
}
}
}
// Final sync to exit
{
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
auto handle = this->GetThreadHandle(id);
if (handle)
{
handle->rejecting = false;
}
this->Barrier(id, 0, false, true);
}
}
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
for (const auto &thread : threads)
{
thread->Exit();
}
// Is dead flag
this->shutdown = true;
if (pLocalRunner)
{
pLocalRunner->bIsKiller = true;
}
}
bool ThreadPool::Exiting()
{
return this->shuttingdown_ ||
GetThreadState()->exiting;
}
AuUInt32 ThreadPool::PollAndCount(bool bStrict)
{
AuUInt32 uCount {};
auto bRanAtLeastOne = this->InternalRunOne(false, uCount);
return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0);
}
AuUInt32 ThreadPool::RunAllPending()
{
AuUInt32 uCount {};
bool ranAtLeastOne {};
do
{
uCount = 0;
ranAtLeastOne |= this->InternalRunOne(false, uCount);
}
while (uCount);
return uCount ? uCount : false;
}
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking)
{
// Error pass-through
if (!task)
{
return {};
}
return AuMakeShared<WorkItem>(this, WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, worker }, task, bSupportsBlocking);
}
AuSPtr<IWorkItem> ThreadPool::NewFence()
{
return AuMakeShared<WorkItem>(this, AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{}, true);
}
AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id)
{
auto pState = GetThreadHandle(id);
if (!pState)
{
return {};
}
return pState->threadObject;
}
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
{
AU_LOCK_GUARD(rwlock_->AsReadable());
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
for (const auto &group : this->threads_)
{
AuList<ThreadId_t> workers;
AuTryReserve(workers, group.second->workers.size());
for (const auto &thread : group.second->workers)
{
workers.push_back(thread.second->id.second);
}
ret[group.first] = workers;
}
return ret;
}
WorkerId_t ThreadPool::GetCurrentThread()
{
return tlsWorkerId;
}
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(workerId.first);
auto currentWorkerId = GetCurrentThread().second;
if (workerId.second == Async::kThreadIdAny)
{
for (auto &jobWorker : group->workers)
{
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{
return false;
}
}
}
else
{
return Barrier(workerId, timeoutMs, requireSignal && workerId.second != currentWorkerId, false);
}
return true;
}
void ThreadPool::Signal(WorkerId_t workerId)
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(workerId.first);
if (workerId.second == Async::kThreadIdAny)
{
for (auto &jobWorker : group->workers)
{
jobWorker.second->running->Set();
}
}
else
{
GetThreadHandle(workerId)->running->Set();
}
}
[*/+/-] MEGA COMMIT. ~2 weeks compressed. The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
2022-02-17 00:11:40 +00:00
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
[*/+/-] MEGA COMMIT. ~2 weeks compressed. The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
2022-02-17 00:11:40 +00:00
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto a = GetThreadHandle(workerId);
if (!a)
{
return {};
}
return a->parent.lock()->asyncLoopSourceShared;
}
void ThreadPool::SyncAllSafe()
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD(this->rwlock_->AsReadable());
for (const auto &re : this->threads_)
{
for (auto &jobWorker : re.second->workers)
{
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
}
}
}
void ThreadPool::AddFeature(WorkerId_t id,
AuSPtr<AuThreads::IThreadFeature> pFeature,
bool bNonBlock)
{
auto work = AuMakeSharedThrow<BasicWorkStdFunc>(([=]()
{
GetThreadState()->features.push_back(pFeature);
pFeature->Init();
}));
auto pWorkItem = this->NewWorkItem(id, work, !bNonBlock)->Dispatch();
SysAssert(pWorkItem);
if (!bNonBlock)
{
pWorkItem->BlockUntilComplete();
}
}
void ThreadPool::AssertInThreadGroup(ThreadGroup_t group)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
}
void ThreadPool::AssertWorker(WorkerId_t id)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
}
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue()
{
2022-03-10 15:35:01 +00:00
return this->GetThreadState()->asyncLoop;
}
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue(WorkerId_t workerId)
2022-03-10 15:35:01 +00:00
{
auto worker = this->GetThreadHandle(workerId);
if (!worker)
{
SysPushErrorGeneric("Couldn't find requested worker");
2022-03-10 15:35:01 +00:00
return {};
}
2022-03-10 15:35:01 +00:00
return worker->asyncLoop;
}
2022-03-10 15:35:01 +00:00
void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
{
auto states = this->GetThreadHandles(workerId);
if (!states.size())
{
SysPushErrorGeneric("Couldn't find requested worker");
2022-03-10 15:35:01 +00:00
return;
}
2022-03-10 15:35:01 +00:00
for (const auto &state : states)
{
state->runMode = mode.mode;
if (mode.freqMsTick)
{
2022-03-10 15:35:01 +00:00
state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000);
}
2022-03-10 15:35:01 +00:00
}
}
2022-03-10 15:35:01 +00:00
ERunMode ThreadPool::GetCurrentThreadRunMode()
{
auto state = this->GetThreadState();
if (!state)
{
return ERunMode::eEfficient;
}
2022-03-10 15:35:01 +00:00
return state->runMode;
}
2022-03-10 15:35:01 +00:00
ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId)
{
auto worker = this->GetThreadHandle(workerId);
if (!worker)
{
SysPushErrorGeneric("Couldn't find requested worker");
2022-03-10 15:35:01 +00:00
return {};
}
return worker->runMode;
}
// Unimplemented fiber hooks, 'twas used for science
int ThreadPool::CtxPollPush()
{
// TOOD (Reece): implement a context switching library
// Refer to the old implementation of this on pastebin
return 0;
}
void ThreadPool::CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask)
{
}
bool ThreadPool::CtxYield()
{
bool ranAtLeastOne = false;
// !!!
if (this->shutdown ||
this->shuttingdown_ & 2) // fast
{
if (GetThreadState()->rejecting)
{
return false;
}
}
#if 0
return this->InternalRunOne(false, uCount);
#else
AuUInt32 uCount {};
do
{
uCount = 0;
ranAtLeastOne |= this->InternalRunOne(false, uCount);
}
while (uCount);
return uCount || ranAtLeastOne;
#endif
}
// internal api
bool ThreadPool::Spawn(WorkerId_t workerId, bool create)
{
AU_LOCK_GUARD(rwlock_->AsWritable());
if (create)
{
gCurrentPool = AuSharedFromThis();
}
AuSPtr<GroupState> group;
// Try fetch or allocate group
{
AuSPtr<GroupState>* groupPtr;
if (!AuTryFind(this->threads_, workerId.first, groupPtr))
{
group = AuMakeShared<GroupState>();
if (!group->Init())
{
SysPushErrorMemory("Not enough memory to intiialize a new group state");
return false;
}
if (!AuTryInsert(this->threads_, AuMakePair(workerId.first, group)))
{
return false;
}
}
else
{
group = *groupPtr;
}
}
// Assert worker does not already exist
{
AuSPtr<ThreadState>* ret;
if (AuTryFind(group->workers, workerId.second, ret))
{
SysPushErrorGeneric("Thread ID already exists");
return false;
}
}
auto threadState = AuMakeShared<ThreadState>();
if (!threadState)
{
SysPushErrorMemory();
return {};
}
threadState->parent = group;
threadState->running = AuThreadPrimitives::EventUnique(true, false, true);
threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0);
threadState->id = workerId;
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
return {};
}
threadState->asyncLoop->pParent = threadState.get();
2022-03-10 15:35:01 +00:00
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
threadState->runMode = ERunMode::eEfficient;
2022-03-10 15:35:01 +00:00
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
2022-03-10 15:35:01 +00:00
return {};
}
if (!threadState->asyncLoop->Init())
{
SysPushErrorNested();
return {};
}
2022-03-10 15:35:01 +00:00
if (!threadState->syncSema)
{
SysPushErrorMemory();
2022-03-10 15:35:01 +00:00
return {};
}
if (!threadState->syncSema)
{
SysPushErrorMemory();
2022-03-10 15:35:01 +00:00
return {};
}
threadState->asyncLoop->SourceAdd(group->eventLs);
if (!create)
{
threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)),
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
gRuntimeConfig.async.threadPoolDefaultStackSize
));
if (!threadState->threadObject)
{
return {};
}
threadState->threadObject->Run();
}
else
{
threadState->threadObject = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
// TODO: this is just a hack
// we should implement this properly
threadState->threadObject->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
{
}, []() -> void
{
auto pid = GetCurrentWorkerPId();
if (pid.pool)
{
GetWorkerInternal(pid.pool)->ThisExiting();
}
}));
//
gCurrentPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
}
group->workers.insert(AuMakePair(workerId.second, threadState));
return true;
}
// private api
AU_NOINLINE bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop)
{
auto self = GetThreadState();
if (!self)
{
return {};
}
auto &semaphore = self->syncSema;
auto unsafeSemaphore = semaphore.get();
bool failed {};
auto work = AuMakeShared<AsyncFuncRunnable>(
[=]()
{
auto state = GetThreadState();
if (drop)
{
state->rejecting = true;
}
if (requireSignal)
{
state->running->Reset();
}
unsafeSemaphore->Unlock(1);
if (requireSignal)
{
state->running->Lock();
}
},
[&]()
{
unsafeSemaphore->Unlock(1);
failed = true;
}
);
if (!work)
{
return false;
}
Run(workerId, work);
return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore), ms) && !failed;
}
void ThreadPool::Entrypoint(WorkerId_t id)
{
gCurrentPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
auto job = GetThreadState();
Run();
if (id != WorkerId_t {0, 0})
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
if (!this->shuttingdown_ && !job->rejecting)
{
// Pump and barrier + reject all after atomically
Barrier(id, 0, false, true);
}
}
ThisExiting();
if (id == WorkerId_t {0, 0})
{
CleanWorkerPoolReservedZeroFree();
}
}
void ThreadPool::EarlyExitTick()
{
auto jobWorker = GetThreadState();
auto state = jobWorker->parent.lock();
if (!jobWorker)
{
SysPushErrorUninitialized("Not an async thread");
return;
}
if ((this->shuttingdown_ & 2) != 2)
{
return;
}
state->eventLs->Set();
state->cvVariable->Broadcast();
{
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
{
return;
}
AuUInt32 uCount {};
do
{
uCount = 0;
this->PollInternal(false, uCount);
}
while (uCount);
}
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
2023-02-16 01:01:21 +00:00
AU_LOCK_GUARD(this->rwlock_->AsReadable()); // WARNING: this should be write, but im seeing a deadlock on exit
// there is no real race condition to be concerned about
// AsReadable shouldn't be enterable while someone is writing (the other accessor)
features = AuExchange(jobWorker->features, {});
}
{
for (const auto &thread : features)
{
try
{
thread->Cleanup();
}
catch (...)
{
SysPushErrorCatch("Couldn't clean up thread feature!");
}
}
if (!this->GetThreadState()->bIsKiller)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
#if 0
// TODO... i know what to do
#else
// this will do for now
if (!jobWorker->rejecting &&
!this->shutdown)
{
this->Barrier(AuAsync::GetCurrentWorkerPId(), 0, false , true);
}
//this->Barrier(AuAsync::GetCurrentWorkerPId(), 0, false, false);
#endif
}
jobWorker->bAlreadyDoingExitTick = false;
jobWorker->bBreakEarly = true;
}
}
void ThreadPool::ThisExiting()
{
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
auto itr = state->workers.find(id.second);
auto &jobWorker = itr->second;
CleanUpWorker(id);
// Abort scheduled tasks
TerminateSceduledTasks(this, id);
// Prevent deadlocks
jobWorker->syncSema->Unlock(10000); // prevent ::Barrier dead-locks
{
AU_LOCK_GUARD(jobWorker->externalFencesLock);
jobWorker->exitingflag2 = true;
for (const auto &pIWaitable : jobWorker->externalFences)
{
pIWaitable->Unlock();
}
jobWorker->externalFences.clear();
}
features = AuExchange(jobWorker->features, {});
}
{
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms
// -> anything your brain wishes to imagination
for (const auto &thread : features)
{
try
{
thread->Cleanup();
}
catch (...)
{
AuLogWarn("Couldn't clean up thread feature!");
Debug::PrintError();
}
}
features.clear();
}
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
auto itr = state->workers.find(id.second);
auto &jobWorker = itr->second;
state->workers.erase(itr);
}
}
AuSPtr<GroupState> ThreadPool::GetGroup(ThreadGroup_t type)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AuSPtr<GroupState>* ret;
if (!AuTryFind(this->threads_, type, ret))
{
return {};
}
return *ret;
}
AuSPtr<ThreadState> ThreadPool::GetThreadState()
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto thread = gCurrentPool.lock();
if (!thread)
{
return {};
}
#if defined(AU_CFG_ID_INTERNAL) || defined(AU_CFG_ID_DEBUG)
if (thread.get() != this)
{
SysPushErrorGeneric("wrong thread");
return {};
}
#endif
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->workers[worker.second];
}
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto thread = gCurrentPool.lock();
if (!thread)
{
return {};
}
if (thread.get() != this)
{
return {};
}
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->workers[worker.second];
}
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
AuSPtr<ThreadState> *ret;
if (!AuTryFind(group->workers, id.second, ret))
{
return {};
}
return *ret;
}
2022-03-10 15:35:01 +00:00
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
AuList<AuSPtr<ThreadState>> ret;
if (id.second != Async::kThreadIdAny)
{
AuSPtr<ThreadState> *ptr;
if (!AuTryFind(group->workers, id.second, ptr))
{
return {};
}
ret.push_back(*ptr);
}
else
{
for (const auto &[key, value] : group->workers)
{
ret.push_back(value);
}
}
return ret;
}
AUKN_SYM AuSPtr<IThreadPool> NewThreadPool()
{
// apps that don't require async shouldn't be burdened with the overhead of this litl spiner
StartSched();
return AuMakeShared<ThreadPool>();
}
}