[*] dd130980 cont: improved AuAsync scheduling

This commit is contained in:
Reece Wilson 2023-08-10 01:10:59 +01:00
parent e68dc02e7e
commit 7865749348

View File

@ -22,41 +22,47 @@ namespace Aurora::Async
IThreadPoolInternal *pool;
};
// sched thread threading:
static AuThreads::ThreadUnique_t gThread;
static AuThreadPrimitives::ConditionMutex gSchedLock;
static AuThreadPrimitives::ConditionVariable gSchedCondvar(AuUnsafeRaiiToShared(gSchedLock.AsPointer()));
static AuWorkerPId_t gMainThread;
static bool gBOriginal;
// next tick timing:
static AuUInt64 uNextSysTickGuessed {};
static AuUInt64 uNextWakeuptimeRateLimit {};
static AuList<SchedEntry> gEntries;
static AuBST<AuUInt64, SchedEntry> gOrderedEntries;
// prevents stupid tick issues
static bool gLockedPump = false;
// old sewage to be cleaned up
static bool gBOriginal;
static AuWorkerPId_t gMainThread;
void StartSched2();
static void SchedNextTime(AuUInt64 uNSAbs);
static void GetDispatchableTasks(AuList<SchedEntry> &pending)
{
auto time = Time::SteadyClockNS();
for (auto itr = gEntries.begin(); itr != gEntries.end(); )
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
{
if (itr->ns <= time)
{
if (!AuTryInsert(pending, *itr))
{
break;
}
auto &[ns, entry] = *itr;
itr = gEntries.erase(itr);
}
else
if (ns > time)
{
itr ++;
break;
}
if (!AuTryInsert(pending, AuMove(entry)))
{
SchedNextTime(time + AuMSToNS<AuUInt64>(3));
break;
}
itr = gOrderedEntries.erase(itr);
}
}
static bool gLockedPump = false;
static void PumpSysThread()
{
try
@ -67,6 +73,7 @@ namespace Aurora::Async
{
SysPushErrorCatch("SysPump failed");
}
gLockedPump = false;
}
@ -114,6 +121,9 @@ namespace Aurora::Async
{
Aurora::Utility::RateLimiter limiter(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS));
// Intentional: no Dec used on this thread; we should never run out of memory to schedule tasks
AuDebug::AddMemoryCrunch();
while (AuIsThreadRunning())
{
AuList<SchedEntry> pending;
@ -128,6 +138,10 @@ namespace Aurora::Async
{
gSchedCondvar->WaitForSignalNS(uNextTick - uNow);
}
else if (uNow == uNextTick)
{
uNextSysTickGuessed = 0;
}
else
{
uNextSysTickGuessed = 0;
@ -147,11 +161,6 @@ namespace Aurora::Async
}
catch (...)
{
if (entry.pool->ToThreadPool()->InRunnerMode())
{
AuLogWarn("Dropped scheduled task! Expect a leaky counter!");
AuLogWarn("Would you rather `Why u no exit?!` or a spurious crash in production?");
}
Debug::PrintError();
}
}
@ -160,19 +169,11 @@ namespace Aurora::Async
{
if (limiter.CheckExchangePass())
{
try
if (!AuExchange(gLockedPump, true))
{
if (!AuExchange(gLockedPump, true))
{
NewWorkItem(gMainThread, AuMakeShared<BasicWorkStdFunc>(PumpSysThread))->Dispatch();
}
NewWorkItem(gMainThread, AuMakeShared<BasicWorkStdFunc>(PumpSysThread))->Dispatch();
}
catch (...)
{
AuLogWarn("Dropped SysRuntimePump");
Debug::PrintError();
}
SchedNextTime(AuMSToNS<AuUInt64>(gRuntimeConfig.async.dwLegacyMainThreadSystemTickMS) + AuTime::SteadyClockNS());
}
}
@ -186,6 +187,8 @@ namespace Aurora::Async
void DeinitSched()
{
gThread->SendExitSignal();
gSchedCondvar->Broadcast();
gThread.reset();
}
@ -242,40 +245,36 @@ namespace Aurora::Async
void StopSched()
{
gThread.reset();
DeinitSched();
}
bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool, WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool,
WorkerId_t target,
AuSPtr<IAsyncRunnable> runnable)
{
// TODO (Reece): urgent
if (!gSchedLock)
{
return false;
}
AU_LOCK_GUARD(gSchedLock);
AuAtomicAdd(&pool->uAtomicCounter, 1u);
SchedNextTime(ns);
return AuTryInsert(gEntries, SchedEntry {ns, target, runnable, pool});
return AuTryInsert(gOrderedEntries,
AuConstReference(ns),
SchedEntry { ns, target, runnable, pool });
}
void TerminateSceduledTasks(IThreadPoolInternal *pool, WorkerId_t target)
void TerminateSceduledTasks(IThreadPoolInternal *pool,
WorkerId_t target)
{
if (!gSchedLock)
{
SysAssert(gEntries.empty());
return;
}
AU_LOCK_GUARD(gSchedLock);
for (auto itr = gEntries.begin(); itr != gEntries.end(); )
for (auto itr = gOrderedEntries.begin(); itr != gOrderedEntries.end(); )
{
if ((itr->pool == pool) &&
((itr->target == target) || (target.second == Async::kThreadIdAny && target.first == itr->target.first)))
auto &[ns, entry] = *itr;
if ((entry.pool == pool) &&
((entry.target == target) ||
(target.second == Async::kThreadIdAny && target.first == entry.target.first)))
{
itr->runnable->CancelAsync();
itr = gEntries.erase(itr);
entry.runnable->CancelAsync();
itr = gOrderedEntries.erase(itr);
}
else
{