From 452b49b610638c0aa6c98a73a68e277732bd44b9 Mon Sep 17 00:00:00 2001 From: Reece Date: Wed, 30 Jun 2021 10:28:52 +0100 Subject: [PATCH] [+] Begin work on the Async subsystem --- Include/Aurora/Async/Async.hpp | 76 +-- Include/Aurora/Threading/LockGuardPtr.hpp | 20 +- .../Threading/Threads/TLSStaticVariable.hpp | 7 +- .../Aurora/Threading/Threads/TLSVariable.hpp | 49 +- Include/AuroraMacros.hpp | 4 +- Include/AuroraUtils.hpp | 18 +- Source/Async/Async.hpp | 33 ++ Source/Async/AsyncApp.cpp | 553 ++++++++++++++++++ Source/Async/AsyncApp.hpp | 61 +- Source/Async/WorkItem.cpp | 20 +- Source/Async/WorkItem.hpp | 9 +- Source/Debug/Debug.cpp | 4 + Source/Debug/ExceptionWatcher.Win32.cpp | 2 +- Source/Processes/Process.Win32.cpp | 36 +- Source/Threading/Primitives/Event.cpp | 6 +- Source/Threading/Primitives/RWLock.cpp | 17 +- 16 files changed, 807 insertions(+), 108 deletions(-) diff --git a/Include/Aurora/Async/Async.hpp b/Include/Aurora/Async/Async.hpp index 3c8938a6..49f9a5b4 100644 --- a/Include/Aurora/Async/Async.hpp +++ b/Include/Aurora/Async/Async.hpp @@ -154,8 +154,8 @@ namespace Aurora::Async private: - static constexpr bool IsCallbackPtr = std::is_pointer_v || is_base_of_template; - static constexpr bool IsTaskPtr = std::is_pointer_v || is_base_of_template; + static constexpr bool IsCallbackPtr = std::is_pointer_v || is_base_of_template::value; + static constexpr bool IsTaskPtr = std::is_pointer_v || is_base_of_template::value; WorkerId_t caller; @@ -172,9 +172,9 @@ namespace Aurora::Async ret = task.onFrame(input); } - auto pin = this->shared_from_this(); + auto pin = std::static_pointer_cast>(this->shared_from_this()); - std::function func = [ret, &callback, &input, pin]() + std::function func = [ret, pin]() { try { @@ -182,23 +182,16 @@ namespace Aurora::Async { if constexpr (IsCallbackPtr) { - callback->onSuccess(input, *ret); + pin->callback->onSuccess(pin->input, *ret); } else { - callback.onSuccess(input, *ret); + pin->callback.onSuccess(pin->input, *ret); } } else { - if constexpr (IsCallbackPtr) - { - callback->onFailure(input, false); - } - else - { - callback.onFailure(input, false); - } + pin->CallOnFailure(false); } } catch (...) @@ -215,21 +208,13 @@ namespace Aurora::Async } else { - std::function err = [&callback, &input, pin]() + std::function err = [pin]() { - // TODO: we shold probably pass: is caller thread: false, has finished: true - if constexpr (IsCallbackPtr) - { - callback->onFailure(input, false); - } - else - { - callback.onFailure(input, false); - } + pin->CallOnFailure(false); }; // TODO: this is somewhat evil. double alloc when we could reuse this - WorkItemShared(caller, std::make_shared(func, err))->Dispatch(); + NewWorkItem(caller, std::make_shared(func, err))->Dispatch(); } } catch (...) @@ -243,20 +228,40 @@ namespace Aurora::Async { try { - if constexpr (IsCallbackPtr) - { - callback->onFailure(true); - } - else - { - callback.onFailure(true); - } + CallOnFailure(true); } catch (...) { Debug::PrintError(); } } + + void CallOnFailure(bool fail) + { + if constexpr (IsCallbackPtr) + { + if constexpr (is_base_of_templateonFailure)>::value) + { + if (!callback->onFailure) + { + return; + } + } + + callback->onFailure(input, fail); + } + else + { + if constexpr (is_base_of_template::value) + { + if (!callback.onFailure) + { + return; + } + } + callback.onFailure(input, fail); + } + } }; template, typename Cleanup_t = std::function> @@ -288,6 +293,7 @@ namespace Aurora::Async { public: // Main thread logic + virtual void Start() = 0; virtual void Main() = 0; virtual void Shutdown() = 0; virtual bool Exiting() = 0; @@ -295,14 +301,14 @@ namespace Aurora::Async // Spawning virtual bool Spawn(WorkerId_t) = 0; virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0; - virtual AuBST> GetThreads() = 0; + virtual AuBST> GetThreads() = 0; virtual WorkerId_t GetCurrentThread() = 0; // Synchronization virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0; virtual void Signal(ThreadGroup_t group) = 0; - virtual bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop + virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0; diff --git a/Include/Aurora/Threading/LockGuardPtr.hpp b/Include/Aurora/Threading/LockGuardPtr.hpp index 2e0e2e15..c49adeab 100644 --- a/Include/Aurora/Threading/LockGuardPtr.hpp +++ b/Include/Aurora/Threading/LockGuardPtr.hpp @@ -13,17 +13,29 @@ namespace Aurora::Threading class LockGuardPtr { public: - LockGuardPtr(T *lock) : lockReference_(lock) + LockGuardPtr(T *lock) : annoying_(lock) { - lockReference_->Lock(); + lock->Lock(); + } + + LockGuardPtr(T &lock) : annoying_(&lock) + { + lock->Lock(); } ~LockGuardPtr() { - lockReference_->Unlock(); + if constexpr (is_base_of_template::value || is_base_of_template::value) + { + annoying_->get()->Unlock(); + } + else + { + annoying_->Unlock(); + } } private: - T *lockReference_; + T *annoying_; }; } \ No newline at end of file diff --git a/Include/Aurora/Threading/Threads/TLSStaticVariable.hpp b/Include/Aurora/Threading/Threads/TLSStaticVariable.hpp index fd2650d9..9191bd97 100644 --- a/Include/Aurora/Threading/Threads/TLSStaticVariable.hpp +++ b/Include/Aurora/Threading/Threads/TLSStaticVariable.hpp @@ -9,9 +9,6 @@ namespace Aurora::Threading::Threads { - template - class TLSStaticVariable - { - // TODO: implement templates for a thread feature based tls implementation - }; + // TODO: #if defined(fibers) static TLSVariable name #endif +#define STATIC_TLS(type, name) static thread_local type name; } \ No newline at end of file diff --git a/Include/Aurora/Threading/Threads/TLSVariable.hpp b/Include/Aurora/Threading/Threads/TLSVariable.hpp index 7f470295..8756899d 100644 --- a/Include/Aurora/Threading/Threads/TLSVariable.hpp +++ b/Include/Aurora/Threading/Threads/TLSVariable.hpp @@ -9,7 +9,7 @@ namespace Aurora::Threading::Threads { - template + template class TLSVariable { private: @@ -19,18 +19,47 @@ namespace Aurora::Threading::Threads TLSVariable() {} ~TLSVariable() { - GetThread()->GetTlsView()->Remove(GetHandle()); + if constexpr (!isStatic) + { + GetThread()->GetTlsView()->Remove(GetHandle()); + } } + + T& operator ->() + { + return Get(); + } + + operator T&() + { + return Get(); + } + + TLSVariable& operator =(const T & val) + { + Get() = val; + return *this; + } + + private: AuUInt64 GetHandle() { - return (AuUInt64(reinterpret_cast(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyResettablePointerHandle; + if constexpr (isStatic) + { + return (AuUInt64(reinterpret_cast(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyStaticPointerHandle; + } + else + { + return (AuUInt64(reinterpret_cast(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyResettablePointerHandle; + } } T &Get() { auto view = GetThread()->GetTlsView(); - auto ptr = view->GetOrSetup(GetHandle(), + auto ptr = view->GetOrSetup(GetHandle(), + sizeof(T), [](void *buffer) -> void { if constexpr (std::is_class_v) @@ -51,16 +80,6 @@ namespace Aurora::Threading::Threads }); return *reinterpret_cast(ptr); } - - T& operator ->() - { - return Get(); - } - - TLSVariable& operator =(const T & val) - { - Get() = val; - return *this; - } + }; } \ No newline at end of file diff --git a/Include/AuroraMacros.hpp b/Include/AuroraMacros.hpp index 12a5e27f..f2ddcb2b 100644 --- a/Include/AuroraMacros.hpp +++ b/Include/AuroraMacros.hpp @@ -73,4 +73,6 @@ name ## Shared_t name ## Shared(T... args) \ #if !defined(NO__NEW) #define _new new (std::nothrow) -#endif \ No newline at end of file +#endif + +//#define AU_FWD(var) std::forward(var) diff --git a/Include/AuroraUtils.hpp b/Include/AuroraUtils.hpp index a430f967..ca6257cb 100644 --- a/Include/AuroraUtils.hpp +++ b/Include/AuroraUtils.hpp @@ -96,7 +96,7 @@ static inline AuString ToUpper(const AuString &in) return ToStringASCIIOp(std::toupper, in); } -template +template static inline bool TryFind(Map &map, const Key &key, Value *&ptr) { auto itr = map.find(key); @@ -112,7 +112,7 @@ static inline bool TryFind(Map &map, const Key &key, Value *&ptr) } } -template +template static inline bool TryFind(Map *map, const Key &key, Value *&ptr) { auto itr = map->find(key); @@ -128,7 +128,7 @@ static inline bool TryFind(Map *map, const Key &key, Value *&ptr) } } -template +template static inline bool TryFind(Map &map, const Key &key) { auto itr = map.find(key); @@ -142,7 +142,7 @@ static inline bool TryFind(Map &map, const Key &key) } } -template +template static inline bool TryFind(Map *map, const Key &key) { auto itr = map->find(key); @@ -155,7 +155,7 @@ static inline bool TryFind(Map *map, const Key &key) return false; } } -template +template static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr) { auto itr = map.find(key); @@ -171,7 +171,7 @@ static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr) } } -template +template static inline bool TryDelete(Map &map, const Key &key) { auto itr = map.find(key); @@ -186,7 +186,7 @@ static inline bool TryDelete(Map &map, const Key &key) } } -template +template static inline bool TryDeleteList(List &list, const Key &key) { auto itr = std::find(list.begin(), list.end(), key); @@ -363,6 +363,8 @@ static inline AuList SplitString(const AuString& str, const AuString& return tokens; } +// more copy/pasta. work smart, not hard. +// i dont want to waste time working out template kinks between clang and msvc template < template class base,typename derived> struct is_base_of_template_impl { @@ -373,4 +375,4 @@ struct is_base_of_template_impl }; template < template class base,typename derived> -using is_base_of_template = typename is_base_of_template_impl::type; +using is_base_of_template = typename is_base_of_template_impl::type; \ No newline at end of file diff --git a/Source/Async/Async.hpp b/Source/Async/Async.hpp index 11a82751..4f002994 100644 --- a/Source/Async/Async.hpp +++ b/Source/Async/Async.hpp @@ -9,5 +9,38 @@ namespace Aurora::Async { + class IAsyncRunnable + { + public: + virtual void RunAsync() = 0; + + virtual void CancelAsync() {} + }; + + class AsyncFuncRunnable : public IAsyncRunnable + { + public: + + std::function callback; + + AsyncFuncRunnable(std::function &&callback) : callback(std::move(callback)) + {} + + AsyncFuncRunnable(const std::function &callback) : callback(callback) + {} + + void RunAsync() override + { + try + { + callback(); + } + catch (...) + { + Debug::PrintError(); + } + } + }; + void InitAsync(); } \ No newline at end of file diff --git a/Source/Async/AsyncApp.cpp b/Source/Async/AsyncApp.cpp index 1163e71e..ca9292c9 100644 --- a/Source/Async/AsyncApp.cpp +++ b/Source/Async/AsyncApp.cpp @@ -6,9 +6,562 @@ Author: Reece ***/ #include +#include "Async.hpp" #include "AsyncApp.hpp" +#include "WorkItem.hpp" namespace Aurora::Async { + static AsyncApp gAsyncApp; + static std::atomic_int gRunningTasks {}; + //STATIC_TLS(WorkerId_t, tlsWorkerId); + static Threading::Threads::TLSVariable tlsWorkerId; + + struct ThreadState + { + WorkerId_t id; + + AuUInt8 multipopCount = 1; + AuUInt32 lastFrameTime {}; + + Threading::Threads::ThreadShared_t threadObject; + + AuWPtr parent; + + Threading::Primitives::SemaphoreUnique_t syncSema; + AuList> features; + + bool rejecting {}; + bool exiting {}; + + Threading::Primitives::EventUnique_t running; + //bool running; + + bool inline IsSysThread() + { + return id.first == 0; + } + + AuList> pendingWorkItems; + }; + + struct GroupState + { + ThreadGroup_t group; + + Threading::Primitives::ConditionMutexUnique_t cvWorkMutex; + Threading::Primitives::ConditionVariableUnique_t cvVariable; + + using WorkEntry_t = std::pair, AuSPtr>; + AuList workQueue; + + AuBST> workers; + + bool Init(); + + bool inline IsSysThread() + { + return group == 0; + } + }; + + bool GroupState::Init() + { + cvWorkMutex = Threading::Primitives::ConditionMutexUnique(); + if (!cvWorkMutex) + { + return false; + } + + cvVariable = Threading::Primitives::ConditionVariableUnique(cvWorkMutex.get()); + if (!cvVariable) + { + return false; + } + return true; + } + + AsyncApp::AsyncApp() + { + rwlock_ = Threading::Primitives::RWLockUnique(); + SysAssert(static_cast(rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock"); + } + + // TODO: barrier multiple + bool AsyncApp::Barrier(WorkerId_t worker, AuUInt32 ms, bool requireSignal, bool drop) + { + auto & semaphore = GetThreadState()->syncSema; + auto unsafeSemaphore = semaphore.get(); + + auto work = std::make_shared(([=]() + { + auto state = GetThreadState(); + + if (drop) + { + state->rejecting = true; + } + + if (requireSignal) + { + state->running->Reset(); + } + + unsafeSemaphore->Unlock(1); + })); + + #if 0 + NewWorkItem({worker.first, worker.second}, work)->Dispatch(); + #else + Run(worker, work); + #endif + + return WaitFor(worker, semaphore.get(), ms); + } + + void AsyncApp::Run(DispatchTarget_t target, AuSPtr runnable) + { + auto state = GetGroup(target.first); + SysAssert(static_cast(state), "couldn't dispatch a task to an offline group"); + + gRunningTasks++; + + { + Threading::LockGuardPtr lol(state->cvWorkMutex); + + #if defined(INTERNAL) || defined(DEBUG) + Threading::LockGuardPtr lock(rwlock_->AsReadable()); + + if (target.second.has_value()) + { + if (state->workers[*target.second]->rejecting) + { + SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0)); + throw "Requested job worker is offline"; + } + } + else + { + auto workers = state->workers; + bool found = false; + + for (const auto &worker : state->workers) + { + if (!worker.second->rejecting) + { + found = true; + break; + } + } + + if (!found) + { + throw "No workers available"; + } + } + #endif + + state->workQueue.push_back(std::make_pair(target.second, runnable)); + } + + if (target.second.has_value()) + { + // sad :( + state->cvVariable->Broadcast(); + } + else + { + state->cvVariable->Signal(); + } + } + + bool AsyncApp::Poll(bool blocking) + { + auto state = GetThreadState(); + auto group = state->parent.lock(); + + state->pendingWorkItems.clear(); + + { + Threading::LockGuardPtr lol(group->cvWorkMutex); + + do + { + for (auto itr = group->workQueue.begin(); + ((itr != group->workQueue.end()) && + (state->pendingWorkItems.size() < state->multipopCount)); + ) + { + if (!itr->first.has_value()) + { + state->pendingWorkItems.push_back((*itr).second); + itr = group->workQueue.erase(itr); + continue; + } + + if ((itr->first.has_value()) && (itr->first.value() == state->id.second)) + { + state->pendingWorkItems.push_back((*itr).second); + itr = group->workQueue.erase(itr); + continue; + } + + itr++; + } + + if (!blocking) + { + break; + } + + if (state->pendingWorkItems.empty()) + { + group->cvVariable->WaitForSignal(); + } + + } while (state->pendingWorkItems.empty()); + } + + if (state->pendingWorkItems.empty()) + { + return false; + } + + int runningTasks {}; + + for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) + { + // TODO: we should be able to implement a watchdog later down the line + state->lastFrameTime = Time::CurrentClockMS(); + (*itr)->RunAsync(); + itr = state->pendingWorkItems.erase(itr); + runningTasks = --gRunningTasks; + } + + if (runningTasks) + { + ShutdownOutOfTasks(); + } + + return true; + } + + + bool AsyncApp::WaitFor(WorkerId_t worker, Aurora::Threading::IWaitable *primitive, int timeoutMs) + { + auto curThread = GetThreadState(); + + if (worker == curThread->id) + { + // TODO: timeout isn't respected here as well + while (!Threading::WaitFor(primitive, 2)) + { + while (this->Poll(false)); + } + + return true; + } + else + { + return Threading::WaitFor(primitive, timeoutMs); + } + } + + void AsyncApp::Start() + { + SysAssert(Spawn({0, 0})); + } + + void AsyncApp::Main() + { + Entrypoint({0, 0}); + } + + void AsyncApp::Shutdown() + { + // Set shutdown trigger + shuttingdown_ = true; + + // Unpause all + for (auto &re : this->threads_) + { + for (auto &[id, worker] : re.second->workers) + { + auto &event = worker->running; + if (event) + { + event->Set(); + } + } + } + + // Drop all tasks from this point onwards + for (auto &re : this->threads_) + { + for (auto &[id, worker] : re.second->workers) + { + SysAssert(Barrier(worker->id, 0, true, true)); + } + } + + // TODO: abort all pending tests + + // Signal the event loop to abort + ShutdownOutOfTasks(); + } + + void AsyncApp::ShutdownOutOfTasks() + { + for (auto& [id, group]: this->threads_) + { + for (auto & [id, worker] : group->workers) + { + worker->threadObject->SendExitSignal(); + } + + if (group->cvVariable) + { + Threading::LockGuardPtr lol(group->cvWorkMutex); + group->cvVariable->Broadcast(); + } + } + } + + bool AsyncApp::Exiting() + { + return shuttingdown_ || GetThreadState()->exiting; + } + + bool AsyncApp::Spawn(WorkerId_t workerId) + { + Threading::LockGuardPtr lock(rwlock_->AsWritable()); + + AuSPtr group; + + // Try fetch or allocate group + { + AuSPtr* groupPtr; + if (!TryFind(this->threads_, workerId.first, groupPtr)) + { + group = std::make_shared(); + + if (!group->Init()) + { + SysPushErrorMem("Not enough memory to intiialize a new group state"); + return false; + } + + if (!TryInsert(this->threads_, std::make_pair(workerId.first, group))) + { + return false; + } + } + else + { + group = *groupPtr; + } + } + + // Assert worker does not already exist + { + AuSPtr* ret; + + if (TryFind(group->workers, workerId.second, ret)) + { + SysPushErrorGen("Thread ID already exists"); + return false; + } + } + + auto threadState = std::make_shared(); + threadState->parent = group; + threadState->running = Threading::Primitives::EventUnique(true, false, false); + threadState->syncSema = Threading::Primitives::SemaphoreUnique(0); + threadState->id = workerId; + + if (!threadState->IsSysThread()) + { + Threading::Threads::AbstractThreadVectors handler; + handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread) + { + Entrypoint(threadState->id); + }; + threadState->threadObject = Threading::Threads::ThreadUnique(handler); + threadState->threadObject->Run(); + } + else + { + threadState->threadObject = std::shared_ptr(Threading::Threads::GetThread(), [](Threading::Threads::IAuroraThread *){}); + } + + group->workers.insert(std::make_pair(workerId.second, threadState)); + return true; + } + + Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id) + { + auto group = GetGroup(id.first); + if (!group) + { + return {}; + } + + AuSPtr* ret; + if (!TryFind(group->workers, id.second, ret)) + { + return {}; + } + + return ret->get()->threadObject; + } + + AuBST> AsyncApp::GetThreads() + { + Threading::LockGuardPtr lock(rwlock_->AsReadable()); + + AuBST> ret; + + for (const auto &group : this->threads_) + { + AuList workers; + + for (const auto &thread : group.second->workers) + { + workers.push_back(thread.second->id.second); + } + + ret[group.first] = workers; + } + + return ret; + } + + WorkerId_t AsyncApp::GetCurrentThread() + { + return tlsWorkerId; + } + + bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs) + { + Threading::LockGuardPtr lock(rwlock_->AsReadable()); + + auto group = GetGroup(groupId); + + for (auto &jobWorker : group->workers) + { + if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min? + { + return false; + } + } + + return true; + } + + void AsyncApp::Signal(ThreadGroup_t groupId) + { + Threading::LockGuardPtr lock(rwlock_->AsReadable()); + + auto group = GetGroup(groupId); + + for (auto &jobWorker : group->workers) + { + jobWorker.second->running->Set(); + } + } + + bool AsyncApp::SyncTimeout(ThreadGroup_t group, AuUInt32 ms) + { + return Sync(group, false, ms); + } + + void AsyncApp::SyncAllSafe() + { + Threading::LockGuardPtr lock(rwlock_->AsReadable()); + + for (const auto &re : this->threads_) + { + for (auto &jobWorker : re.second->workers) + { + SysAssert(Barrier(jobWorker.second->id, 0, false, false)); + } + } + } + + AuSPtr AsyncApp::GetGroup(ThreadGroup_t type) + { + Threading::LockGuardPtr lock(rwlock_->AsReadable()); + + AuSPtr* ret; + if (!TryFind(this->threads_, type, ret)) + { + return {}; + } + + return *ret; + } + + AuSPtr AsyncApp::GetThreadState() + { + auto id = GetCurrentThread(); + auto state = GetGroup(id.first); + return state->workers[id.second]; + } + + void AsyncApp::Entrypoint(WorkerId_t id) + { + tlsWorkerId = id; + + auto auThread = Threading::Threads::GetThread(); + auto job = GetThreadState(); + + while (!auThread->Exiting()) + { + // Do work (blocking) + Poll(true); + + // Synchronization after pause + job->running->Lock(); + } + + for (const auto &thread : job->features) + { + thread->Cleanup(); + } + + job->features.clear(); + } + + void AsyncApp::AddFeature(WorkerId_t id, AuSPtr feature, bool async) + { + auto work = std::make_shared(([=]() + { + GetThreadState()->features.push_back(feature); + feature->Init(); + })); + + auto workItem = NewWorkItem(id, work, !async); + workItem->Dispatch(); + + if (!async) + { + workItem->BlockUntilComplete(); + } + } + + void AsyncApp::AssertInThreadGroup(ThreadGroup_t group) + { + SysAssert(static_cast(tlsWorkerId).first == group); + } + + void AsyncApp::AssertWorker(WorkerId_t id) + { + SysAssert(static_cast(tlsWorkerId) == id); + } + + AUKN_SYM IAsyncApp *GetAsyncApp() + { + return &gAsyncApp; + } } \ No newline at end of file diff --git a/Source/Async/AsyncApp.hpp b/Source/Async/AsyncApp.hpp index 472b0469..689ca89a 100644 --- a/Source/Async/AsyncApp.hpp +++ b/Source/Async/AsyncApp.hpp @@ -9,5 +9,64 @@ namespace Aurora::Async { - + struct GroupState; + struct ThreadState; + //class WorkItem; + + class AsyncApp : public IAsyncApp + { + public: + AsyncApp(); + + // Main thread logic + void Start() override; + void Main() override; + void Shutdown() override; + bool Exiting() override; + + // Spawning + bool Spawn(WorkerId_t) override; + Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override; + AuBST> GetThreads() override; + WorkerId_t GetCurrentThread() override; + + // Synchronization + bool Sync(ThreadGroup_t group, bool requireSignal, AuUInt32 timeout) override; + void Signal(ThreadGroup_t group) override; + + bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) override; // when unlocker = this, pump event loop + + bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) override; + + void SyncAllSafe() override; + + // Features + void AddFeature(WorkerId_t id, AuSPtr feature, bool async = false) override; + + // Debug + void AssertInThreadGroup(ThreadGroup_t group) override; + void AssertWorker(WorkerId_t id) override; + + void Run(DispatchTarget_t target, AuSPtr runnable); + + private: + + void ShutdownOutOfTasks(); + // TODO: BarrierMultiple + bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop); + bool Poll(bool a); + + Threading::Primitives::RWLockUnique_t rwlock_; + + AuSPtr GetGroup(ThreadGroup_t type); + + AuSPtr GetThreadState(); + + void Entrypoint(WorkerId_t id); + + using ThreadDb_t = AuBST>; + + ThreadDb_t threads_; + bool shuttingdown_ {}; + }; } \ No newline at end of file diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index eb502f07..2546ea3f 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -6,7 +6,9 @@ Author: Reece ***/ #include +#include "Async.hpp" #include "WorkItem.hpp" +#include "AsyncApp.hpp" namespace Aurora::Async { @@ -21,7 +23,7 @@ namespace Aurora::Async WorkItem::~WorkItem() { - Fail(); + //Fail(); } void WorkItem::WaitFor(const AuSPtr &workItem) @@ -87,7 +89,7 @@ namespace Aurora::Async itr = waitOn_.erase(itr); } - if (Time::CurrentClockMS() >= dispatchTime_) + if (Time::CurrentClockMS() < dispatchTime_) { Schedule(); return; @@ -96,7 +98,13 @@ namespace Aurora::Async SendOff(); } - void WorkItem::Handle() + + void WorkItem::CancelAsync() + { + Fail(); + } + + void WorkItem::RunAsync() { Threading::LockGuard l(lock); @@ -195,11 +203,11 @@ namespace Aurora::Async void WorkItem::SendOff() { - // TODO: + static_cast(GetAsyncApp())->Run(worker_, this->shared_from_this()); } - AUKN_SYM AuSPtr NewWorkItem(const WorkerId_t &worker, const AuSPtr &task, bool annoying) + AUKN_SYM AuSPtr NewWorkItem(const DispatchTarget_t &worker, const AuSPtr &task, bool supportsBlocking) { - return std::make_shared(worker, task, annoying); + return std::make_shared(worker, task, supportsBlocking); } } \ No newline at end of file diff --git a/Source/Async/WorkItem.hpp b/Source/Async/WorkItem.hpp index 24e85c6b..df2ec7db 100644 --- a/Source/Async/WorkItem.hpp +++ b/Source/Async/WorkItem.hpp @@ -9,7 +9,7 @@ namespace Aurora::Async { - class WorkItem : public IWorkItem, public std::enable_shared_from_this + class WorkItem : public IWorkItem, public IAsyncRunnable, public std::enable_shared_from_this { public: WorkItem(const DispatchTarget_t &worker_, const AuSPtr &task_, bool supportsBlocking); @@ -24,10 +24,11 @@ namespace Aurora::Async bool BlockUntilComplete() override; bool HasFinished() override; bool HasFailed() override; - - void Handle(); + + void RunAsync() override; + void CancelAsync() override; - private: + private: void DispatchEx(bool check); AuSPtr task_; DispatchTarget_t worker_; diff --git a/Source/Debug/Debug.cpp b/Source/Debug/Debug.cpp index f899b692..783dee77 100644 --- a/Source/Debug/Debug.cpp +++ b/Source/Debug/Debug.cpp @@ -212,6 +212,10 @@ namespace Aurora::Debug TryGetOrFetchCError(); TryGetOrFetchOSError(); Telemetry::InsertManualFence(rng); + + #if defined(DEBUG) || defined(INTERNAL) + LogWarn("ERROR: {}", msg); + #endif } AUKN_SYM AuString StackTraceEntry::Stringify() const diff --git a/Source/Debug/ExceptionWatcher.Win32.cpp b/Source/Debug/ExceptionWatcher.Win32.cpp index 04392729..e649c638 100644 --- a/Source/Debug/ExceptionWatcher.Win32.cpp +++ b/Source/Debug/ExceptionWatcher.Win32.cpp @@ -284,7 +284,7 @@ namespace Aurora::Debug try { - if (minimal < 3) + if (minimal < 2) { ParseStack(ExceptionInfo->ContextRecord, entry.wincxx.stack.backtrace); } diff --git a/Source/Processes/Process.Win32.cpp b/Source/Processes/Process.Win32.cpp index 4fed0b94..0c0add12 100644 --- a/Source/Processes/Process.Win32.cpp +++ b/Source/Processes/Process.Win32.cpp @@ -56,12 +56,12 @@ namespace Aurora::Processes private: - HANDLE pipeStdOutRead_ {}; - HANDLE pipeStdOutWrite_ {}; - HANDLE pipeStdErrRead_ {}; - HANDLE pipeStdErrWrite_ {}; - HANDLE pipeStdInRead_ {}; - HANDLE pipeStdInWrite_ {}; + HANDLE pipeStdOutRead_ {INVALID_HANDLE_VALUE}; + HANDLE pipeStdOutWrite_ {INVALID_HANDLE_VALUE}; + HANDLE pipeStdErrRead_ {INVALID_HANDLE_VALUE}; + HANDLE pipeStdErrWrite_ {INVALID_HANDLE_VALUE}; + HANDLE pipeStdInRead_ {INVALID_HANDLE_VALUE}; + HANDLE pipeStdInWrite_ {INVALID_HANDLE_VALUE}; AuString execModule_; ESpawnType type_; @@ -196,22 +196,24 @@ namespace Aurora::Processes void ProcessImpl::ShutdownPipes() { - if (auto handle = std::exchange(this->pipeStdOutRead_, {})) - CloseHandle(handle); + HANDLE handle; - if (auto handle = std::exchange(this->pipeStdOutWrite_, {})) + if ((handle = std::exchange(this->pipeStdOutRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE) CloseHandle(handle); - - if (auto handle = std::exchange(this->pipeStdErrRead_, {})) + + if ((handle = std::exchange(this->pipeStdOutWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE) CloseHandle(handle); - - if (auto handle = std::exchange(this->pipeStdErrWrite_, {})) + + if ((handle = std::exchange(this->pipeStdErrRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE) CloseHandle(handle); - - if (auto handle = std::exchange(this->pipeStdInRead_, {})) + + if ((handle = std::exchange(this->pipeStdErrWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE) CloseHandle(handle); - - if (auto handle = std::exchange(this->pipeStdInWrite_, {})) + + if ((handle = std::exchange(this->pipeStdInRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE) + CloseHandle(handle); + + if ((handle = std::exchange(this->pipeStdInWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE) CloseHandle(handle); } diff --git a/Source/Threading/Primitives/Event.cpp b/Source/Threading/Primitives/Event.cpp index 22a15d4a..a66c3002 100644 --- a/Source/Threading/Primitives/Event.cpp +++ b/Source/Threading/Primitives/Event.cpp @@ -31,7 +31,7 @@ namespace Aurora::Threading::Primitives bool EventImpl::Lock(AuUInt64 timeout /*=0*/) { - LockGuardPtr re(mutex_.get()); + LockGuardPtr re(mutex_); AuInt64 startTime = Aurora::Time::CurrentClockMS(); AuInt64 endTime = startTime + timeout; @@ -66,7 +66,7 @@ namespace Aurora::Threading::Primitives bool EventImpl::TryLock() { - LockGuardPtr re(mutex_.get()); + LockGuardPtr re(mutex_); return AtomicIsEventSet(); } @@ -89,7 +89,7 @@ namespace Aurora::Threading::Primitives void EventImpl::Set() { - LockGuardPtr re(mutex_.get()); + LockGuardPtr re(mutex_); SysAssertExp((permitMultipleTriggers_) || (!triggered_), "Can not trigger an awake event object"); triggered_ = true; condition_->Broadcast(); diff --git a/Source/Threading/Primitives/RWLock.cpp b/Source/Threading/Primitives/RWLock.cpp index 0d3d819a..63771fd8 100644 --- a/Source/Threading/Primitives/RWLock.cpp +++ b/Source/Threading/Primitives/RWLock.cpp @@ -17,7 +17,8 @@ namespace Aurora::Threading::Primitives RWLockImpl::~RWLockImpl() { - + mutex_.reset(); + condition_.reset(); } bool RWLockImpl::Init() @@ -38,8 +39,8 @@ namespace Aurora::Threading::Primitives bool RWLockImpl::LockRead(AuUInt64 timeout) { - LockGuardPtr mutex(mutex_.get()); - + LockGuardPtr lock(mutex_); + while (state_ < 0) { if (!condition_->WaitForSignal(timeout)) @@ -54,7 +55,7 @@ namespace Aurora::Threading::Primitives bool RWLockImpl::LockWrite(AuUInt64 timeout) { - LockGuardPtr mutex(mutex_.get()); + LockGuardPtr lock(mutex_); while (state_ != 0) { @@ -70,7 +71,7 @@ namespace Aurora::Threading::Primitives bool RWLockImpl::TryLockRead() { - LockGuardPtr mutex(mutex_.get()); + LockGuardPtr lock(mutex_); if (state_ == -1) { @@ -83,7 +84,7 @@ namespace Aurora::Threading::Primitives bool RWLockImpl::TryLockWrite() { - LockGuardPtr mutex(mutex_.get()); + LockGuardPtr lock(mutex_); if (state_ > 0) { @@ -96,7 +97,7 @@ namespace Aurora::Threading::Primitives void RWLockImpl::UnlockRead() { - LockGuardPtr mutex(mutex_.get()); + LockGuardPtr lock(mutex_); if(--state_ == 0) { @@ -106,7 +107,7 @@ namespace Aurora::Threading::Primitives void RWLockImpl::UnlockWrite() { - LockGuardPtr mutex(mutex_.get()); + LockGuardPtr lock(mutex_); state_ = 0; condition_->Broadcast(); }