[+] Begin work on a possible future API for interoperability between loop sources and event driven async apps

This commit is contained in:
Reece Wilson 2021-10-01 17:26:27 +01:00
parent 31c9f32a24
commit 9436b12455
3 changed files with 221 additions and 36 deletions

View File

@ -7,6 +7,11 @@
***/
#pragma once
namespace Aurora::Loop
{
class ILoopSource;
}
namespace Aurora::Async
{
class IWorkItem;
@ -549,6 +554,7 @@ namespace Aurora::Async
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job, inputParameters), enableWait)->Dispatch();
}
class IAsyncApp
{
public:
@ -579,5 +585,7 @@ namespace Aurora::Async
virtual void AssertWorker(WorkerId_t id) = 0;
virtual bool Poll(bool block) = 0;
virtual void AddLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) = 0;
};
}

View File

@ -56,6 +56,7 @@ namespace Aurora::Loop
class ILSEvent : public ILoopSource
{
public:
virtual bool Set() = 0;
virtual bool Reset() = 0;
};

View File

@ -12,6 +12,7 @@
#include "Schedular.hpp"
#include <Source/Console/Commands/Commands.hpp>
namespace Aurora::Async
{
static AsyncApp gAsyncApp;
@ -36,6 +37,15 @@ namespace Aurora::Async
using WorkEntry_t = AuPair<AuOptional<ThreadId_t>, AuSPtr<IAsyncRunnable>>;
struct AsyncAppWaitSourceRequest
{
AuConsumer<AuSPtr<Loop::ILoopSource>, bool> callback;
AuSPtr<Loop::ILoopSource> loopSource;
AuUInt32 requestedOffset;
AuUInt64 startTime;
AuUInt64 endTime;
};
struct ThreadState
{
WorkerId_t id;
@ -53,6 +63,7 @@ namespace Aurora::Async
bool rejecting {};
bool exiting {};
bool inLoopSourceMode {};
bool shuttingdown {};
Threading::Primitives::EventUnique_t running;
@ -63,6 +74,7 @@ namespace Aurora::Async
return id.first == 0;
}
AuList<AsyncAppWaitSourceRequest> loopSources;
AuList<WorkEntry_t> pendingWorkItems;
};
@ -72,6 +84,7 @@ namespace Aurora::Async
Threading::Primitives::ConditionMutexUnique_t cvWorkMutex;
Threading::Primitives::ConditionVariableUnique_t cvVariable;
AuSPtr<Loop::ILSEvent> eventLs;
AuList<WorkEntry_t> workQueue;
@ -87,14 +100,20 @@ namespace Aurora::Async
bool GroupState::Init()
{
cvWorkMutex = Threading::Primitives::ConditionMutexUnique();
if (!cvWorkMutex)
this->cvWorkMutex = Threading::Primitives::ConditionMutexUnique();
if (!this->cvWorkMutex)
{
return false;
}
cvVariable = Threading::Primitives::ConditionVariableUnique(cvWorkMutex.get());
if (!cvVariable)
this->cvVariable = Threading::Primitives::ConditionVariableUnique(this->cvWorkMutex.get());
if (!this->cvVariable)
{
return false;
}
this->eventLs = Loop::NewLSEvent(false, false, true);
if (!this->eventLs)
{
return false;
}
@ -103,8 +122,8 @@ namespace Aurora::Async
AsyncApp::AsyncApp()
{
rwlock_ = Threading::Primitives::RWLockUnique();
SysAssert(static_cast<bool>(rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock");
this->rwlock_ = Threading::Primitives::RWLockUnique();
SysAssert(static_cast<bool>(this->rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock");
}
// TODO: barrier multiple
@ -190,6 +209,7 @@ namespace Aurora::Async
#endif
state->workQueue.push_back(AuMakePair(target.second, runnable));
state->eventLs->Set();
}
if (target.second == Async::kThreadIdAny)
@ -228,7 +248,156 @@ namespace Aurora::Async
return ranAtLeastOne;
}
bool AsyncApp::Poll(bool blocking)
bool AsyncApp::AddLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback)
{
auto thread = this->GetThreadHandle(workerId);
if (!thread)
{
return false;
}
auto group = thread->parent.lock();
{
AU_LOCK_GUARD(group->cvWorkMutex);
AsyncAppWaitSourceRequest req {};
req.startTime = Time::CurrentClockMS();
if (timeout)
{
req.requestedOffset = timeout;
req.endTime = req.startTime + timeout;
}
req.loopSource = loopSource;
req.callback = callback;
if (!AuTryInsert(thread->loopSources, req))
{
return false;
}
thread->inLoopSourceMode = thread->loopSources.size();
}
return true;
}
bool AsyncApp::Poll(bool block)
{
auto state = GetThreadState();
bool success {};
do
{
if (state->inLoopSourceMode)
{
success = PollLoopSource(block);
}
else
{
success = PollInternal(block);
success |= state->inLoopSourceMode;
}
} while (!block || success);
return success;
}
bool AsyncApp::PollLoopSource(bool block)
{
auto state = GetThreadState();
auto group = state->parent.lock();
//state->pendingWorkItems.clear();
auto magic = CfxPollPush();
bool retValue {};
// TODO (reece): This function isn't very efficient
{
AU_LOCK_GUARD(group->cvWorkMutex);
AuList<AsyncAppWaitSourceRequest> curLoopReq = state->loopSources;
AuList<AuSPtr<Loop::ILoopSource>> curLoopSources;
auto lenLoopReqs = curLoopReq.size();
curLoopSources.resize(lenLoopReqs + 1);
for (auto i = 0; i < lenLoopReqs; i++)
{
curLoopSources[i] = curLoopReq[i].loopSource;
}
curLoopSources[lenLoopReqs] = group->eventLs;
AuList<AuSPtr<Loop::ILoopSource>> nextLoopSources;
if (block)
{
nextLoopSources = Loop::WaitMultipleObjects(curLoopSources, 0);
}
else
{
nextLoopSources.reserve(curLoopSources.size());
for (const auto &source : curLoopSources)
{
if (source->IsSignaled())
{
nextLoopSources.push_back(source);
}
}
}
auto time = Aurora::Time::CurrentClockMS();
state->loopSources.clear();
state->loopSources.reserve(curLoopReq.size());
if (AuTryFind(nextLoopSources, group->eventLs))
{
PollInternal(false);
}
for (const auto &request : curLoopReq)
{
bool remove {};
bool removeType {};
if (AuTryFind(nextLoopSources, request.loopSource))
{
remove = true;
removeType = true;
}
else
{
if (request.requestedOffset)
{
if (request.endTime < time)
{
remove = true;
removeType = false;
}
}
}
if (!remove)
{
state->loopSources.push_back(request);
}
else
{
request.callback(request.loopSource, removeType);
retValue |= removeType;
}
}
state->inLoopSourceMode = state->loopSources.size();
}
return retValue;
}
bool AsyncApp::PollInternal(bool blocking)
{
auto state = GetThreadState();
auto group = state->parent.lock();
@ -296,6 +465,11 @@ namespace Aurora::Async
}
} while (state->pendingWorkItems.empty());
if (group->workQueue.empty())
{
group->eventLs->Reset();
}
}
if (state->pendingWorkItems.empty())
@ -326,15 +500,17 @@ namespace Aurora::Async
runningTasks = gRunningTasks.fetch_sub(1) - 1;
}
//
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
AU_LOCK_GUARD(group->cvWorkMutex);
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
group->eventLs->Set();
state->pendingWorkItems.clear();
}
CtxPollReturn(state, magic, true);
if (runningTasks == 0)