AuroraRuntime/Source/Async/AsyncApp.cpp

960 lines
26 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AsyncApp.cpp
Date: 2021-6-26
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "AsyncApp.hpp"
#include "WorkItem.hpp"
#include "Schedular.hpp"
#include <Source/Console/Commands/Commands.hpp>
namespace Aurora::Async
{
static AsyncApp gAsyncApp;
static std::atomic_int gRunningTasks {};
void IncRunningTasks()
{
gRunningTasks++;
}
void DecRunningTasks()
{
if ((--gRunningTasks) == 0)
{
gAsyncApp.Shutdown();
}
}
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId;
using WorkEntry_t = AuPair<AuOptional<ThreadId_t>, AuSPtr<IAsyncRunnable>>;
struct AsyncAppWaitSourceRequest
{
AuConsumer<AuSPtr<Loop::ILoopSource>, bool> callback;
AuSPtr<Loop::ILoopSource> loopSource;
AuUInt32 requestedOffset;
AuUInt64 startTime;
AuUInt64 endTime;
};
struct ThreadState
{
WorkerId_t id;
AuUInt8 multipopCount = 1;
AuUInt32 lastFrameTime {};
Threading::Threads::ThreadShared_t threadObject;
//std::stack<jmp_buf> jmpStack;
AuWPtr<GroupState> parent;
Threading::Primitives::SemaphoreUnique_t syncSema;
AuList<AuSPtr<Threading::Threads::IThreadFeature>> features;
bool rejecting {};
bool exiting {};
bool inLoopSourceMode {};
bool shuttingdown {};
Threading::Primitives::EventUnique_t running;
//bool running;
bool inline IsSysThread()
{
return id.first == 0;
}
AuList<AsyncAppWaitSourceRequest> loopSources;
AuList<WorkEntry_t> pendingWorkItems;
};
struct GroupState
{
ThreadGroup_t group;
Threading::Primitives::ConditionMutexUnique_t cvWorkMutex;
Threading::Primitives::ConditionVariableUnique_t cvVariable;
AuSPtr<Loop::ILSEvent> eventLs;
AuList<WorkEntry_t> workQueue;
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
bool Init();
bool inline IsSysThread()
{
return group == 0;
}
};
bool GroupState::Init()
{
this->cvWorkMutex = Threading::Primitives::ConditionMutexUnique();
if (!this->cvWorkMutex)
{
return false;
}
this->cvVariable = Threading::Primitives::ConditionVariableUnique(this->cvWorkMutex.get());
if (!this->cvVariable)
{
return false;
}
this->eventLs = Loop::NewLSEvent(false, false, true);
if (!this->eventLs)
{
return false;
}
return true;
}
AsyncApp::AsyncApp()
{
this->rwlock_ = Threading::Primitives::RWLockUnique();
SysAssert(static_cast<bool>(this->rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock");
}
// TODO: barrier multiple
bool AsyncApp::Barrier(WorkerId_t worker, AuUInt32 ms, bool requireSignal, bool drop)
{
auto &semaphore = GetThreadState()->syncSema;
auto unsafeSemaphore = semaphore.get();
auto work = AuMakeShared<AsyncFuncRunnable>(([=]()
{
auto state = GetThreadState();
if (drop)
{
state->rejecting = true;
}
if (requireSignal)
{
state->running->Reset();
}
unsafeSemaphore->Unlock(1);
if (requireSignal)
{
state->running->Lock();
}
}));
#if 0
NewWorkItem({worker.first, worker.second}, work)->Dispatch();
#else
Run(worker, work);
#endif
return WaitFor(worker, AuUnsafeRaiiToShared(semaphore), ms);
}
void AsyncApp::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
{
auto state = GetGroup(target.first);
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
IncRunningTasks();
{
AU_LOCK_GUARD(state->cvWorkMutex);
#if defined(STAGING) || defined(DEBUG)
AU_LOCK_GUARD(rwlock_->AsReadable());
if (target.second != Async::kThreadIdAny)
{
auto itr = state->workers.find(target.second);
if ((itr == state->workers.end()) || (itr->second->rejecting))
{
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second);
DecRunningTasks();
throw "Requested job worker is offline";
}
}
else
{
auto workers = state->workers;
bool found = false;
for (const auto &worker : state->workers)
{
if (!worker.second->rejecting)
{
found = true;
break;
}
}
if (!found)
{
DecRunningTasks();
throw "No workers available";
}
}
#endif
state->workQueue.push_back(AuMakePair(target.second, runnable));
state->eventLs->Set();
}
if (target.second == Async::kThreadIdAny)
{
// sad :(
state->cvVariable->Broadcast();
}
else
{
state->cvVariable->Signal();
}
}
int AsyncApp::CfxPollPush()
{
// TOOD (Reece): implement a context switching library
// Refer to the old implementation of this oin pastebin
return 0;
}
void AsyncApp::CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask)
{
// TOOD (Reece): implement a context switching library
// Refer to the old implementation of this oin pastebin
}
bool AsyncApp::CtxYield()
{
bool ranAtLeastOne = false;
while (this->Poll(false))
{
ranAtLeastOne = true;
}
return ranAtLeastOne;
}
bool AsyncApp::AddLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback)
{
auto thread = this->GetThreadHandle(workerId);
if (!thread)
{
return false;
}
auto group = thread->parent.lock();
{
AU_LOCK_GUARD(group->cvWorkMutex);
AsyncAppWaitSourceRequest req {};
req.startTime = Time::CurrentClockMS();
if (timeout)
{
req.requestedOffset = timeout;
req.endTime = req.startTime + timeout;
}
req.loopSource = loopSource;
req.callback = callback;
if (!AuTryInsert(thread->loopSources, req))
{
return false;
}
thread->inLoopSourceMode = thread->loopSources.size();
}
return true;
}
bool AsyncApp::Poll(bool block)
{
auto state = GetThreadState();
bool success {};
do
{
if (state->inLoopSourceMode)
{
success = PollLoopSource(block);
}
else
{
success = PollInternal(block);
success |= state->inLoopSourceMode;
}
} while (!block || success);
return success;
}
bool AsyncApp::PollLoopSource(bool block)
{
auto state = GetThreadState();
auto group = state->parent.lock();
//state->pendingWorkItems.clear();
auto magic = CfxPollPush();
bool retValue {};
// TODO (reece): This function isn't very efficient
{
AU_LOCK_GUARD(group->cvWorkMutex);
AuList<AsyncAppWaitSourceRequest> curLoopReq = state->loopSources;
AuList<AuSPtr<Loop::ILoopSource>> curLoopSources;
auto lenLoopReqs = curLoopReq.size();
curLoopSources.resize(lenLoopReqs + 1);
for (auto i = 0; i < lenLoopReqs; i++)
{
curLoopSources[i] = curLoopReq[i].loopSource;
}
curLoopSources[lenLoopReqs] = group->eventLs;
AuList<AuSPtr<Loop::ILoopSource>> nextLoopSources;
if (block)
{
nextLoopSources = Loop::WaitMultipleObjects(curLoopSources, 0);
}
else
{
nextLoopSources.reserve(curLoopSources.size());
for (const auto &source : curLoopSources)
{
if (source->IsSignaled())
{
nextLoopSources.push_back(source);
}
}
}
auto time = Aurora::Time::CurrentClockMS();
state->loopSources.clear();
state->loopSources.reserve(curLoopReq.size());
if (AuExists(nextLoopSources, group->eventLs))
{
PollInternal(false);
}
for (const auto &request : curLoopReq)
{
bool remove {};
bool removeType {};
if (AuExists(nextLoopSources, request.loopSource))
{
remove = true;
removeType = true;
}
else
{
if (request.requestedOffset)
{
if (request.endTime < time)
{
remove = true;
removeType = false;
}
}
}
if (!remove)
{
state->loopSources.push_back(request);
}
else
{
request.callback(request.loopSource, removeType);
retValue |= removeType;
}
}
state->inLoopSourceMode = state->loopSources.size();
}
return retValue;
}
bool AsyncApp::PollInternal(bool blocking)
{
auto state = GetThreadState();
auto group = state->parent.lock();
//state->pendingWorkItems.clear();
auto magic = CfxPollPush();
{
AU_LOCK_GUARD(group->cvWorkMutex);
do
{
// Deque tasks the current thread runner could dipatch
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
// It's probable `multipopCount` will equal 1 for your use case
//
// Only increment when you know tasks within a group queue will not depend on one another
// *and* tasks require a small amount of execution time
//
// This could be potentially useful for an event dispatcher whereby you're dispatching
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
// is a waste of CPU cycles.
//
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
// thread group waits
for (auto itr = group->workQueue.begin();
((itr != group->workQueue.end()) &&
(state->pendingWorkItems.size() < state->multipopCount));
)
{
if (!itr->first.has_value())
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
if ((itr->first.has_value()) && (itr->first.value() == state->id.second))
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
itr++;
}
// Consider blocking for more work
if (!blocking)
{
break;
}
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
group->cvVariable->WaitForSignal();
}
// Post-wakeup thread terminating check
if (state->threadObject->Exiting() || state->shuttingdown)
{
break;
}
} while (state->pendingWorkItems.empty());
if (group->workQueue.empty())
{
group->eventLs->Reset();
}
}
if (state->pendingWorkItems.empty())
{
CtxPollReturn(state, magic, false);
return false;
}
int runningTasks {};
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
if (state->threadObject->Exiting() || state->shuttingdown)
{
break;
}
// Set the last frame time for a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS();
// Dispatch
itr->second->RunAsync();
// Remove from our local job queue
itr = state->pendingWorkItems.erase(itr);
// Atomically decrement global task counter
runningTasks = gRunningTasks.fetch_sub(1) - 1;
}
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
AU_LOCK_GUARD(group->cvWorkMutex);
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
group->eventLs->Set();
state->pendingWorkItems.clear();
}
CtxPollReturn(state, magic, true);
if (runningTasks == 0)
{
ShutdownZero();
}
return true;
}
bool AsyncApp::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
auto curThread = GetThreadState();
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == curThread->id.first) && // work group matches
(workerIdMatches)) // well, crap
{
bool queryAsync = false;
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
{
queryAsync = CtxYield();
}
return true;
}
else
{
return Threading::WaitFor(primitive.get(), timeoutMs);
}
}
void AsyncApp::Start()
{
SysAssert(Spawn({0, 0}));
StartSched();
}
void AsyncApp::Main()
{
Entrypoint({0, 0});
}
void AsyncApp::ShutdownZero()
{
Shutdown();
}
void AsyncApp::Shutdown()
{
// Nested shutdowns can happen a write lock
{
AU_LOCK_GUARD(rwlock_->AsReadable());
if (shuttingdown_)
{
return;
}
}
// Set shutdown flag
{
AU_LOCK_GUARD(rwlock_->AsWritable());
if (std::exchange(shuttingdown_, true))
{
return;
}
}
// Noting
// 1) that StopSched may lockup under a writable lock
// -> we will terminate a thread that may be dispatching a sys pump event
// 2) that barrier doesn't need to be under a write lock
//
// Perform the following shutdown of the schedular and other available threads under a read lock
{
AU_LOCK_GUARD(rwlock_->AsReadable());
StopSched();
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
Barrier(worker->id, 0, false, true);
}
}
}
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
AuList<Threading::Threads::ThreadShared_t> threads;
{
AU_LOCK_GUARD(rwlock_->AsWritable());
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
worker->shuttingdown = true;
if (groupId != 0)
{
worker->threadObject->SendExitSignal();
threads.push_back(worker->threadObject);
}
auto &event = worker->running;
if (event)
{
event->Set();
}
}
if (group->cvVariable)
{
AU_LOCK_GUARD(group->cvWorkMutex);
group->cvVariable->Broadcast();
}
}
}
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
for (const auto &thread : threads)
{
thread->Exit();
}
}
bool AsyncApp::Exiting()
{
return shuttingdown_ || GetThreadState()->exiting;
}
bool AsyncApp::Spawn(WorkerId_t workerId)
{
AU_LOCK_GUARD(rwlock_->AsWritable());
AuSPtr<GroupState> group;
// Try fetch or allocate group
{
AuSPtr<GroupState>* groupPtr;
if (!AuTryFind(this->threads_, workerId.first, groupPtr))
{
group = AuMakeShared<GroupState>();
if (!group->Init())
{
SysPushErrorMem("Not enough memory to intiialize a new group state");
return false;
}
if (!AuTryInsert(this->threads_, AuMakePair(workerId.first, group)))
{
return false;
}
}
else
{
group = *groupPtr;
}
}
// Assert worker does not already exist
{
AuSPtr<ThreadState>* ret;
if (AuTryFind(group->workers, workerId.second, ret))
{
SysPushErrorGen("Thread ID already exists");
return false;
}
}
auto threadState = AuMakeShared<ThreadState>();
threadState->parent = group;
threadState->running = Threading::Primitives::EventUnique(true, false, true);
threadState->syncSema = Threading::Primitives::SemaphoreUnique(0);
threadState->id = workerId;
if (!threadState->IsSysThread())
{
Threading::Threads::AbstractThreadVectors handler;
handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread)
{
Entrypoint(threadState->id);
};
threadState->threadObject = Threading::Threads::ThreadUnique(handler);
threadState->threadObject->Run();
}
else
{
threadState->threadObject = AuSPtr<Threading::Threads::IAuroraThread>(Threading::Threads::GetThread(), [](Threading::Threads::IAuroraThread *){});
}
group->workers.insert(AuMakePair(workerId.second, threadState));
return true;
}
AuSPtr<ThreadState> AsyncApp::GetThreadHandle(WorkerId_t id)
{
AU_LOCK_GUARD(rwlock_->AsReadable());
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
AuSPtr<ThreadState>* ret;
if (!AuTryFind(group->workers, id.second, ret))
{
return {};
}
return *ret;
}
Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id)
{
return GetThreadHandle(id)->threadObject;
}
AuBST<ThreadGroup_t, AuList<ThreadId_t>> AsyncApp::GetThreads()
{
AU_LOCK_GUARD(rwlock_->AsReadable());
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
for (const auto &group : this->threads_)
{
AuList<ThreadId_t> workers;
for (const auto &thread : group.second->workers)
{
workers.push_back(thread.second->id.second);
}
ret[group.first] = workers;
}
return ret;
}
WorkerId_t AsyncApp::GetCurrentThread()
{
return tlsWorkerId;
}
bool AsyncApp::Sync(WorkerId_t groupId, AuUInt32 timeoutMs, bool requireSignal)
{
AU_LOCK_GUARD(rwlock_->AsReadable());
auto group = GetGroup(groupId.first);
auto currentWorkerId = GetCurrentThread().second;
if (groupId.second == Async::kThreadIdAny)
{
for (auto &jobWorker : group->workers)
{
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{
return false;
}
}
}
else
{
return Barrier(groupId, timeoutMs, requireSignal && groupId.second != currentWorkerId, false);
}
return true;
}
void AsyncApp::Signal(WorkerId_t groupId)
{
AU_LOCK_GUARD(rwlock_->AsReadable());
auto group = GetGroup(groupId.first);
if (groupId.second == Async::kThreadIdAny)
{
for (auto &jobWorker : group->workers)
{
jobWorker.second->running->Set();
}
}
else
{
GetThreadHandle(groupId)->running->Set();
}
}
void AsyncApp::SyncAllSafe()
{
AU_LOCK_GUARD(rwlock_->AsReadable());
for (const auto &re : this->threads_)
{
for (auto &jobWorker : re.second->workers)
{
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
}
}
}
AuSPtr<GroupState> AsyncApp::GetGroup(ThreadGroup_t type)
{
AU_LOCK_GUARD(rwlock_->AsReadable());
AuSPtr<GroupState>* ret;
if (!AuTryFind(this->threads_, type, ret))
{
return {};
}
return *ret;
}
size_t AsyncApp::GetThreadWorkersCount(ThreadGroup_t group)
{
AU_LOCK_GUARD(rwlock_->AsReadable());
return GetGroup(group)->workers.size();
}
AuSPtr<ThreadState> AsyncApp::GetThreadState()
{
AU_LOCK_GUARD(rwlock_->AsReadable());
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
return state->workers[id.second];
}
void AsyncApp::Entrypoint(WorkerId_t id)
{
tlsWorkerId = id;
auto auThread = Threading::Threads::GetThread();
auto job = GetThreadState();
while ((!auThread->Exiting()) && (!job->shuttingdown))
{
// Do work (blocking)
Poll(true);
}
if (id != WorkerId_t {0, 0})
{
AU_LOCK_GUARD(rwlock_->AsReadable());
if (!shuttingdown_ && !job->rejecting)
{
// Pump and barrier + reject all after atomically
Barrier(id, 0, false, true);
}
}
ThisExiting();
if (id == WorkerId_t {0, 0})
{
Shutdown();
}
}
void AsyncApp::SetConsoleCommandDispatcher(WorkerId_t id)
{
commandDispatcher_ = id;
Console::Commands::UpdateDispatcher(commandDispatcher_);
}
void AsyncApp::ThisExiting()
{
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
{
AU_LOCK_GUARD(rwlock_->AsWritable());
auto itr = state->workers.find(id.second);
auto &jobWorker = itr->second;
// This shouldn't be a problem; however, we're going to handle the one edge case where
// some angry sysadmin is spamming commands
if ((commandDispatcher_.has_value())
&& (commandDispatcher_.value() == id))
{
Console::Commands::UpdateDispatcher({});
}
// Abort scheduled tasks
TerminateSceduledTasks(id);
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms
// -> anything your brain wishes to imagination
for (const auto &thread : jobWorker->features)
{
try
{
thread->Cleanup();
}
catch (...)
{
LogWarn("Couldn't clean up thread feature!");
Debug::PrintError();
}
}
jobWorker->features.clear();
state->workers.erase(itr);
}
}
void AsyncApp::AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async)
{
auto work = AuMakeShared<BasicWorkStdFunc>(([=]()
{
GetThreadState()->features.push_back(feature);
feature->Init();
}));
auto workItem = NewWorkItem(id, work, !async)->Dispatch();
if (!async)
{
workItem->BlockUntilComplete();
}
}
void AsyncApp::AssertInThreadGroup(ThreadGroup_t group)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
}
void AsyncApp::AssertWorker(WorkerId_t id)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
}
AUKN_SYM IAsyncApp *GetAsyncApp()
{
return &gAsyncApp;
}
}