[*] Transition to dynamic tick-based scheduling

[*] Re-do AuAsync reference counting
[+] IWorkItem::SetSchedSteadyTimeNsAbs
[*] Irrelevant IIOProcessor sources are now discarded in evaluating whether or not a thread-pool in special running mode should shutdown
[*] Transition WorkItems to only use steady time
[*] Refactor AsyncConfig
[*] Drop default SMT spin time from hundreds of cycles to ~32 so that we can sit nicely at the bottom of task manager unless the application calls for extra responsivity
This commit is contained in:
Reece Wilson 2023-08-09 03:21:14 +01:00
parent 424cccd00a
commit dd13098022
9 changed files with 166 additions and 77 deletions

View File

@ -34,6 +34,9 @@ namespace Aurora::Async
// ns = Aurora::Time::CurrentClockNS() + relativeNs
virtual AuSPtr<IWorkItem> SetSchedTimeNsAbs(AuUInt64 ns) = 0;
// ns = Aurora::Time::SteadyClockNS() + relativeNs
virtual AuSPtr<IWorkItem> SetSchedSteadyTimeNsAbs(AuUInt64 ns) = 0;
// ms = time relative to the time at which the work item would otherwise dispatch
virtual AuSPtr<IWorkItem> AddDelayTime(AuUInt32 ms) = 0;

View File

@ -243,11 +243,11 @@ namespace Aurora
struct AsyncConfig
{
bool enableSchedularThread {true}; // turn this off to make your application lighter-weight; turn this on for higher performance (+expensive) scheduling
bool enableSysPumpFreqnecy {false}; // turn this on to enable an async apps singleton threadpool to SysPump on worker id zero. Alternatively, use SetMainThreadForSysPumpScheduling once you have a thread pool and worker id.
AuUInt32 threadPoolDefaultStackSize {};
AuUInt32 schedularFrequency {2}; // * 0.5 or 1 MS depending on the platform
AuUInt32 sysPumpFrequency {25}; // x amount of schedularFrequencys
bool bStartSchedularOnStartup { true }; // turn this off to make your application lighter-weight; turn this on for higher performance (+expensive) scheduling
bool bEnableLegacyTicks { false }; // turn this on to enable an async apps singleton threadpool to SysPump on worker id zero. Alternatively, use SetMainThreadForSysPumpScheduling once you have a thread pool and worker id.
AuUInt32 threadPoolDefaultStackSize { };
AuUInt32 dwSchedulerRateLimitMS { 2 }; //
AuUInt32 dwLegacyMainThreadSystemTickMS { 25 }; // nowadays this is primarily used to dispatch main-thread posted (AuConsole) commands
};
struct FIOConfig
@ -325,7 +325,7 @@ namespace Aurora
bool bNoThreadNames { false };
bool bPlatformIsSMPProcessorOptimized { true }; // Whether to attempt to using mm_pause or similar before yielding into the kernel
AuUInt8 uSpinLoopPowerA { 7 }; // Nudgable spinloop power. This is our local userland niceness factor; where 1 << n is the amount of smt-yield instructions to stall for
AuUInt8 uSpinLoopPowerA { 5 }; // Nudgable spinloop power. This is our local userland niceness factor; where 1 << n is the amount of smt-yield instructions to stall for
// This is comparable to Win32's SetCriticalSectionSpinCount applied across every single AuThreadPrimitives try-lock and lock.
// Adjust this value to compensate for longer critical sections when context switching isn't preferrable.
bool bEnableAggressiveScheduling { false };

View File

