AuroraRuntime/Source/Async/Schedular.cpp

285 lines
7.7 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Schedular.cpp
Date: 2021-6-26
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "Schedular.hpp"
//#include "AsyncApp.hpp"
#include "ThreadPool.hpp"
#include <Console/Commands/Commands.hpp>
namespace Aurora::Async
{
struct SchedEntry
{
AuUInt64 ns;
WorkerId_t target;
AuSPtr<IAsyncRunnable> runnable;
IThreadPoolInternal *pool;
};
// sched thread threading:
static AuThreads::ThreadUnique_t gThread;
static AuThreadPrimitives::ConditionMutex gSchedLock;
static AuThreadPrimitives::ConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer()));
// next tick timing:
static AuUInt64 uNextSysTickGuessed {};
static AuUInt64 uNextWakeuptimeRateLimit {};
static AuBST<AuUInt64, SchedEntry> gOrderedEntries;
// prevents stupid tick issues
static bool gLockedPump = false;
// old sewage to be cleaned up
static bool gBOriginal;
static AuOptionalEx<AuWorkerPId_t> gMainThread;
void StartSched2();
static void SchedNextTime(AuUInt64 uNSAbs);
static void GetDispatchableTasks(AuList<SchedEntry> &pending)
{
auto time = Time::SteadyClockNS();
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
{
auto &[ns, entry] = *itr;
if (ns > time)
{
break;
}
if (!AuTryInsert(pending, AuMove(entry)))
{
SchedNextTime(time + AuMSToNS<AuUInt64>(3));
break;
}
itr = gOrderedEntries.erase(itr);
}
}
static void PumpSysThread()
{
try
{
RuntimeSysPump();
}
catch (...)
{
SysPushErrorCatch("SysPump failed");
}
gLockedPump = false;
}
static void SchedNextTime(AuUInt64 uNSAbs)
{
bool bForceWakeup {};
if (uNextSysTickGuessed > uNSAbs ||
!uNextSysTickGuessed)
{
while (true)
{
auto uCurrent = uNextSysTickGuessed;
if (uCurrent &&
uCurrent <= uNSAbs)
{
break;
}
if (uNextWakeuptimeRateLimit > uNSAbs)
{
uNSAbs = uNextWakeuptimeRateLimit;
}
if (!uCurrent)
{
bForceWakeup = true;
}
if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent)
{
break;
}
}
if (bForceWakeup)
{
gSchedCondvar->Signal();
}
}
}
static void SchedThread()
{
Aurora::Utility::RateLimiter limiter(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS));
// Intentional: no Dec used on this thread; we should never run out of memory to schedule tasks
AuDebug::AddMemoryCrunch();
while (AuIsThreadRunning())
{
AuList<SchedEntry> pending;
{
AU_LOCK_GUARD(gSchedLock);
auto uNextTick = uNextSysTickGuessed;
auto uNow = AuTime::SteadyClockNS();
if (uNow < uNextTick)
{
gSchedCondvar->WaitForSignalNS(uNextTick - uNow);
}
else if (uNow == uNextTick)
{
uNextSysTickGuessed = 0;
}
else
{
uNextSysTickGuessed = 0;
gSchedCondvar->WaitForSignalNS(gRuntimeConfig.async.bEnableLegacyTicks ?
AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) :
0);
}
GetDispatchableTasks(pending);
uNextWakeuptimeRateLimit = AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwSchedulerRateLimitMS);
}
for (auto &entry : pending)
{
try
{
AuStaticCast<AuAsync::ThreadPool>(entry.pool)->Run(entry.target, entry.runnable, false);
}
catch (...)
{
Debug::PrintError();
}
}
if (gRuntimeConfig.async.bEnableLegacyTicks)
{
if (limiter.CheckExchangePass())
{
if (!AuExchange(gLockedPump, true))
{
DispatchOn(gMainThread.value(), PumpSysThread);
}
SchedNextTime(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS());
}
}
}
}
void InitSched()
{
}
void DeinitSched()
{
if (gThread)
{
gThread->SendExitSignal();
}
gSchedCondvar->Broadcast();
gThread.reset();
}
AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid)
{
if (!pid.pool)
{
gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal;
AuResetMember(gMainThread);
Console::Commands::UpdateDispatcher(gMainThread);
return;
}
gMainThread = pid;
Console::Commands::UpdateDispatcher(gMainThread);
gRuntimeConfig.async.bEnableLegacyTicks = true;
StartSched2();
}
void StartSched()
{
if (!gRuntimeConfig.async.bStartSchedularOnStartup)
{
return;
}
else
{
if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread)
{
gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0});
}
}
StartSched2();
}
void StartSched2()
{
AU_LOCK_GUARD(gSchedLock);
if (gThread) return;
gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks;
gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)),
AuThreads::IThreadVectorsFunctional::OnExit_t {})
));
gThread->Run();
}
bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool,
WorkerId_t target,
AuSPtr<IAsyncRunnable> runnable)
{
AU_LOCK_GUARD(gSchedLock);
AU_DEBUG_MEMCRUNCH;
AuAtomicAdd(&pool->uAtomicCounter, 1u);
SchedNextTime(ns);
return AuTryInsert(gOrderedEntries,
AuConstReference(ns),
SchedEntry { ns, target, runnable, pool });
}
void TerminateSceduledTasks(IThreadPoolInternal *pool,
WorkerId_t target)
{
AU_LOCK_GUARD(gSchedLock);
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
{
auto &[ns, entry] = *itr;
if ((entry.pool == pool) &&
((entry.target == target) ||
(target.second == Async::kThreadIdAny && target.first == entry.target.first)))
{
entry.runnable->CancelAsync();
itr = gOrderedEntries.erase(itr);
}
else
{
itr ++;
}
}
}
}