/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AsyncApp.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "AsyncApp.hpp" #include "WorkItem.hpp" namespace Aurora::Async { static AsyncApp gAsyncApp; static std::atomic_int gRunningTasks {}; void IncRunningTasks() { gRunningTasks++; } void DecRunningTasks() { if ((--gRunningTasks) == 0) { gAsyncApp.ShutdownOutOfTasks(); } } //STATIC_TLS(WorkerId_t, tlsWorkerId); static Threading::Threads::TLSVariable tlsWorkerId; struct ThreadState { WorkerId_t id; AuUInt8 multipopCount = 1; AuUInt32 lastFrameTime {}; Threading::Threads::ThreadShared_t threadObject; AuWPtr parent; Threading::Primitives::SemaphoreUnique_t syncSema; AuList> features; bool rejecting {}; bool exiting {}; Threading::Primitives::EventUnique_t running; //bool running; bool inline IsSysThread() { return id.first == 0; } AuList> pendingWorkItems; }; struct GroupState { ThreadGroup_t group; Threading::Primitives::ConditionMutexUnique_t cvWorkMutex; Threading::Primitives::ConditionVariableUnique_t cvVariable; using WorkEntry_t = std::pair, AuSPtr>; AuList workQueue; AuBST> workers; bool Init(); bool inline IsSysThread() { return group == 0; } }; bool GroupState::Init() { cvWorkMutex = Threading::Primitives::ConditionMutexUnique(); if (!cvWorkMutex) { return false; } cvVariable = Threading::Primitives::ConditionVariableUnique(cvWorkMutex.get()); if (!cvVariable) { return false; } return true; } AsyncApp::AsyncApp() { rwlock_ = Threading::Primitives::RWLockUnique(); SysAssert(static_cast(rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock"); } // TODO: barrier multiple bool AsyncApp::Barrier(WorkerId_t worker, AuUInt32 ms, bool requireSignal, bool drop) { auto & semaphore = GetThreadState()->syncSema; auto unsafeSemaphore = semaphore.get(); auto work = std::make_shared(([=]() { auto state = GetThreadState(); if (drop) { state->rejecting = true; } if (requireSignal) { state->running->Reset(); } unsafeSemaphore->Unlock(1); })); #if 0 NewWorkItem({worker.first, worker.second}, work)->Dispatch(); #else Run(worker, work); #endif return WaitFor(worker, semaphore.get(), ms); } void AsyncApp::Run(DispatchTarget_t target, AuSPtr runnable) { auto state = GetGroup(target.first); SysAssert(static_cast(state), "couldn't dispatch a task to an offline group"); gRunningTasks++; { Threading::LockGuardPtr lol(state->cvWorkMutex); #if defined(INTERNAL) || defined(DEBUG) Threading::LockGuardPtr lock(rwlock_->AsReadable()); if (target.second.has_value()) { if (state->workers[*target.second]->rejecting) { SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0)); throw "Requested job worker is offline"; } } else { auto workers = state->workers; bool found = false; for (const auto &worker : state->workers) { if (!worker.second->rejecting) { found = true; break; } } if (!found) { throw "No workers available"; } } #endif state->workQueue.push_back(std::make_pair(target.second, runnable)); } if (target.second.has_value()) { // sad :( state->cvVariable->Broadcast(); } else { state->cvVariable->Signal(); } } bool AsyncApp::Poll(bool blocking) { auto state = GetThreadState(); auto group = state->parent.lock(); state->pendingWorkItems.clear(); { Threading::LockGuardPtr lol(group->cvWorkMutex); do { for (auto itr = group->workQueue.begin(); ((itr != group->workQueue.end()) && (state->pendingWorkItems.size() < state->multipopCount)); ) { if (!itr->first.has_value()) { state->pendingWorkItems.push_back((*itr).second); itr = group->workQueue.erase(itr); continue; } if ((itr->first.has_value()) && (itr->first.value() == state->id.second)) { state->pendingWorkItems.push_back((*itr).second); itr = group->workQueue.erase(itr); continue; } itr++; } if (!blocking) { break; } if (state->pendingWorkItems.empty()) { group->cvVariable->WaitForSignal(); } } while (state->pendingWorkItems.empty()); } if (state->pendingWorkItems.empty()) { return false; } int runningTasks {}; for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) { // TODO: we should be able to implement a watchdog later down the line state->lastFrameTime = Time::CurrentClockMS(); (*itr)->RunAsync(); itr = state->pendingWorkItems.erase(itr); runningTasks = --gRunningTasks; } if (runningTasks == 0) { ShutdownOutOfTasks(); } return true; } bool AsyncApp::WaitFor(WorkerId_t worker, Aurora::Threading::IWaitable *primitive, int timeoutMs) { auto curThread = GetThreadState(); if (worker == curThread->id) { // TODO: timeout isn't respected here as well while (!Threading::WaitFor(primitive, 2)) { while (this->Poll(false)); } return true; } else { return Threading::WaitFor(primitive, timeoutMs); } } void AsyncApp::Start() { SysAssert(Spawn({0, 0})); } void AsyncApp::Main() { Entrypoint({0, 0}); } void AsyncApp::Shutdown() { // Set shutdown trigger shuttingdown_ = true; // Unpause all for (auto &re : this->threads_) { for (auto &[id, worker] : re.second->workers) { auto &event = worker->running; if (event) { event->Set(); } } } // Drop all tasks from this point onwards for (auto &re : this->threads_) { for (auto &[id, worker] : re.second->workers) { SysAssert(Barrier(worker->id, 0, true, true)); } } // TODO: abort all pending tests // Signal the event loop to abort ShutdownOutOfTasks(); } void AsyncApp::ShutdownOutOfTasks() { for (auto& [id, group]: this->threads_) { for (auto & [id, worker] : group->workers) { worker->threadObject->SendExitSignal(); } if (group->cvVariable) { Threading::LockGuardPtr lol(group->cvWorkMutex); group->cvVariable->Broadcast(); } } } bool AsyncApp::Exiting() { return shuttingdown_ || GetThreadState()->exiting; } bool AsyncApp::Spawn(WorkerId_t workerId) { Threading::LockGuardPtr lock(rwlock_->AsWritable()); AuSPtr group; // Try fetch or allocate group { AuSPtr* groupPtr; if (!TryFind(this->threads_, workerId.first, groupPtr)) { group = std::make_shared(); if (!group->Init()) { SysPushErrorMem("Not enough memory to intiialize a new group state"); return false; } if (!TryInsert(this->threads_, std::make_pair(workerId.first, group))) { return false; } } else { group = *groupPtr; } } // Assert worker does not already exist { AuSPtr* ret; if (TryFind(group->workers, workerId.second, ret)) { SysPushErrorGen("Thread ID already exists"); return false; } } auto threadState = std::make_shared(); threadState->parent = group; threadState->running = Threading::Primitives::EventUnique(true, false, false); threadState->syncSema = Threading::Primitives::SemaphoreUnique(0); threadState->id = workerId; if (!threadState->IsSysThread()) { Threading::Threads::AbstractThreadVectors handler; handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread) { Entrypoint(threadState->id); }; threadState->threadObject = Threading::Threads::ThreadUnique(handler); threadState->threadObject->Run(); } else { threadState->threadObject = std::shared_ptr(Threading::Threads::GetThread(), [](Threading::Threads::IAuroraThread *){}); } group->workers.insert(std::make_pair(workerId.second, threadState)); return true; } Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id) { auto group = GetGroup(id.first); if (!group) { return {}; } AuSPtr* ret; if (!TryFind(group->workers, id.second, ret)) { return {}; } return ret->get()->threadObject; } AuBST> AsyncApp::GetThreads() { Threading::LockGuardPtr lock(rwlock_->AsReadable()); AuBST> ret; for (const auto &group : this->threads_) { AuList workers; for (const auto &thread : group.second->workers) { workers.push_back(thread.second->id.second); } ret[group.first] = workers; } return ret; } WorkerId_t AsyncApp::GetCurrentThread() { return tlsWorkerId; } bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs) { Threading::LockGuardPtr lock(rwlock_->AsReadable()); auto group = GetGroup(groupId); for (auto &jobWorker : group->workers) { if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min? { return false; } } return true; } void AsyncApp::Signal(ThreadGroup_t groupId) { Threading::LockGuardPtr lock(rwlock_->AsReadable()); auto group = GetGroup(groupId); for (auto &jobWorker : group->workers) { jobWorker.second->running->Set(); } } bool AsyncApp::SyncTimeout(ThreadGroup_t group, AuUInt32 ms) { return Sync(group, false, ms); } void AsyncApp::SyncAllSafe() { Threading::LockGuardPtr lock(rwlock_->AsReadable()); for (const auto &re : this->threads_) { for (auto &jobWorker : re.second->workers) { SysAssert(Barrier(jobWorker.second->id, 0, false, false)); } } } AuSPtr AsyncApp::GetGroup(ThreadGroup_t type) { Threading::LockGuardPtr lock(rwlock_->AsReadable()); AuSPtr* ret; if (!TryFind(this->threads_, type, ret)) { return {}; } return *ret; } AuSPtr AsyncApp::GetThreadState() { auto id = GetCurrentThread(); auto state = GetGroup(id.first); return state->workers[id.second]; } void AsyncApp::Entrypoint(WorkerId_t id) { tlsWorkerId = id; auto auThread = Threading::Threads::GetThread(); auto job = GetThreadState(); while (!auThread->Exiting()) { // Do work (blocking) Poll(true); // Synchronization after pause job->running->Lock(); } for (const auto &thread : job->features) { thread->Cleanup(); } job->features.clear(); } void AsyncApp::AddFeature(WorkerId_t id, AuSPtr feature, bool async) { auto work = std::make_shared(([=]() { GetThreadState()->features.push_back(feature); feature->Init(); })); auto workItem = NewWorkItem(id, work, !async); workItem->Dispatch(); if (!async) { workItem->BlockUntilComplete(); } } void AsyncApp::AssertInThreadGroup(ThreadGroup_t group) { SysAssert(static_cast(tlsWorkerId).first == group); } void AsyncApp::AssertWorker(WorkerId_t id) { SysAssert(static_cast(tlsWorkerId) == id); } AUKN_SYM IAsyncApp *GetAsyncApp() { return &gAsyncApp; } }