/*** Copyright (C) 2021-2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuSchedular.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "AuSchedular.hpp" //#include "AsyncApp.hpp" #include "ThreadPool.hpp" #include #include "IAsyncRunnable.hpp" //#include //#include //#define SCHEDULER_USE_NO_SPIN namespace Aurora::Async { struct SchedEntry { AuUInt64 ns; WorkerId_t target; AuSPtr runnable; IThreadPoolInternal *pool; }; // sched thread threading: static AuThreads::ThreadUnique_t gThread; static AuConditionMutex gSchedLock; #if !defined(SCHEDULER_USE_NO_SPIN) static AuConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer())); #else static AuThreadPrimitives::ConditionVariableInternal gSchedCondvar; #endif // next tick timing: static AuUInt64 uNextSysTickGuessed {}; static AuUInt64 uNextWakeuptimeRateLimit {}; static AuBST gOrderedEntries; // prevents stupid tick issues static bool gLockedPump = false; // old sewage to be cleaned up static bool gBOriginal; static AuOptionalEx gMainThread; void StartSched2(); static void SchedNextTime(AuUInt64 uNSAbs); static void GetDispatchableTasks(AuList &pending) { auto time = Time::SteadyClockNS(); for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); ) { auto &[ns, entry] = *itr; if (ns > time) { SchedNextTime(ns); break; } if (!AuTryInsert(pending, AuMove(entry))) { SchedNextTime(time + AuMSToNS(3)); break; } itr = gOrderedEntries.erase(itr); } } static void PumpSysThread() { try { RuntimeSysPump(); } catch (...) { SysPushErrorCatch("SysPump failed"); } gLockedPump = false; } static void SchedNextTime(AuUInt64 uNSAbs) { while (true) { auto uCurrent = AuAtomicLoad(&uNextSysTickGuessed); if (uCurrent && uCurrent <= uNSAbs) { return; } auto uRateLimit = AuAtomicLoad(&uNextWakeuptimeRateLimit); if (uRateLimit && uRateLimit > uNSAbs) { uNSAbs = uRateLimit; if (uRateLimit == uCurrent) { return; } } if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent) { break; } } #if !defined(SCHEDULER_USE_NO_SPIN) gSchedCondvar->Signal(); #else gSchedCondvar.Signal(); #endif } static void SchedThread() { Aurora::Utility::RateLimiter limiter(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS)); // Intentional: no Dec used on this thread; we should never run out of memory to schedule tasks AuDebug::AddMemoryCrunch(); while (AuIsThreadRunning()) { AuList pending; { AU_LOCK_GUARD(gSchedLock); auto uNextTick = AuAtomicLoad(&uNextSysTickGuessed); auto uNow = AuTime::SteadyClockNS(); if (uNow < uNextTick) { uNextSysTickGuessed = 0; #if !defined(SCHEDULER_USE_NO_SPIN) gSchedCondvar->WaitForSignalNS(uNextTick - uNow); #else gSchedCondvar.WaitForSignalNsEx(&AuStaticCast(gSchedLock.AsPointer())->mutex, uNextTick - uNow, false); #endif } else if (uNow >= uNextTick) { uNextSysTickGuessed = 0; } else if (uNextSysTickGuessed == 0) { #if !defined(SCHEDULER_USE_NO_SPIN) gSchedCondvar->WaitForSignalNS(gRuntimeConfig.async.bEnableLegacyTicks ? AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) : 0); #else gSchedCondvar.WaitForSignalNsEx(&AuStaticCast(gSchedLock.AsPointer())->mutex, gRuntimeConfig.async.bEnableLegacyTicks ? AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) : 0, false); #endif } if (gRuntimeConfig.async.dwSchedulerRateLimitNS) { uNextWakeuptimeRateLimit = AuTime::SteadyClockNS() + gRuntimeConfig.async.dwSchedulerRateLimitNS; } GetDispatchableTasks(pending); } for (auto &entry : pending) { try { AuStaticCast(entry.pool)->Run(entry.target, entry.runnable, false); } catch (...) { SysPushErrorCatch(); } } if (gRuntimeConfig.async.bEnableLegacyTicks) { if (limiter.CheckExchangePass()) { if (!AuExchange(gLockedPump, true)) { DispatchOn(gMainThread.value(), PumpSysThread); } SchedNextTime(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS()); } } } } void InitSched() { } void DeinitSched() { if (gThread) { gThread->SendExitSignal(); } #if !defined(SCHEDULER_USE_NO_SPIN) gSchedCondvar->Broadcast(); #else gSchedCondvar.Broadcast(); #endif gThread.reset(); } AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid) { if (!pid) { gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal; AuResetMember(gMainThread); Console::Commands::UpdateDispatcher(gMainThread); return; } gMainThread = pid; Console::Commands::UpdateDispatcher(gMainThread); gRuntimeConfig.async.bEnableLegacyTicks = true; StartSched2(); } void StartSched() { if (!gRuntimeConfig.async.bStartSchedularOnStartup) { return; } else { if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread) { gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); } } StartSched2(); } void StartSched2() { AU_LOCK_GUARD(gSchedLock); if (gThread) return; gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks; gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo( AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)), AuThreads::IThreadVectorsFunctional::OnExit_t {}) )); gThread->Run(); } bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool, WorkerId_t target, AuSPtr runnable) { AU_LOCK_GUARD(gSchedLock); AU_DEBUG_MEMCRUNCH; AuAtomicAdd(&pool->uAtomicCounter, 1u); SchedNextTime(ns); return AuTryInsert(gOrderedEntries, AuConstReference(ns), SchedEntry { ns, target, runnable, pool }); } void TerminateSceduledTasks(IThreadPoolInternal *pool, WorkerId_t target) { AU_LOCK_GUARD(gSchedLock); for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); ) { auto &[ns, entry] = *itr; if ((entry.pool == pool) && ((entry.target == target) || (target.second == Async::kThreadIdAny && target.first == entry.target.first))) { entry.runnable->CancelAsync(); itr = gOrderedEntries.erase(itr); } else { itr++; } } } }