AuroraRuntime/Source/Async/WorkItem.cpp

743 lines
18 KiB
C++
Raw Normal View History

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: WorkItem.cpp
Date: 2021-6-26
Author: Reece
***/
2021-09-30 14:57:41 +00:00
#include <Source/RuntimeInternal.hpp>
2021-06-30 09:28:52 +00:00
#include "Async.hpp"
2021-06-27 21:25:29 +00:00
#include "WorkItem.hpp"
2021-06-30 09:28:52 +00:00
#include "AsyncApp.hpp"
#include "AuSchedular.hpp"
2021-06-27 21:25:29 +00:00
2023-08-30 00:28:05 +00:00
#if defined(AURORA_COMPILER_CLANG)
// warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch
#pragma clang diagnostic ignored "-Wswitch"
// Yea, I don't give a shit.
#endif
2021-06-27 21:25:29 +00:00
namespace Aurora::Async
{
FuncWorker::FuncWorker(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
AuVoidFunc &&func) :
WorkItem(owner, worker, {}),
func(func)
{
}
WorkItem::WorkItem(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
const AuSPtr<IWorkItemHandler> &task) :
worker_(worker), task_(task), owner_(owner),
finishedEvent_(false, true, true)
2021-06-27 21:25:29 +00:00
{
this->uShutdownCookie = owner->uAtomicShutdownCookie;
if (auto pWorker = this->GetState())
{
this->optOtherCookie = pWorker->shutdown.uShutdownFence;
}
2021-06-27 21:25:29 +00:00
}
WorkItem::~WorkItem()
{
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
{
pIOWatch->StopWatch();
}
2021-06-27 21:25:29 +00:00
}
AuSPtr<IWorkItem> WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
2021-06-27 21:25:29 +00:00
{
bool status {};
2021-06-27 21:25:29 +00:00
{
auto dependency = AuReinterpretCast<WorkItem>(workItem);
AU_LOCK_GUARD(this->lock);
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD(dependency->lock);
if (dependency->HasFailed())
{
2022-05-17 00:59:19 +00:00
Fail();
return AU_SHARED_FROM_THIS;
}
if (!AuTryInsert(dependency->waiters_, AuSharedFromThis()))
{
2022-05-17 00:59:19 +00:00
Fail();
return AU_SHARED_FROM_THIS;
}
if (!AuTryInsert(this->waitOn_, workItem))
{
AuTryRemove(dependency->waiters_, AuSharedFromThis());
2022-05-17 00:59:19 +00:00
Fail();
return AU_SHARED_FROM_THIS;
}
2021-06-27 21:25:29 +00:00
}
return AU_SHARED_FROM_THIS;
}
2021-07-12 14:37:05 +00:00
bool WorkItem::WaitForLocked(const AuList<AuSPtr<IWorkItem>> &workItems)
{
for (auto &workItem : workItems)
{
2023-05-02 10:27:06 +00:00
if (!workItem)
{
SysPushErrorArg();
return false;
}
auto dependency = AuReinterpretCast<WorkItem>(workItem);
AU_LOCK_GUARD(dependency->lock);
if (dependency->HasFailed())
{
return false;
}
if (!AuTryInsert(dependency->waiters_, AuSharedFromThis()))
{
return false;
}
if (!AuTryInsert(this->waitOn_, workItem))
{
AuTryRemove(dependency->waiters_, AuSharedFromThis());
return false;
}
}
return true;
}
AuSPtr<IWorkItem> WorkItem::WaitFor(const AuList<AuSPtr<IWorkItem>> &workItems)
{
bool status {};
{
AU_LOCK_GUARD(this->lock);
status = WaitForLocked(workItems);
}
if (!status)
{
Fail();
}
return AU_SHARED_FROM_THIS;
2021-06-27 21:25:29 +00:00
}
2021-07-12 14:37:05 +00:00
AuSPtr<IWorkItem> WorkItem::Then(const AuSPtr<IWorkItem> &next)
{
auto that = AU_SHARED_FROM_THIS;
2023-05-02 10:27:06 +00:00
if (!next)
{
SysPushErrorArg();
return {};
}
2021-07-12 14:37:05 +00:00
next->WaitFor(that);
next->Dispatch();
return that;
}
2021-06-30 12:00:32 +00:00
AuSPtr<IWorkItem> WorkItem::SetSchedTimeNs(AuUInt64 ns)
2021-06-30 12:00:32 +00:00
{
this->dispatchTimeNs_ = Time::SteadyClockNS() + ns;
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedTimeAbs(AuUInt32 ms)
{
return this->SetSchedTimeNsAbs(AuMSToNS<AuUInt64>(ms));
}
AuSPtr<IWorkItem> WorkItem::SetSchedTimeNsAbs(AuUInt64 ns)
{
auto uNow = AuTime::CurrentClockNS();
if (uNow > ns)
{
return AU_SHARED_FROM_THIS;
}
this->dispatchTimeNs_ = AuTime::SteadyClockNS() + (ns - uNow);
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedSteadyTimeNsAbs(AuUInt64 ns)
{
this->dispatchTimeNs_ = ns;
return AU_SHARED_FROM_THIS;
2021-06-30 12:00:32 +00:00
}
AuSPtr<IWorkItem> WorkItem::SetSchedByLoopSource(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource)
{
this->pIOWatchLS = pLoopSource;
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedTime(AuUInt32 ms)
2021-06-27 21:25:29 +00:00
{
this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS<AuUInt64>(ms);
return AU_SHARED_FROM_THIS;
2021-06-27 21:25:29 +00:00
}
AuSPtr<IWorkItem> WorkItem::AddDelayTime(AuUInt32 ms)
{
2022-06-12 18:23:18 +00:00
this->delayTimeNs_ += AuMSToNS<AuUInt64>(ms);
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::AddDelayTimeNs(AuUInt64 ns)
{
this->delayTimeNs_ += ns;
return AU_SHARED_FROM_THIS;
}
2021-06-27 21:25:29 +00:00
AuSPtr<IWorkItem> WorkItem::Dispatch()
2021-06-27 21:25:29 +00:00
{
DispatchEx(false);
return AU_SHARED_FROM_THIS;
2021-06-27 21:25:29 +00:00
}
void WorkItem::DispatchEx(bool check)
{
AU_LOCK_GUARD(this->lock);
2021-06-27 21:25:29 +00:00
DispatchExLocked(check);
}
void WorkItem::DispatchExLocked(bool check)
{
2021-06-27 21:25:29 +00:00
if (check)
{
if (this->dispatchPending_)
2021-06-27 21:25:29 +00:00
{
return;
}
}
if (HasFailed())
{
return;
}
2021-06-27 21:25:29 +00:00
for (auto itr = waitOn_.begin(); itr != waitOn_.end(); )
{
auto &waitable = *itr;
if (!waitable->HasFinished())
{
return;
}
itr = waitOn_.erase(itr);
}
this->dispatchPending_ = true;
2021-09-06 10:58:08 +00:00
if (this->pIOWatchLS)
{
if (!this->pIOWatchLS->IsSignaled())
{
if (!Schedule())
{
this->Fail();
}
return;
}
else
{
AuResetMember(this->pIOWatchLS);
}
}
if (Time::SteadyClockNS() < this->dispatchTimeNs_)
{
if (!Schedule())
{
this->Fail();
}
return;
}
2022-01-19 17:08:13 +00:00
if (auto delay = AuExchange(delayTimeNs_, {}))
2021-06-27 21:25:29 +00:00
{
this->dispatchTimeNs_ = delay + Time::SteadyClockNS();
if (!Schedule())
{
this->Fail();
}
2021-06-27 21:25:29 +00:00
return;
}
SendOff();
}
EWorkPrio WorkItem::GetPrio()
{
return this->prio_;
}
void WorkItem::SetPrio(EWorkPrio prio)
{
this->prio_ = prio;
}
2021-06-30 09:28:52 +00:00
void WorkItem::CancelAsync()
{
AU_TRY_LOCK_GUARD_NAMED(this->lock2, asd);
this->Fail();
}
AuOptional<AuPair<AuUInt32, AuUInt32>> WorkItem::QueryFences()
{
return AuPair<AuUInt32, AuUInt32>{ this->uShutdownCookie, this->optOtherCookie.ValueOr(0) };
}
bool WorkItem::CheckAlive()
{
if (this->owner_ &&
this->uShutdownCookie != this->owner_->uAtomicShutdownCookie)
{
this->Fail();
return false;
}
if (this->optOtherCookie)
{
if (auto pWorker = this->GetState())
{
if (this->optOtherCookie.value() != pWorker->shutdown.uShutdownFence)
{
this->Fail();
return false;
}
}
}
return true;
2021-06-30 09:28:52 +00:00
}
void WorkItem::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
if (!this->CheckAlive())
{
return;
}
if (this->task_)
{
try
{
this->task_->DispatchFrame(info);
}
catch (...)
{
// TODO: runtime config for root level exception caught behaviour
SysPushErrorCatch();
this->Fail();
return;
}
}
}
AuSPtr<ThreadState> WorkItem::GetState()
{
if (this->worker_.HasValue())
{
return this->owner_->GetThreadHandle(this->worker_.value());
}
else
{
return {};
}
}
void WorkItem::RunAsyncLocked2()
{
AU_LOCK_GUARD(this->lock2);
IWorkItemHandler::ProcessInfo info(true);
info.pool = this->owner_->ToThreadPool();
DispatchTask(info);
RunAsyncLocked2(info);
}
2021-06-30 09:28:52 +00:00
void WorkItem::RunAsync()
2021-06-27 21:25:29 +00:00
{
AU_LOCK_GUARD(this->lock2);
RunAsyncLocked();
}
void WorkItem::RunAsyncLocked()
{
2021-06-27 21:25:29 +00:00
IWorkItemHandler::ProcessInfo info(true);
info.pool = this->owner_->ToThreadPool();
2021-09-06 10:58:08 +00:00
2023-09-22 05:05:39 +00:00
DispatchTask(info);
2021-06-27 21:25:29 +00:00
AU_LOCK_GUARD(this->lock);
RunAsyncLocked2(info);
}
void WorkItem::RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info)
{
2021-06-27 21:25:29 +00:00
switch (info.type)
{
case ETickType::eFinished:
2021-06-27 21:25:29 +00:00
{
2021-06-27 21:25:29 +00:00
// do nothing
break;
}
case ETickType::eEnumInvalid:
2021-06-27 21:25:29 +00:00
{
SysPanic("Handle Invalid");
break;
}
case ETickType::eSchedule:
2021-06-27 21:25:29 +00:00
{
if (info.reschedMs)
{
SetSchedTime(info.reschedMs);
}
else if (info.reschedNs)
2021-06-30 12:00:32 +00:00
{
SetSchedTimeNs(info.reschedNs);
}
else if (info.reschedClockAbsMs)
{
SetSchedTimeAbs(info.reschedClockAbsMs);
}
else if (info.reschedClockAbsNs)
{
SetSchedTimeNsAbs(info.reschedClockAbsNs);
}
else if (info.reschedSteadyClockAbsNs)
{
SetSchedSteadyTimeNsAbs(info.reschedSteadyClockAbsNs);
}
else if (info.pLoopSource)
{
SetSchedByLoopSource(info.pLoopSource);
}
if (!WaitForLocked(info.waitFor))
{
this->Fail();
}
2021-06-27 21:25:29 +00:00
}
[[fallthrough]];
case ETickType::eRerun:
2021-06-27 21:25:29 +00:00
{
DispatchExLocked(false);
2021-06-27 21:25:29 +00:00
return;
}
case ETickType::eFailed:
2021-06-27 21:25:29 +00:00
{
this->Fail();
2021-06-27 21:25:29 +00:00
return;
}
}
this->finished = true;
if (this->finishedEvent_)
2021-06-27 21:25:29 +00:00
{
this->finishedEvent_->Set();
2021-06-27 21:25:29 +00:00
}
2021-09-06 10:58:08 +00:00
for (auto &waiter : AuExchange(this->waiters_, {}))
2021-09-06 10:58:08 +00:00
{
AuReinterpretCast<WorkItem>(waiter)->DispatchExLocked(true);
2021-09-06 10:58:08 +00:00
}
this->waitOn_.clear();
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
{
pIOWatch->StopWatch();
}
AuResetMember(this->pIOWatchLS);
2021-06-27 21:25:29 +00:00
}
void WorkItem::Fail()
{
failed = true;
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
{
pIOWatch->StopWatch();
}
AuResetMember(this->pIOWatchLS);
2022-01-19 17:08:13 +00:00
if (auto task_ = AuExchange(this->task_, {}))
2021-06-27 21:25:29 +00:00
{
task_->OnFailure();
2021-06-27 21:25:29 +00:00
}
for (auto &waiter : this->waiters_)
2021-06-27 21:25:29 +00:00
{
AuReinterpretCast<WorkItem>(waiter)->Fail();
2021-06-27 21:25:29 +00:00
}
this->waiters_.clear();
this->waitOn_.clear();
2021-06-27 21:25:29 +00:00
if (this->finishedEvent_)
2021-06-27 21:25:29 +00:00
{
this->finishedEvent_->Set();
2021-06-27 21:25:29 +00:00
}
}
bool WorkItem::BlockUntilComplete()
{
2023-09-22 05:05:39 +00:00
if (!this->worker_)
{
this->finishedEvent_->Wait();
return true;
}
struct WaitProxy : Threading::IWaitable
{
AuThreadPrimitives::IEvent *pEvent {};
WaitProxy(AuThreadPrimitives::IEvent *pEvent) :
pEvent(pEvent)
{
}
bool HasOSHandle(AuMach &mach) override
{
return this->pEvent->HasOSHandle(mach);
}
bool HasLockImplementation() override
{
return this->pEvent->HasLockImplementation();
}
void Lock() override
{
return this->pEvent->Lock();
}
bool LockMS(AuUInt64 qwRelTimeoutInMs) override
{
return this->pEvent->LockMS(qwRelTimeoutInMs);
}
bool LockNS(AuUInt64 qwRelTimeoutInNs) override
{
return this->pEvent->LockNS(qwRelTimeoutInNs);
}
bool LockAbsMS(AuUInt64 qwAbsTimeoutInMs) override
{
return this->pEvent->LockAbsMS(qwAbsTimeoutInMs);
}
bool LockAbsNS(AuUInt64 qwAbsTimeoutInNs) override
{
return this->pEvent->LockAbsNS(qwAbsTimeoutInNs);
}
bool TryLock() override
{
return this->pEvent->TryLock();
}
void Unlock() override
{
// PATCH: ensure release notifications set the event!
this->pEvent->Set();
}
} waitProxy(this->finishedEvent_.AsPointer());
return this->owner_->WaitFor(this->worker_.value(),
AuUnsafeRaiiToShared(&waitProxy),
0 /*forever*/);
2021-06-27 21:25:29 +00:00
}
bool WorkItem::HasFinished()
{
return this->finished;
2021-06-27 21:25:29 +00:00
}
2021-09-29 08:01:42 +00:00
void WorkItem::Cancel()
{
AU_LOCK_GUARD(this->lock2);
2021-09-29 08:01:42 +00:00
Fail();
}
2021-06-27 21:25:29 +00:00
bool WorkItem::HasFailed()
{
return this->failed;
2021-06-27 21:25:29 +00:00
}
bool WorkItem::Schedule()
2021-06-27 21:25:29 +00:00
{
if (auto pLoopSource = this->pIOWatchLS)
{
if (this->pIOWatch)
{
return true;
}
auto pState = this->GetState();
if (!pState)
{
return false;
}
auto pIOProcessor = pState->singletons.GetIOProcessor(this->worker_.value());
if (!pIOProcessor)
{
return false;
}
this->pIOWatch = pIOProcessor->StartSimpleLSWatch(pLoopSource, AuMakeSharedThrow<AuIO::IIOSimpleEventListenerFunctional>([=]()
{
this->Dispatch();
}, [=]()
{
this->Dispatch();
}, [=]()
{
this->Dispatch();
}));
if (!this->pIOWatch)
{
return false;
}
return true;
}
else
{
return Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_.value(), AuSharedFromThis());
}
2021-06-27 21:25:29 +00:00
}
void WorkItem::SendOff()
{
2023-09-22 05:05:39 +00:00
if (!this->worker_)
{
// If we aren't actually calling a task interface, we may as well just dispatch objects waiting on us from here
RunAsyncLocked2();
}
else
{
2023-09-22 05:05:39 +00:00
this->owner_->Run(this->worker_.value(), AuSharedFromThis());
}
2021-06-27 21:25:29 +00:00
}
2023-09-15 15:41:10 +00:00
inline auto ToInternal(const AuSPtr<IThreadPool> &pool)
{
return AuStaticPointerCast<ThreadPool>(pool);
}
void FuncWorker::DispatchTask(IWorkItemHandler::ProcessInfo &info)
2021-06-27 21:25:29 +00:00
{
auto func = AuExchange(this->func, {});
if (!this->CheckAlive())
{
return;
}
if (func)
{
func();
}
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task)
{
AU_DEBUG_MEMCRUNCH;
if (!task)
{
SysPushErrorArg("WorkItem has null task. Running out of memory?");
return {};
}
2023-09-15 15:41:10 +00:00
auto pWorker = GetCurrentWorkerPId().pool;
2023-09-15 15:53:18 +00:00
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(static_cast<IAsyncApp *>(gAsyncApp));
2023-09-15 15:53:18 +00:00
}
2023-09-15 15:41:10 +00:00
return AuMakeShared<WorkItem>(ToInternal(pWorker).get(), WorkerPId_t { pWorker , worker }, task);
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkFunction(const WorkerPId_t &worker, AuVoidFunc func)
{
AU_DEBUG_MEMCRUNCH;
if (!func)
{
SysPushErrorArg("WorkItem has null function");
return {};
}
2023-09-15 15:53:18 +00:00
auto pWorker = worker.pool;
if (!pWorker)
{
pWorker = GetCurrentWorkerPId().pool;
}
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(static_cast<IAsyncApp *>(gAsyncApp));
2023-09-15 15:53:18 +00:00
}
return AuMakeSharedThrow<FuncWorker>(ToInternal(pWorker).get(), worker, AuMove(func));
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task)
{
AU_DEBUG_MEMCRUNCH;
if (!task)
{
SysPushErrorArg("WorkItem has null task. Running out of memory?");
return {};
}
2023-09-15 15:53:18 +00:00
auto pWorker = worker.pool;
if (!pWorker)
{
pWorker = GetCurrentWorkerPId().pool;
}
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(static_cast<IAsyncApp *>(gAsyncApp));
2023-09-15 15:53:18 +00:00
}
return AuMakeSharedThrow<WorkItem>(ToInternal(pWorker).get(), worker, task);
2021-06-27 21:25:29 +00:00
}
AUKN_SYM AuSPtr<IWorkItem> NewFence()
{
2023-09-15 15:41:10 +00:00
auto pWorker = GetCurrentWorkerPId().pool;
2023-09-15 15:53:18 +00:00
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(static_cast<IAsyncApp *>(gAsyncApp));
2023-09-15 15:53:18 +00:00
}
2023-09-15 15:41:10 +00:00
return AuMakeShared<WorkItem>((IThreadPoolInternal *)ToInternal(pWorker).get(), WorkerPId_t {}, AuSPtr<IWorkItemHandler>{});
}
void *WorkItem::GetPrivateData()
{
if (!this->task_)
{
return nullptr;
}
return this->task_->GetPrivateData();
}
2021-06-27 21:25:29 +00:00
}