/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: WorkItem.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "WorkItem.hpp" #include "AsyncApp.hpp" #include "Schedular.hpp" #if defined(AURORA_COMPILER_CLANG) // warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch #pragma clang diagnostic ignored "-Wswitch" // Yea, I don't give a shit. #endif namespace Aurora::Async { FuncWorker::FuncWorker(IThreadPoolInternal *owner, const WorkerPId_t &worker, AuVoidFunc &&func) : WorkItem(owner, worker, {}), func(func) { } WorkItem::WorkItem(IThreadPoolInternal *owner, const WorkerPId_t &worker, const AuSPtr &task) : worker_(worker), task_(task), owner_(owner), finishedEvent_(false, true, true) { this->uShutdownCookie = owner->uAtomicShutdownCookie; if (auto pWorker = this->GetState()) { this->optOtherCookie = pWorker->uShutdownFence; } } WorkItem::~WorkItem() { //Fail(); } AuSPtr WorkItem::WaitFor(const AuSPtr &workItem) { bool status {}; { auto dependency = AuReinterpretCast(workItem); AU_LOCK_GUARD(this->lock); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { Fail(); return AU_SHARED_FROM_THIS; } if (!AuTryInsert(dependency->waiters_, AuSharedFromThis())) { Fail(); return AU_SHARED_FROM_THIS; } if (!AuTryInsert(this->waitOn_, workItem)) { AuTryRemove(dependency->waiters_, AuSharedFromThis()); Fail(); return AU_SHARED_FROM_THIS; } } return AU_SHARED_FROM_THIS; } bool WorkItem::WaitForLocked(const AuList> &workItems) { for (auto &workItem : workItems) { if (!workItem) { SysPushErrorArg(); return false; } auto dependency = AuReinterpretCast(workItem); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { return false; } if (!AuTryInsert(dependency->waiters_, AuSharedFromThis())) { return false; } if (!AuTryInsert(this->waitOn_, workItem)) { AuTryRemove(dependency->waiters_, AuSharedFromThis()); return false; } } return true; } AuSPtr WorkItem::WaitFor(const AuList> &workItems) { bool status {}; { AU_LOCK_GUARD(this->lock); status = WaitForLocked(workItems); } if (!status) { Fail(); } return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Then(const AuSPtr &next) { auto that = AU_SHARED_FROM_THIS; if (!next) { SysPushErrorArg(); return {}; } next->WaitFor(that); next->Dispatch(); return that; } AuSPtr WorkItem::SetSchedTimeNs(AuUInt64 ns) { this->dispatchTimeNs_ = Time::SteadyClockNS() + ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTimeAbs(AuUInt32 ms) { return this->SetSchedTimeNsAbs(AuMSToNS(ms)); } AuSPtr WorkItem::SetSchedTimeNsAbs(AuUInt64 ns) { auto uNow = AuTime::CurrentClockNS(); if (uNow > ns) { return AU_SHARED_FROM_THIS; } this->dispatchTimeNs_ = AuTime::SteadyClockNS() + (ns - uNow); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedSteadyTimeNsAbs(AuUInt64 ns) { this->dispatchTimeNs_ = ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTime(AuUInt32 ms) { this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS(ms); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTime(AuUInt32 ms) { this->delayTimeNs_ += AuMSToNS(ms); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTimeNs(AuUInt64 ns) { this->delayTimeNs_ += ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Dispatch() { DispatchEx(false); return AU_SHARED_FROM_THIS; } void WorkItem::DispatchEx(bool check) { AU_LOCK_GUARD(this->lock); DispatchExLocked(check); } void WorkItem::DispatchExLocked(bool check) { if (check) { if (this->dispatchPending_) { return; } } if (HasFailed()) { return; } for (auto itr = waitOn_.begin(); itr != waitOn_.end(); ) { auto &waitable = *itr; if (!waitable->HasFinished()) { return; } itr = waitOn_.erase(itr); } this->dispatchPending_ = true; if (Time::SteadyClockNS() < this->dispatchTimeNs_) { if (!Schedule()) { this->Fail(); } return; } if (auto delay = AuExchange(delayTimeNs_, {})) { this->dispatchTimeNs_ = delay + Time::SteadyClockNS(); if (!Schedule()) { this->Fail(); } return; } SendOff(); } EWorkPrio WorkItem::GetPrio() { return this->prio_; } void WorkItem::SetPrio(EWorkPrio prio) { this->prio_ = prio; } void WorkItem::CancelAsync() { AU_TRY_LOCK_GUARD_NAMED(this->lock2, asd); this->Fail(); } AuOptional> WorkItem::QueryFences() { return AuPair{ this->uShutdownCookie, this->optOtherCookie.ValueOr(0) }; } bool WorkItem::CheckAlive() { if (this->owner_ && this->uShutdownCookie != this->owner_->uAtomicShutdownCookie) { this->Fail(); return false; } if (this->optOtherCookie) { if (auto pWorker = this->GetState()) { if (this->optOtherCookie.value() != pWorker->uShutdownFence) { this->Fail(); return false; } } } return true; } void WorkItem::DispatchTask(IWorkItemHandler::ProcessInfo &info) { if (!this->CheckAlive()) { return; } if (this->task_) { try { this->task_->DispatchFrame(info); } catch (...) { // TODO: runtime config for root level exception caught behaviour SysPushErrorCatch(); this->Fail(); return; } } } AuSPtr WorkItem::GetState() { if (!this->worker_.HasValue()) { return {}; } return this->owner_->GetThreadHandle(this->worker_.value()); } void WorkItem::RunAsyncLocked2() { AU_LOCK_GUARD(this->lock2); IWorkItemHandler::ProcessInfo info(true); info.pool = this->owner_->ToThreadPool(); DispatchTask(info); RunAsyncLocked2(info); } void WorkItem::RunAsync() { AU_LOCK_GUARD(this->lock2); RunAsyncLocked(); } void WorkItem::RunAsyncLocked() { IWorkItemHandler::ProcessInfo info(true); info.pool = this->owner_->ToThreadPool(); DispatchTask(info); AU_LOCK_GUARD(this->lock); RunAsyncLocked2(info); } void WorkItem::RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info) { switch (info.type) { case ETickType::eFinished: { // do nothing break; } case ETickType::eEnumInvalid: { SysPanic("Handle Invalid"); break; } case ETickType::eSchedule: { if (info.reschedMs) { SetSchedTime(info.reschedMs); } else if (info.reschedNs) { SetSchedTimeNs(info.reschedNs); } else if (info.reschedClockAbsMs) { SetSchedTimeAbs(info.reschedMs); } else if (info.reschedClockAbsNs) { SetSchedTimeNsAbs(info.reschedNs); } if (!WaitForLocked(info.waitFor)) { this->Fail(); } } [[fallthrough]]; case ETickType::eRerun: { DispatchExLocked(false); return; } case ETickType::eFailed: { this->Fail(); return; } } this->finished = true; if (this->finishedEvent_) { this->finishedEvent_->Set(); } for (auto &waiter : this->waiters_) { AuReinterpretCast(waiter)->DispatchExLocked(true); } } void WorkItem::Fail() { failed = true; if (auto task_ = AuExchange(this->task_, {})) { task_->OnFailure(); } for (auto &waiter : this->waiters_) { AuReinterpretCast(waiter)->Fail(); } this->waiters_.clear(); this->waitOn_.clear(); if (this->finishedEvent_) { this->finishedEvent_->Set(); } } bool WorkItem::BlockUntilComplete() { if (!this->finishedEvent_) return false; if (!this->worker_) { this->finishedEvent_->Wait(); return true; }; return this->owner_->WaitFor(this->worker_.value(), AuUnsafeRaiiToShared(this->finishedEvent_.AsPointer()), 0); } bool WorkItem::HasFinished() { return this->finished; } void WorkItem::Cancel() { AU_LOCK_GUARD(this->lock2); Fail(); } bool WorkItem::HasFailed() { return this->failed; } bool WorkItem::Schedule() { return Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_.value(), AuSharedFromThis()); } void WorkItem::SendOff() { if (!this->worker_) { // If we aren't actually calling a task interface, we may as well just dispatch objects waiting on us from here RunAsyncLocked2(); } else { this->owner_->Run(this->worker_.value(), AuSharedFromThis()); } } inline auto ToInternal(const AuSPtr &pool) { return AuStaticPointerCast(pool); } void FuncWorker::DispatchTask(IWorkItemHandler::ProcessInfo &info) { auto func = AuExchange(this->func, {}); if (!this->CheckAlive()) { return; } if (func) { func(); } } AUKN_SYM AuSPtr NewWorkItem(const WorkerId_t &worker, const AuSPtr &task) { AU_DEBUG_MEMCRUNCH; if (!task) { SysPushErrorArg("WorkItem has null task. Running out of memory?"); return {}; } auto pWorker = GetCurrentWorkerPId().pool; if (!pWorker) { pWorker = AuUnsafeRaiiToShared(static_cast(gAsyncApp)); } return AuMakeShared(ToInternal(pWorker).get(), WorkerPId_t { pWorker , worker }, task); } AUKN_SYM AuSPtr NewWorkFunction(const WorkerPId_t &worker, AuVoidFunc func) { AU_DEBUG_MEMCRUNCH; if (!func) { SysPushErrorArg("WorkItem has null function"); return {}; } auto pWorker = worker.pool; if (!pWorker) { pWorker = GetCurrentWorkerPId().pool; } if (!pWorker) { pWorker = AuUnsafeRaiiToShared(static_cast(gAsyncApp)); } return AuMakeSharedThrow(ToInternal(pWorker).get(), worker, AuMove(func)); } AUKN_SYM AuSPtr NewWorkItem(const WorkerPId_t &worker, const AuSPtr &task) { AU_DEBUG_MEMCRUNCH; if (!task) { SysPushErrorArg("WorkItem has null task. Running out of memory?"); return {}; } auto pWorker = worker.pool; if (!pWorker) { pWorker = GetCurrentWorkerPId().pool; } if (!pWorker) { pWorker = AuUnsafeRaiiToShared(static_cast(gAsyncApp)); } return AuMakeSharedThrow(ToInternal(pWorker).get(), worker, task); } AUKN_SYM AuSPtr NewFence() { auto pWorker = GetCurrentWorkerPId().pool; if (!pWorker) { pWorker = AuUnsafeRaiiToShared(static_cast(gAsyncApp)); } return AuMakeShared((IThreadPoolInternal *)ToInternal(pWorker).get(), WorkerPId_t {}, AuSPtr{}); } void *WorkItem::GetPrivateData() { if (!this->task_) { return nullptr; } return this->task_->GetPrivateData(); } }