/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: Schedular.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "Schedular.hpp" //#include "AsyncApp.hpp" #include "ThreadPool.hpp" #include namespace Aurora::Async { struct SchedEntry { AuUInt64 ns; WorkerId_t target; AuSPtr runnable; IThreadPoolInternal *pool; }; // sched thread threading: static AuThreads::ThreadUnique_t gThread; static AuThreadPrimitives::ConditionMutex gSchedLock; static AuThreadPrimitives::ConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer())); // next tick timing: static AuUInt64 uNextSysTickGuessed {}; static AuUInt64 uNextWakeuptimeRateLimit {}; static AuBST gOrderedEntries; // prevents stupid tick issues static bool gLockedPump = false; // old sewage to be cleaned up static bool gBOriginal; static AuWorkerPId_t gMainThread; void StartSched2(); static void SchedNextTime(AuUInt64 uNSAbs); static void GetDispatchableTasks(AuList &pending) { auto time = Time::SteadyClockNS(); for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); ) { auto &[ns, entry] = *itr; if (ns > time) { break; } if (!AuTryInsert(pending, AuMove(entry))) { SchedNextTime(time + AuMSToNS(3)); break; } itr = gOrderedEntries.erase(itr); } } static void PumpSysThread() { try { RuntimeSysPump(); } catch (...) { SysPushErrorCatch("SysPump failed"); } gLockedPump = false; } static void SchedNextTime(AuUInt64 uNSAbs) { bool bForceWakeup {}; if (uNextSysTickGuessed > uNSAbs || !uNextSysTickGuessed) { while (true) { auto uCurrent = uNextSysTickGuessed; if (uCurrent && uCurrent <= uNSAbs) { break; } if (uNextWakeuptimeRateLimit > uNSAbs) { uNSAbs = uNextWakeuptimeRateLimit; } if (!uCurrent) { bForceWakeup = true; } if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent) { break; } } if (bForceWakeup) { gSchedCondvar->Signal(); } } } static void SchedThread() { Aurora::Utility::RateLimiter limiter(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS)); // Intentional: no Dec used on this thread; we should never run out of memory to schedule tasks AuDebug::AddMemoryCrunch(); while (AuIsThreadRunning()) { AuList pending; { AU_LOCK_GUARD(gSchedLock); auto uNextTick = uNextSysTickGuessed; auto uNow = AuTime::SteadyClockNS(); if (uNow < uNextTick) { gSchedCondvar->WaitForSignalNS(uNextTick - uNow); } else if (uNow == uNextTick) { uNextSysTickGuessed = 0; } else { uNextSysTickGuessed = 0; gSchedCondvar->WaitForSignalNS(); } GetDispatchableTasks(pending); uNextWakeuptimeRateLimit = AuTime::SteadyClockNS() + AuMSToNS(gRuntimeConfig.async.dwSchedulerRateLimitMS); } for (auto &entry : pending) { try { AuStaticCast(entry.pool)->Run(entry.target, entry.runnable, false); } catch (...) { Debug::PrintError(); } } if (gRuntimeConfig.async.bEnableLegacyTicks) { if (limiter.CheckExchangePass()) { if (!AuExchange(gLockedPump, true)) { NewWorkItem(gMainThread, AuMakeShared(PumpSysThread))->Dispatch(); } SchedNextTime(AuMSToNS(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS()); } } } } void InitSched() { } void DeinitSched() { if (gThread) { gThread->SendExitSignal(); } gSchedCondvar->Broadcast(); gThread.reset(); } AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid) { if (!pid) { gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal; gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); Console::Commands::UpdateDispatcher(gMainThread); return; } gMainThread = pid; Console::Commands::UpdateDispatcher(gMainThread); gRuntimeConfig.async.bEnableLegacyTicks = true; StartSched2(); } void StartSched() { if (!gRuntimeConfig.async.bStartSchedularOnStartup) { return; } else { if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread) { gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); } } StartSched2(); } void StartSched2() { AU_LOCK_GUARD(gSchedLock); if (gThread) return; gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks; gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo( AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)), AuThreads::IThreadVectorsFunctional::OnExit_t {}) )); gThread->Run(); } bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool, WorkerId_t target, AuSPtr runnable) { AU_LOCK_GUARD(gSchedLock); AU_DEBUG_MEMCRUNCH; AuAtomicAdd(&pool->uAtomicCounter, 1u); SchedNextTime(ns); return AuTryInsert(gOrderedEntries, AuConstReference(ns), SchedEntry { ns, target, runnable, pool }); } void TerminateSceduledTasks(IThreadPoolInternal *pool, WorkerId_t target) { AU_LOCK_GUARD(gSchedLock); for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); ) { auto &[ns, entry] = *itr; if ((entry.pool == pool) && ((entry.target == target) || (target.second == Async::kThreadIdAny && target.first == entry.target.first))) { entry.runnable->CancelAsync(); itr = gOrderedEntries.erase(itr); } else { itr ++; } } } }