@ -23,18 +23,19 @@ namespace Aurora::Async
};
static AuThreads::ThreadUnique_t gThread;
static AuThreadPrimitives::MutexUnique_t gSchedLock;
static AuThreadPrimitives::ConditionMutex gSchedLock;
static AuThreadPrimitives::ConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer()));
static AuWorkerPId_t gMainThread;
static bool gBOriginal;
static AuUInt64 uNextSysTickGuessed {};
static AuUInt64 uNextWakeuptimeRateLimit {};
static AuList<SchedEntry> gEntries;
void StartSched2();
static void GetDispatchableTasks(AuList<SchedEntry> &pending)
{
AU_LOCK_GUARD(gSchedLock);
auto time = Time::CurrentClockNS();
auto time = Time::SteadyClockNS();
for (auto itr = gEntries.begin(); itr != gEntries.end(); )
{
@ -69,27 +70,80 @@ namespace Aurora::Async
gLockedPump = false;
}
static void SchedNextTime(AuUInt64 uNSAbs)
{
bool bForceWakeup {};
if (uNextSysTickGuessed > uNSAbs ||
!uNextSysTickGuessed)
{
while (true)
{
auto uCurrent = uNextSysTickGuessed;
if (uCurrent &&
uCurrent <= uNSAbs)
{
break;
}
if (uNextWakeuptimeRateLimit > uNSAbs)
{
uNSAbs = uNextWakeuptimeRateLimit;
}
if (!uCurrent)
{
bForceWakeup = true;
}
if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent)
{
break;
}
}
if (bForceWakeup)
{
gSchedCondvar->Signal();
}
}
}
static void SchedThread()
{
AuUInt32 counter {};
AuList<SchedEntry> pending;
Aurora::Utility::RateLimiter limiter(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS));
auto thread = AuThreads::GetThread();
while (!thread->Exiting())
while (AuIsThreadRunning())
{
AuList<SchedEntry> pending;
Threading::SleepNs(1000000 / 2 * gRuntimeConfig.async.schedularFrequency);
{
AU_LOCK_GUARD(gSchedLock);
GetDispatchableTasks(pending);
auto uNextTick = uNextSysTickGuessed;
auto uNow = AuTime::SteadyClockNS();
if (uNow < uNextTick)
{
gSchedCondvar->WaitForSignalNS(uNextTick - uNow);
}
else
{
uNextSysTickGuessed = 0;
gSchedCondvar->WaitForSignalNS();
}
GetDispatchableTasks(pending);
uNextWakeuptimeRateLimit = AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwSchedulerRateLimitMS);
}
for (auto &entry : pending)
{
try
{
entry.pool->Run(entry.target, entry.runnable);
entry.pool->DecrementTasksRunning();
AuStaticCast<AuAsync::ThreadPool>(entry.pool)->Run(entry.target, entry.runnable, false);
}
catch (...)
{
@ -102,12 +156,9 @@ namespace Aurora::Async
}
}
counter++;
if (gRuntimeConfig.async.enableSysPumpFreqnecy)
if (gRuntimeConfig.async.bEnableLegacyTicks)
{
if ((!gRuntimeConfig.async.sysPumpFrequency) || ((gRuntimeConfig.async.sysPumpFrequency) && (counter % gRuntimeConfig.async.sysPumpFrequency) == 0))
if (limiter.CheckExchangePass())
{
try
{
@ -121,6 +172,8 @@ namespace Aurora::Async
AuLogWarn("Dropped SysRuntimePump");
Debug::PrintError();
}
SchedNextTime(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS());
}
}
}
@ -128,20 +181,19 @@ namespace Aurora::Async
void InitSched()
{
gSchedLock = AuThreadPrimitives::MutexUnique();
}
void DeinitSched()
{
gThread.reset();
gSchedLock.reset();
}
AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid)
{
if (!pid)
{
gRuntimeConfig.async.enableSysPumpFreqnecy = gBOriginal;
gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal;
gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0});
Console::Commands::UpdateDispatcher(gMainThread);
@ -150,20 +202,20 @@ namespace Aurora::Async
gMainThread = pid;
Console::Commands::UpdateDispatcher(gMainThread);
gRuntimeConfig.async.enableSysPumpFreqnecy = true;
gRuntimeConfig.async.bEnableLegacyTicks = true;
StartSched2();
}
void StartSched()
{
if (!gRuntimeConfig.async.enableSchedularThread)
if (!gRuntimeConfig.async.bStartSchedularOnStartup)
{
return;
}
else
{
if (gRuntimeConfig.async.sysPumpFrequency && !gMainThread)
if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread)
{
gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0});
}
@ -178,7 +230,7 @@ namespace Aurora::Async
if (gThread) return;
gBOriginal = gRuntimeConfig.async.enableSysPumpFreqnecy;
gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks;
gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)),
@ -200,8 +252,10 @@ namespace Aurora::Async
{
return false;
}
AU_LOCK_GUARD(gSchedLock);
pool->IncrementTasksRunning();
AuAtomicAdd(&pool->uAtomicCounter, 1u);
SchedNextTime(ns);
return AuTryInsert(gEntries, SchedEntry {ns, target, runnable, pool});
}

View File

