/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: WorkItem.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "WorkItem.hpp" #include "AsyncApp.hpp" #include "Schedular.hpp" namespace Aurora::Async { WorkItem::WorkItem(const DispatchTarget_t &worker, const AuSPtr &task, bool supportsBlocking) : worker_(worker), task_(task) { if (supportsBlocking) { finishedEvent_ = Threading::Primitives::EventUnique(false, true, true); SysAssert(finishedEvent_ ? true : false); } } WorkItem::~WorkItem() { //Fail(); } AuSPtr WorkItem::WaitFor(const AuSPtr &workItem) { bool status {}; { auto dependency = std::reinterpret_pointer_cast(workItem); AU_LOCK_GUARD(lock); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { status = true; } dependency->waiters_.push_back(shared_from_this()); waitOn_.push_back(workItem); } if (status) { Fail(); } return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::WaitFor(const AuList> &workItems) { bool status {}; { AU_LOCK_GUARD(lock); for (auto &workItem : workItems) { auto dependency = std::reinterpret_pointer_cast(workItem); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { status = true; } dependency->waiters_.push_back(shared_from_this()); waitOn_.push_back(workItem); } } if (status) { Fail(); } return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Then(const AuSPtr &next) { auto that = AU_SHARED_FROM_THIS; next->WaitFor(that); next->Dispatch(); return that; } AuSPtr WorkItem::SetSchedTimeNs(AuUInt64 ns) { dispatchTimeNs_ = Time::CurrentClockNS() + ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTime(AuUInt32 ms) { dispatchTimeNs_ = Time::CurrentClockNS() + (AuUInt64(ms) * AuUInt64(1000000)); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTime(AuUInt32 ms) { delayTimeNs_ += AuUInt64(ms) * AuUInt64(1000000); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTimeNs(AuUInt64 ns) { delayTimeNs_ += ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Dispatch() { DispatchEx(false); return AU_SHARED_FROM_THIS; } void WorkItem::DispatchEx(bool check) { AU_LOCK_GUARD(lock); if (check) { if (dispatchPending_) { return; } } if (HasFailed()) { return; } for (auto itr = waitOn_.begin(); itr != waitOn_.end(); ) { auto &waitable = *itr; if (!waitable->HasFinished()) { return; } itr = waitOn_.erase(itr); } dispatchPending_ = true; if (Time::CurrentClockNS() < dispatchTimeNs_) { Schedule(); return; } if (auto delay = std::exchange(delayTimeNs_, {})) { dispatchTimeNs_ = delay + Time::CurrentClockNS(); Schedule(); return; } SendOff(); } void WorkItem::CancelAsync() { AU_LOCK_GUARD(lock); Fail(); } void WorkItem::RunAsync() { AU_LOCK_GUARD(lock); IWorkItemHandler::ProcessInfo info(true); if (task_) { task_->DispatchFrame(info); } switch (info.type) { case IWorkItemHandler::EProcessNext::eFinished: { // do nothing break; } case IWorkItemHandler::EProcessNext::eInvalid: { SysPanic("Handle Invalid"); break; } case IWorkItemHandler::EProcessNext::eSchedule: { if (info.reschedMs) { SetSchedTime(info.reschedMs); } if (info.reschedNs) { SetSchedTimeNs(info.reschedNs); } WaitFor(info.waitFor); } [[fallthrough]]; case IWorkItemHandler::EProcessNext::eRerun: { DispatchEx(false); return; } case IWorkItemHandler::EProcessNext::eFailed: { Fail(); return; } } finished = true; if (finishedEvent_) { finishedEvent_->Set(); } for (auto &waiter : waiters_) { std::reinterpret_pointer_cast(waiter)->DispatchEx(true); } } void WorkItem::Fail() { failed = true; if (auto task_ = std::exchange(this->task_, {})) { task_->Shutdown(); } for (auto &waiter : waiters_) { std::reinterpret_pointer_cast(waiter)->Fail(); } waiters_.clear(); waitOn_.clear(); if (finishedEvent_) { finishedEvent_->Set(); } } bool WorkItem::BlockUntilComplete() { if (!finishedEvent_) return false; return Async::GetAsyncApp()->WaitFor(this->worker_, finishedEvent_.get(), 0); } bool WorkItem::HasFinished() { return finished; } bool WorkItem::HasFailed() { return failed; } void WorkItem::Schedule() { Aurora::Async::Schedule(dispatchTimeNs_, worker_, this->shared_from_this()); } void WorkItem::SendOff() { static_cast(GetAsyncApp())->Run(worker_, this->shared_from_this()); } AUKN_SYM AuSPtr NewWorkItem(const DispatchTarget_t &worker, const AuSPtr &task, bool supportsBlocking) { return AuMakeShared(worker, task, supportsBlocking); } }