From dd13098022dd0b58cce1164f5b65a2ec0d5de3fc Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Wed, 9 Aug 2023 03:21:14 +0100 Subject: [PATCH] [*] Transition to dynamic tick-based scheduling [*] Re-do AuAsync reference counting [+] IWorkItem::SetSchedSteadyTimeNsAbs [*] Irrelevant IIOProcessor sources are now discarded in evaluating whether or not a thread-pool in special running mode should shutdown [*] Transition WorkItems to only use steady time [*] Refactor AsyncConfig [*] Drop default SMT spin time from hundreds of cycles to ~32 so that we can sit nicely at the bottom of task manager unless the application calls for extra responsivity --- Include/Aurora/Async/IWorkItem.hpp | 7 +- Include/Aurora/Runtime.hpp | 12 ++-- Source/Async/Schedular.cpp | 106 ++++++++++++++++++++++------- Source/Async/ThreadPool.cpp | 41 +++++------ Source/Async/ThreadPool.hpp | 12 ++-- Source/Async/WorkItem.cpp | 25 +++++-- Source/Async/WorkItem.hpp | 1 + Source/IO/AuIOProcessor.cpp | 27 +++++++- Source/IO/AuIOProcessor.hpp | 12 ++-- 9 files changed, 166 insertions(+), 77 deletions(-) diff --git a/Include/Aurora/Async/IWorkItem.hpp b/Include/Aurora/Async/IWorkItem.hpp index 95f0a077..385d4810 100644 --- a/Include/Aurora/Async/IWorkItem.hpp +++ b/Include/Aurora/Async/IWorkItem.hpp @@ -24,7 +24,7 @@ namespace Aurora::Async // ms = time relative to the current time virtual AuSPtr SetSchedTime(AuUInt32 ms) = 0; - + // ns = Aurora::Time::CurrentClockMS() + relativeMs virtual AuSPtr SetSchedTimeAbs(AuUInt32 ms) = 0; @@ -33,7 +33,10 @@ namespace Aurora::Async // ns = Aurora::Time::CurrentClockNS() + relativeNs virtual AuSPtr SetSchedTimeNsAbs(AuUInt64 ns) = 0; - + + // ns = Aurora::Time::SteadyClockNS() + relativeNs + virtual AuSPtr SetSchedSteadyTimeNsAbs(AuUInt64 ns) = 0; + // ms = time relative to the time at which the work item would otherwise dispatch virtual AuSPtr AddDelayTime(AuUInt32 ms) = 0; diff --git a/Include/Aurora/Runtime.hpp b/Include/Aurora/Runtime.hpp index bd3a933b..9163fa5a 100644 --- a/Include/Aurora/Runtime.hpp +++ b/Include/Aurora/Runtime.hpp @@ -243,11 +243,11 @@ namespace Aurora struct AsyncConfig { - bool enableSchedularThread {true}; // turn this off to make your application lighter-weight; turn this on for higher performance (+expensive) scheduling - bool enableSysPumpFreqnecy {false}; // turn this on to enable an async apps singleton threadpool to SysPump on worker id zero. Alternatively, use SetMainThreadForSysPumpScheduling once you have a thread pool and worker id. - AuUInt32 threadPoolDefaultStackSize {}; - AuUInt32 schedularFrequency {2}; // * 0.5 or 1 MS depending on the platform - AuUInt32 sysPumpFrequency {25}; // x amount of schedularFrequencys + bool bStartSchedularOnStartup { true }; // turn this off to make your application lighter-weight; turn this on for higher performance (+expensive) scheduling + bool bEnableLegacyTicks { false }; // turn this on to enable an async apps singleton threadpool to SysPump on worker id zero. Alternatively, use SetMainThreadForSysPumpScheduling once you have a thread pool and worker id. + AuUInt32 threadPoolDefaultStackSize { }; + AuUInt32 dwSchedulerRateLimitMS { 2 }; // + AuUInt32 dwLegacyMainThreadSystemTickMS { 25 }; // nowadays this is primarily used to dispatch main-thread posted (AuConsole) commands }; struct FIOConfig @@ -325,7 +325,7 @@ namespace Aurora bool bNoThreadNames { false }; bool bPlatformIsSMPProcessorOptimized { true }; // Whether to attempt to using mm_pause or similar before yielding into the kernel - AuUInt8 uSpinLoopPowerA { 7 }; // Nudgable spinloop power. This is our local userland niceness factor; where 1 << n is the amount of smt-yield instructions to stall for + AuUInt8 uSpinLoopPowerA { 5 }; // Nudgable spinloop power. This is our local userland niceness factor; where 1 << n is the amount of smt-yield instructions to stall for // This is comparable to Win32's SetCriticalSectionSpinCount applied across every single AuThreadPrimitives try-lock and lock. // Adjust this value to compensate for longer critical sections when context switching isn't preferrable. bool bEnableAggressiveScheduling { false }; diff --git a/Source/Async/Schedular.cpp b/Source/Async/Schedular.cpp index 984d86fc..b1aa8848 100644 --- a/Source/Async/Schedular.cpp +++ b/Source/Async/Schedular.cpp @@ -23,18 +23,19 @@ namespace Aurora::Async }; static AuThreads::ThreadUnique_t gThread; - static AuThreadPrimitives::MutexUnique_t gSchedLock; + static AuThreadPrimitives::ConditionMutex gSchedLock; + static AuThreadPrimitives::ConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer())); static AuWorkerPId_t gMainThread; static bool gBOriginal; + static AuUInt64 uNextSysTickGuessed {}; + static AuUInt64 uNextWakeuptimeRateLimit {}; static AuList gEntries; void StartSched2(); static void GetDispatchableTasks(AuList &pending) { - AU_LOCK_GUARD(gSchedLock); - - auto time = Time::CurrentClockNS(); + auto time = Time::SteadyClockNS(); for (auto itr = gEntries.begin(); itr != gEntries.end(); ) { @@ -69,27 +70,80 @@ namespace Aurora::Async gLockedPump = false; } + static void SchedNextTime(AuUInt64 uNSAbs) + { + bool bForceWakeup {}; + + if (uNextSysTickGuessed > uNSAbs || + !uNextSysTickGuessed) + { + while (true) + { + auto uCurrent = uNextSysTickGuessed; + + if (uCurrent && + uCurrent <= uNSAbs) + { + break; + } + + if (uNextWakeuptimeRateLimit > uNSAbs) + { + uNSAbs = uNextWakeuptimeRateLimit; + } + + if (!uCurrent) + { + bForceWakeup = true; + } + + if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent) + { + break; + } + } + + if (bForceWakeup) + { + gSchedCondvar->Signal(); + } + } + } + static void SchedThread() { - AuUInt32 counter {}; - AuList pending; - - auto thread = AuThreads::GetThread(); + Aurora::Utility::RateLimiter limiter(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS)); - while (!thread->Exiting()) + while (AuIsThreadRunning()) { AuList pending; - Threading::SleepNs(1000000 / 2 * gRuntimeConfig.async.schedularFrequency); + { + AU_LOCK_GUARD(gSchedLock); - GetDispatchableTasks(pending); + auto uNextTick = uNextSysTickGuessed; + auto uNow = AuTime::SteadyClockNS(); + + if (uNow < uNextTick) + { + gSchedCondvar->WaitForSignalNS(uNextTick - uNow); + } + else + { + uNextSysTickGuessed = 0; + gSchedCondvar->WaitForSignalNS(); + } + + GetDispatchableTasks(pending); + + uNextWakeuptimeRateLimit = AuTime::SteadyClockNS() + AuMSToNS(gRuntimeConfig.async.dwSchedulerRateLimitMS); + } for (auto &entry : pending) { try { - entry.pool->Run(entry.target, entry.runnable); - entry.pool->DecrementTasksRunning(); + AuStaticCast(entry.pool)->Run(entry.target, entry.runnable, false); } catch (...) { @@ -102,12 +156,9 @@ namespace Aurora::Async } } - counter++; - - if (gRuntimeConfig.async.enableSysPumpFreqnecy) + if (gRuntimeConfig.async.bEnableLegacyTicks) { - - if ((!gRuntimeConfig.async.sysPumpFrequency) || ((gRuntimeConfig.async.sysPumpFrequency) && (counter % gRuntimeConfig.async.sysPumpFrequency) == 0)) + if (limiter.CheckExchangePass()) { try { @@ -121,6 +172,8 @@ namespace Aurora::Async AuLogWarn("Dropped SysRuntimePump"); Debug::PrintError(); } + + SchedNextTime(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS()); } } } @@ -128,20 +181,19 @@ namespace Aurora::Async void InitSched() { - gSchedLock = AuThreadPrimitives::MutexUnique(); + } void DeinitSched() { gThread.reset(); - gSchedLock.reset(); } AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid) { if (!pid) { - gRuntimeConfig.async.enableSysPumpFreqnecy = gBOriginal; + gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal; gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); Console::Commands::UpdateDispatcher(gMainThread); @@ -150,20 +202,20 @@ namespace Aurora::Async gMainThread = pid; Console::Commands::UpdateDispatcher(gMainThread); - gRuntimeConfig.async.enableSysPumpFreqnecy = true; + gRuntimeConfig.async.bEnableLegacyTicks = true; StartSched2(); } void StartSched() { - if (!gRuntimeConfig.async.enableSchedularThread) + if (!gRuntimeConfig.async.bStartSchedularOnStartup) { return; } else { - if (gRuntimeConfig.async.sysPumpFrequency && !gMainThread) + if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread) { gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); } @@ -178,7 +230,7 @@ namespace Aurora::Async if (gThread) return; - gBOriginal = gRuntimeConfig.async.enableSysPumpFreqnecy; + gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks; gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo( AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)), @@ -200,8 +252,10 @@ namespace Aurora::Async { return false; } + AU_LOCK_GUARD(gSchedLock); - pool->IncrementTasksRunning(); + AuAtomicAdd(&pool->uAtomicCounter, 1u); + SchedNextTime(ns); return AuTryInsert(gEntries, SchedEntry {ns, target, runnable, pool}); } diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 7e5b1ca4..62da581d 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -120,6 +120,11 @@ namespace Aurora::Async } void ThreadPool::Run(WorkerId_t target, AuSPtr runnable) + { + return this->Run(target, runnable, true); + } + + void ThreadPool::Run(WorkerId_t target, AuSPtr runnable, bool bIncrement) { auto state = GetGroup(target.first); SysAssert(static_cast(state), "couldn't dispatch a task to an offline group"); @@ -132,7 +137,11 @@ namespace Aurora::Async } AuDebug::AddMemoryCrunch(); - IncrementTasksRunning(); + + if (bIncrement) + { + AuAtomicAdd(&this->uAtomicCounter, 1u); + } state->workQueue.AddWorkEntry(AuMakePair(target.second, runnable)); @@ -155,22 +164,6 @@ namespace Aurora::Async { return this; } - - void ThreadPool::IncrementTasksRunning() - { - this->tasksRunning_++; - } - - void ThreadPool::DecrementTasksRunning() - { - if ((--this->tasksRunning_) == 0) - { - if (InRunnerMode()) - { - Shutdown(); - } - } - } // ithreadpool @@ -494,7 +487,7 @@ namespace Aurora::Async { auto queue = ToKernelWorkQueue(); - if ((this->tasksRunning_ == 0) && + if ((this->uAtomicCounter == 0) && (!queue || queue->GetSourceCount() <= 1)) { Shutdown(); @@ -522,8 +515,8 @@ namespace Aurora::Async { auto queue = ToKernelWorkQueue(); - if ((this->tasksRunning_ == tlsCallStack) && - (!queue || queue->GetSourceCount() <= 1)) + if ((this->uAtomicCounter == tlsCallStack) && + (!queue || queue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors)) { return false; } @@ -556,7 +549,7 @@ namespace Aurora::Async uCount++; // Atomically decrement global task counter - runningTasks = this->tasksRunning_.fetch_sub(1) - 1; + runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u); tlsCallStack--; @@ -593,8 +586,8 @@ namespace Aurora::Async auto queue = ToKernelWorkQueue(); if ((runningTasks == 0) && - (this->tasksRunning_ == 0 ) && - (!queue || queue->GetSourceCount() <= 1)) + (this->uAtomicCounter == 0 ) && + (!queue || queue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors)) { Shutdown(); } @@ -1141,10 +1134,12 @@ namespace Aurora::Async AuThreads::IThreadVectorsFunctional::OnExit_t{}), gRuntimeConfig.async.threadPoolDefaultStackSize )); + if (!threadState->threadObject) { return {}; } + threadState->threadObject->Run(); } else diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index e93c1b35..d1182ec2 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -11,15 +11,19 @@ namespace Aurora::Async { struct GroupState; struct ThreadState; + struct IAsyncRunnable; //class WorkItem; + struct IThreadPoolInternal { virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) = 0; virtual void Run(WorkerId_t target, AuSPtr runnable) = 0; virtual IThreadPool *ToThreadPool() = 0; - virtual void IncrementTasksRunning() = 0; - virtual void DecrementTasksRunning() = 0; + + AuUInt32 uAtomicCounter {}; + AuUInt32 uAtomicIOProcessors {}; + AuUInt32 uAtomicIOProcessorsWorthlessSources {}; }; @@ -31,9 +35,8 @@ namespace Aurora::Async bool WaitFor(WorkerPId_t unlocker, const AuSPtr &primitive, AuUInt32 ms); bool WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) override; void Run(WorkerId_t target, AuSPtr runnable) override; + void Run(WorkerId_t target, AuSPtr runnable, bool bIncrement); IThreadPool *ToThreadPool() override; - void IncrementTasksRunning() override; - void DecrementTasksRunning() override; // IThreadPool virtual bool Spawn(WorkerId_t workerId) override; @@ -139,7 +142,6 @@ namespace Aurora::Async bool shutdown {}; AuThreadPrimitives::RWRenterableLock rwlock_; AuThreadPrimitives::Event shutdownEvent_; - std::atomic_int tasksRunning_; bool runnersRunning_ {}; }; } \ No newline at end of file diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 502f0635..d58e670c 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -125,25 +125,36 @@ namespace Aurora::Async AuSPtr WorkItem::SetSchedTimeNs(AuUInt64 ns) { - this->dispatchTimeNs_ = Time::CurrentClockNS() + ns; + this->dispatchTimeNs_ = Time::SteadyClockNS() + ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTimeAbs(AuUInt32 ms) { - this->dispatchTimeNs_ = AuUInt64(ms) * AuMSToNS(ms); - return AU_SHARED_FROM_THIS; + return this->SetSchedTimeNsAbs(AuMSToNS(ms)); } AuSPtr WorkItem::SetSchedTimeNsAbs(AuUInt64 ns) { - this->dispatchTimeNs_ = ns; + auto uNow = AuTime::CurrentClockNS(); + if (uNow > ns) + { + return AU_SHARED_FROM_THIS; + } + + this->dispatchTimeNs_ = AuTime::SteadyClockNS() + (ns - uNow); + return AU_SHARED_FROM_THIS; + } + + AuSPtr WorkItem::SetSchedSteadyTimeNsAbs(AuUInt64 ns) + { + this->dispatchTimeNs_ = ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTime(AuUInt32 ms) { - this->dispatchTimeNs_ = Time::CurrentClockNS() + AuMSToNS(ms); + this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS(ms); return AU_SHARED_FROM_THIS; } @@ -201,7 +212,7 @@ namespace Aurora::Async this->dispatchPending_ = true; - if (Time::CurrentClockNS() < this->dispatchTimeNs_) + if (Time::SteadyClockNS() < this->dispatchTimeNs_) { if (!Schedule()) { @@ -212,7 +223,7 @@ namespace Aurora::Async if (auto delay = AuExchange(delayTimeNs_, {})) { - this->dispatchTimeNs_ = delay + Time::CurrentClockNS(); + this->dispatchTimeNs_ = delay + Time::SteadyClockNS(); if (!Schedule()) { this->Fail(); diff --git a/Source/Async/WorkItem.hpp b/Source/Async/WorkItem.hpp index 4167b29e..e8f55a11 100644 --- a/Source/Async/WorkItem.hpp +++ b/Source/Async/WorkItem.hpp @@ -30,6 +30,7 @@ namespace Aurora::Async AuSPtr AddDelayTimeNs(AuUInt64 ns) override; AuSPtr SetSchedTimeAbs(AuUInt32 ms) override; AuSPtr SetSchedTimeNsAbs(AuUInt64 ns) override; + AuSPtr SetSchedSteadyTimeNsAbs(AuUInt64 ns) override; AuSPtr Then(const AuSPtr &next) override; AuSPtr Dispatch() override; diff --git a/Source/IO/AuIOProcessor.cpp b/Source/IO/AuIOProcessor.cpp index 14f937ff..621cfb84 100644 --- a/Source/IO/AuIOProcessor.cpp +++ b/Source/IO/AuIOProcessor.cpp @@ -13,6 +13,7 @@ #include "AuIOPipeProcessor.hpp" #include "Loop/Loop.hpp" #include "Loop/LoopQueue.hpp" +#include namespace Aurora::IO { @@ -72,6 +73,11 @@ namespace Aurora::IO } this->ToQueue()->Commit(); + if (asyncWorker) + { + AuAtomicAdd(&AuStaticCast(asyncWorker.pool)->uAtomicIOProcessors, 1u); + } + return true; } @@ -566,13 +572,13 @@ namespace Aurora::IO AU_THROW_STRING("Wrong Thread"); } - bool changed = bool(ns) != bool(this->refreshRateNs); + bool bBinaryCanged = bool(ns) != bool(this->refreshRateNs); auto old = AuExchange(this->refreshRateNs, ns); UpdateTimers(); - if (changed) + if (bBinaryCanged) { if (ns) { @@ -627,6 +633,11 @@ namespace Aurora::IO void IOProcessor::AddTimerLS() { + if (asyncWorker) + { + AuAtomicAdd(&AuStaticCast(asyncWorker.pool)->uAtomicIOProcessorsWorthlessSources, 1u); + } + this->ToQueue()->SourceAdd(this->timers.pLsTicker); this->ToQueue()->AddCallback(this->timers.pLsTicker, AuSPtr(AuSharedFromThis(), &this->timers)); this->ToQueue()->Commit(); @@ -676,6 +687,11 @@ namespace Aurora::IO } queue->SourceRemove(this->timers.pLsTicker); + + if (asyncWorker) + { + AuAtomicSub(&AuStaticCast(asyncWorker.pool)->uAtomicIOProcessorsWorthlessSources, 1u); + } } bool IOProcessor::IsAsync() @@ -853,8 +869,15 @@ namespace Aurora::IO void IOProcessor::ReleaseAllWatches() { + if (asyncWorker) + { + AuAtomicSub(&AuStaticCast(asyncWorker.pool)->uAtomicIOProcessors, 1u); + } + RemoveTimer(); + auto queue = ToQueue(); + if (queue) { if (this->items.cvEvent) diff --git a/Source/IO/AuIOProcessor.hpp b/Source/IO/AuIOProcessor.hpp index 7f051bdd..2958f6d2 100644 --- a/Source/IO/AuIOProcessor.hpp +++ b/Source/IO/AuIOProcessor.hpp @@ -41,7 +41,7 @@ namespace Aurora::IO bool AddEventListener(const AuSPtr &eventListener) override; void RemoveEventListener(const AuSPtr &eventListener) override; - void FrameStart(); + void FrameStart(); void FramePumpWaitingBlocked(); bool FrameWaitForAny(AuUInt32 msMax); AuUInt FrameRunEpilogue(); @@ -70,9 +70,9 @@ namespace Aurora::IO AuSPtr StartSimpleIOWatch(const AuSPtr &object, const AuSPtr &listener) override; AuSPtr StartSimpleLSWatch(const AuSPtr &source, const AuSPtr &listener) override; - AuSPtr StartIOWatchEx (const AuSPtr &object, const AuSPtr &listener, bool singleshot) override; - AuSPtr StartSimpleIOWatchEx(const AuSPtr& object, const AuSPtr& listener, bool singleshot) override; - AuSPtr StartSimpleLSWatchEx(const AuSPtr& source, const AuSPtr& listener, bool singleshot, AuUInt32 msTimeout) override; + AuSPtr StartIOWatchEx(const AuSPtr &object, const AuSPtr &listener, bool singleshot) override; + AuSPtr StartSimpleIOWatchEx(const AuSPtr &object, const AuSPtr &listener, bool singleshot) override; + AuSPtr StartSimpleLSWatchEx(const AuSPtr &source, const AuSPtr &listener, bool singleshot, AuUInt32 msTimeout) override; AuSPtr ToPipeProcessor() override; @@ -83,7 +83,7 @@ namespace Aurora::IO bool ConfigureNoDeferredTicks(bool bOption); bool HasItems() override; - + void WakeupThread(); bool CheckThread(); @@ -103,7 +103,7 @@ namespace Aurora::IO bool IsTickOnly(); bool IsAsync(); - bool mutliplexIOAndTimer {true}; + bool mutliplexIOAndTimer { true }; IOProcessorItems items; IOProcessorTimers timers;