AuroraRuntime/Source/Async/AuSchedular.cpp

312 lines
8.9 KiB
C++

/***
Copyright (C) 2021-2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuSchedular.cpp
Date: 2021-6-26
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "AuSchedular.hpp"
//#include "AsyncApp.hpp"
#include "ThreadPool.hpp"
#include <Console/Commands/Commands.hpp>
#include "IAsyncRunnable.hpp"
//#include <Source/Threading/Primitives/AuConditionMutex.Generic.hpp>
//#include <Source/Threading/Primitives/AuConditionVariable.Generic.hpp>
//#define SCHEDULER_USE_NO_SPIN
namespace Aurora::Async
{
struct SchedEntry
{
AuUInt64 ns;
WorkerId_t target;
AuSPtr<IAsyncRunnable> runnable;
IThreadPoolInternal *pool;
};
// sched thread threading:
static AuThreads::ThreadUnique_t gThread;
static AuConditionMutex gSchedLock;
#if !defined(SCHEDULER_USE_NO_SPIN)
static AuConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer()));
#else
static AuThreadPrimitives::ConditionVariableInternal gSchedCondvar;
#endif
// next tick timing:
static AuUInt64 uNextSysTickGuessed {};
static AuUInt64 uNextWakeuptimeRateLimit {};
static AuBST<AuUInt64, SchedEntry> gOrderedEntries;
// prevents stupid tick issues
static bool gLockedPump = false;
// old sewage to be cleaned up
static bool gBOriginal;
static AuOptionalEx<AuWorkerPId_t> gMainThread;
void StartSched2();
static void SchedNextTime(AuUInt64 uNSAbs);
static void GetDispatchableTasks(AuList<SchedEntry> &pending)
{
auto time = Time::SteadyClockNS();
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
{
auto &[ns, entry] = *itr;
if (ns > time)
{
SchedNextTime(ns);
break;
}
if (!AuTryInsert(pending, AuMove(entry)))
{
SchedNextTime(time + AuMSToNS<AuUInt64>(3));
break;
}
itr = gOrderedEntries.erase(itr);
}
}
static void PumpSysThread()
{
try
{
RuntimeSysPump();
}
catch (...)
{
SysPushErrorCatch("SysPump failed");
}
gLockedPump = false;
}
static void SchedNextTime(AuUInt64 uNSAbs)
{
while (true)
{
auto uCurrent = AuAtomicLoad(&uNextSysTickGuessed);
if (uCurrent &&
uCurrent <= uNSAbs)
{
return;
}
auto uRateLimit = AuAtomicLoad(&uNextWakeuptimeRateLimit);
if (uRateLimit &&
uRateLimit > uNSAbs)
{
uNSAbs = uRateLimit;
if (uRateLimit == uCurrent)
{
return;
}
}
if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent)
{
break;
}
}
#if !defined(SCHEDULER_USE_NO_SPIN)
gSchedCondvar->Signal();
#else
gSchedCondvar.Signal();
#endif
}
static void SchedThread()
{
Aurora::Utility::RateLimiter limiter(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS));
// Intentional: no Dec used on this thread; we should never run out of memory to schedule tasks
AuDebug::AddMemoryCrunch();
while (AuIsThreadRunning())
{
AuList<SchedEntry> pending;
{
AU_LOCK_GUARD(gSchedLock);
auto uNextTick = AuAtomicLoad(&uNextSysTickGuessed);
auto uNow = AuTime::SteadyClockNS();
if (uNow < uNextTick)
{
uNextSysTickGuessed = 0;
#if !defined(SCHEDULER_USE_NO_SPIN)
gSchedCondvar->WaitForSignalNS(uNextTick - uNow);
#else
gSchedCondvar.WaitForSignalNsEx(&AuStaticCast<AuThreadPrimitives::ConditionMutexImpl>(gSchedLock.AsPointer())->mutex, uNextTick - uNow, false);
#endif
}
else if (uNow >= uNextTick)
{
uNextSysTickGuessed = 0;
}
else if (uNextSysTickGuessed == 0)
{
#if !defined(SCHEDULER_USE_NO_SPIN)
gSchedCondvar->WaitForSignalNS(gRuntimeConfig.async.bEnableLegacyTicks ?
AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) :
0);
#else
gSchedCondvar.WaitForSignalNsEx(&AuStaticCast<AuThreadPrimitives::ConditionMutexImpl>(gSchedLock.AsPointer())->mutex, gRuntimeConfig.async.bEnableLegacyTicks ?
AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) :
0,
false);
#endif
}
if (gRuntimeConfig.async.dwSchedulerRateLimitNS)
{
uNextWakeuptimeRateLimit =
AuTime::SteadyClockNS() + gRuntimeConfig.async.dwSchedulerRateLimitNS;
}
GetDispatchableTasks(pending);
}
for (auto &entry : pending)
{
try
{
AuStaticCast<AuAsync::ThreadPool>(entry.pool)->Run(entry.target, entry.runnable, false);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (gRuntimeConfig.async.bEnableLegacyTicks)
{
if (limiter.CheckExchangePass())
{
if (!AuExchange(gLockedPump, true))
{
DispatchOn(gMainThread.value(), PumpSysThread);
}
SchedNextTime(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS());
}
}
}
}
void InitSched()
{
}
void DeinitSched()
{
if (gThread)
{
gThread->SendExitSignal();
}
#if !defined(SCHEDULER_USE_NO_SPIN)
gSchedCondvar->Broadcast();
#else
gSchedCondvar.Broadcast();
#endif
gThread.reset();
}
AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid)
{
if (!pid)
{
gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal;
AuResetMember(gMainThread);
Console::Commands::UpdateDispatcher(gMainThread);
return;
}
gMainThread = pid;
Console::Commands::UpdateDispatcher(gMainThread);
gRuntimeConfig.async.bEnableLegacyTicks = true;
StartSched2();
}
void StartSched()
{
if (!gRuntimeConfig.async.bStartSchedularOnStartup)
{
return;
}
else
{
if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread)
{
gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0});
}
}
StartSched2();
}
void StartSched2()
{
AU_LOCK_GUARD(gSchedLock);
if (gThread) return;
gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks;
gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)),
AuThreads::IThreadVectorsFunctional::OnExit_t {})
));
gThread->Run();
}
bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool,
WorkerId_t target,
AuSPtr<IAsyncRunnable> runnable)
{
AU_LOCK_GUARD(gSchedLock);
AU_DEBUG_MEMCRUNCH;
AuAtomicAdd(&pool->uAtomicCounter, 1u);
SchedNextTime(ns);
return AuTryInsert(gOrderedEntries,
AuConstReference(ns),
SchedEntry { ns, target, runnable, pool });
}
void TerminateSceduledTasks(IThreadPoolInternal *pool,
WorkerId_t target)
{
AU_LOCK_GUARD(gSchedLock);
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
{
auto &[ns, entry] = *itr;
if ((entry.pool == pool) &&
((entry.target == target) ||
(target.second == Async::kThreadIdAny && target.first == entry.target.first)))
{
entry.runnable->CancelAsync();
itr = gOrderedEntries.erase(itr);
}
else
{
itr++;
}
}
}
}