/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: WorkItem.cpp Date: 2021-6-26 Author: Reece ***/ #include #include "Async.hpp" #include "WorkItem.hpp" #include "AsyncApp.hpp" #include "AuSchedular.hpp" #include "ThreadPool.hpp" #if defined(AURORA_COMPILER_CLANG) // warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch #pragma clang diagnostic ignored "-Wswitch" // Yea, I don't give a shit. #endif namespace Aurora::Async { WorkItem::WorkItem(IThreadPoolInternal *owner, AuOptional worker, const AuSPtr &task) : worker_(worker), task_(task), owner_(owner), finishedEvent_(false, true, true) { SysAssert(owner, "Unable to dispatch on an unspecified pool"); this->uShutdownCookie = owner->uAtomicShutdownCookie; if (auto pWorker = this->GetState()) { this->optOtherCookie = pWorker->shutdown.uShutdownFence; } } WorkItem::~WorkItem() { if (auto pIOWatch = AuExchange(this->pIOWatch, {})) { pIOWatch->StopWatch(); } } AuSPtr WorkItem::WaitFor(const AuSPtr &workItem) { bool status {}; { auto dependency = AuReinterpretCast(workItem); AU_LOCK_GUARD(this->lock); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { Fail(); return AU_SHARED_FROM_THIS; } if (!AuTryInsert(dependency->waiters_, AuSharedFromThis())) { Fail(); return AU_SHARED_FROM_THIS; } if (!AuTryInsert(this->waitOn_, AuConstReference(workItem))) { AuTryRemove(dependency->waiters_, AuSharedFromThis()); Fail(); return AU_SHARED_FROM_THIS; } } return AU_SHARED_FROM_THIS; } bool WorkItem::WaitForLocked(const AuList> &workItems) { for (auto &workItem : workItems) { if (!workItem) { SysPushErrorArg(); return false; } auto dependency = AuReinterpretCast(workItem); AU_LOCK_GUARD(dependency->lock); if (dependency->HasFailed()) { return false; } if (!AuTryInsert(dependency->waiters_, AuSharedFromThis())) { return false; } if (!AuTryInsert(this->waitOn_, AuConstReference(workItem))) { AuTryRemove(dependency->waiters_, AuSharedFromThis()); return false; } } return true; } AuSPtr WorkItem::WaitFor(const AuList> &workItems) { bool status {}; { AU_LOCK_GUARD(this->lock); status = WaitForLocked(workItems); } if (!status) { Fail(); } return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Then(const AuSPtr &next) { auto that = AU_SHARED_FROM_THIS; if (!next) { SysPushErrorArg(); return {}; } next->WaitFor(that); next->Dispatch(); return that; } AuSPtr WorkItem::SetSchedTimeNs(AuUInt64 ns) { this->dispatchTimeNs_ = Time::SteadyClockNS() + ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTimeAbs(AuUInt32 ms) { return this->SetSchedTimeNsAbs(AuMSToNS(ms)); } AuSPtr WorkItem::SetSchedTimeNsAbs(AuUInt64 ns) { auto uNow = AuTime::CurrentClockNS(); if (uNow > ns) { return AU_SHARED_FROM_THIS; } this->dispatchTimeNs_ = AuTime::SteadyClockNS() + (ns - uNow); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedSteadyTimeNsAbs(AuUInt64 ns) { this->dispatchTimeNs_ = ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedByLoopSourceRepeating(const AuSPtr &pLoopSource) { if (this->pIOWatchLS) { this->Fail(); return {}; } this->bIoRepeating = true; this->pIOWatchLS = pLoopSource; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedByLoopSourceOnce(const AuSPtr &pLoopSource) { if (this->pIOWatchLS) { this->Fail(); return {}; } this->bIoRepeating = false; this->pIOWatchLS = pLoopSource; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::SetSchedTime(AuUInt32 ms) { this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS(ms); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTime(AuUInt32 ms) { this->delayTimeNs_ += AuMSToNS(ms); return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTimeNs(AuUInt64 ns) { this->delayTimeNs_ += ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTimeRepeating(AuUInt32 ms) { this->delayTimeNs_ += AuMSToNS(ms); this->bRepeatingTimer = true; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::AddDelayTimeNsRepeating(AuUInt64 ns) { this->bRepeatingTimer = true; this->delayTimeNs_ += ns; return AU_SHARED_FROM_THIS; } AuSPtr WorkItem::Dispatch() { DispatchEx(false); return AU_SHARED_FROM_THIS; } void WorkItem::DispatchEx(bool check, bool bIsIOTick) { AU_LOCK_GUARD(this->lock); DispatchExLocked(check); } void WorkItem::DispatchExLocked(bool check, bool bIsIOTick) { if (check) { if (this->dispatchPending_) { return; } } if (HasFailed()) { return; } for (auto itr = this->waitOn_.begin(); itr != this->waitOn_.end(); ) { auto &waitable = *itr; if (AuStaticCast(waitable)->HasFinishedRepeatable()) { // noop } else if (waitable->HasFinished()) { itr = this->waitOn_.erase(itr); } else if (waitable->HasFailed()) { itr = this->waitOn_.erase(itr); this->Fail(); return; } else { return; } } this->dispatchPending_ = true; if (this->ioTickCount) { // Bypass IO tick check / I owe you } else if (!bIsIOTick && this->pIOWatchLS) { if (this->pIOWatchLS && !this->pIOWatch && // should sched? !this->pIOWatchLS->IsSignaled()) // fast path to avoid scheduling { // Try schedule outside of an IO tick if (!Schedule()) { this->Fail(); } return; } else if (!this->pIOWatchLS->IsSignaled()) // fast path to avoid bailing out { return; } } if (bIsIOTick) { this->ioTickCount++; } if (Time::SteadyClockNS() < this->dispatchTimeNs_) { if (!Schedule()) { this->Fail(); } return; } if (auto delay = this->delayTimeNs_) { this->dispatchTimeNs_ = delay + Time::SteadyClockNS(); if (!this->bRepeatingTimer) { this->delayTimeNs_ = 0; } if (!Schedule()) { this->Fail(); } return; } if (this->ioTickCount) { this->ioTickCount--; } SendOff(); } EWorkPriority WorkItem::GetPrio() { return this->prio_; } void WorkItem::SetPrio(EWorkPriority prio) { if (!EWorkPriorityIsValid(prio)) { SysPushErrorArg(); return; } this->prio_ = prio; } void WorkItem::CancelAsync() { AU_LOCK_GUARD(this->lock2); this->Fail(); } AuOptional> WorkItem::QueryFences() { return AuPair{ this->uShutdownCookie, this->optOtherCookie.ValueOr(0) }; } bool WorkItem::CheckAlive() { if (this->owner_ && this->uShutdownCookie != this->owner_->uAtomicShutdownCookie) { this->Fail(); return false; } if (this->optOtherCookie) { if (auto pWorker = this->GetState()) { if (this->optOtherCookie.value() != pWorker->shutdown.uShutdownFence) { this->Fail(); return false; } } } return true; } void WorkItem::DispatchTask(IWorkItemHandler::ProcessInfo &info) { if (!this->CheckAlive()) { return; } bool bRerun = (this->pIOWatchLS && this->bIoRepeating) || this->waitOn_.size() || this->bRepeatingTimer; if (bRerun) { info = ETickType::eScheduleAndNotifyRepeatingWaiters; } if (this->task_) { try { this->task_->DispatchFrame(info); } catch (...) { // TODO: runtime config for root level exception caught behaviour SysPushErrorCatch(); this->Fail(); return; } } } void WorkItem::Cleanup() { } AuSPtr WorkItem::GetState() { if (this->worker_.HasValue()) { return this->owner_->GetThreadHandle(this->worker_.value()); } else { return {}; } } void WorkItem::RunAsyncLocked2() { AU_LOCK_GUARD(this->lock2); IWorkItemHandler::ProcessInfo info(true); info.pool = this->owner_->ToThreadPool(); DispatchTask(info); RunAsyncLocked2(info); } void WorkItem::RunAsync() { AU_LOCK_GUARD(this->lock2); RunAsyncLocked(); } void WorkItem::RunAsyncLocked() { IWorkItemHandler::ProcessInfo info(true); info.pool = this->owner_->ToThreadPool(); DispatchTask(info); AU_LOCK_GUARD(this->lock); RunAsyncLocked2(info); } void WorkItem::RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info) { if (info.type != ETickType::eScheduleAndNotifyRepeatingWaiters && info.type != ETickType::eRerunAndNotifyRepeatingWaiters) { if (auto pIOWatch = AuExchange(this->pIOWatch, {})) { pIOWatch->StopWatch(); } AuResetMember(this->pIOWatchLS); } this->finished2 = false; switch (info.type) { case ETickType::eFinished: { // do nothing break; } case ETickType::eEnumInvalid: { SysPanic("Handle Invalid"); break; } case ETickType::eSchedule: case ETickType::eScheduleAndNotifyRepeatingWaiters: { if (info.reschedMs) { SetSchedTime(info.reschedMs); } else if (info.reschedNs) { SetSchedTimeNs(info.reschedNs); } else if (info.reschedClockAbsMs) { SetSchedTimeAbs(info.reschedClockAbsMs); } else if (info.reschedClockAbsNs) { SetSchedTimeNsAbs(info.reschedClockAbsNs); } else if (info.reschedSteadyClockAbsNs) { SetSchedSteadyTimeNsAbs(info.reschedSteadyClockAbsNs); } else if (info.pLoopSource) { SetSchedByLoopSourceOnce(info.pLoopSource); } if (!WaitForLocked(info.waitFor)) { this->Fail(); } } [[fallthrough]]; case ETickType::eRerun: case ETickType::eRerunAndNotifyRepeatingWaiters: { DispatchEx(false); if (info.type == ETickType::eRerun || info.type == ETickType::eSchedule) { return; } this->finished2 = true; } case ETickType::eFailed: { this->Fail(); return; } } this->dispatchPending_ = false; this->finished = !this->finished2; this->finishedEvent_->Set(); for (auto &waiter : AuExchange(this->waiters_, {})) { AuReinterpretCast(waiter)->DispatchEx(true); } if (this->finished2) { this->finished2 = false; this->finishedEvent_->Reset(); } else { this->Cleanup(); } } void WorkItem::Fail() { failed = true; if (auto pIOWatch = AuExchange(this->pIOWatch, {})) { pIOWatch->StopWatch(); } AuResetMember(this->pIOWatchLS); if (auto task_ = AuExchange(this->task_, {})) { task_->OnFailure(); } for (auto &waiter : this->waiters_) { AuReinterpretCast(waiter)->Fail(); } this->waiters_.clear(); this->waitOn_.clear(); this->Cleanup(); if (this->finishedEvent_) { this->finishedEvent_->Set(); } } bool WorkItem::BlockUntilComplete() { #if 0 if (!this->worker_) #else if (!this->worker_ && !AuAsync::GetCurrentWorkerPId()) #endif { this->finishedEvent_->Wait(); return true; } struct WaitProxy : Threading::IWaitable { AuThreadPrimitives::IEvent *pEvent {}; WaitProxy(AuThreadPrimitives::IEvent *pEvent) : pEvent(pEvent) { } bool HasOSHandle(AuMach &mach) override { return this->pEvent->HasOSHandle(mach); } bool HasLockImplementation() override { return this->pEvent->HasLockImplementation(); } void Lock() override { return this->pEvent->Lock(); } bool LockMS(AuUInt64 qwRelTimeoutInMs) override { return this->pEvent->LockMS(qwRelTimeoutInMs); } bool LockNS(AuUInt64 qwRelTimeoutInNs) override { return this->pEvent->LockNS(qwRelTimeoutInNs); } bool LockAbsMS(AuUInt64 qwAbsTimeoutInMs) override { return this->pEvent->LockAbsMS(qwAbsTimeoutInMs); } bool LockAbsNS(AuUInt64 qwAbsTimeoutInNs) override { return this->pEvent->LockAbsNS(qwAbsTimeoutInNs); } bool TryLock() override { return this->pEvent->TryLock(); } void Unlock() override { // PATCH: ensure release notifications set the event! this->pEvent->Set(); } } waitProxy(this->finishedEvent_.AsPointer()); return this->owner_->WaitFor(this->worker_.ValueOr(WorkerPId_t {}), AuUnsafeRaiiToShared(&waitProxy), 0 /*forever*/); } bool WorkItem::HasFinished() { return this->finished; } bool WorkItem::HasFinishedRepeatable() { return this->finished2; } void WorkItem::Cancel() { AU_LOCK_GUARD(this->lock2); Fail(); } bool WorkItem::HasFailed() { return this->failed; } bool WorkItem::Schedule() { if (auto pLoopSource = this->pIOWatchLS) { AuSPtr pIOProcessor; if (this->pIOWatch) { return true; } auto pState = this->GetState(); if (!pState) { pIOProcessor = this->owner_->ToThreadPool()->GetIOProcessor(AuAsync::GetCurrentWorkerPId()); } else { pIOProcessor = pState->singletons.GetIOProcessor(this->worker_.value()); } if (!pIOProcessor) { return false; } AU_DEBUG_MEMCRUNCH; AuWPtr wptr = this->SharedFromThis(); this->pIOWatch = pIOProcessor->StartSimpleLSWatchEx(pLoopSource, AuMakeSharedThrow([=]() { if (auto pThat = AuTryLockMemoryType(wptr)) { pThat->DispatchEx(false, true); } }, [=]() { if (auto pThat = AuTryLockMemoryType(wptr)) { pThat->DispatchEx(false, true); } }, [=]() { if (auto pThat = AuTryLockMemoryType(wptr)) { pThat->DispatchEx(false, true); } }), !this->bIoRepeating); if (!this->pIOWatch) { return false; } return true; } else { return Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_.value(), AuSharedFromThis()); } } void WorkItem::SendOff() { if (!this->worker_) { // If we aren't actually calling a task interface, we may as well just dispatch objects waiting on us from here RunAsyncLocked2(); } else { this->owner_->Run(this->worker_.value(), AuSharedFromThis()); } } void *WorkItem::GetPrivateData() { if (!this->task_) { return nullptr; } return this->task_->GetPrivateData(); } inline auto ToInternal(const AuSPtr &pool) { return AuStaticPointerCast(pool); } AUKN_SYM AuSPtr NewWorkItem(const WorkerId_t &worker, const AuSPtr &task) { AU_DEBUG_MEMCRUNCH; if (!task) { SysPushErrorArg("WorkItem has null task. Running out of memory?"); return {}; } auto pWorker = GetCurrentWorkerPId().GetPool(); if (!pWorker) { pWorker = AuUnsafeRaiiToShared(static_cast(gAsyncApp)); } return AuMakeShared(ToInternal(pWorker).get(), WorkerPId_t { pWorker , worker }, task); } AUKN_SYM AuSPtr NewWorkItem(const WorkerPId_t &worker, const AuSPtr &task) { AU_DEBUG_MEMCRUNCH; if (!task) { SysPushErrorArg("WorkItem has null task. Running out of memory?"); return {}; } auto pWorker = worker.GetPool(); if (!pWorker) { pWorker = GetCurrentWorkerPId().GetPool(); } if (!pWorker) { pWorker = AuUnsafeRaiiToShared(static_cast(gAsyncApp)); } return AuMakeSharedThrow(ToInternal(pWorker).get(), worker, task); } AUKN_SYM AuSPtr NewFence() { auto pWorker = GetCurrentWorkerPId().GetPool(); if (!pWorker) { pWorker = AuUnsafeRaiiToShared(static_cast(gAsyncApp)); } return AuMakeShared((IThreadPoolInternal *)ToInternal(pWorker).get(), AuOptional {}, AuSPtr{}); } }