From 9436b12455f92d1053b8a71578c1021fa502226e Mon Sep 17 00:00:00 2001 From: Reece Date: Fri, 1 Oct 2021 17:26:27 +0100 Subject: [PATCH] [+] Begin work on a possible future API for interoperability between loop sources and event driven async apps --- Include/Aurora/Async/Async.hpp | 8 ++ Include/Aurora/Loop/Loop.hpp | 1 + Source/Async/AsyncApp.cpp | 248 ++++++++++++++++++++++++++++----- 3 files changed, 221 insertions(+), 36 deletions(-) diff --git a/Include/Aurora/Async/Async.hpp b/Include/Aurora/Async/Async.hpp index e385f1ec..03565dae 100644 --- a/Include/Aurora/Async/Async.hpp +++ b/Include/Aurora/Async/Async.hpp @@ -7,6 +7,11 @@ ***/ #pragma once +namespace Aurora::Loop +{ + class ILoopSource; +} + namespace Aurora::Async { class IWorkItem; @@ -549,6 +554,7 @@ namespace Aurora::Async return Async::NewWorkItem(worker, AuMakeShared>(task, job, inputParameters), enableWait)->Dispatch(); } + class IAsyncApp { public: @@ -579,5 +585,7 @@ namespace Aurora::Async virtual void AssertWorker(WorkerId_t id) = 0; virtual bool Poll(bool block) = 0; + + virtual void AddLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) = 0; }; } \ No newline at end of file diff --git a/Include/Aurora/Loop/Loop.hpp b/Include/Aurora/Loop/Loop.hpp index 915a19a9..4eb0dce9 100644 --- a/Include/Aurora/Loop/Loop.hpp +++ b/Include/Aurora/Loop/Loop.hpp @@ -56,6 +56,7 @@ namespace Aurora::Loop class ILSEvent : public ILoopSource { public: + virtual bool Set() = 0; virtual bool Reset() = 0; }; diff --git a/Source/Async/AsyncApp.cpp b/Source/Async/AsyncApp.cpp index 0af247db..5298cdda 100644 --- a/Source/Async/AsyncApp.cpp +++ b/Source/Async/AsyncApp.cpp @@ -12,6 +12,7 @@ #include "Schedular.hpp" #include + namespace Aurora::Async { static AsyncApp gAsyncApp; @@ -30,39 +31,50 @@ namespace Aurora::Async } } - + //STATIC_TLS(WorkerId_t, tlsWorkerId); static Threading::Threads::TLSVariable tlsWorkerId; - + using WorkEntry_t = AuPair, AuSPtr>; + struct AsyncAppWaitSourceRequest + { + AuConsumer, bool> callback; + AuSPtr loopSource; + AuUInt32 requestedOffset; + AuUInt64 startTime; + AuUInt64 endTime; + }; + struct ThreadState { WorkerId_t id; AuUInt8 multipopCount = 1; AuUInt32 lastFrameTime {}; - + Threading::Threads::ThreadShared_t threadObject; //std::stack jmpStack; AuWPtr parent; - + Threading::Primitives::SemaphoreUnique_t syncSema; AuList> features; bool rejecting {}; bool exiting {}; + bool inLoopSourceMode {}; bool shuttingdown {}; - + Threading::Primitives::EventUnique_t running; //bool running; - + bool inline IsSysThread() { return id.first == 0; } + AuList loopSources; AuList pendingWorkItems; }; @@ -72,9 +84,10 @@ namespace Aurora::Async Threading::Primitives::ConditionMutexUnique_t cvWorkMutex; Threading::Primitives::ConditionVariableUnique_t cvVariable; + AuSPtr eventLs; AuList workQueue; - + AuBST> workers; bool Init(); @@ -87,30 +100,36 @@ namespace Aurora::Async bool GroupState::Init() { - cvWorkMutex = Threading::Primitives::ConditionMutexUnique(); - if (!cvWorkMutex) + this->cvWorkMutex = Threading::Primitives::ConditionMutexUnique(); + if (!this->cvWorkMutex) { return false; } - - cvVariable = Threading::Primitives::ConditionVariableUnique(cvWorkMutex.get()); - if (!cvVariable) + + this->cvVariable = Threading::Primitives::ConditionVariableUnique(this->cvWorkMutex.get()); + if (!this->cvVariable) + { + return false; + } + + this->eventLs = Loop::NewLSEvent(false, false, true); + if (!this->eventLs) { return false; } return true; } - + AsyncApp::AsyncApp() { - rwlock_ = Threading::Primitives::RWLockUnique(); - SysAssert(static_cast(rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock"); + this->rwlock_ = Threading::Primitives::RWLockUnique(); + SysAssert(static_cast(this->rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock"); } // TODO: barrier multiple bool AsyncApp::Barrier(WorkerId_t worker, AuUInt32 ms, bool requireSignal, bool drop) { - auto & semaphore = GetThreadState()->syncSema; + auto &semaphore = GetThreadState()->syncSema; auto unsafeSemaphore = semaphore.get(); auto work = AuMakeShared(([=]() @@ -128,13 +147,13 @@ namespace Aurora::Async } unsafeSemaphore->Unlock(1); - + if (requireSignal) { state->running->Lock(); } })); - + #if 0 NewWorkItem({worker.first, worker.second}, work)->Dispatch(); #else @@ -148,13 +167,13 @@ namespace Aurora::Async { auto state = GetGroup(target.first); SysAssert(static_cast(state), "couldn't dispatch a task to an offline group"); - + IncRunningTasks(); { AU_LOCK_GUARD(state->cvWorkMutex); - - #if defined(STAGING) || defined(DEBUG) + + #if defined(STAGING) || defined(DEBUG) AU_LOCK_GUARD(rwlock_->AsReadable()); if (target.second != Async::kThreadIdAny) @@ -171,7 +190,7 @@ namespace Aurora::Async { auto workers = state->workers; bool found = false; - + for (const auto &worker : state->workers) { if (!worker.second->rejecting) @@ -187,22 +206,23 @@ namespace Aurora::Async throw "No workers available"; } } - #endif + #endif state->workQueue.push_back(AuMakePair(target.second, runnable)); + state->eventLs->Set(); } - if (target.second == Async::kThreadIdAny) - { - // sad :( - state->cvVariable->Broadcast(); - } - else - { - state->cvVariable->Signal(); - } + if (target.second == Async::kThreadIdAny) + { + // sad :( + state->cvVariable->Broadcast(); + } + else + { + state->cvVariable->Signal(); + } } - + int AsyncApp::CfxPollPush() { // TOOD (Reece): implement a context switching library @@ -215,7 +235,7 @@ namespace Aurora::Async // TOOD (Reece): implement a context switching library // Refer to the old implementation of this oin pastebin } - + bool AsyncApp::CtxYield() { bool ranAtLeastOne = false; @@ -228,7 +248,156 @@ namespace Aurora::Async return ranAtLeastOne; } - bool AsyncApp::Poll(bool blocking) + bool AsyncApp::AddLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) + { + auto thread = this->GetThreadHandle(workerId); + if (!thread) + { + return false; + } + + auto group = thread->parent.lock(); + + { + AU_LOCK_GUARD(group->cvWorkMutex); + + AsyncAppWaitSourceRequest req {}; + req.startTime = Time::CurrentClockMS(); + if (timeout) + { + req.requestedOffset = timeout; + req.endTime = req.startTime + timeout; + } + req.loopSource = loopSource; + req.callback = callback; + + if (!AuTryInsert(thread->loopSources, req)) + { + return false; + } + + thread->inLoopSourceMode = thread->loopSources.size(); + } + + return true; + } + + bool AsyncApp::Poll(bool block) + { + auto state = GetThreadState(); + bool success {}; + + do + { + if (state->inLoopSourceMode) + { + success = PollLoopSource(block); + } + else + { + success = PollInternal(block); + success |= state->inLoopSourceMode; + } + } while (!block || success); + + return success; + } + + bool AsyncApp::PollLoopSource(bool block) + { + auto state = GetThreadState(); + auto group = state->parent.lock(); + + //state->pendingWorkItems.clear(); + + auto magic = CfxPollPush(); + bool retValue {}; + + // TODO (reece): This function isn't very efficient + { + AU_LOCK_GUARD(group->cvWorkMutex); + + AuList curLoopReq = state->loopSources; + AuList> curLoopSources; + + auto lenLoopReqs = curLoopReq.size(); + + curLoopSources.resize(lenLoopReqs + 1); + + for (auto i = 0; i < lenLoopReqs; i++) + { + curLoopSources[i] = curLoopReq[i].loopSource; + } + + curLoopSources[lenLoopReqs] = group->eventLs; + + AuList> nextLoopSources; + if (block) + { + nextLoopSources = Loop::WaitMultipleObjects(curLoopSources, 0); + } + else + { + nextLoopSources.reserve(curLoopSources.size()); + for (const auto &source : curLoopSources) + { + if (source->IsSignaled()) + { + nextLoopSources.push_back(source); + } + } + } + + auto time = Aurora::Time::CurrentClockMS(); + + state->loopSources.clear(); + state->loopSources.reserve(curLoopReq.size()); + + if (AuTryFind(nextLoopSources, group->eventLs)) + { + PollInternal(false); + } + + for (const auto &request : curLoopReq) + { + bool remove {}; + bool removeType {}; + + if (AuTryFind(nextLoopSources, request.loopSource)) + { + remove = true; + removeType = true; + } + else + { + if (request.requestedOffset) + { + if (request.endTime < time) + { + remove = true; + removeType = false; + } + } + } + + if (!remove) + { + state->loopSources.push_back(request); + } + else + { + request.callback(request.loopSource, removeType); + retValue |= removeType; + } + } + + state->inLoopSourceMode = state->loopSources.size(); + } + + return retValue; + } + + bool AsyncApp::PollInternal(bool blocking) { auto state = GetThreadState(); auto group = state->parent.lock(); @@ -296,6 +465,11 @@ namespace Aurora::Async } } while (state->pendingWorkItems.empty()); + + if (group->workQueue.empty()) + { + group->eventLs->Reset(); + } } if (state->pendingWorkItems.empty()) @@ -326,15 +500,17 @@ namespace Aurora::Async runningTasks = gRunningTasks.fetch_sub(1) - 1; } + // + // Return popped work back to the groups work pool when our -pump loops were preempted if (state->pendingWorkItems.size()) { AU_LOCK_GUARD(group->cvWorkMutex); group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end()); + group->eventLs->Set(); state->pendingWorkItems.clear(); } - CtxPollReturn(state, magic, true); if (runningTasks == 0)