/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: Schedular.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "Schedular.hpp" //#include "AsyncApp.hpp" #include "ThreadPool.hpp" #include namespace Aurora::Async { struct SchedEntry { AuUInt64 ns; WorkerId_t target; AuSPtr runnable; IThreadPoolInternal *pool; }; static AuThreads::ThreadUnique_t gThread; static AuThreadPrimitives::MutexUnique_t gSchedLock; static AuWorkerPId_t gMainThread; static bool gBOriginal; static AuList gEntries; void StartSched2(); static void GetDispatchableTasks(AuList &pending) { AU_LOCK_GUARD(gSchedLock); auto time = Time::CurrentClockNS(); for (auto itr = gEntries.begin(); itr != gEntries.end(); ) { if (itr->ns <= time) { if (!AuTryInsert(pending, *itr)) { break; } itr = gEntries.erase(itr); } else { itr ++; } } } static bool gLockedPump = false; static void PumpSysThread() { try { RuntimeSysPump(); } catch (...) { SysPushErrorCatch("SysPump failed"); } gLockedPump = false; } static void SchedThread() { AuUInt32 counter {}; AuList pending; auto thread = AuThreads::GetThread(); while (!thread->Exiting()) { AuList pending; Threading::SleepNs(1000000 / 2 * gRuntimeConfig.async.schedularFrequency); GetDispatchableTasks(pending); for (auto &entry : pending) { try { entry.pool->Run(entry.target, entry.runnable); entry.pool->DecrementTasksRunning(); } catch (...) { if (entry.pool->ToThreadPool()->InRunnerMode()) { AuLogWarn("Dropped scheduled task! Expect a leaky counter!"); AuLogWarn("Would you rather `Why u no exit?!` or a spurious crash in production?"); } Debug::PrintError(); } } counter++; if (gRuntimeConfig.async.enableSysPumpFreqnecy) { if ((!gRuntimeConfig.async.sysPumpFrequency) || ((gRuntimeConfig.async.sysPumpFrequency) && (counter % gRuntimeConfig.async.sysPumpFrequency) == 0)) { try { if (!AuExchange(gLockedPump, true)) { NewWorkItem(gMainThread, AuMakeShared(PumpSysThread))->Dispatch(); } } catch (...) { AuLogWarn("Dropped SysRuntimePump"); Debug::PrintError(); } } } } } void InitSched() { gSchedLock = AuThreadPrimitives::MutexUnique(); } void DeinitSched() { gThread.reset(); gSchedLock.reset(); } AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid) { if (!pid) { gRuntimeConfig.async.enableSysPumpFreqnecy = gBOriginal; gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); Console::Commands::UpdateDispatcher(gMainThread); return; } gMainThread = pid; Console::Commands::UpdateDispatcher(gMainThread); gRuntimeConfig.async.enableSysPumpFreqnecy = true; StartSched2(); } void StartSched() { if (!gRuntimeConfig.async.enableSchedularThread) { return; } else { if (gRuntimeConfig.async.sysPumpFrequency && !gMainThread) { gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0}); } } StartSched2(); } void StartSched2() { AU_LOCK_GUARD(gSchedLock); if (gThread) return; gBOriginal = gRuntimeConfig.async.enableSysPumpFreqnecy; gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo( AuMakeShared(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)), AuThreads::IThreadVectorsFunctional::OnExit_t {}) )); gThread->Run(); } void StopSched() { gThread.reset(); } bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool, WorkerId_t target, AuSPtr runnable) { // TODO (Reece): urgent if (!gSchedLock) { return false; } AU_LOCK_GUARD(gSchedLock); pool->IncrementTasksRunning(); return AuTryInsert(gEntries, SchedEntry {ns, target, runnable, pool}); } void TerminateSceduledTasks(IThreadPoolInternal *pool, WorkerId_t target) { if (!gSchedLock) { SysAssert(gEntries.empty()); return; } AU_LOCK_GUARD(gSchedLock); for (auto itr = gEntries.begin(); itr != gEntries.end(); ) { if ((itr->pool == pool) && ((itr->target == target) || (target.second == Async::kThreadIdAny && target.first == itr->target.first))) { itr->runnable->CancelAsync(); itr = gEntries.erase(itr); } else { itr ++; } } } }