@ -120,6 +120,11 @@ namespace Aurora::Async
}
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
{
return this->Run(target, runnable, true);
}
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement)
{
auto state = GetGroup(target.first);
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
@ -132,7 +137,11 @@ namespace Aurora::Async
}
AuDebug::AddMemoryCrunch();
IncrementTasksRunning();
if (bIncrement)
{
AuAtomicAdd(&this->uAtomicCounter, 1u);
}
state->workQueue.AddWorkEntry(AuMakePair(target.second, runnable));
@ -156,22 +165,6 @@ namespace Aurora::Async
return this;
}
void ThreadPool::IncrementTasksRunning()
{
this->tasksRunning_++;
}
void ThreadPool::DecrementTasksRunning()
{
if ((--this->tasksRunning_) == 0)
{
if (InRunnerMode())
{
Shutdown();
}
}
}
// ithreadpool
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
@ -494,7 +487,7 @@ namespace Aurora::Async
{
auto queue = ToKernelWorkQueue();
if ((this->tasksRunning_ == 0) &&
if ((this->uAtomicCounter == 0) &&
(!queue || queue->GetSourceCount() <= 1))
{
Shutdown();
@ -522,8 +515,8 @@ namespace Aurora::Async
{
auto queue = ToKernelWorkQueue();
if ((this->tasksRunning_ == tlsCallStack) &&
(!queue || queue->GetSourceCount() <= 1))
if ((this->uAtomicCounter == tlsCallStack) &&
(!queue || queue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors))
{
return false;
}
@ -556,7 +549,7 @@ namespace Aurora::Async
uCount++;
// Atomically decrement global task counter
runningTasks = this->tasksRunning_.fetch_sub(1) - 1;
runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u);
tlsCallStack--;
@ -593,8 +586,8 @@ namespace Aurora::Async
auto queue = ToKernelWorkQueue();
if ((runningTasks == 0) &&
(this->tasksRunning_ == 0 ) &&
(!queue || queue->GetSourceCount() <= 1))
(this->uAtomicCounter == 0 ) &&
(!queue || queue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors))
{
Shutdown();
}
@ -1141,10 +1134,12 @@ namespace Aurora::Async
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
gRuntimeConfig.async.threadPoolDefaultStackSize
));
if (!threadState->threadObject)
{
return {};
}
threadState->threadObject->Run();
}
else

View File

@ -11,15 +11,19 @@ namespace Aurora::Async
{
struct GroupState;
struct ThreadState;
struct IAsyncRunnable;
//class WorkItem;
struct IThreadPoolInternal
{
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
virtual IThreadPool *ToThreadPool() = 0;
virtual void IncrementTasksRunning() = 0;
virtual void DecrementTasksRunning() = 0;
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
};
@ -31,9 +35,8 @@ namespace Aurora::Async
bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms);
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement);
IThreadPool *ToThreadPool() override;
void IncrementTasksRunning() override;
void DecrementTasksRunning() override;
// IThreadPool
virtual bool Spawn(WorkerId_t workerId) override;
@ -139,7 +142,6 @@ namespace Aurora::Async
bool shutdown {};
AuThreadPrimitives::RWRenterableLock rwlock_;
AuThreadPrimitives::Event shutdownEvent_;
std::atomic_int tasksRunning_;
bool runnersRunning_ {};
};
}

View File

