/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: WorkItem.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "WorkItem.hpp" #include "AsyncApp.hpp" #include "Schedular.hpp" namespace Aurora::Async { WorkItem::WorkItem(IThreadPoolInternal *owner, const WorkerId_t &worker, const AuSPtr &task, bool supportsBlocking) : worker_(worker), task_(task), owner_(owner) { if (supportsBlocking) { this->finishedEvent_ = AuThreadPrimitives::EventUnique(false, true, true); SysAssert(this->finishedEvent_); } } WorkItem::~WorkItem() { //Fail(); } AuSPtr WorkItem::WaitFor(const AuSPtr &workItem) { bool status {}; { auto dependency = std::reinterpret_pointer_cast(workItem); AU_LOCK_GUARD(this->lock); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { status = true; } dependency->waiters_.push_back(shared_from_this()); this->waitOn_.push_back(workItem); } if (status) { Fail(); } return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::WaitFor(const AuList> &workItems) { bool status {}; { AU_LOCK_GUARD(this->lock); for (auto &workItem : workItems) { auto dependency = std::reinterpret_pointer_cast(workItem); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { status = true; } dependency->waiters_.push_back(shared_from_this()); this->waitOn_.push_back(workItem); } } if (status) { Fail(); } return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Then(const AuSPtr &next) { auto that = AU_SHARED_FROM_THIS; next->WaitFor(that); next->Dispatch(); return that; } AuSPtr WorkItem::SetSchedTimeNs(AuUInt64 ns) { this->dispatchTimeNs_ = Time::CurrentClockNS() + ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTimeAbs(AuUInt32 ms) { this->dispatchTimeNs_ = AuUInt64(ms) * AuUInt64(1000000); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTimeNsAbs(AuUInt64 ns) { this->dispatchTimeNs_ = ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTime(AuUInt32 ms) { this->dispatchTimeNs_ = Time::CurrentClockNS() + (AuUInt64(ms) * AuUInt64(1000000)); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTime(AuUInt32 ms) { this->delayTimeNs_ += AuUInt64(ms) * AuUInt64(1000000); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTimeNs(AuUInt64 ns) { this->delayTimeNs_ += ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Dispatch() { DispatchEx(false); return AU_SHARED_FROM_THIS; } void WorkItem::DispatchEx(bool check) { AU_LOCK_GUARD(lock); if (check) { if (this->dispatchPending_) { return; } } if (HasFailed()) { return; } for (auto itr = waitOn_.begin(); itr != waitOn_.end(); ) { auto &waitable = *itr; if (!waitable->HasFinished()) { return; } itr = waitOn_.erase(itr); } this->dispatchPending_ = true; if (Time::CurrentClockNS() < this->dispatchTimeNs_) { Schedule(); return; } if (auto delay = std::exchange(delayTimeNs_, {})) { this->dispatchTimeNs_ = delay + Time::CurrentClockNS(); Schedule(); return; } SendOff(); } float WorkItem::GetPrio() { return prio_; } void WorkItem::SetPrio(float val) { prio_ = val; } void WorkItem::CancelAsync() { AU_TRY_LOCK_GUARD_NAMED(this->lock, asd); Fail(); } void WorkItem::RunAsync() { AU_LOCK_GUARD(this->lock); IWorkItemHandler::ProcessInfo info(true); info.pool = this->owner_->ToThreadPool(); if (this->task_) { this->task_->DispatchFrame(info); } switch (info.type) { case IWorkItemHandler::EProcessNext::eFinished: { // do nothing break; } case IWorkItemHandler::EProcessNext::eInvalid: { SysPanic("Handle Invalid"); break; } case IWorkItemHandler::EProcessNext::eSchedule: { if (info.reschedMs) { SetSchedTime(info.reschedMs); } else if (info.reschedNs) { SetSchedTimeNs(info.reschedNs); } else if (info.reschedClockAbsMs) { SetSchedTimeAbs(info.reschedMs); } else if (info.reschedClockAbsNs) { SetSchedTimeNsAbs(info.reschedNs); } WaitFor(info.waitFor); } [[fallthrough]]; case IWorkItemHandler::EProcessNext::eRerun: { DispatchEx(false); return; } case IWorkItemHandler::EProcessNext::eFailed: { Fail(); return; } } this->finished = true; if (this->finishedEvent_) { this->finishedEvent_->Set(); } for (auto &waiter : this->waiters_) { std::reinterpret_pointer_cast(waiter)->DispatchEx(true); } } void WorkItem::Fail() { failed = true; if (auto task_ = std::exchange(this->task_, {})) { task_->Shutdown(); } for (auto &waiter : this->waiters_) { std::reinterpret_pointer_cast(waiter)->Fail(); } this->waiters_.clear(); this->waitOn_.clear(); if (this->finishedEvent_) { this->finishedEvent_->Set(); } } bool WorkItem::BlockUntilComplete() { if (!this->finishedEvent_) return false; return this->owner_->WaitFor(this->worker_, AuUnsafeRaiiToShared(this->finishedEvent_), 0); } bool WorkItem::HasFinished() { return this->finished; } void WorkItem::Cancel() { AU_LOCK_GUARD(this->lock); Fail(); } bool WorkItem::HasFailed() { return this->failed; } void WorkItem::Schedule() { Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_, this->shared_from_this()); } void WorkItem::SendOff() { if (!this->task_) { // If we aren't actually calling a task interface, we may as well just dispatch objects waiting on us from here RunAsync(); } else { this->owner_->Run(this->worker_, this->shared_from_this()); } } static auto GetWorkerInternal(const AuSPtr &pool) { return std::static_pointer_cast(pool).get(); } static auto GetWorkerInternal() { return static_cast(GetAsyncApp()); } AUKN_SYM AuSPtr NewWorkItem(const WorkerId_t &worker, const AuSPtr &task, bool supportsBlocking) { if (!task) { return {}; } return AuMakeShared(GetWorkerInternal(), worker, task, supportsBlocking); } AUKN_SYM AuSPtr NewWorkItem(const WorkerPId_t &worker, const AuSPtr &task, bool supportsBlocking) { if (!task) { return {}; } if (!worker.pool) { return {}; } return AuMakeShared(GetWorkerInternal(), worker, task, supportsBlocking); } AUKN_SYM AuSPtr NewFence() { return AuMakeShared(GetWorkerInternal(), WorkerId_t{}, AuSPtr{}, true); } void *WorkItem::GetPrivateData() { if (!this->task_) { return nullptr; } return this->task_->GetPrivateData(); } AuOptional WorkItem::ToWorkResultT() { if (!this->task_) { return nullptr; } auto priv = reinterpret_cast(this->task_->GetPrivateData()); if (!priv) { return nullptr; } if (priv->magic == AuConvertMagicTag32("BWOT")) { return reinterpret_cast(priv)->opt; } return {}; } }