AuroraRuntime/Include/Aurora/Async/Async.hpp

361 lines
12 KiB
C++
Raw Normal View History

2021-06-27 21:25:29 +00:00
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Async.hpp
Date: 2021-6-10
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
class IWorkItem;
class IAsyncApp;
using AVoid = AuUInt8;
AUKN_SYM IAsyncApp *GetAsyncApp();
using ThreadGroup_t = AuUInt8;
using ThreadId_t = AuUInt16;
using WorkerId_t = std::pair<ThreadGroup_t, ThreadId_t>;
using DispatchTarget_t = std::pair<ThreadGroup_t, std::optional<ThreadId_t>>;
struct IWorkItemHandler
{
enum class EProcessNext
{
eInvalid = -1,
eFinished = 0,
eRerun,
eSchedule,
2021-06-27 21:25:29 +00:00
eFailed
};
2021-06-27 21:25:29 +00:00
struct ProcessInfo
{
ProcessInfo(bool finished) : type(finished ? EProcessNext::eFinished : EProcessNext::eFailed) {}
ProcessInfo(EProcessNext type) : type(type) {}
ProcessInfo(const AuList<AuSPtr<IWorkItem>> &blockedBy) : type(EProcessNext::eSchedule), waitFor(blockedBy) {}
// ...
2021-06-27 21:25:29 +00:00
EProcessNext type;
AuList<AuSPtr<IWorkItem>> waitFor;
AuUInt32 reschedMs;
2021-06-30 12:00:32 +00:00
AuUInt64 reschedNs;
};
2021-06-27 21:25:29 +00:00
virtual void DispatchFrame(ProcessInfo &info) = 0;
virtual void Shutdown() = 0;
};
2021-07-05 13:35:13 +00:00
template<class Info_t = AVoid, class Result_t = AVoid>
2021-06-27 21:25:29 +00:00
struct FJob
{
std::function<void(const Info_t &, const Result_t &)> onSuccess = 0;
std::function<void(const Info_t &, bool)> onFailure = 0;
};
2021-07-05 13:35:13 +00:00
template<class Info_t = AVoid, class Result_t = AVoid>
2021-06-27 21:25:29 +00:00
struct CJob
{
void(* onSuccess)(const Info_t &, const Result_t &); //
void(* onFailure)(const Info_t &, bool taskNeverDispatched); // called from caller thread if taskNeverDispatched
};
2021-07-05 13:35:13 +00:00
template<class Info_t = AVoid, class Result_t = AVoid>
2021-06-27 21:25:29 +00:00
struct FTask
{
std::function<std::optional<Result_t>(const Info_t &)> onFrame = 0;
};
2021-07-05 13:35:13 +00:00
template<class Info_t = AVoid, class Result_t = AVoid>
2021-06-27 21:25:29 +00:00
struct CTask
{
std::optional<Result_t>(* onFrame)(const Info_t &);
};
2021-06-27 21:25:29 +00:00
class IWorkItem
{
public:
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) = 0;
virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;
virtual AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) = 0;
2021-06-27 21:25:29 +00:00
virtual AuSPtr<IWorkItem> Dispatch() = 0;
2021-06-27 21:25:29 +00:00
virtual bool BlockUntilComplete() = 0;
virtual bool HasFinished() = 0;
virtual bool HasFailed() = 0;
2021-06-27 21:25:29 +00:00
};
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
struct BasicWorkStdFunc : IWorkItemHandler
{
std::function<void()> callback;
std::function<void()> shutdown;
BasicWorkStdFunc(std::function<void()> &&callback, std::function<void()> &&error) : callback(std::move(callback)), shutdown(std::move(shutdown))
2021-06-27 21:25:29 +00:00
{}
BasicWorkStdFunc(std::function<void()> &&callback) : callback(std::move(callback))
{}
BasicWorkStdFunc(const std::function<void()> &callback) : callback(callback)
{}
BasicWorkStdFunc(const std::function<void()> &callback, const std::function<void()> &shutdown) : callback(callback), shutdown(shutdown)
2021-06-27 21:25:29 +00:00
{}
private:
void DispatchFrame(ProcessInfo &info) override
{
2021-06-27 21:25:29 +00:00
try
{
callback();
2021-06-27 21:25:29 +00:00
}
catch (...)
{
Debug::PrintError();
}
}
void Shutdown() override
2021-06-27 21:25:29 +00:00
{
try
{
if (shutdown)
{
shutdown();
}
2021-06-27 21:25:29 +00:00
}
catch (...)
{
Debug::PrintError();
}
}
};
template<typename Info_t, typename Result_t, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
2021-06-27 21:25:29 +00:00
struct BasicWorkCallback : IWorkItemHandler, std::enable_shared_from_this<IWorkItemHandler>
{
BasicWorkCallback()
{
caller = GetAsyncApp()->GetCurrentThread();
}
Task_t task;
Job_t callback;
Info_t input;
private:
2021-06-27 21:25:29 +00:00
2021-06-30 09:28:52 +00:00
static constexpr bool IsCallbackPtr = std::is_pointer_v<Job_t> || is_base_of_template<std::shared_ptr, Job_t>::value;
static constexpr bool IsTaskPtr = std::is_pointer_v<Task_t> || is_base_of_template<std::shared_ptr, Task_t>::value;
2021-06-27 21:25:29 +00:00
WorkerId_t caller;
void DispatchFrame(ProcessInfo &info) override
{
2021-06-27 21:25:29 +00:00
std::optional<Result_t> ret;
if constexpr (IsTaskPtr)
{
ret = task->onFrame(input);
}
else
{
ret = task.onFrame(input);
}
2021-06-30 09:28:52 +00:00
auto pin = std::static_pointer_cast<std::remove_pointer_t<decltype(this)>>(this->shared_from_this());
2021-06-27 21:25:29 +00:00
2021-06-30 09:28:52 +00:00
std::function<void()> func = [ret, pin]()
2021-06-27 21:25:29 +00:00
{
try
{
if (ret.has_value())
{
if constexpr (IsCallbackPtr)
{
2021-06-30 09:28:52 +00:00
pin->callback->onSuccess(pin->input, *ret);
2021-06-27 21:25:29 +00:00
}
else
{
2021-06-30 09:28:52 +00:00
pin->callback.onSuccess(pin->input, *ret);
2021-06-27 21:25:29 +00:00
}
}
else
{
2021-06-30 09:28:52 +00:00
pin->CallOnFailure(false);
2021-06-27 21:25:29 +00:00
}
}
catch (...)
{
Debug::PrintError();
}
};
try
{
if (caller == GetAsyncApp()->GetCurrentThread())
{
func();
}
else
{
2021-06-30 09:28:52 +00:00
std::function<void()> err = [pin]()
2021-06-27 21:25:29 +00:00
{
2021-06-30 09:28:52 +00:00
pin->CallOnFailure(false);
2021-06-27 21:25:29 +00:00
};
// TODO: this is somewhat evil. double alloc when we could reuse this
2021-06-30 09:28:52 +00:00
NewWorkItem(caller, std::make_shared<BasicWorkStdFunc>(func, err))->Dispatch();
2021-06-27 21:25:29 +00:00
}
}
catch (...)
{
Debug::PrintError();
Shutdown();
}
}
void Shutdown() override
{
2021-06-27 21:25:29 +00:00
try
{
2021-06-30 09:28:52 +00:00
CallOnFailure(true);
2021-06-27 21:25:29 +00:00
}
catch (...)
{
Debug::PrintError();
}
}
2021-06-30 09:28:52 +00:00
void CallOnFailure(bool fail)
{
if constexpr (IsCallbackPtr)
{
if constexpr (is_base_of_template<std::function, decltype(callback->onFailure)>::value)
{
if (!callback->onFailure)
{
return;
}
}
callback->onFailure(input, fail);
}
else
{
if constexpr (is_base_of_template<std::function, decltype(callback.onFailure)>::value)
{
if (!callback.onFailure)
{
return;
}
}
callback.onFailure(input, fail);
}
}
2021-06-27 21:25:29 +00:00
};
template<typename Frame_t = std::function<void()>, typename Cleanup_t = std::function<void()>>
2021-06-27 21:25:29 +00:00
struct WorkItemCallabale : IWorkItemHandler
{
Frame_t frame;
Cleanup_t cleanup;
private:
void DispatchFrame(ProcessInfo &info) override
{
frame();
info.type = IWorkItemHandler::EProcessNext::eFinished;
}
void Shutdown() override
{
cleanup();
}
};
2021-07-05 13:35:13 +00:00
template<typename... Args>
2021-07-05 13:35:13 +00:00
static std::function<void(Args...)> TranslateAsyncFunctionToDispatcher(std::function<void(Args...)> func)
2021-06-27 21:25:29 +00:00
{
2021-07-05 13:35:13 +00:00
auto cur = GetAsyncApp()->GetCurrentThread();
return [=](Args... in) -> void
{
auto work = std::make_shared<BasicWorkStdFunc>([=]() -> void {
func(in...);
});
auto workItem = NewWorkItem(cur, work);
if (!workItem) throw "can't dispatch async call; out of memory";
workItem->Dispatch();
};
}
2021-06-27 21:25:29 +00:00
2021-07-05 13:35:13 +00:00
template<typename B, typename... Args>
static std::function<void(std::function<void(const B&)>, Args...)> TranslateAsyncReturnableFunctionToDispatcher(std::function<B(Args...)> func)
{
auto cur = GetAsyncApp()->GetCurrentThread();
return [=](std::function<void(const B&)> callback, Args... in) -> void
{
auto work = std::make_shared<BasicWorkCallback<AVoid, B>>();
work.task.onProcess = [](const AVoid &) -> std::optional<B>
{
return func(in...);
};
work.callback.onSuccess = [=](const AVoid &, const B &ret)
{
callback(ret);
};
auto workItem = NewWorkItem(cur, work);
if (!workItem) throw "can't dispatch async call; out of memory";
workItem->Dispatch();
};
2021-06-27 21:25:29 +00:00
}
class IAsyncApp
{
public:
// Main thread logic
2021-06-30 09:28:52 +00:00
virtual void Start() = 0;
2021-06-27 21:25:29 +00:00
virtual void Main() = 0;
virtual void Shutdown() = 0;
virtual bool Exiting() = 0;
// Spawning
virtual bool Spawn(WorkerId_t) = 0;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0;
2021-06-30 09:28:52 +00:00
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() = 0;
2021-06-27 21:25:29 +00:00
virtual WorkerId_t GetCurrentThread() = 0;
// Synchronization
virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0;
virtual void Signal(ThreadGroup_t group) = 0;
virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop
virtual bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop
2021-06-27 21:25:29 +00:00
virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0;
2021-06-27 21:25:29 +00:00
virtual void SyncAllSafe() = 0;
// Features
virtual void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async = false) = 0;
// Debug
virtual void AssertInThreadGroup(ThreadGroup_t thread) = 0;
virtual void AssertWorker(WorkerId_t id) = 0;
2021-07-05 13:35:13 +00:00
virtual bool Poll(bool block) = 0;
2021-06-27 21:25:29 +00:00
};
}