/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AsyncApp.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "AsyncApp.hpp" #include "WorkItem.hpp" #include "Schedular.hpp" #include namespace Aurora::Async { static AsyncApp gAsyncApp; static std::atomic_int gRunningTasks {}; void IncRunningTasks() { gRunningTasks++; } void DecRunningTasks() { if ((--gRunningTasks) == 0) { gAsyncApp.Shutdown(); } } //STATIC_TLS(WorkerId_t, tlsWorkerId); static Threading::Threads::TLSVariable tlsWorkerId; using WorkEntry_t = AuPair, AuSPtr>; struct ThreadState { WorkerId_t id; AuUInt8 multipopCount = 1; AuUInt32 lastFrameTime {}; Threading::Threads::ThreadShared_t threadObject; AuWPtr parent; Threading::Primitives::SemaphoreUnique_t syncSema; AuList> features; bool rejecting {}; bool exiting {}; bool shuttingdown {}; Threading::Primitives::EventUnique_t running; //bool running; bool inline IsSysThread() { return id.first == 0; } AuList pendingWorkItems; }; struct GroupState { ThreadGroup_t group; Threading::Primitives::ConditionMutexUnique_t cvWorkMutex; Threading::Primitives::ConditionVariableUnique_t cvVariable; AuList workQueue; AuBST> workers; bool Init(); bool inline IsSysThread() { return group == 0; } }; bool GroupState::Init() { cvWorkMutex = Threading::Primitives::ConditionMutexUnique(); if (!cvWorkMutex) { return false; } cvVariable = Threading::Primitives::ConditionVariableUnique(cvWorkMutex.get()); if (!cvVariable) { return false; } return true; } AsyncApp::AsyncApp() { rwlock_ = Threading::Primitives::RWLockUnique(); SysAssert(static_cast(rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock"); } // TODO: barrier multiple bool AsyncApp::Barrier(WorkerId_t worker, AuUInt32 ms, bool requireSignal, bool drop) { auto & semaphore = GetThreadState()->syncSema; auto unsafeSemaphore = semaphore.get(); auto work = AuMakeShared(([=]() { auto state = GetThreadState(); if (drop) { state->rejecting = true; } if (requireSignal) { state->running->Reset(); } unsafeSemaphore->Unlock(1); if (requireSignal) { state->running->Lock(); } })); #if 0 NewWorkItem({worker.first, worker.second}, work)->Dispatch(); #else Run(worker, work); #endif return WaitFor(worker, semaphore.get(), ms); } void AsyncApp::Run(DispatchTarget_t target, AuSPtr runnable) { auto state = GetGroup(target.first); SysAssert(static_cast(state), "couldn't dispatch a task to an offline group"); IncRunningTasks(); { AU_LOCK_GUARD(state->cvWorkMutex); #if defined(STAGING) || defined(DEBUG) AU_LOCK_GUARD(rwlock_->AsReadable()); if (target.second.has_value()) { auto itr = state->workers.find(*target.second); if ((itr == state->workers.end()) || (itr->second->rejecting)) { SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0)); DecRunningTasks(); throw "Requested job worker is offline"; } } else { auto workers = state->workers; bool found = false; for (const auto &worker : state->workers) { if (!worker.second->rejecting) { found = true; break; } } if (!found) { DecRunningTasks(); throw "No workers available"; } } #endif state->workQueue.push_back(AuMakePair(target.second, runnable)); } if (target.second.has_value()) { // sad :( state->cvVariable->Broadcast(); } else { state->cvVariable->Signal(); } } bool AsyncApp::Poll(bool blocking) { auto state = GetThreadState(); auto group = state->parent.lock(); //state->pendingWorkItems.clear(); { AU_LOCK_GUARD(group->cvWorkMutex); do { // Deque tasks the current thread runner could dipatch // Noting that `multipopCount` determines how aggressive threads are in dequeuing work // It's probable `multipopCount` will equal 1 for your use case // // Only increment when you know tasks within a group queue will not depend on one another // *and* tasks require a small amount of execution time // // This could be potentially useful for an event dispatcher whereby you're dispatching // hundreds of items per second, across a thread or two, knowing dequeuing one instead of all // is a waste of CPU cycles. // // Remember, incrementing `multipopCount` is potentially dangerous the second you have local // thread group waits for (auto itr = group->workQueue.begin(); ((itr != group->workQueue.end()) && (state->pendingWorkItems.size() < state->multipopCount)); ) { if (!itr->first.has_value()) { state->pendingWorkItems.push_back(*itr); itr = group->workQueue.erase(itr); continue; } if ((itr->first.has_value()) && (itr->first.value() == state->id.second)) { state->pendingWorkItems.push_back(*itr); itr = group->workQueue.erase(itr); continue; } itr++; } // Consider blocking for more work if (!blocking) { break; } // Block if no work items are present if (state->pendingWorkItems.empty()) { group->cvVariable->WaitForSignal(); } // Post-wakeup thread terminating check if (state->threadObject->Exiting() || state->shuttingdown) { break; } } while (state->pendingWorkItems.empty()); } if (state->pendingWorkItems.empty()) { return false; } int runningTasks {}; for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) { if (state->threadObject->Exiting() || state->shuttingdown) { break; } // Set the last frame time for a watchdog later down the line state->lastFrameTime = Time::CurrentClockMS(); // Dispatch itr->second->RunAsync(); // Remove from our local job queue itr = state->pendingWorkItems.erase(itr); // Atomically decrement global task counter runningTasks = gRunningTasks.fetch_sub(1) - 1; } // Return popped work back to the groups work pool when our -pump loops were preempted if (state->pendingWorkItems.size()) { AU_LOCK_GUARD(group->cvWorkMutex); group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end()); state->pendingWorkItems.clear(); } if (runningTasks == 0) { ShutdownZero(); } return true; } bool AsyncApp::WaitFor(WorkerId_t worker, Threading::IWaitable *primitive, AuUInt32 timeoutMs) { auto curThread = GetThreadState(); if (worker == curThread->id) { // TODO: nest counter or jump out while (!Threading::WaitFor(primitive, 2)) { while (this->Poll(false)); } return true; } else { return Threading::WaitFor(primitive, timeoutMs); } } bool AsyncApp::WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 timeoutMs) { auto curThread = GetThreadState(); bool workerIdMatches = (!unlocker.second.has_value()) && (unlocker.second.value() == curThread->id.second); if ((unlocker.first == curThread->id.first) && // work group matches ((GetThreadWorkersCount(unlocker.first) < 2) || // is there anyone besides us who might deal with this? unlikely fast path (workerIdMatches))) // well, crap { if ((workerIdMatches) && (unlocker.first != 0)) // UI code is always hacky. dont judge people for nesting tasks within tasks. // if theres a stack overflow problem, the native dev responsable for the sysloop and ui would already know about it { LogWarn("Nested Task: {}:{}. This is not an error, it's just bad practice.", unlocker.first, unlocker.second.value_or(0)); SysPushErrorLogicError("[telemetry] Nested Task: {}:{}", unlocker.first, unlocker.second.value_or(0)); } // TODO: timeout isn't respected here as well while (!Threading::WaitFor(primitive, 2)) { while (this->Poll(false)); } return true; } else { return Threading::WaitFor(primitive, timeoutMs); } } void AsyncApp::Start() { SysAssert(Spawn({0, 1})); StartSched(); } void AsyncApp::Main() { Entrypoint({0, 0}); } void AsyncApp::ShutdownZero() { Shutdown(); } void AsyncApp::Shutdown() { // Nested shutdowns can happen a write lock { AU_LOCK_GUARD(rwlock_->AsReadable()); if (shuttingdown_) { return; } } // Set shutdown flag { AU_LOCK_GUARD(rwlock_->AsWritable()); if (std::exchange(shuttingdown_, true)) { return; } } // Noting // 1) that StopSched may lockup under a writable lock // -> we will terminate a thread that may be dispatching a sys pump event // 2) that barrier doesn't need to be under a write lock // // Perform the following shutdown of the schedular and other available threads under a read lock { AU_LOCK_GUARD(rwlock_->AsReadable()); StopSched(); for (auto &[groupId, group] : this->threads_) { for (auto &[id, worker] : group->workers) { Barrier(worker->id, 0, false, true); } } } // Finally set the shutdown flag on all of our thread contexts // then release them from the runners/workers list // then release all group contexts AuList threads; { AU_LOCK_GUARD(rwlock_->AsWritable()); for (auto &[groupId, group] : this->threads_) { for (auto &[id, worker] : group->workers) { worker->shuttingdown = true; if (groupId != 0) { worker->threadObject->SendExitSignal(); threads.push_back(worker->threadObject); } auto &event = worker->running; if (event) { event->Set(); } } if (group->cvVariable) { AU_LOCK_GUARD(group->cvWorkMutex); group->cvVariable->Broadcast(); } } } // Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads for (const auto &thread : threads) { thread->Exit(); } } bool AsyncApp::Exiting() { return shuttingdown_ || GetThreadState()->exiting; } bool AsyncApp::Spawn(WorkerId_t workerId) { AU_LOCK_GUARD(rwlock_->AsWritable()); if (workerId.second == 0) { LogWarn("WorkerIds must not start from zero to preserve std::optiona nullability"); return false; } AuSPtr group; // Try fetch or allocate group { AuSPtr* groupPtr; if (!AuTryFind(this->threads_, workerId.first, groupPtr)) { group = AuMakeShared(); if (!group->Init()) { SysPushErrorMem("Not enough memory to intiialize a new group state"); return false; } if (!AuTryInsert(this->threads_, AuMakePair(workerId.first, group))) { return false; } } else { group = *groupPtr; } } // Assert worker does not already exist { AuSPtr* ret; if (AuTryFind(group->workers, workerId.second, ret)) { SysPushErrorGen("Thread ID already exists"); return false; } } auto threadState = AuMakeShared(); threadState->parent = group; threadState->running = Threading::Primitives::EventUnique(true, false, true); threadState->syncSema = Threading::Primitives::SemaphoreUnique(0); threadState->id = workerId; if (!threadState->IsSysThread()) { Threading::Threads::AbstractThreadVectors handler; handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread) { Entrypoint(threadState->id); }; threadState->threadObject = Threading::Threads::ThreadUnique(handler); threadState->threadObject->Run(); } else { threadState->threadObject = AuSPtr(Threading::Threads::GetThread(), [](Threading::Threads::IAuroraThread *){}); } group->workers.insert(AuMakePair(workerId.second, threadState)); return true; } Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id) { auto group = GetGroup(id.first); if (!group) { return {}; } AuSPtr* ret; if (!AuTryFind(group->workers, id.second, ret)) { return {}; } return ret->get()->threadObject; } AuBST> AsyncApp::GetThreads() { AU_LOCK_GUARD(rwlock_->AsReadable()); AuBST> ret; for (const auto &group : this->threads_) { AuList workers; for (const auto &thread : group.second->workers) { workers.push_back(thread.second->id.second); } ret[group.first] = workers; } return ret; } WorkerId_t AsyncApp::GetCurrentThread() { return tlsWorkerId; } bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs) { AU_LOCK_GUARD(rwlock_->AsReadable()); auto group = GetGroup(groupId); for (auto &jobWorker : group->workers) { if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min? { return false; } } return true; } void AsyncApp::Signal(ThreadGroup_t groupId) { AU_LOCK_GUARD(rwlock_->AsReadable()); auto group = GetGroup(groupId); for (auto &jobWorker : group->workers) { jobWorker.second->running->Set(); } } bool AsyncApp::SyncTimeout(ThreadGroup_t group, AuUInt32 ms) { return Sync(group, false, ms); } void AsyncApp::SyncAllSafe() { AU_LOCK_GUARD(rwlock_->AsReadable()); for (const auto &re : this->threads_) { for (auto &jobWorker : re.second->workers) { SysAssert(Barrier(jobWorker.second->id, 0, false, false)); } } } AuSPtr AsyncApp::GetGroup(ThreadGroup_t type) { AU_LOCK_GUARD(rwlock_->AsReadable()); AuSPtr* ret; if (!AuTryFind(this->threads_, type, ret)) { return {}; } return *ret; } size_t AsyncApp::GetThreadWorkersCount(ThreadGroup_t group) { AU_LOCK_GUARD(rwlock_->AsReadable()); return GetGroup(group)->workers.size(); } AuSPtr AsyncApp::GetThreadState() { AU_LOCK_GUARD(rwlock_->AsReadable()); auto id = GetCurrentThread(); auto state = GetGroup(id.first); return state->workers[id.second]; } void AsyncApp::Entrypoint(WorkerId_t id) { tlsWorkerId = id; auto auThread = Threading::Threads::GetThread(); auto job = GetThreadState(); while ((!auThread->Exiting()) && (!job->shuttingdown)) { // Do work (blocking) Poll(true); } if (id != WorkerId_t {0, 0}) { AU_LOCK_GUARD(rwlock_->AsReadable()); if (!shuttingdown_ && !job->rejecting) { // Pump and barrier + reject all after atomically Barrier(id, 0, false, true); } } ThisExiting(); if (id == WorkerId_t {0, 0}) { Shutdown(); } } void AsyncApp::SetConsoleCommandDispatcher(WorkerId_t id) { commandDispatcher_ = id; Console::Commands::UpdateDispatcher(commandDispatcher_); } void AsyncApp::ThisExiting() { auto id = GetCurrentThread(); auto state = GetGroup(id.first); { AU_LOCK_GUARD(rwlock_->AsWritable()); auto itr = state->workers.find(id.second); auto &jobWorker = itr->second; // This shouldn't be a problem; however, we're going to handle the one edge case where // some angry sysadmin is spamming commands if ((commandDispatcher_.has_value()) && (commandDispatcher_.value() == id)) { Console::Commands::UpdateDispatcher({}); } // Abort scheduled tasks TerminateSceduledTasks(id); // Clean up thread features // -> transferable TLS handles // -> thread specific vms // -> anything your brain wishes to imagination for (const auto &thread : jobWorker->features) { try { thread->Cleanup(); } catch (...) { LogWarn("Couldn't clean up thread feature!"); Debug::PrintError(); } } jobWorker->features.clear(); state->workers.erase(itr); } } void AsyncApp::AddFeature(WorkerId_t id, AuSPtr feature, bool async) { auto work = AuMakeShared(([=]() { GetThreadState()->features.push_back(feature); feature->Init(); })); auto workItem = NewWorkItem(id, work, !async)->Dispatch(); if (!async) { workItem->BlockUntilComplete(); } } void AsyncApp::AssertInThreadGroup(ThreadGroup_t group) { SysAssert(static_cast(tlsWorkerId).first == group); } void AsyncApp::AssertWorker(WorkerId_t id) { SysAssert(static_cast(tlsWorkerId) == id); } AUKN_SYM IAsyncApp *GetAsyncApp() { return &gAsyncApp; } }