@ -125,25 +125,36 @@ namespace Aurora::Async
AuSPtr<IWorkItem> WorkItem::SetSchedTimeNs(AuUInt64 ns)
{
this->dispatchTimeNs_ = Time::CurrentClockNS() + ns;
this->dispatchTimeNs_ = Time::SteadyClockNS() + ns;
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedTimeAbs(AuUInt32 ms)
{
this->dispatchTimeNs_ = AuUInt64(ms) * AuMSToNS<AuUInt64>(ms);
return AU_SHARED_FROM_THIS;
return this->SetSchedTimeNsAbs(AuMSToNS<AuUInt64>(ms));
}
AuSPtr<IWorkItem> WorkItem::SetSchedTimeNsAbs(AuUInt64 ns)
{
this->dispatchTimeNs_ = ns;
auto uNow = AuTime::CurrentClockNS();
if (uNow > ns)
{
return AU_SHARED_FROM_THIS;
}
this->dispatchTimeNs_ = AuTime::SteadyClockNS() + (ns - uNow);
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedSteadyTimeNsAbs(AuUInt64 ns)
{
this->dispatchTimeNs_ = ns;
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedTime(AuUInt32 ms)
{
this->dispatchTimeNs_ = Time::CurrentClockNS() + AuMSToNS<AuUInt64>(ms);
this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS<AuUInt64>(ms);
return AU_SHARED_FROM_THIS;
}
@ -201,7 +212,7 @@ namespace Aurora::Async
this->dispatchPending_ = true;
if (Time::CurrentClockNS() < this->dispatchTimeNs_)
if (Time::SteadyClockNS() < this->dispatchTimeNs_)
{
if (!Schedule())
{
@ -212,7 +223,7 @@ namespace Aurora::Async
if (auto delay = AuExchange(delayTimeNs_, {}))
{
this->dispatchTimeNs_ = delay + Time::CurrentClockNS();
this->dispatchTimeNs_ = delay + Time::SteadyClockNS();
if (!Schedule())
{
this->Fail();

View File

@ -30,6 +30,7 @@ namespace Aurora::Async
AuSPtr<IWorkItem> AddDelayTimeNs(AuUInt64 ns) override;
AuSPtr<IWorkItem> SetSchedTimeAbs(AuUInt32 ms) override;
AuSPtr<IWorkItem> SetSchedTimeNsAbs(AuUInt64 ns) override;
AuSPtr<IWorkItem> SetSchedSteadyTimeNsAbs(AuUInt64 ns) override;
AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) override;
AuSPtr<IWorkItem> Dispatch() override;

View File

@ -13,6 +13,7 @@
#include "AuIOPipeProcessor.hpp"
#include "Loop/Loop.hpp"
#include "Loop/LoopQueue.hpp"
#include <Async/ThreadPool.hpp>
namespace Aurora::IO
{
@ -72,6 +73,11 @@ namespace Aurora::IO
}
this->ToQueue()->Commit();
if (asyncWorker)
{
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(asyncWorker.pool)->uAtomicIOProcessors, 1u);
}
return true;
}
@ -566,13 +572,13 @@ namespace Aurora::IO
AU_THROW_STRING("Wrong Thread");
}
bool changed = bool(ns) != bool(this->refreshRateNs);
bool bBinaryCanged = bool(ns) != bool(this->refreshRateNs);
auto old = AuExchange(this->refreshRateNs, ns);
UpdateTimers();
if (changed)
if (bBinaryCanged)
{
if (ns)
{
@ -627,6 +633,11 @@ namespace Aurora::IO
void IOProcessor::AddTimerLS()
{
if (asyncWorker)
{
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(asyncWorker.pool)->uAtomicIOProcessorsWorthlessSources, 1u);
}
this->ToQueue()->SourceAdd(this->timers.pLsTicker);
this->ToQueue()->AddCallback(this->timers.pLsTicker, AuSPtr<AuLoop::ILoopSourceSubscriber>(AuSharedFromThis(), &this->timers));
this->ToQueue()->Commit();
@ -676,6 +687,11 @@ namespace Aurora::IO
}
queue->SourceRemove(this->timers.pLsTicker);
if (asyncWorker)
{
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(asyncWorker.pool)->uAtomicIOProcessorsWorthlessSources, 1u);
}
}
bool IOProcessor::IsAsync()
@ -853,8 +869,15 @@ namespace Aurora::IO
void IOProcessor::ReleaseAllWatches()
{
if (asyncWorker)
{
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(asyncWorker.pool)->uAtomicIOProcessors, 1u);
}
RemoveTimer();
auto queue = ToQueue();
if (queue)
{
if (this->items.cvEvent)

View File

@ -70,9 +70,9 @@ namespace Aurora::IO
AuSPtr<IIOProcessorItem> StartSimpleIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener) override;
AuSPtr<IIOProcessorItem> StartSimpleLSWatch(const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener) override;
AuSPtr<IIOProcessorItem> StartIOWatchEx (const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener, bool singleshot) override;
AuSPtr<IIOProcessorItem> StartSimpleIOWatchEx(const AuSPtr<IIOWaitableItem>& object, const AuSPtr<IIOSimpleEventListener>& listener, bool singleshot) override;
AuSPtr<IIOProcessorItem> StartSimpleLSWatchEx(const AuSPtr<Loop::ILoopSource>& source, const AuSPtr<IIOSimpleEventListener>& listener, bool singleshot, AuUInt32 msTimeout) override;
AuSPtr<IIOProcessorItem> StartIOWatchEx(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener, bool singleshot) override;
AuSPtr<IIOProcessorItem> StartSimpleIOWatchEx(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener, bool singleshot) override;
AuSPtr<IIOProcessorItem> StartSimpleLSWatchEx(const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener, bool singleshot, AuUInt32 msTimeout) override;
AuSPtr<IIOPipeProcessor> ToPipeProcessor() override;
@ -103,7 +103,7 @@ namespace Aurora::IO
bool IsTickOnly();
bool IsAsync();
bool mutliplexIOAndTimer {true};
bool mutliplexIOAndTimer { true };
IOProcessorItems items;
IOProcessorTimers timers;