285 lines
7.7 KiB
C++
285 lines
7.7 KiB
C++
/***
|
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: Schedular.cpp
|
|
Date: 2021-6-26
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "Async.hpp"
|
|
#include "Schedular.hpp"
|
|
//#include "AsyncApp.hpp"
|
|
#include "ThreadPool.hpp"
|
|
#include <Console/Commands/Commands.hpp>
|
|
|
|
namespace Aurora::Async
|
|
{
|
|
struct SchedEntry
|
|
{
|
|
AuUInt64 ns;
|
|
WorkerId_t target;
|
|
AuSPtr<IAsyncRunnable> runnable;
|
|
IThreadPoolInternal *pool;
|
|
};
|
|
|
|
// sched thread threading:
|
|
static AuThreads::ThreadUnique_t gThread;
|
|
static AuThreadPrimitives::ConditionMutex gSchedLock;
|
|
static AuThreadPrimitives::ConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer()));
|
|
|
|
// next tick timing:
|
|
static AuUInt64 uNextSysTickGuessed {};
|
|
static AuUInt64 uNextWakeuptimeRateLimit {};
|
|
static AuBST<AuUInt64, SchedEntry> gOrderedEntries;
|
|
// prevents stupid tick issues
|
|
static bool gLockedPump = false;
|
|
|
|
// old sewage to be cleaned up
|
|
static bool gBOriginal;
|
|
static AuOptionalEx<AuWorkerPId_t> gMainThread;
|
|
void StartSched2();
|
|
static void SchedNextTime(AuUInt64 uNSAbs);
|
|
|
|
static void GetDispatchableTasks(AuList<SchedEntry> &pending)
|
|
{
|
|
auto time = Time::SteadyClockNS();
|
|
|
|
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
|
|
{
|
|
auto &[ns, entry] = *itr;
|
|
|
|
if (ns > time)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (!AuTryInsert(pending, AuMove(entry)))
|
|
{
|
|
SchedNextTime(time + AuMSToNS<AuUInt64>(3));
|
|
break;
|
|
}
|
|
|
|
itr = gOrderedEntries.erase(itr);
|
|
}
|
|
}
|
|
|
|
static void PumpSysThread()
|
|
{
|
|
try
|
|
{
|
|
RuntimeSysPump();
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch("SysPump failed");
|
|
}
|
|
|
|
gLockedPump = false;
|
|
}
|
|
|
|
static void SchedNextTime(AuUInt64 uNSAbs)
|
|
{
|
|
bool bForceWakeup {};
|
|
|
|
if (uNextSysTickGuessed > uNSAbs ||
|
|
!uNextSysTickGuessed)
|
|
{
|
|
while (true)
|
|
{
|
|
auto uCurrent = uNextSysTickGuessed;
|
|
|
|
if (uCurrent &&
|
|
uCurrent <= uNSAbs)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (uNextWakeuptimeRateLimit > uNSAbs)
|
|
{
|
|
uNSAbs = uNextWakeuptimeRateLimit;
|
|
}
|
|
|
|
if (!uCurrent)
|
|
{
|
|
bForceWakeup = true;
|
|
}
|
|
|
|
if (AuAtomicCompareExchange(&uNextSysTickGuessed, uNSAbs, uCurrent) == uCurrent)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (bForceWakeup)
|
|
{
|
|
gSchedCondvar->Signal();
|
|
}
|
|
}
|
|
}
|
|
|
|
static void SchedThread()
|
|
{
|
|
Aurora::Utility::RateLimiter limiter(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS));
|
|
|
|
// Intentional: no Dec used on this thread; we should never run out of memory to schedule tasks
|
|
AuDebug::AddMemoryCrunch();
|
|
|
|
while (AuIsThreadRunning())
|
|
{
|
|
AuList<SchedEntry> pending;
|
|
|
|
{
|
|
AU_LOCK_GUARD(gSchedLock);
|
|
|
|
auto uNextTick = uNextSysTickGuessed;
|
|
auto uNow = AuTime::SteadyClockNS();
|
|
|
|
if (uNow < uNextTick)
|
|
{
|
|
gSchedCondvar->WaitForSignalNS(uNextTick - uNow);
|
|
}
|
|
else if (uNow == uNextTick)
|
|
{
|
|
uNextSysTickGuessed = 0;
|
|
}
|
|
else
|
|
{
|
|
uNextSysTickGuessed = 0;
|
|
gSchedCondvar->WaitForSignalNS(gRuntimeConfig.async.bEnableLegacyTicks ?
|
|
AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) :
|
|
0);
|
|
}
|
|
|
|
GetDispatchableTasks(pending);
|
|
|
|
uNextWakeuptimeRateLimit = AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwSchedulerRateLimitMS);
|
|
}
|
|
|
|
for (auto &entry : pending)
|
|
{
|
|
try
|
|
{
|
|
AuStaticCast<AuAsync::ThreadPool>(entry.pool)->Run(entry.target, entry.runnable, false);
|
|
}
|
|
catch (...)
|
|
{
|
|
Debug::PrintError();
|
|
}
|
|
}
|
|
|
|
if (gRuntimeConfig.async.bEnableLegacyTicks)
|
|
{
|
|
if (limiter.CheckExchangePass())
|
|
{
|
|
if (!AuExchange(gLockedPump, true))
|
|
{
|
|
DispatchOn(gMainThread.value(), PumpSysThread);
|
|
}
|
|
|
|
SchedNextTime(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void InitSched()
|
|
{
|
|
|
|
}
|
|
|
|
void DeinitSched()
|
|
{
|
|
if (gThread)
|
|
{
|
|
gThread->SendExitSignal();
|
|
}
|
|
gSchedCondvar->Broadcast();
|
|
gThread.reset();
|
|
}
|
|
|
|
AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid)
|
|
{
|
|
if (!pid.pool)
|
|
{
|
|
gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal;
|
|
AuResetMember(gMainThread);
|
|
Console::Commands::UpdateDispatcher(gMainThread);
|
|
return;
|
|
}
|
|
|
|
gMainThread = pid;
|
|
Console::Commands::UpdateDispatcher(gMainThread);
|
|
gRuntimeConfig.async.bEnableLegacyTicks = true;
|
|
|
|
StartSched2();
|
|
}
|
|
|
|
void StartSched()
|
|
{
|
|
if (!gRuntimeConfig.async.bStartSchedularOnStartup)
|
|
{
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
if (gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS && !gMainThread)
|
|
{
|
|
gMainThread = AuWorkerPId_t(AuAsync::GetSharedAsyncApp(), AuWorkerId_t {0, 0});
|
|
}
|
|
}
|
|
|
|
StartSched2();
|
|
}
|
|
|
|
void StartSched2()
|
|
{
|
|
AU_LOCK_GUARD(gSchedLock);
|
|
|
|
if (gThread) return;
|
|
|
|
gBOriginal = gRuntimeConfig.async.bEnableLegacyTicks;
|
|
|
|
gThread = AuThreads::ThreadUnique(AuThreads::ThreadInfo(
|
|
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(SchedThread)),
|
|
AuThreads::IThreadVectorsFunctional::OnExit_t {})
|
|
));
|
|
|
|
gThread->Run();
|
|
}
|
|
|
|
bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool,
|
|
WorkerId_t target,
|
|
AuSPtr<IAsyncRunnable> runnable)
|
|
{
|
|
AU_LOCK_GUARD(gSchedLock);
|
|
AU_DEBUG_MEMCRUNCH;
|
|
AuAtomicAdd(&pool->uAtomicCounter, 1u);
|
|
SchedNextTime(ns);
|
|
return AuTryInsert(gOrderedEntries,
|
|
AuConstReference(ns),
|
|
SchedEntry { ns, target, runnable, pool });
|
|
}
|
|
|
|
void TerminateSceduledTasks(IThreadPoolInternal *pool,
|
|
WorkerId_t target)
|
|
{
|
|
AU_LOCK_GUARD(gSchedLock);
|
|
|
|
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
|
|
{
|
|
auto &[ns, entry] = *itr;
|
|
|
|
if ((entry.pool == pool) &&
|
|
((entry.target == target) ||
|
|
(target.second == Async::kThreadIdAny && target.first == entry.target.first)))
|
|
{
|
|
entry.runnable->CancelAsync();
|
|
itr = gOrderedEntries.erase(itr);
|
|
}
|
|
else
|
|
{
|
|
itr ++;
|
|
}
|
|
}
|
|
}
|
|
} |