/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: WorkItem.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "WorkItem.hpp" #include "AsyncApp.hpp" #include "Schedular.hpp" namespace Aurora::Async { WorkItem::WorkItem(const DispatchTarget_t &worker, const AuSPtr &task, bool supportsBlocking) : worker_(worker), task_(task) { if (supportsBlocking) { finishedEvent_ = Threading::Primitives::EventUnique(false, true, true); SysAssert(finishedEvent_ ? true : false); } } WorkItem::~WorkItem() { //Fail(); } void WorkItem::WaitFor(const AuSPtr &workItem) { bool status {}; { auto dependency = std::reinterpret_pointer_cast(workItem); Threading::LockGuard l(lock); Threading::LockGuard l2(dependency->lock); if (dependency->HasFailed()) { status = true; } dependency->waiters_.push_back(shared_from_this()); waitOn_.push_back(workItem); } if (status) { Fail(); } } void WorkItem::WaitFor(const AuList> &workItems) { bool status {}; { Threading::LockGuard l(lock); for (auto &workItem : workItems) { auto dependency = std::reinterpret_pointer_cast(workItem); Threading::LockGuard l2(dependency->lock); if (dependency->HasFailed()) { status = true; } dependency->waiters_.push_back(shared_from_this()); waitOn_.push_back(workItem); } } if (status) { Fail(); } } void WorkItem::SetSchedTimeNs(AuUInt64 ns) { dispatchTimeNs_ = Time::CurrentClockNS() + ns; } void WorkItem::SetSchedTime(AuUInt32 ms) { dispatchTimeNs_ = Time::CurrentClockNS() + (AuUInt64(ms) * AuUInt64(1000000)); } void WorkItem::Dispatch() { DispatchEx(false); } void WorkItem::DispatchEx(bool check) { Threading::LockGuard l(lock); if (check) { if (!dispatchPending_) { return; } } else { dispatchPending_ = true; } if (HasFailed()) { return; } for (auto itr = waitOn_.begin(); itr != waitOn_.end(); ) { auto &waitable = *itr; if (!waitable->HasFinished()) { return; } itr = waitOn_.erase(itr); } if (Time::CurrentClockNS() < dispatchTimeNs_ ) { Schedule(); return; } SendOff(); } void WorkItem::CancelAsync() { Threading::LockGuard l(lock); Fail(); } void WorkItem::RunAsync() { Threading::LockGuard l(lock); IWorkItemHandler::ProcessInfo info(true); task_->DispatchFrame(info); switch (info.type) { case IWorkItemHandler::EProcessNext::eFinished: { // do nothing break; } case IWorkItemHandler::EProcessNext::eInvalid: { SysPanic("Handle Invalid"); break; } case IWorkItemHandler::EProcessNext::eSchedule: { if (info.reschedMs) { SetSchedTime(info.reschedMs); } if (info.reschedNs) { SetSchedTimeNs(info.reschedNs); } WaitFor(info.waitFor); } [[fallthrough]]; case IWorkItemHandler::EProcessNext::eRerun: { DispatchEx(false); return; } case IWorkItemHandler::EProcessNext::eFailed: { Fail(); return; } } for (auto &waiter : waiters_) { std::reinterpret_pointer_cast(waiter)->DispatchEx(true); } finished = true; if (finishedEvent_) { finishedEvent_->Set(); } } void WorkItem::Fail() { failed = true; if (auto task_ = std::exchange(this->task_, {})) { task_->Shutdown(); } for (auto &waiter : waiters_) { std::reinterpret_pointer_cast(waiter)->Fail(); } waiters_.clear(); waitOn_.clear(); if (finishedEvent_) { finishedEvent_->Set(); } } bool WorkItem::BlockUntilComplete() { if (!finishedEvent_) return false; finishedEvent_->Lock(); return true; } bool WorkItem::HasFinished() { return finished; } bool WorkItem::HasFailed() { return failed; } void WorkItem::Schedule() { Aurora::Async::Schedule(dispatchTimeNs_, worker_, this->shared_from_this()); } void WorkItem::SendOff() { static_cast(GetAsyncApp())->Run(worker_, this->shared_from_this()); } AUKN_SYM AuSPtr NewWorkItem(const DispatchTarget_t &worker, const AuSPtr &task, bool supportsBlocking) { return std::make_shared(worker, task, supportsBlocking); } }