From df301a4101c9e8856b16c0291a8c2c690788a14f Mon Sep 17 00:00:00 2001 From: Reece Date: Wed, 7 Jul 2021 21:32:59 +0100 Subject: [PATCH] Improve workitem api [+] New waitfor by work dispatcher type --- Include/Aurora/Async/Async.hpp | 25 ++++++++++++---------- Source/Async/AsyncApp.cpp | 38 +++++++++++++++++++++++++++++++++- Source/Async/AsyncApp.hpp | 8 +++++-- Source/Async/WorkItem.cpp | 20 +++++++++++------- Source/Async/WorkItem.hpp | 10 ++++----- 5 files changed, 75 insertions(+), 26 deletions(-) diff --git a/Include/Aurora/Async/Async.hpp b/Include/Aurora/Async/Async.hpp index bf5f7445..6656cf21 100644 --- a/Include/Aurora/Async/Async.hpp +++ b/Include/Aurora/Async/Async.hpp @@ -80,16 +80,16 @@ namespace Aurora::Async class IWorkItem { public: - virtual void WaitFor(const AuSPtr &workItem) = 0; - virtual void WaitFor(const AuList> &workItem) = 0; - virtual void SetSchedTime(AuUInt32 ms) = 0; - virtual void SetSchedTimeNs(AuUInt64 ns) = 0; + virtual AuSPtr WaitFor(const AuSPtr &workItem) = 0; + virtual AuSPtr WaitFor(const AuList> &workItem) = 0; + virtual AuSPtr SetSchedTime(AuUInt32 ms) = 0; + virtual AuSPtr SetSchedTimeNs(AuUInt64 ns) = 0; - virtual void Dispatch() = 0; + virtual AuSPtr Dispatch() = 0; - virtual bool BlockUntilComplete() = 0; - virtual bool HasFinished() = 0; - virtual bool HasFailed() = 0; + virtual bool BlockUntilComplete() = 0; + virtual bool HasFinished() = 0; + virtual bool HasFailed() = 0; }; AUKN_SYM AuSPtr NewWorkItem(const DispatchTarget_t &worker, const AuSPtr &task, bool supportsBlocking = false); @@ -285,7 +285,7 @@ namespace Aurora::Async } }; - template + template static std::function TranslateAsyncFunctionToDispatcher(std::function func) { auto cur = GetAsyncApp()->GetCurrentThread(); @@ -340,10 +340,13 @@ namespace Aurora::Async virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0; virtual void Signal(ThreadGroup_t group) = 0; - virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop - + virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop + virtual bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop + + virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0; + virtual void SyncAllSafe() = 0; // Features diff --git a/Source/Async/AsyncApp.cpp b/Source/Async/AsyncApp.cpp index bfbc4a6d..55df2b17 100644 --- a/Source/Async/AsyncApp.cpp +++ b/Source/Async/AsyncApp.cpp @@ -261,12 +261,42 @@ namespace Aurora::Async } - bool AsyncApp::WaitFor(WorkerId_t worker, Aurora::Threading::IWaitable *primitive, int timeoutMs) + bool AsyncApp::WaitFor(WorkerId_t worker, Threading::IWaitable *primitive, AuUInt32 timeoutMs) { auto curThread = GetThreadState(); if (worker == curThread->id) { + // TODO: nest counter or jump out + while (!Threading::WaitFor(primitive, 2)) + { + while (this->Poll(false)); + } + + return true; + } + else + { + return Threading::WaitFor(primitive, timeoutMs); + } + } + + bool AsyncApp::WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 timeoutMs) + { + auto curThread = GetThreadState(); + + bool workerIdMatches = (!unlocker.second.has_value()) && (unlocker.second.value() == curThread->id.second); + + if ((unlocker.first == curThread->id.first) && // work group matches + ((GetThreadWorkersCount(unlocker.first) < 2) || // is there anyone besides us who might deal with this? unlikely fast path + (workerIdMatches))) // well, crap + { + if (workerIdMatches) + { + LogWarn("Nested Task: {}:{}", unlocker.first, unlocker.second); + SysPushErrorLogicError("Nested Task: {}:{}", unlocker.first, unlocker.second); + } + // TODO: timeout isn't respected here as well while (!Threading::WaitFor(primitive, 2)) { @@ -515,6 +545,12 @@ namespace Aurora::Async return *ret; } + size_t AsyncApp::GetThreadWorkersCount(ThreadGroup_t group) + { + Threading::LockGuardPtr lock(rwlock_->AsReadable()); + return GetGroup(group)->workers.size(); + } + AuSPtr AsyncApp::GetThreadState() { auto id = GetCurrentThread(); diff --git a/Source/Async/AsyncApp.hpp b/Source/Async/AsyncApp.hpp index 1b33460f..766fc21c 100644 --- a/Source/Async/AsyncApp.hpp +++ b/Source/Async/AsyncApp.hpp @@ -31,14 +31,15 @@ namespace Aurora::Async // Spawning bool Spawn(WorkerId_t) override; Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override; - AuBST> GetThreads() override; + AuBST> GetThreads() override; WorkerId_t GetCurrentThread() override; // Synchronization bool Sync(ThreadGroup_t group, bool requireSignal, AuUInt32 timeout) override; void Signal(ThreadGroup_t group) override; - bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) override; // when unlocker = this, pump event loop + bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) override; // when unlocker = this, pump event loop + bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) override; // when unlocker = this, pump event loop bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) override; @@ -56,6 +57,9 @@ namespace Aurora::Async void ShutdownOutOfTasks(); bool Poll(bool block) override; + + size_t GetThreadWorkersCount(ThreadGroup_t group); + private: // TODO: BarrierMultiple diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 1eb05319..ff02d29c 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -27,7 +27,7 @@ namespace Aurora::Async //Fail(); } - void WorkItem::WaitFor(const AuSPtr &workItem) + AuSPtr WorkItem::WaitFor(const AuSPtr &workItem) { bool status {}; @@ -50,9 +50,11 @@ namespace Aurora::Async { Fail(); } + + return AU_SHARED_FROM_THIS; } - void WorkItem::WaitFor(const AuList> &workItems) + AuSPtr WorkItem::WaitFor(const AuList> &workItems) { bool status {}; @@ -78,22 +80,27 @@ namespace Aurora::Async { Fail(); } + + return AU_SHARED_FROM_THIS; } - void WorkItem::SetSchedTimeNs(AuUInt64 ns) + AuSPtr WorkItem::SetSchedTimeNs(AuUInt64 ns) { dispatchTimeNs_ = Time::CurrentClockNS() + ns; + return AU_SHARED_FROM_THIS; } - void WorkItem::SetSchedTime(AuUInt32 ms) + AuSPtr WorkItem::SetSchedTime(AuUInt32 ms) { dispatchTimeNs_ = Time::CurrentClockNS() + (AuUInt64(ms) * AuUInt64(1000000)); + return AU_SHARED_FROM_THIS; } - void WorkItem::Dispatch() + AuSPtr WorkItem::Dispatch() { DispatchEx(false); + return AU_SHARED_FROM_THIS; } void WorkItem::DispatchEx(bool check) @@ -227,8 +234,7 @@ namespace Aurora::Async bool WorkItem::BlockUntilComplete() { if (!finishedEvent_) return false; - finishedEvent_->Lock(); - return true; + return Async::GetAsyncApp()->WaitFor(this->worker_, finishedEvent_.get(), 0); } bool WorkItem::HasFinished() diff --git a/Source/Async/WorkItem.hpp b/Source/Async/WorkItem.hpp index 1b6ae562..c57ca952 100644 --- a/Source/Async/WorkItem.hpp +++ b/Source/Async/WorkItem.hpp @@ -15,12 +15,12 @@ namespace Aurora::Async WorkItem(const DispatchTarget_t &worker_, const AuSPtr &task_, bool supportsBlocking); ~WorkItem(); - void WaitFor(const AuSPtr &workItem) override; - void WaitFor(const AuList> &workItem) override; - void SetSchedTime(AuUInt32 ms) override; - void SetSchedTimeNs(AuUInt64 ns) override; + AuSPtr WaitFor(const AuSPtr &workItem) override; + AuSPtr WaitFor(const AuList> &workItem) override; + AuSPtr SetSchedTime(AuUInt32 ms) override; + AuSPtr SetSchedTimeNs(AuUInt64 ns) override; - void Dispatch() override; + AuSPtr Dispatch() override; bool BlockUntilComplete() override; bool HasFinished() override;