/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: Schedular.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "Schedular.hpp" //#include "AsyncApp.hpp" #include "ThreadPool.hpp" namespace Aurora::Async { struct SchedEntry { AuUInt64 ns; WorkerId_t target; AuSPtr runnable; IThreadPoolInternal *pool; }; static AuThreads::ThreadUnique_t gThread; static AuThreadPrimitives::MutexUnique_t gSchedLock; static AuList gEntries; static void GetDispatchableTasks(AuList &pending) { AU_LOCK_GUARD(gSchedLock); auto time = Time::CurrentClockNS(); for (auto itr = gEntries.begin(); itr != gEntries.end(); ) { if (itr->ns <= time) { if (!AuTryInsert(pending, AuMove(*itr))) { break; } itr = gEntries.erase(itr); } else { itr ++; } } } static bool gLockedPump = false; static void PumpSysThread() { RuntimeSysPump(); gLockedPump = false; } static void SchedThread() { AuUInt32 counter {}; AuList pending; auto thread = AuThreads::GetThread(); while (!thread->Exiting()) { AuList pending; Threading::SleepNs(1000000 / 2 * gRuntimeConfig.async.schedularFrequency); GetDispatchableTasks(pending); for (auto &entry : pending) { try { entry.pool->Run(entry.target, entry.runnable); entry.pool->DecrementTasksRunning(); } catch (...) { if (entry.pool->ToThreadPool()->InRunnerMode()) { AuLogWarn("Dropped scheduled task! Expect a leaky counter!"); AuLogWarn("Would you rather `Why u no exit?!` or `WHY DID U JUST CRASH REEEE` in production?"); } Debug::PrintError(); } } counter++; if ((!gRuntimeConfig.async.sysPumpFrequency) || ((gRuntimeConfig.async.sysPumpFrequency) && (counter % gRuntimeConfig.async.sysPumpFrequency) == 0)) { try { if (!AuExchange(gLockedPump, true)) { NewWorkItem(AuWorkerId_t{0, 0}, AuMakeShared(PumpSysThread))->Dispatch(); } } catch (...) { AuLogWarn("Dropped SysRuntimePump"); Debug::PrintError(); } } } } void InitSched() { gSchedLock = AuThreadPrimitives::MutexUnique(); } void DeinitSched() { gThread.reset(); gSchedLock.reset(); } void StartSched() { AU_LOCK_GUARD(gSchedLock); if (gThread) return; gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo( AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)), AuThreads::IThreadVectorsFunctional::OnExit_t{}) )); gThread->Run(); } void StopSched() { gThread.reset(); } void Schedule(AuUInt64 ns, IThreadPoolInternal *pool, WorkerId_t target, AuSPtr runnable) { AU_LOCK_GUARD(gSchedLock); pool->IncrementTasksRunning(); gEntries.push_back({ns, target, runnable, pool}); } void TerminateSceduledTasks(IThreadPoolInternal *pool, WorkerId_t target) { AU_LOCK_GUARD(gSchedLock); for (auto itr = gEntries.begin(); itr != gEntries.end(); ) { if ((itr->pool == pool) && ((itr->target == target) || (target.second == Async::kThreadIdAny && target.first == itr->target.first))) { itr->runnable->CancelAsync(); itr = gEntries.erase(itr); } else { itr ++; } } } }