Reece
f559897b42
[-] Removed WorkerId_t typedef [*] Added shared support to some older threading apis that have yet to be updated
604 lines
20 KiB
C++
604 lines
20 KiB
C++
/***
|
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: Async.hpp
|
|
Date: 2021-7-14
|
|
Author: Reece
|
|
***/
|
|
#pragma once
|
|
|
|
namespace Aurora::Loop
|
|
{
|
|
class ILoopSource;
|
|
}
|
|
|
|
namespace Aurora::Async
|
|
{
|
|
class IWorkItem;
|
|
class IAsyncApp;
|
|
|
|
using AVoid = AuUInt8;
|
|
|
|
AUKN_SYM IAsyncApp *GetAsyncApp();
|
|
|
|
/// ThreadGroup_t:
|
|
/// 0 = system main thread
|
|
/// 1+ = user defined
|
|
using ThreadGroup_t = AuUInt8;
|
|
|
|
/// ThreadId_t:
|
|
/// -1 = invalid
|
|
/// index = tid/runner id
|
|
using ThreadId_t = AuUInt16;
|
|
|
|
static const ThreadId_t kThreadIdAny = -1;
|
|
|
|
struct WorkerId_t : AuPair<ThreadGroup_t, ThreadId_t>
|
|
{
|
|
WorkerId_t() : AuPair<ThreadGroup_t, ThreadId_t>(0, 0)
|
|
{}
|
|
|
|
WorkerId_t(ThreadGroup_t group) : AuPair<ThreadGroup_t, ThreadId_t>(group, kThreadIdAny)
|
|
{}
|
|
|
|
WorkerId_t(ThreadGroup_t group, ThreadId_t id) : AuPair<ThreadGroup_t, ThreadId_t>(group, id)
|
|
{}
|
|
|
|
WorkerId_t(const WorkerId_t &cpy) : AuPair<ThreadGroup_t, ThreadId_t>(cpy.first, cpy.second)
|
|
{}
|
|
};
|
|
|
|
struct WorkPriv
|
|
{
|
|
AuUInt32 magic;
|
|
};
|
|
|
|
struct IWorkItemHandler
|
|
{
|
|
enum class EProcessNext
|
|
{
|
|
eInvalid = -1,
|
|
eFinished = 0,
|
|
eRerun,
|
|
eSchedule,
|
|
eFailed
|
|
};
|
|
|
|
struct ProcessInfo
|
|
{
|
|
ProcessInfo(bool finished) : type(finished ? EProcessNext::eFinished : EProcessNext::eFailed) {}
|
|
ProcessInfo(EProcessNext type) : type(type) {}
|
|
ProcessInfo(const AuList<AuSPtr<IWorkItem>> &blockedBy) : type(EProcessNext::eSchedule), waitFor(blockedBy) {}
|
|
// ...
|
|
|
|
EProcessNext type;
|
|
AuList<AuSPtr<IWorkItem>> waitFor;
|
|
AuUInt32 reschedMs;
|
|
AuUInt64 reschedNs;
|
|
};
|
|
|
|
virtual void DispatchFrame(ProcessInfo &info) = 0;
|
|
virtual void Shutdown() = 0;
|
|
|
|
virtual void *GetPrivateData() { return nullptr; }
|
|
};
|
|
|
|
template<class Info_t = AVoid, class Result_t = AVoid>
|
|
struct FJob
|
|
{
|
|
std::function<void(const Info_t &, const Result_t &)> onSuccess = 0;
|
|
std::function<void(const Info_t &, bool)> onFailure = 0;
|
|
};
|
|
|
|
template<class Info_t = AVoid, class Result_t = AVoid>
|
|
static inline FJob<Info_t, Result_t> JobFromConsumer(const AuConsumer<const Info_t &, const Result_t &> &onSuccess)
|
|
{
|
|
FJob<Info_t, Result_t> ret;
|
|
ret.onSuccess = [=](const Info_t &in, const Result_t &a)
|
|
{
|
|
onSuccess(in, a);
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
template<class Info_t = AVoid, class Result_t = AVoid>
|
|
static inline FJob<Info_t, Result_t> JobFromConsumer(const AuConsumer<const Info_t &, const Result_t &> &onSuccess, const AuConsumer<const Info_t &, bool/*neverDispatched*/> &onFailure)
|
|
|
|
{
|
|
FJob<Info_t, Result_t> ret;
|
|
ret.onSuccess = [=](const Info_t &in, const Result_t &a)
|
|
{
|
|
onSuccess(in, a);
|
|
};
|
|
ret.onFailure = [=](const Info_t &a, bool neverDispatched)
|
|
{
|
|
onFailure(a, neverDispatched);
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
template<class Info_t = AVoid, class Result_t = AVoid>
|
|
static inline FJob<Info_t, Result_t> JobFromConsumer(const AuConsumer<const Info_t &, const Result_t &> &onSuccess, const AuConsumer<const Info_t &> &onFailure)
|
|
|
|
{
|
|
FJob<Info_t, Result_t> ret;
|
|
ret.onSuccess = [=](const Info_t &in, const Result_t &a)
|
|
{
|
|
onSuccess(in, a);
|
|
};
|
|
ret.onFailure = [=](const Info_t &a, bool neverDispatched)
|
|
{
|
|
onFailure(a);
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
using FVoidJob = FJob<AVoid, AVoid>;
|
|
|
|
template<class Info_t = AVoid, class Result_t = AVoid>
|
|
struct CJob
|
|
{
|
|
void(* onSuccess)(const Info_t &, const Result_t &); //
|
|
void(* onFailure)(const Info_t &, bool taskNeverDispatched); // called from caller thread if taskNeverDispatched
|
|
};
|
|
|
|
template<class Info_t = AVoid, class Result_t = AVoid>
|
|
struct FTask
|
|
{
|
|
std::function<AuOptional<Result_t>(const Info_t &)> onFrame = 0;
|
|
};
|
|
|
|
using FVoidTask = FTask<AVoid, AVoid>;
|
|
|
|
template<class In_t = Async::AVoid>
|
|
static inline FTask<In_t, AVoid> TaskFromConsumerRefT(const AuConsumer<const In_t &> &func)
|
|
{
|
|
FTask<In_t, AVoid> ret;
|
|
ret.onFrame = [=](const In_t &a) -> AuOptional<AVoid>
|
|
{
|
|
func(a);
|
|
return AVoid{};
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
|
|
template<class In_t = Async::AVoid, class Out_t = Async::AVoid>
|
|
static inline FTask<In_t, AVoid> TaskFromConsumerRefT(const AuSupplierConsumer<AuOptional<Out_t>, const In_t &> &func)
|
|
{
|
|
FTask<In_t, Out_t> ret;
|
|
ret.onFrame = [=](const In_t &a) -> AuOptional<Out_t>
|
|
{
|
|
return func(a);
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
static inline FTask<AVoid, AVoid> TaskFromFunctional(const AuVoidFunc &func)
|
|
{
|
|
FTask<AVoid, AVoid> ret;
|
|
ret.onFrame = [=](const AVoid &a) -> AuOptional<AVoid>
|
|
{
|
|
func();
|
|
return AVoid{};
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
|
|
|
|
template<class Info_t = AVoid, class Result_t = AVoid>
|
|
struct CTask
|
|
{
|
|
AuOptional<Result_t>(* onFrame)(const Info_t &);
|
|
};
|
|
|
|
class IWorkItem
|
|
{
|
|
public:
|
|
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) = 0;
|
|
virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
|
|
|
|
// ms = time relative to the current time
|
|
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;
|
|
|
|
// ns = time relative to the current time
|
|
virtual AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) = 0;
|
|
|
|
// ms = time relative to the time at which the work item would otherwise dispatch
|
|
virtual AuSPtr<IWorkItem> AddDelayTime(AuUInt32 ms) = 0;
|
|
|
|
// ns = time relative to the time at which the work item would otherwise dispatch
|
|
virtual AuSPtr<IWorkItem> AddDelayTimeNs(AuUInt64 ns) = 0;
|
|
|
|
virtual AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) = 0;
|
|
|
|
virtual AuSPtr<IWorkItem> Dispatch() = 0;
|
|
|
|
virtual bool BlockUntilComplete() = 0;
|
|
virtual bool HasFinished() = 0;
|
|
virtual bool HasFailed() = 0;
|
|
virtual void Cancel() = 0;
|
|
|
|
virtual void *GetPrivateData() = 0;
|
|
virtual AuOptional<void *> ToWorkResultT() = 0;
|
|
};
|
|
|
|
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
|
|
AUKN_SYM AuSPtr<IWorkItem> NewFence();
|
|
|
|
class IAsyncApp
|
|
{
|
|
public:
|
|
// Main thread logic
|
|
virtual void Start() = 0;
|
|
virtual void Main() = 0;
|
|
virtual void Shutdown() = 0;
|
|
virtual bool Exiting() = 0;
|
|
virtual void SetConsoleCommandDispatcher(WorkerId_t id) = 0;
|
|
|
|
// Spawning
|
|
virtual bool Spawn(WorkerId_t) = 0;
|
|
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0;
|
|
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() = 0;
|
|
virtual WorkerId_t GetCurrentThread() = 0;
|
|
|
|
// Synchronization
|
|
// Note: syncing to yourself will nullify requireSignal to prevent deadlock
|
|
virtual bool Sync(WorkerId_t group, AuUInt32 timeoutMs = 0, bool requireSignal = false) = 0;
|
|
virtual void Signal(WorkerId_t group) = 0;
|
|
virtual void SyncAllSafe() = 0;
|
|
|
|
// Features
|
|
virtual void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async = false) = 0;
|
|
|
|
// Debug
|
|
virtual void AssertInThreadGroup(ThreadGroup_t thread) = 0;
|
|
virtual void AssertWorker(WorkerId_t id) = 0;
|
|
|
|
virtual bool Poll(bool block) = 0;
|
|
|
|
virtual bool ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) = 0;
|
|
};
|
|
|
|
|
|
#pragma region EASE_OF_READING
|
|
struct BasicWorkStdFunc : IWorkItemHandler
|
|
{
|
|
std::function<void()> callback;
|
|
std::function<void()> shutdown;
|
|
|
|
BasicWorkStdFunc(std::function<void()> &&callback, std::function<void()> &&error) : callback(std::move(callback)), shutdown(std::move(error))
|
|
{}
|
|
|
|
BasicWorkStdFunc(std::function<void()> &&callback) : callback(std::move(callback))
|
|
{}
|
|
|
|
BasicWorkStdFunc(const std::function<void()> &callback) : callback(callback)
|
|
{}
|
|
|
|
BasicWorkStdFunc(const std::function<void()> &callback, const std::function<void()> &shutdown) : callback(callback), shutdown(shutdown)
|
|
{}
|
|
|
|
private:
|
|
#if !defined(_CPPSHARP)
|
|
void DispatchFrame(ProcessInfo &info) override
|
|
{
|
|
try
|
|
{
|
|
callback();
|
|
}
|
|
catch (...)
|
|
{
|
|
Debug::PrintError();
|
|
}
|
|
}
|
|
|
|
void Shutdown() override
|
|
{
|
|
try
|
|
{
|
|
if (shutdown)
|
|
{
|
|
shutdown();
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
Debug::PrintError();
|
|
}
|
|
}
|
|
#endif
|
|
};
|
|
|
|
|
|
#if !defined(_CPPSHARP)
|
|
/// @hideinitializer
|
|
struct BasicWorkCtx : WorkPriv
|
|
{
|
|
BasicWorkCtx()
|
|
{
|
|
magic = AuConvertMagicTag32("BWOT");
|
|
opt = nullptr;
|
|
}
|
|
void *opt;
|
|
};
|
|
|
|
/// @hideinitializer
|
|
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
|
|
struct BasicWorkCallback : IWorkItemHandler, std::enable_shared_from_this<IWorkItemHandler>
|
|
{
|
|
BasicWorkCallback()
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
BasicWorkCallback(Task_t &&task) : task(std::move(task))
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
BasicWorkCallback(Task_t &&task, Job_t &&callback) : task(std::move(task)), callback(std::move(callback))
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
BasicWorkCallback(const Task_t &task) : task(task)
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
BasicWorkCallback(const Task_t &task, const Job_t &callback) : task(task), callback(callback)
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
BasicWorkCallback(const Task_t &task, const Job_t &callback, const Info_t &info) : task(task), callback(callback), input(info)
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
BasicWorkCallback(Task_t &&task, Job_t &&callback, Info_t &&info) : task(std::move(task)), callback(std::move(callback)), input(std::move(info))
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
BasicWorkCallback(const Task_t &task, const Job_t &callback, Info_t &&info) : task(task), callback(callback), input(std::move(info))
|
|
{
|
|
caller = GetAsyncApp()->GetCurrentThread();
|
|
}
|
|
|
|
Info_t input;
|
|
Task_t task;
|
|
Job_t callback;
|
|
|
|
|
|
BasicWorkCallback<Info_t, Result_t, Task_t, Job_t> &SetTask(const Task_t &task)
|
|
{
|
|
this->task = task;
|
|
return *this;
|
|
}
|
|
|
|
BasicWorkCallback<Info_t, Result_t, Task_t, Job_t> &SetTask(const Job_t &callback)
|
|
{
|
|
this->callback = callback;
|
|
return *this;
|
|
}
|
|
|
|
private:
|
|
|
|
static constexpr bool IsCallbackPtr = std::is_pointer_v<Job_t> || AuIsBaseOfTemplate<std::shared_ptr, Job_t>::value;
|
|
static constexpr bool IsTaskPtr = std::is_pointer_v<Task_t> || AuIsBaseOfTemplate<std::shared_ptr, Task_t>::value;
|
|
|
|
WorkerId_t caller;
|
|
|
|
BasicWorkCtx secretContext_;
|
|
AuOptional<Result_t> resultValue_;
|
|
|
|
virtual void *GetPrivateData() override { return &secretContext_; }
|
|
|
|
void DispatchFrame(ProcessInfo &info) override
|
|
{
|
|
try
|
|
{
|
|
if constexpr (IsTaskPtr)
|
|
{
|
|
resultValue_ = task->onFrame(input);
|
|
}
|
|
else
|
|
{
|
|
resultValue_ = task.onFrame(input);
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
Debug::PrintError();
|
|
Shutdown();
|
|
}
|
|
|
|
auto pin = std::static_pointer_cast<std::remove_pointer_t<decltype(this)>>(this->shared_from_this());
|
|
|
|
std::function<void()> func = [pin]()
|
|
{
|
|
try
|
|
{
|
|
if (pin->resultValue_.has_value())
|
|
{
|
|
pin->secretContext_.opt = &pin->resultValue_.value();
|
|
if constexpr (IsCallbackPtr)
|
|
{
|
|
pin->callback->onSuccess(pin->input, *pin->resultValue_);
|
|
}
|
|
else
|
|
{
|
|
pin->callback.onSuccess(pin->input, *pin->resultValue_);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
pin->CallOnFailure(false);
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
Debug::PrintError();
|
|
}
|
|
};
|
|
|
|
try
|
|
{
|
|
if (caller == GetAsyncApp()->GetCurrentThread())
|
|
{
|
|
func();
|
|
}
|
|
else
|
|
{
|
|
std::function<void()> err = [pin]()
|
|
{
|
|
pin->CallOnFailure(false);
|
|
};
|
|
|
|
// TODO: this is somewhat evil. double alloc when we could reuse this
|
|
NewWorkItem(caller, AuMakeShared<BasicWorkStdFunc>(func, err))->Dispatch();
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
Debug::PrintError();
|
|
Shutdown();
|
|
}
|
|
}
|
|
|
|
void Shutdown() override
|
|
{
|
|
try
|
|
{
|
|
CallOnFailure(true);
|
|
}
|
|
catch (...)
|
|
{
|
|
Debug::PrintError();
|
|
}
|
|
}
|
|
|
|
void CallOnFailure(bool fail)
|
|
{
|
|
if constexpr (IsCallbackPtr)
|
|
{
|
|
if constexpr (AuIsBaseOfTemplate<std::function, decltype(callback->onFailure)>::value)
|
|
{
|
|
if (!callback->onFailure)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
callback->onFailure(input, fail);
|
|
}
|
|
else
|
|
{
|
|
if constexpr (AuIsBaseOfTemplate<std::function, decltype(callback.onFailure)>::value)
|
|
{
|
|
if (!callback.onFailure)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
callback.onFailure(input, fail);
|
|
}
|
|
}
|
|
};
|
|
|
|
/// @hideinitializer
|
|
template<typename Frame_t = std::function<void()>, typename Cleanup_t = std::function<void()>>
|
|
struct WorkItemCallable : IWorkItemHandler
|
|
{
|
|
Frame_t frame;
|
|
Cleanup_t cleanup;
|
|
|
|
private:
|
|
void DispatchFrame(ProcessInfo &info) override
|
|
{
|
|
frame();
|
|
info.type = IWorkItemHandler::EProcessNext::eFinished;
|
|
}
|
|
|
|
void Shutdown() override
|
|
{
|
|
cleanup();
|
|
}
|
|
};
|
|
|
|
#define ASYNC_ERROR(exp) { if constexpr (std::is_same_v<T, bool>) { SysPushErrorGen(exp); return {}; } else { throw std::string(exp); } }
|
|
#define ASYNC_FINISH { if constexpr (std::is_same_v<T, bool>) { return true; } }
|
|
|
|
template<typename T = void, typename... Args, AU_TEMPLATE_ENABLE_WHEN(std::is_same_v<T, bool> || std::is_void<T>::value)>
|
|
static std::function<T(Args&&...)> TranslateAsyncFunctionToDispatcherWithThread(WorkerId_t id, std::function<void(Args...)> func)
|
|
{
|
|
return [=](Args&&... in) -> T
|
|
{
|
|
auto work = AuMakeShared<BasicWorkStdFunc>([=]() -> void {
|
|
func(in...);
|
|
});
|
|
if (!work) ASYNC_ERROR("can't dispatch async call; out of memory");
|
|
auto workItem = NewWorkItem(id, work);
|
|
if (!workItem) ASYNC_ERROR("can't dispatch async call; out of memory");
|
|
workItem->Dispatch();
|
|
ASYNC_FINISH;
|
|
};
|
|
}
|
|
|
|
template<typename T = void, typename... Args, AU_TEMPLATE_ENABLE_WHEN(std::is_same_v<T, bool> || std::is_void<T>::value)>
|
|
static std::function<T(Args&&...)> TranslateAsyncFunctionToDispatcher(std::function<void(Args...)> func)
|
|
{
|
|
return TranslateAsyncFunctionToDispatcherWithThread(GetAsyncApp()->GetCurrentThread(), func);
|
|
}
|
|
|
|
template<typename B = void, typename T, typename... Args, AU_TEMPLATE_ENABLE_WHEN(std::is_same_v<T, bool> || std::is_void<T>::value)>
|
|
static std::function<T(std::function<void(const B&)>, Args...)> TranslateAsyncReturnableFunctionToDispatcherWithThread(WorkerId_t id, std::function<AuOptional<B>(Args...)> func)
|
|
{
|
|
return [=](std::function<T(const B&)> callback, Args... in) -> T
|
|
{
|
|
auto work = AuMakeShared<BasicWorkCallback<AVoid, B>>();
|
|
if (!work) ASYNC_ERROR("can't dispatch async call; out of memory");
|
|
work.task.onProcess = [=](const AVoid &) -> AuOptional<B>
|
|
{
|
|
return func(in...);
|
|
};
|
|
work.callback.onSuccess = [=](const AVoid &, const B &ret)
|
|
{
|
|
callback(ret);
|
|
};
|
|
auto workItem = NewWorkItem(id, work);
|
|
if (!workItem) ASYNC_ERROR("can't dispatch async call; out of memory");
|
|
workItem->Dispatch();
|
|
ASYNC_FINISH;
|
|
};
|
|
}
|
|
|
|
template<typename B = void, typename T, typename... Args, AU_TEMPLATE_ENABLE_WHEN(std::is_same_v<T, bool> || std::is_void<T>::value)>
|
|
static std::function<T(std::function<void(const B&)>, Args...)> TranslateAsyncReturnableFunctionToDispatcher(std::function<AuOptional<B>(Args...)> func)
|
|
{
|
|
return TranslateAsyncReturnableFunctionToDispatcherWithThread(GetAsyncApp()->GetCurrentThread(), func);
|
|
}
|
|
|
|
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
|
|
static AuSPtr<Async::IWorkItem> DispatchBasicWorkCallback(const WorkerId_t &worker, const Task_t &task, const Job_t &job, bool enableWait = false)
|
|
{
|
|
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job), enableWait)->Dispatch();
|
|
}
|
|
|
|
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
|
|
static AuSPtr<Async::IWorkItem> DispatchBasicWorkCallback(const WorkerId_t &worker, const Task_t &task, const Job_t &job, const Info_t &inputParameters, bool enableWait = false)
|
|
{
|
|
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job, inputParameters), enableWait)->Dispatch();
|
|
}
|
|
|
|
#undef ASYNC_ERROR
|
|
#undef ASYNC_FINISH
|
|
|
|
#endif
|
|
|
|
#pragma endregion EASE_OF_READING
|
|
}
|