From c8c39080855ab7d1b537e74a2d55c53dd651a67f Mon Sep 17 00:00:00 2001 From: Reece Date: Wed, 30 Jun 2021 13:00:32 +0100 Subject: [PATCH] [+] Now with a scheduler! --- Include/Aurora/Async/Async.hpp | 6 +-- Source/Async/Async.cpp | 6 +++ Source/Async/Async.hpp | 1 + Source/Async/AsyncApp.cpp | 15 ++++++- Source/Async/AsyncApp.hpp | 7 ++- Source/Async/Schedular.cpp | 79 +++++++++++++++++++++++++++++++++- Source/Async/Schedular.hpp | 6 ++- Source/Async/WorkItem.cpp | 18 ++++++-- Source/Async/WorkItem.hpp | 3 +- Source/Console/Hooks/Hooks.cpp | 11 +---- Source/Entrypoint.cpp | 1 + Source/IO/FS/Resources.cpp | 13 +++++- 12 files changed, 141 insertions(+), 25 deletions(-) diff --git a/Include/Aurora/Async/Async.hpp b/Include/Aurora/Async/Async.hpp index dc37a588..07a2b7a9 100644 --- a/Include/Aurora/Async/Async.hpp +++ b/Include/Aurora/Async/Async.hpp @@ -43,7 +43,7 @@ namespace Aurora::Async EProcessNext type; AuList> waitFor; AuUInt32 reschedMs; - //AuUInt64 reschedNs; + AuUInt64 reschedNs; }; virtual void DispatchFrame(ProcessInfo &info) = 0; @@ -77,13 +77,13 @@ namespace Aurora::Async std::optional(* onFrame)(const Info_t &); }; - class IWorkItem { public: virtual void WaitFor(const AuSPtr &workItem) = 0; virtual void WaitFor(const AuList> &workItem) = 0; virtual void SetSchedTime(AuUInt32 ms) = 0; + virtual void SetSchedTimeNs(AuUInt64 ns) = 0; virtual void Dispatch() = 0; @@ -91,7 +91,6 @@ namespace Aurora::Async virtual bool HasFinished() = 0; virtual bool HasFailed() = 0; }; - AUKN_SYM AuSPtr NewWorkItem(const DispatchTarget_t &worker, const AuSPtr &task, bool supportsBlocking = false); @@ -324,5 +323,4 @@ namespace Aurora::Async virtual void AssertInThreadGroup(ThreadGroup_t thread) = 0; virtual void AssertWorker(WorkerId_t id) = 0; }; - } \ No newline at end of file diff --git a/Source/Async/Async.cpp b/Source/Async/Async.cpp index 9ccd35a7..002077c8 100644 --- a/Source/Async/Async.cpp +++ b/Source/Async/Async.cpp @@ -7,11 +7,17 @@ ***/ #include #include "Async.hpp" +#include "Schedular.hpp" namespace Aurora::Async { void InitAsync() { + InitSched(); + } + void ShutdownSync() + { + ShutdownSched(); } } \ No newline at end of file diff --git a/Source/Async/Async.hpp b/Source/Async/Async.hpp index 4f002994..0d4fbf9c 100644 --- a/Source/Async/Async.hpp +++ b/Source/Async/Async.hpp @@ -43,4 +43,5 @@ namespace Aurora::Async }; void InitAsync(); + void ShutdownSync(); } \ No newline at end of file diff --git a/Source/Async/AsyncApp.cpp b/Source/Async/AsyncApp.cpp index ca9292c9..bfbc4a6d 100644 --- a/Source/Async/AsyncApp.cpp +++ b/Source/Async/AsyncApp.cpp @@ -14,6 +14,19 @@ namespace Aurora::Async { static AsyncApp gAsyncApp; static std::atomic_int gRunningTasks {}; + + void IncRunningTasks() + { + gRunningTasks++; + } + + void DecRunningTasks() + { + if ((--gRunningTasks) == 0) + { + gAsyncApp.ShutdownOutOfTasks(); + } + } //STATIC_TLS(WorkerId_t, tlsWorkerId); static Threading::Threads::TLSVariable tlsWorkerId; @@ -239,7 +252,7 @@ namespace Aurora::Async runningTasks = --gRunningTasks; } - if (runningTasks) + if (runningTasks == 0) { ShutdownOutOfTasks(); } diff --git a/Source/Async/AsyncApp.hpp b/Source/Async/AsyncApp.hpp index 689ca89a..2054f333 100644 --- a/Source/Async/AsyncApp.hpp +++ b/Source/Async/AsyncApp.hpp @@ -13,6 +13,10 @@ namespace Aurora::Async struct ThreadState; //class WorkItem; + + void DecRunningTasks(); + void IncRunningTasks(); + class AsyncApp : public IAsyncApp { public: @@ -49,9 +53,10 @@ namespace Aurora::Async void Run(DispatchTarget_t target, AuSPtr runnable); + void ShutdownOutOfTasks(); + private: - void ShutdownOutOfTasks(); // TODO: BarrierMultiple bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop); bool Poll(bool a); diff --git a/Source/Async/Schedular.cpp b/Source/Async/Schedular.cpp index 375e551b..ac27687d 100644 --- a/Source/Async/Schedular.cpp +++ b/Source/Async/Schedular.cpp @@ -6,9 +6,86 @@ Author: Reece ***/ #include +#include "Async.hpp" #include "Schedular.hpp" +#include "AsyncApp.hpp" namespace Aurora::Async { - + struct SchedEntry + { + AuUInt64 ns; + DispatchTarget_t target; + AuSPtr runnable; + }; + + static Threading::Threads::ThreadUnique_t gThread; + static Threading::Primitives::RWLockUnique_t gSchedLock; + static AuList gEntries; + + static void GetDispatchableTasks(AuList &pending) + { + Threading::LockGuardPtr lock(gSchedLock->AsReadable()); + + auto time = Time::CurrentClockNS(); + + for (auto itr = gEntries.begin(); itr != gEntries.end(); ) + { + if (itr->ns <= time) + { + pending.push_back(std::move(*itr)); + itr = gEntries.erase(itr); + } + else + { + itr ++; + } + } + } + + static void SchedThread() + { + auto thread = Threading::Threads::GetThread(); + AuList pending; + + while (!thread->Exiting()) + { + Threading::SleepNs(1000000 / 2); + + AuList pending; + GetDispatchableTasks(pending); + + for (auto &entry : pending) + { + static_cast(GetAsyncApp())->Run(entry.target, entry.runnable); + DecRunningTasks(); + } + } + } + + void InitSched() + { + gSchedLock = Threading::Primitives::RWLockUnique(); + + Threading::Threads::AbstractThreadVectors handler; + handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread) + { + SchedThread(); + }; + gThread = Threading::Threads::ThreadUnique(handler); + gThread->Run(); + } + + void ShutdownSched() + { + gThread.reset(); + gSchedLock.reset(); + } + + void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr runnable) + { + Threading::LockGuardPtr lock(gSchedLock->AsWritable()); + IncRunningTasks(); + gEntries.push_back({ns, target, runnable}); + } } \ No newline at end of file diff --git a/Source/Async/Schedular.hpp b/Source/Async/Schedular.hpp index 6eda6788..1e362a23 100644 --- a/Source/Async/Schedular.hpp +++ b/Source/Async/Schedular.hpp @@ -9,5 +9,9 @@ namespace Aurora::Async { - + void InitSched(); + void ShutdownSched(); + + void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr runnable); + } \ No newline at end of file diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 9ccb38b5..1eb05319 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -9,6 +9,7 @@ #include "Async.hpp" #include "WorkItem.hpp" #include "AsyncApp.hpp" +#include "Schedular.hpp" namespace Aurora::Async { @@ -79,10 +80,15 @@ namespace Aurora::Async } } + + void WorkItem::SetSchedTimeNs(AuUInt64 ns) + { + dispatchTimeNs_ = Time::CurrentClockNS() + ns; + } + void WorkItem::SetSchedTime(AuUInt32 ms) { - Threading::LockGuard l(lock); - dispatchTime_ = Time::CurrentClockMS() + ms; + dispatchTimeNs_ = Time::CurrentClockNS() + (AuUInt64(ms) * AuUInt64(1000000)); } void WorkItem::Dispatch() @@ -123,7 +129,7 @@ namespace Aurora::Async itr = waitOn_.erase(itr); } - if (Time::CurrentClockMS() < dispatchTime_) + if (Time::CurrentClockNS() < dispatchTimeNs_ ) { Schedule(); return; @@ -163,6 +169,10 @@ namespace Aurora::Async { SetSchedTime(info.reschedMs); } + if (info.reschedNs) + { + SetSchedTimeNs(info.reschedNs); + } WaitFor(info.waitFor); } @@ -233,7 +243,7 @@ namespace Aurora::Async void WorkItem::Schedule() { - // TODO: + Aurora::Async::Schedule(dispatchTimeNs_, worker_, this->shared_from_this()); } void WorkItem::SendOff() diff --git a/Source/Async/WorkItem.hpp b/Source/Async/WorkItem.hpp index df2ec7db..1b6ae562 100644 --- a/Source/Async/WorkItem.hpp +++ b/Source/Async/WorkItem.hpp @@ -18,6 +18,7 @@ namespace Aurora::Async void WaitFor(const AuSPtr &workItem) override; void WaitFor(const AuList> &workItem) override; void SetSchedTime(AuUInt32 ms) override; + void SetSchedTimeNs(AuUInt64 ns) override; void Dispatch() override; @@ -40,7 +41,7 @@ namespace Aurora::Async bool finished {}; bool failed {}; bool dispatchPending_ {}; - AuUInt32 dispatchTime_ {}; + AuUInt64 dispatchTimeNs_ {}; void Fail(); void Schedule(); diff --git a/Source/Console/Hooks/Hooks.cpp b/Source/Console/Hooks/Hooks.cpp index f0f35511..ceac904b 100644 --- a/Source/Console/Hooks/Hooks.cpp +++ b/Source/Console/Hooks/Hooks.cpp @@ -21,10 +21,6 @@ namespace Aurora::Console::Hooks void WriteLine(const ConsoleMessage &msg) { - // Note: I believe waiting for the second pump to deposit each line - // would be a bad idea for warnings/debug logs. - // console messages must be written to the handler as soon as - // they are produced to alleviate loss of buffered data on termination. gMutex->Lock(); auto callbacks = gLineCallbacks; gMutex->Unlock(); @@ -36,12 +32,7 @@ namespace Aurora::Console::Hooks callback(msg); } } - else [[unlikely]] // Note: yes, this is a write line function. - // yes, .find('\n') is rather complex, but i dont care. - // logging FIO has and always will be the bottleneck. - // given the assumption this is slow, we may as well - // unfuck writelines with \n's. fmt's, unwinded stacks, - // etc all benefit from this + else [[unlikely]] { Aurora::Parse::SplitNewlines(msg.line, [&](const AuString &line) diff --git a/Source/Entrypoint.cpp b/Source/Entrypoint.cpp index 3c687157..77133f3f 100644 --- a/Source/Entrypoint.cpp +++ b/Source/Entrypoint.cpp @@ -42,6 +42,7 @@ static void Deinit() { Aurora::RNG::Release(); Aurora::Console::Exit(); + Aurora::Async::ShutdownSync(); } namespace Aurora diff --git a/Source/IO/FS/Resources.cpp b/Source/IO/FS/Resources.cpp index 0f0a6313..a53cf1e5 100644 --- a/Source/IO/FS/Resources.cpp +++ b/Source/IO/FS/Resources.cpp @@ -69,9 +69,18 @@ namespace Aurora::IO::FS } } } - + { - auto systemPath = gApplicationData + AuString(1, kPathSplitter) + fileName; + auto systemPath = gHomeDirectory + fileName; + if (FileExists(systemPath)) + { + path = systemPath; + return true; + } + } + + { + auto systemPath = gApplicationData + fileName; if (FileExists(systemPath)) { path = systemPath;