AuroraRuntime/Source/Async/WorkItem.cpp

225 lines
5.1 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: WorkItem.cpp
Date: 2021-6-26
Author: Reece
***/
#include <RuntimeInternal.hpp>
#include "Async.hpp"
#include "WorkItem.hpp"
#include "AsyncApp.hpp"
namespace Aurora::Async
{
WorkItem::WorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking) : worker_(worker), task_(task)
{
if (supportsBlocking)
{
finishedEvent_ = Threading::Primitives::EventUnique(false, true, true);
SysAssert(finishedEvent_ ? true : false);
}
}
WorkItem::~WorkItem()
{
//Fail();
}
void WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
{
auto dependency = std::reinterpret_pointer_cast<WorkItem>(workItem);
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
Threading::LockGuard<Threading::Primitives::SpinLock> l2(dependency->lock);
dependency->waiters_.push_back(shared_from_this());
waitOn_.push_back(workItem);
}
void WorkItem::WaitFor(const AuList<AuSPtr<IWorkItem>> &workItems)
{
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
for (auto &workItem : workItems)
{
auto dependency = std::reinterpret_pointer_cast<WorkItem>(workItem);
Threading::LockGuard<Threading::Primitives::SpinLock> l2(dependency->lock);
if (dependency->HasFailed())
{
Fail();
}
dependency->waiters_.push_back(shared_from_this());
waitOn_.push_back(workItem);
}
}
void WorkItem::SetSchedTime(AuUInt32 ms)
{
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
dispatchTime_ = Time::CurrentClockMS() + ms;
}
void WorkItem::Dispatch()
{
DispatchEx(false);
}
void WorkItem::DispatchEx(bool check)
{
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
if (check)
{
if (!dispatchPending_)
{
return;
}
}
else
{
dispatchPending_ = true;
}
if (HasFailed())
{
return;
}
for (auto itr = waitOn_.begin(); itr != waitOn_.end(); )
{
auto &waitable = *itr;
if (!waitable->HasFinished())
{
return;
}
itr = waitOn_.erase(itr);
}
if (Time::CurrentClockMS() < dispatchTime_)
{
Schedule();
return;
}
SendOff();
}
void WorkItem::CancelAsync()
{
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
Fail();
}
void WorkItem::RunAsync()
{
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
IWorkItemHandler::ProcessInfo info(true);
task_->DispatchFrame(info);
switch (info.type)
{
case IWorkItemHandler::EProcessNext::eFinished:
{
// do nothing
break;
}
case IWorkItemHandler::EProcessNext::eInvalid:
{
SysPanic("Handle Invalid");
break;
}
case IWorkItemHandler::EProcessNext::eSchedule:
{
if (info.reschedMs)
{
SetSchedTime(info.reschedMs);
}
WaitFor(info.waitFor);
}
[[fallthrough]];
case IWorkItemHandler::EProcessNext::eRerun:
{
DispatchEx(false);
return;
}
case IWorkItemHandler::EProcessNext::eFailed:
{
Fail();
return;
}
}
for (auto &waiter : waiters_)
{
std::reinterpret_pointer_cast<WorkItem>(waiter)->DispatchEx(true);
}
finished = true;
if (finishedEvent_)
{
finishedEvent_->Set();
}
}
void WorkItem::Fail()
{
failed = true;
if (auto task_ = std::exchange(this->task_, {}))
{
task_->Shutdown();
}
for (auto &waiter : waiters_)
{
std::reinterpret_pointer_cast<WorkItem>(waiter)->Fail();
}
waiters_.clear();
waitOn_.clear();
if (finishedEvent_)
{
finishedEvent_->Set();
}
}
bool WorkItem::BlockUntilComplete()
{
if (!finishedEvent_) return false;
finishedEvent_->Lock();
return true;
}
bool WorkItem::HasFinished()
{
return finished;
}
bool WorkItem::HasFailed()
{
return failed;
}
void WorkItem::Schedule()
{
// TODO:
}
void WorkItem::SendOff()
{
static_cast<AsyncApp *>(GetAsyncApp())->Run(worker_, this->shared_from_this());
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
{
return std::make_shared<WorkItem>(worker, task, supportsBlocking);
}
}