/*** Copyright (C) 2021-2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuSchedular.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "AuSchedular.hpp" //#include "AsyncApp.hpp" #include "ThreadPool.hpp" #include #include "IAsyncRunnable.hpp" //#include //#include //#define SCHEDULER_USE_NO_SPIN namespace Aurora::Async { struct SchedEntry { AuUInt64 ns; WorkerId_t target; AuSPtr runnable; IThreadPoolInternal *pool; }; // sched thread threading: static AuThreads::ThreadUnique_t gThread; static AuConditionMutex gSchedLock; #if !defined(SCHEDULER_USE_NO_SPIN) static AuConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer())); #else static AuThreadPrimitives::ConditionVariableInternal gSchedCondvar; #endif // next tick timing: static AuUInt64 uNextSysTickGuessed {}; static AuUInt64 uNextWakeuptimeRateLimit {}; static AuBST gOrderedEntries; static AuUInt32 uScheduledHighPerfTimers {}; // prevents stupid tick issues static bool gLockedPump = false; // old sewage to be cleaned up static bool gBOriginal; static AuOptionalEx gMainThread; void StartSched2(); static void SchedNextTime(AuUInt64 uNSAbs); static void GetDispatchableTasks(AuList &pending) { auto time = Time::SteadyClockNS(); for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); ) { auto &[ns, entry] = *itr; if (ns > time) { SchedNextTime(ns); break; } if (!AuTryInsert(pending, AuMove(entry))) { SchedNextTime(time + AuMSToNS(3)); break; } itr = gOrderedEntries.erase(itr); } } static void PumpSysThread() { try { RuntimeSysPump(); } catch (...) { SysPushErrorCatch("SysPump failed"); } gLockedPump = false; } static void SchedNextTime(AuUInt64 uNSAbs) { while (true) { auto uCurrent = AuAtomicLoad(&uNextSysTickGuessed); if (uCurrent && uCurrent <= uNSAbs) { return; } auto uRateLimit = AuAtomicLoad(&uNextWakeuptimeRateLimit); if (uRateLimit && uRateLimit > uNSAbs) { uNSAbs = uRateLimit; if (uRateLimit == uCurrent) { return; } } if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent) { break; } } #if !defined(SCHEDULER_USE_NO_SPIN) gSchedCondvar->Signal(); #else gSchedCondvar.Signal(); #endif } static void SchedThread() { Aurora::Utility::RateLimiter limiter(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS)); // Intentional: no Dec used on this thread; we should never run out of memory to schedule tasks AuDebug::AddMemoryCrunch(); while (AuIsThreadRunning()) { AuList pending; { AU_LOCK_GUARD(gSchedLock); auto uNextTick = AuAtomicLoad(&uNextSysTickGuessed); auto uNow = AuTime::SteadyClockNS(); if (uNow < uNextTick) { uNextSysTickGuessed = 0; #if !defined(SCHEDULER_USE_NO_SPIN) auto uNS = uNextTick - uNow; #if defined(AURORA_IS_MODERNNT_DERIVED) bool bExpensivePath = AuAtomicLoad(&uScheduledHighPerfTimers); bool bWin10sABinch {}; if (!AuSwInfo::IsWindows8Point1OrGreater()) { if (gRuntimeConfig.threadingConfig.bEnableAggressiveScheduling) { if (uNS <= AuMSToNS(3)) { bExpensivePath = true; } else if (uNS < AuMSToNS(1'000) && uNS > AuMSToNS(100) && !bExpensivePath) { // Windows 7 coalescing doesnt batch very well. // We overshoot by ~6MS or 8MS, if we're above ~20MS uNS -= AuMSToNS(10); } } } else { // Windows 10-11 will coalesce us less, but our precision is an order of mangitude worse than what I can get out of Windows 7 if (bExpensivePath) { bWin10sABinch = true; bExpensivePath = false; } } if (bExpensivePath) { // With any luck, we'll be scheduled within 0.02MS of error with some spikes to .3MS. gSchedLock->Unlock(); AuThreading::SleepNs(uNS); gSchedLock->Lock(); } else if (bWin10sABinch) { // With any luck, we'll be scheduled within 0.3MS of error, maybe a little bit lower for some fraction of a MS requests. Win32DropSchedulerResolutionForced(); gSchedCondvar->WaitForSignalNS(uNS); Win32DropSchedulerResolutionForcedUndo(); } else #endif { gSchedCondvar->WaitForSignalNS(uNS); } #else gSchedCondvar.WaitForSignalNsEx(&AuStaticCast(gSchedLock.AsPointer())->mutex, uNextTick - uNow, false); #endif } else if (uNow >= uNextTick && uNextSysTickGuessed != 0) { uNextSysTickGuessed = 0; } else if (uNextSysTickGuessed == 0 && gOrderedEntries.empty()) { #if !defined(SCHEDULER_USE_NO_SPIN) gSchedCondvar->WaitForSignalNS(gRuntimeConfig.async.bEnableLegacyTicks ? AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) : 0); #else gSchedCondvar.WaitForSignalNsEx(&AuStaticCast(gSchedLock.AsPointer())->mutex, gRuntimeConfig.async.bEnableLegacyTicks ? AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) : 0, false); #endif } if (gRuntimeConfig.async.dwSchedulerRateLimitNS) { uNextWakeuptimeRateLimit = AuTime::SteadyClockNS() + gRuntimeConfig.async.dwSchedulerRateLimitNS; } GetDispatchableTasks(pending); } for (auto &entry : pending) { try { if (entry.pool) { AuStaticCast(entry.pool)->Run(entry.target, entry.runnable, false); } else { // IO timer hack! entry.runnable->RunAsync(); AuAtomicSub(&uScheduledHighPerfTimers, 1u); } } catch (...) { SysPushErrorCatch(); } } if (gRuntimeConfig.async.bEnableLegacyTicks) { if (limiter.CheckExchangePass()) { if (!AuExchange(gLockedPump, true)) { DispatchOn(gMainThread.value(), PumpSysThread); } SchedNextTime(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS()); } } } } void InitSched() { } void DeinitSched() { if (gThread) { gThread->SendExitSignal(); } #if !defined(SCHEDULER_USE_NO_SPIN) gSchedCondvar->Broadcast(); #else gSchedCondvar.Broadcast(); #endif gThread.reset(); } AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid) { if (!pid) { gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal; AuResetMember(gMainThread); Console::Commands::UpdateDispatcher(gMainThread); return; } gMainThread = pid; Console::Commands::UpdateDispatcher(gMainThread); gRuntimeConfig.async.bEnableLegacyTicks = true; StartSched2(); } void StartSched() { if (!gRuntimeConfig.async.bStartSchedularOnStartup) { return; } else { if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread) { gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); } } StartSched2(); } void StartSched2() { AU_LOCK_GUARD(gSchedLock); if (gThread) return; gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks; gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo( AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)), AuThreads::IThreadVectorsFunctional::OnExit_t {}) )); gThread->Run(); } bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool, WorkerId_t target, AuSPtr runnable) { AU_LOCK_GLOBAL_GUARD(gSchedLock); AU_DEBUG_MEMCRUNCH; if (pool) { AuAtomicAdd(&pool->uAtomicCounter, 1u); } else { AuAtomicAdd(&uScheduledHighPerfTimers, 1u); } SchedNextTime(ns); return AuTryInsert(gOrderedEntries, AuConstReference(ns), SchedEntry { ns, target, runnable, pool }); } void TerminateSceduledTasks(IThreadPoolInternal *pool, WorkerId_t target) { AU_LOCK_GLOBAL_GUARD(gSchedLock); for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); ) { auto &[ns, entry] = *itr; if ((entry.pool == pool) && ((entry.target == target) || (target.second == Async::kThreadIdAny && target.first == entry.target.first))) { entry.runnable->CancelAsync(); itr = gOrderedEntries.erase(itr); } else { itr++; } } } }