[+] Begin work on the Async subsystem
This commit is contained in:
parent
95b6854f92
commit
452b49b610
@ -154,8 +154,8 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
static constexpr bool IsCallbackPtr = std::is_pointer_v<Job_t> || is_base_of_template<AuSPtr, Job_t>;
|
static constexpr bool IsCallbackPtr = std::is_pointer_v<Job_t> || is_base_of_template<std::shared_ptr, Job_t>::value;
|
||||||
static constexpr bool IsTaskPtr = std::is_pointer_v<Task_t> || is_base_of_template<AuSPtr, Task_t>;
|
static constexpr bool IsTaskPtr = std::is_pointer_v<Task_t> || is_base_of_template<std::shared_ptr, Task_t>::value;
|
||||||
|
|
||||||
WorkerId_t caller;
|
WorkerId_t caller;
|
||||||
|
|
||||||
@ -172,9 +172,9 @@ namespace Aurora::Async
|
|||||||
ret = task.onFrame(input);
|
ret = task.onFrame(input);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto pin = this->shared_from_this();
|
auto pin = std::static_pointer_cast<std::remove_pointer_t<decltype(this)>>(this->shared_from_this());
|
||||||
|
|
||||||
std::function<void()> func = [ret, &callback, &input, pin]()
|
std::function<void()> func = [ret, pin]()
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -182,23 +182,16 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
if constexpr (IsCallbackPtr)
|
if constexpr (IsCallbackPtr)
|
||||||
{
|
{
|
||||||
callback->onSuccess(input, *ret);
|
pin->callback->onSuccess(pin->input, *ret);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
callback.onSuccess(input, *ret);
|
pin->callback.onSuccess(pin->input, *ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if constexpr (IsCallbackPtr)
|
pin->CallOnFailure(false);
|
||||||
{
|
|
||||||
callback->onFailure(input, false);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
callback.onFailure(input, false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
@ -215,21 +208,13 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
std::function<void()> err = [&callback, &input, pin]()
|
std::function<void()> err = [pin]()
|
||||||
{
|
{
|
||||||
// TODO: we shold probably pass: is caller thread: false, has finished: true
|
pin->CallOnFailure(false);
|
||||||
if constexpr (IsCallbackPtr)
|
|
||||||
{
|
|
||||||
callback->onFailure(input, false);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
callback.onFailure(input, false);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: this is somewhat evil. double alloc when we could reuse this
|
// TODO: this is somewhat evil. double alloc when we could reuse this
|
||||||
WorkItemShared(caller, std::make_shared<BasicWorkStdFunc>(func, err))->Dispatch();
|
NewWorkItem(caller, std::make_shared<BasicWorkStdFunc>(func, err))->Dispatch();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
@ -243,20 +228,40 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if constexpr (IsCallbackPtr)
|
CallOnFailure(true);
|
||||||
{
|
|
||||||
callback->onFailure(true);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
callback.onFailure(true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
Debug::PrintError();
|
Debug::PrintError();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CallOnFailure(bool fail)
|
||||||
|
{
|
||||||
|
if constexpr (IsCallbackPtr)
|
||||||
|
{
|
||||||
|
if constexpr (is_base_of_template<std::function, decltype(callback->onFailure)>::value)
|
||||||
|
{
|
||||||
|
if (!callback->onFailure)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
callback->onFailure(input, fail);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if constexpr (is_base_of_template<std::function, decltype(callback.onFailure)>::value)
|
||||||
|
{
|
||||||
|
if (!callback.onFailure)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
callback.onFailure(input, fail);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
template<typename Frame_t = std::function<void()>, typename Cleanup_t = std::function<void()>>
|
template<typename Frame_t = std::function<void()>, typename Cleanup_t = std::function<void()>>
|
||||||
@ -288,6 +293,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
// Main thread logic
|
// Main thread logic
|
||||||
|
virtual void Start() = 0;
|
||||||
virtual void Main() = 0;
|
virtual void Main() = 0;
|
||||||
virtual void Shutdown() = 0;
|
virtual void Shutdown() = 0;
|
||||||
virtual bool Exiting() = 0;
|
virtual bool Exiting() = 0;
|
||||||
@ -295,14 +301,14 @@ namespace Aurora::Async
|
|||||||
// Spawning
|
// Spawning
|
||||||
virtual bool Spawn(WorkerId_t) = 0;
|
virtual bool Spawn(WorkerId_t) = 0;
|
||||||
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0;
|
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0;
|
||||||
virtual AuBST<ThreadGroup_t, AuList<AuUInt8>> GetThreads() = 0;
|
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() = 0;
|
||||||
virtual WorkerId_t GetCurrentThread() = 0;
|
virtual WorkerId_t GetCurrentThread() = 0;
|
||||||
|
|
||||||
// Synchronization
|
// Synchronization
|
||||||
virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0;
|
virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0;
|
||||||
virtual void Signal(ThreadGroup_t group) = 0;
|
virtual void Signal(ThreadGroup_t group) = 0;
|
||||||
|
|
||||||
virtual bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop
|
virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop
|
||||||
|
|
||||||
virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0;
|
virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0;
|
||||||
|
|
||||||
|
@ -13,17 +13,29 @@ namespace Aurora::Threading
|
|||||||
class LockGuardPtr
|
class LockGuardPtr
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
LockGuardPtr(T *lock) : lockReference_(lock)
|
LockGuardPtr(T *lock) : annoying_(lock)
|
||||||
{
|
{
|
||||||
lockReference_->Lock();
|
lock->Lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
LockGuardPtr(T &lock) : annoying_(&lock)
|
||||||
|
{
|
||||||
|
lock->Lock();
|
||||||
}
|
}
|
||||||
|
|
||||||
~LockGuardPtr()
|
~LockGuardPtr()
|
||||||
{
|
{
|
||||||
lockReference_->Unlock();
|
if constexpr (is_base_of_template<std::shared_ptr, T>::value || is_base_of_template<std::unique_ptr, T>::value)
|
||||||
|
{
|
||||||
|
annoying_->get()->Unlock();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
annoying_->Unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
T *lockReference_;
|
T *annoying_;
|
||||||
};
|
};
|
||||||
}
|
}
|
@ -9,9 +9,6 @@
|
|||||||
|
|
||||||
namespace Aurora::Threading::Threads
|
namespace Aurora::Threading::Threads
|
||||||
{
|
{
|
||||||
template<typename T>
|
// TODO: #if defined(fibers) static TLSVariable<type, true> name #endif
|
||||||
class TLSStaticVariable
|
#define STATIC_TLS(type, name) static thread_local type name;
|
||||||
{
|
|
||||||
// TODO: implement templates for a thread feature based tls implementation
|
|
||||||
};
|
|
||||||
}
|
}
|
@ -9,7 +9,7 @@
|
|||||||
|
|
||||||
namespace Aurora::Threading::Threads
|
namespace Aurora::Threading::Threads
|
||||||
{
|
{
|
||||||
template<typename T>
|
template<typename T, bool isStatic = false>
|
||||||
class TLSVariable
|
class TLSVariable
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
@ -19,18 +19,47 @@ namespace Aurora::Threading::Threads
|
|||||||
TLSVariable() {}
|
TLSVariable() {}
|
||||||
~TLSVariable()
|
~TLSVariable()
|
||||||
{
|
{
|
||||||
GetThread()->GetTlsView()->Remove(GetHandle());
|
if constexpr (!isStatic)
|
||||||
|
{
|
||||||
|
GetThread()->GetTlsView()->Remove(GetHandle());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
T& operator ->()
|
||||||
|
{
|
||||||
|
return Get();
|
||||||
|
}
|
||||||
|
|
||||||
|
operator T&()
|
||||||
|
{
|
||||||
|
return Get();
|
||||||
|
}
|
||||||
|
|
||||||
|
TLSVariable& operator =(const T & val)
|
||||||
|
{
|
||||||
|
Get() = val;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
AuUInt64 GetHandle()
|
AuUInt64 GetHandle()
|
||||||
{
|
{
|
||||||
return (AuUInt64(reinterpret_cast<AuUInt>(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyResettablePointerHandle;
|
if constexpr (isStatic)
|
||||||
|
{
|
||||||
|
return (AuUInt64(reinterpret_cast<AuUInt>(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyStaticPointerHandle;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return (AuUInt64(reinterpret_cast<AuUInt>(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyResettablePointerHandle;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
T &Get()
|
T &Get()
|
||||||
{
|
{
|
||||||
auto view = GetThread()->GetTlsView();
|
auto view = GetThread()->GetTlsView();
|
||||||
auto ptr = view->GetOrSetup(GetHandle(),
|
auto ptr = view->GetOrSetup(GetHandle(),
|
||||||
|
sizeof(T),
|
||||||
[](void *buffer) -> void
|
[](void *buffer) -> void
|
||||||
{
|
{
|
||||||
if constexpr (std::is_class_v<T>)
|
if constexpr (std::is_class_v<T>)
|
||||||
@ -51,16 +80,6 @@ namespace Aurora::Threading::Threads
|
|||||||
});
|
});
|
||||||
return *reinterpret_cast<T *>(ptr);
|
return *reinterpret_cast<T *>(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
T& operator ->()
|
|
||||||
{
|
|
||||||
return Get();
|
|
||||||
}
|
|
||||||
|
|
||||||
TLSVariable& operator =(const T & val)
|
|
||||||
{
|
|
||||||
Get() = val;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
@ -73,4 +73,6 @@ name ## Shared_t name ## Shared(T... args) \
|
|||||||
|
|
||||||
#if !defined(NO__NEW)
|
#if !defined(NO__NEW)
|
||||||
#define _new new (std::nothrow)
|
#define _new new (std::nothrow)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
//#define AU_FWD(var) std::forward<decltype(var)>(var)
|
||||||
|
@ -96,7 +96,7 @@ static inline AuString ToUpper(const AuString &in)
|
|||||||
return ToStringASCIIOp<int(*)(int)>(std::toupper, in);
|
return ToStringASCIIOp<int(*)(int)>(std::toupper, in);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Map, typename Key, typename Value>
|
template<typename Map, class Key, typename Value>
|
||||||
static inline bool TryFind(Map &map, const Key &key, Value *&ptr)
|
static inline bool TryFind(Map &map, const Key &key, Value *&ptr)
|
||||||
{
|
{
|
||||||
auto itr = map.find(key);
|
auto itr = map.find(key);
|
||||||
@ -112,7 +112,7 @@ static inline bool TryFind(Map &map, const Key &key, Value *&ptr)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Map, typename Key, typename Value>
|
template<typename Map, class Key, typename Value>
|
||||||
static inline bool TryFind(Map *map, const Key &key, Value *&ptr)
|
static inline bool TryFind(Map *map, const Key &key, Value *&ptr)
|
||||||
{
|
{
|
||||||
auto itr = map->find(key);
|
auto itr = map->find(key);
|
||||||
@ -128,7 +128,7 @@ static inline bool TryFind(Map *map, const Key &key, Value *&ptr)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Map, typename Key>
|
template<typename Map, class Key>
|
||||||
static inline bool TryFind(Map &map, const Key &key)
|
static inline bool TryFind(Map &map, const Key &key)
|
||||||
{
|
{
|
||||||
auto itr = map.find(key);
|
auto itr = map.find(key);
|
||||||
@ -142,7 +142,7 @@ static inline bool TryFind(Map &map, const Key &key)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Map, typename Key>
|
template<typename Map, class Key>
|
||||||
static inline bool TryFind(Map *map, const Key &key)
|
static inline bool TryFind(Map *map, const Key &key)
|
||||||
{
|
{
|
||||||
auto itr = map->find(key);
|
auto itr = map->find(key);
|
||||||
@ -155,7 +155,7 @@ static inline bool TryFind(Map *map, const Key &key)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
template<typename Map, typename Key, typename Value>
|
template<typename Map, class Key, typename Value>
|
||||||
static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr)
|
static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr)
|
||||||
{
|
{
|
||||||
auto itr = map.find(key);
|
auto itr = map.find(key);
|
||||||
@ -171,7 +171,7 @@ static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Map, typename Key>
|
template<typename Map, class Key>
|
||||||
static inline bool TryDelete(Map &map, const Key &key)
|
static inline bool TryDelete(Map &map, const Key &key)
|
||||||
{
|
{
|
||||||
auto itr = map.find(key);
|
auto itr = map.find(key);
|
||||||
@ -186,7 +186,7 @@ static inline bool TryDelete(Map &map, const Key &key)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename List, typename Key>
|
template<typename List, class Key>
|
||||||
static inline bool TryDeleteList(List &list, const Key &key)
|
static inline bool TryDeleteList(List &list, const Key &key)
|
||||||
{
|
{
|
||||||
auto itr = std::find(list.begin(), list.end(), key);
|
auto itr = std::find(list.begin(), list.end(), key);
|
||||||
@ -363,6 +363,8 @@ static inline AuList<AuString> SplitString(const AuString& str, const AuString&
|
|||||||
return tokens;
|
return tokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// more copy/pasta. work smart, not hard.
|
||||||
|
// i dont want to waste time working out template kinks between clang and msvc
|
||||||
template < template <typename...> class base,typename derived>
|
template < template <typename...> class base,typename derived>
|
||||||
struct is_base_of_template_impl
|
struct is_base_of_template_impl
|
||||||
{
|
{
|
||||||
@ -373,4 +375,4 @@ struct is_base_of_template_impl
|
|||||||
};
|
};
|
||||||
|
|
||||||
template < template <typename...> class base,typename derived>
|
template < template <typename...> class base,typename derived>
|
||||||
using is_base_of_template = typename is_base_of_template_impl<base,derived>::type;
|
using is_base_of_template = typename is_base_of_template_impl<base,derived>::type;
|
@ -9,5 +9,38 @@
|
|||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
|
class IAsyncRunnable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual void RunAsync() = 0;
|
||||||
|
|
||||||
|
virtual void CancelAsync() {}
|
||||||
|
};
|
||||||
|
|
||||||
|
class AsyncFuncRunnable : public IAsyncRunnable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
std::function<void()> callback;
|
||||||
|
|
||||||
|
AsyncFuncRunnable(std::function<void()> &&callback) : callback(std::move(callback))
|
||||||
|
{}
|
||||||
|
|
||||||
|
AsyncFuncRunnable(const std::function<void()> &callback) : callback(callback)
|
||||||
|
{}
|
||||||
|
|
||||||
|
void RunAsync() override
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
Debug::PrintError();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
void InitAsync();
|
void InitAsync();
|
||||||
}
|
}
|
@ -6,9 +6,562 @@
|
|||||||
Author: Reece
|
Author: Reece
|
||||||
***/
|
***/
|
||||||
#include <RuntimeInternal.hpp>
|
#include <RuntimeInternal.hpp>
|
||||||
|
#include "Async.hpp"
|
||||||
#include "AsyncApp.hpp"
|
#include "AsyncApp.hpp"
|
||||||
|
#include "WorkItem.hpp"
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
|
static AsyncApp gAsyncApp;
|
||||||
|
static std::atomic_int gRunningTasks {};
|
||||||
|
|
||||||
|
//STATIC_TLS(WorkerId_t, tlsWorkerId);
|
||||||
|
static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId;
|
||||||
|
|
||||||
|
struct ThreadState
|
||||||
|
{
|
||||||
|
WorkerId_t id;
|
||||||
|
|
||||||
|
AuUInt8 multipopCount = 1;
|
||||||
|
AuUInt32 lastFrameTime {};
|
||||||
|
|
||||||
|
Threading::Threads::ThreadShared_t threadObject;
|
||||||
|
|
||||||
|
AuWPtr<GroupState> parent;
|
||||||
|
|
||||||
|
Threading::Primitives::SemaphoreUnique_t syncSema;
|
||||||
|
AuList<AuSPtr<Threading::Threads::IThreadFeature>> features;
|
||||||
|
|
||||||
|
bool rejecting {};
|
||||||
|
bool exiting {};
|
||||||
|
|
||||||
|
Threading::Primitives::EventUnique_t running;
|
||||||
|
//bool running;
|
||||||
|
|
||||||
|
bool inline IsSysThread()
|
||||||
|
{
|
||||||
|
return id.first == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuList<AuSPtr<IAsyncRunnable>> pendingWorkItems;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct GroupState
|
||||||
|
{
|
||||||
|
ThreadGroup_t group;
|
||||||
|
|
||||||
|
Threading::Primitives::ConditionMutexUnique_t cvWorkMutex;
|
||||||
|
Threading::Primitives::ConditionVariableUnique_t cvVariable;
|
||||||
|
|
||||||
|
using WorkEntry_t = std::pair<std::optional<ThreadId_t>, AuSPtr<IAsyncRunnable>>;
|
||||||
|
AuList<WorkEntry_t> workQueue;
|
||||||
|
|
||||||
|
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
|
||||||
|
|
||||||
|
bool Init();
|
||||||
|
|
||||||
|
bool inline IsSysThread()
|
||||||
|
{
|
||||||
|
return group == 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
bool GroupState::Init()
|
||||||
|
{
|
||||||
|
cvWorkMutex = Threading::Primitives::ConditionMutexUnique();
|
||||||
|
if (!cvWorkMutex)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
cvVariable = Threading::Primitives::ConditionVariableUnique(cvWorkMutex.get());
|
||||||
|
if (!cvVariable)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncApp::AsyncApp()
|
||||||
|
{
|
||||||
|
rwlock_ = Threading::Primitives::RWLockUnique();
|
||||||
|
SysAssert(static_cast<bool>(rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: barrier multiple
|
||||||
|
bool AsyncApp::Barrier(WorkerId_t worker, AuUInt32 ms, bool requireSignal, bool drop)
|
||||||
|
{
|
||||||
|
auto & semaphore = GetThreadState()->syncSema;
|
||||||
|
auto unsafeSemaphore = semaphore.get();
|
||||||
|
|
||||||
|
auto work = std::make_shared</*Async::BasicWorkStdFunc*/AsyncFuncRunnable>(([=]()
|
||||||
|
{
|
||||||
|
auto state = GetThreadState();
|
||||||
|
|
||||||
|
if (drop)
|
||||||
|
{
|
||||||
|
state->rejecting = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (requireSignal)
|
||||||
|
{
|
||||||
|
state->running->Reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafeSemaphore->Unlock(1);
|
||||||
|
}));
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
NewWorkItem({worker.first, worker.second}, work)->Dispatch();
|
||||||
|
#else
|
||||||
|
Run(worker, work);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return WaitFor(worker, semaphore.get(), ms);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable)
|
||||||
|
{
|
||||||
|
auto state = GetGroup(target.first);
|
||||||
|
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
|
||||||
|
|
||||||
|
gRunningTasks++;
|
||||||
|
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lol(state->cvWorkMutex);
|
||||||
|
|
||||||
|
#if defined(INTERNAL) || defined(DEBUG)
|
||||||
|
Threading::LockGuardPtr lock(rwlock_->AsReadable());
|
||||||
|
|
||||||
|
if (target.second.has_value())
|
||||||
|
{
|
||||||
|
if (state->workers[*target.second]->rejecting)
|
||||||
|
{
|
||||||
|
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0));
|
||||||
|
throw "Requested job worker is offline";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto workers = state->workers;
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
|
for (const auto &worker : state->workers)
|
||||||
|
{
|
||||||
|
if (!worker.second->rejecting)
|
||||||
|
{
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
throw "No workers available";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
state->workQueue.push_back(std::make_pair(target.second, runnable));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (target.second.has_value())
|
||||||
|
{
|
||||||
|
// sad :(
|
||||||
|
state->cvVariable->Broadcast();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state->cvVariable->Signal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncApp::Poll(bool blocking)
|
||||||
|
{
|
||||||
|
auto state = GetThreadState();
|
||||||
|
auto group = state->parent.lock();
|
||||||
|
|
||||||
|
state->pendingWorkItems.clear();
|
||||||
|
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lol(group->cvWorkMutex);
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
for (auto itr = group->workQueue.begin();
|
||||||
|
((itr != group->workQueue.end()) &&
|
||||||
|
(state->pendingWorkItems.size() < state->multipopCount));
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (!itr->first.has_value())
|
||||||
|
{
|
||||||
|
state->pendingWorkItems.push_back((*itr).second);
|
||||||
|
itr = group->workQueue.erase(itr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((itr->first.has_value()) && (itr->first.value() == state->id.second))
|
||||||
|
{
|
||||||
|
state->pendingWorkItems.push_back((*itr).second);
|
||||||
|
itr = group->workQueue.erase(itr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
itr++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!blocking)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state->pendingWorkItems.empty())
|
||||||
|
{
|
||||||
|
group->cvVariable->WaitForSignal();
|
||||||
|
}
|
||||||
|
|
||||||
|
} while (state->pendingWorkItems.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state->pendingWorkItems.empty())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int runningTasks {};
|
||||||
|
|
||||||
|
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
|
||||||
|
{
|
||||||
|
// TODO: we should be able to implement a watchdog later down the line
|
||||||
|
state->lastFrameTime = Time::CurrentClockMS();
|
||||||
|
(*itr)->RunAsync();
|
||||||
|
itr = state->pendingWorkItems.erase(itr);
|
||||||
|
runningTasks = --gRunningTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (runningTasks)
|
||||||
|
{
|
||||||
|
ShutdownOutOfTasks();
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool AsyncApp::WaitFor(WorkerId_t worker, Aurora::Threading::IWaitable *primitive, int timeoutMs)
|
||||||
|
{
|
||||||
|
auto curThread = GetThreadState();
|
||||||
|
|
||||||
|
if (worker == curThread->id)
|
||||||
|
{
|
||||||
|
// TODO: timeout isn't respected here as well
|
||||||
|
while (!Threading::WaitFor(primitive, 2))
|
||||||
|
{
|
||||||
|
while (this->Poll(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return Threading::WaitFor(primitive, timeoutMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::Start()
|
||||||
|
{
|
||||||
|
SysAssert(Spawn({0, 0}));
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::Main()
|
||||||
|
{
|
||||||
|
Entrypoint({0, 0});
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::Shutdown()
|
||||||
|
{
|
||||||
|
// Set shutdown trigger
|
||||||
|
shuttingdown_ = true;
|
||||||
|
|
||||||
|
// Unpause all
|
||||||
|
for (auto &re : this->threads_)
|
||||||
|
{
|
||||||
|
for (auto &[id, worker] : re.second->workers)
|
||||||
|
{
|
||||||
|
auto &event = worker->running;
|
||||||
|
if (event)
|
||||||
|
{
|
||||||
|
event->Set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop all tasks from this point onwards
|
||||||
|
for (auto &re : this->threads_)
|
||||||
|
{
|
||||||
|
for (auto &[id, worker] : re.second->workers)
|
||||||
|
{
|
||||||
|
SysAssert(Barrier(worker->id, 0, true, true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: abort all pending tests
|
||||||
|
|
||||||
|
// Signal the event loop to abort
|
||||||
|
ShutdownOutOfTasks();
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::ShutdownOutOfTasks()
|
||||||
|
{
|
||||||
|
for (auto& [id, group]: this->threads_)
|
||||||
|
{
|
||||||
|
for (auto & [id, worker] : group->workers)
|
||||||
|
{
|
||||||
|
worker->threadObject->SendExitSignal();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (group->cvVariable)
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lol(group->cvWorkMutex);
|
||||||
|
group->cvVariable->Broadcast();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncApp::Exiting()
|
||||||
|
{
|
||||||
|
return shuttingdown_ || GetThreadState()->exiting;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncApp::Spawn(WorkerId_t workerId)
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lock(rwlock_->AsWritable());
|
||||||
|
|
||||||
|
AuSPtr<GroupState> group;
|
||||||
|
|
||||||
|
// Try fetch or allocate group
|
||||||
|
{
|
||||||
|
AuSPtr<GroupState>* groupPtr;
|
||||||
|
if (!TryFind(this->threads_, workerId.first, groupPtr))
|
||||||
|
{
|
||||||
|
group = std::make_shared<GroupState>();
|
||||||
|
|
||||||
|
if (!group->Init())
|
||||||
|
{
|
||||||
|
SysPushErrorMem("Not enough memory to intiialize a new group state");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!TryInsert(this->threads_, std::make_pair(workerId.first, group)))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
group = *groupPtr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert worker does not already exist
|
||||||
|
{
|
||||||
|
AuSPtr<ThreadState>* ret;
|
||||||
|
|
||||||
|
if (TryFind(group->workers, workerId.second, ret))
|
||||||
|
{
|
||||||
|
SysPushErrorGen("Thread ID already exists");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto threadState = std::make_shared<ThreadState>();
|
||||||
|
threadState->parent = group;
|
||||||
|
threadState->running = Threading::Primitives::EventUnique(true, false, false);
|
||||||
|
threadState->syncSema = Threading::Primitives::SemaphoreUnique(0);
|
||||||
|
threadState->id = workerId;
|
||||||
|
|
||||||
|
if (!threadState->IsSysThread())
|
||||||
|
{
|
||||||
|
Threading::Threads::AbstractThreadVectors handler;
|
||||||
|
handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread)
|
||||||
|
{
|
||||||
|
Entrypoint(threadState->id);
|
||||||
|
};
|
||||||
|
threadState->threadObject = Threading::Threads::ThreadUnique(handler);
|
||||||
|
threadState->threadObject->Run();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
threadState->threadObject = std::shared_ptr<Threading::Threads::IAuroraThread>(Threading::Threads::GetThread(), [](Threading::Threads::IAuroraThread *){});
|
||||||
|
}
|
||||||
|
|
||||||
|
group->workers.insert(std::make_pair(workerId.second, threadState));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id)
|
||||||
|
{
|
||||||
|
auto group = GetGroup(id.first);
|
||||||
|
if (!group)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<ThreadState>* ret;
|
||||||
|
if (!TryFind(group->workers, id.second, ret))
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret->get()->threadObject;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> AsyncApp::GetThreads()
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lock(rwlock_->AsReadable());
|
||||||
|
|
||||||
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
|
||||||
|
|
||||||
|
for (const auto &group : this->threads_)
|
||||||
|
{
|
||||||
|
AuList<ThreadId_t> workers;
|
||||||
|
|
||||||
|
for (const auto &thread : group.second->workers)
|
||||||
|
{
|
||||||
|
workers.push_back(thread.second->id.second);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret[group.first] = workers;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerId_t AsyncApp::GetCurrentThread()
|
||||||
|
{
|
||||||
|
return tlsWorkerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs)
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lock(rwlock_->AsReadable());
|
||||||
|
|
||||||
|
auto group = GetGroup(groupId);
|
||||||
|
|
||||||
|
for (auto &jobWorker : group->workers)
|
||||||
|
{
|
||||||
|
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::Signal(ThreadGroup_t groupId)
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lock(rwlock_->AsReadable());
|
||||||
|
|
||||||
|
auto group = GetGroup(groupId);
|
||||||
|
|
||||||
|
for (auto &jobWorker : group->workers)
|
||||||
|
{
|
||||||
|
jobWorker.second->running->Set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncApp::SyncTimeout(ThreadGroup_t group, AuUInt32 ms)
|
||||||
|
{
|
||||||
|
return Sync(group, false, ms);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::SyncAllSafe()
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lock(rwlock_->AsReadable());
|
||||||
|
|
||||||
|
for (const auto &re : this->threads_)
|
||||||
|
{
|
||||||
|
for (auto &jobWorker : re.second->workers)
|
||||||
|
{
|
||||||
|
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<GroupState> AsyncApp::GetGroup(ThreadGroup_t type)
|
||||||
|
{
|
||||||
|
Threading::LockGuardPtr lock(rwlock_->AsReadable());
|
||||||
|
|
||||||
|
AuSPtr<GroupState>* ret;
|
||||||
|
if (!TryFind(this->threads_, type, ret))
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
return *ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<ThreadState> AsyncApp::GetThreadState()
|
||||||
|
{
|
||||||
|
auto id = GetCurrentThread();
|
||||||
|
auto state = GetGroup(id.first);
|
||||||
|
return state->workers[id.second];
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::Entrypoint(WorkerId_t id)
|
||||||
|
{
|
||||||
|
tlsWorkerId = id;
|
||||||
|
|
||||||
|
auto auThread = Threading::Threads::GetThread();
|
||||||
|
auto job = GetThreadState();
|
||||||
|
|
||||||
|
while (!auThread->Exiting())
|
||||||
|
{
|
||||||
|
// Do work (blocking)
|
||||||
|
Poll(true);
|
||||||
|
|
||||||
|
// Synchronization after pause
|
||||||
|
job->running->Lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto &thread : job->features)
|
||||||
|
{
|
||||||
|
thread->Cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
job->features.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async)
|
||||||
|
{
|
||||||
|
auto work = std::make_shared<BasicWorkStdFunc>(([=]()
|
||||||
|
{
|
||||||
|
GetThreadState()->features.push_back(feature);
|
||||||
|
feature->Init();
|
||||||
|
}));
|
||||||
|
|
||||||
|
auto workItem = NewWorkItem(id, work, !async);
|
||||||
|
workItem->Dispatch();
|
||||||
|
|
||||||
|
if (!async)
|
||||||
|
{
|
||||||
|
workItem->BlockUntilComplete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::AssertInThreadGroup(ThreadGroup_t group)
|
||||||
|
{
|
||||||
|
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::AssertWorker(WorkerId_t id)
|
||||||
|
{
|
||||||
|
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
|
||||||
|
}
|
||||||
|
|
||||||
|
AUKN_SYM IAsyncApp *GetAsyncApp()
|
||||||
|
{
|
||||||
|
return &gAsyncApp;
|
||||||
|
}
|
||||||
}
|
}
|
@ -9,5 +9,64 @@
|
|||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
|
struct GroupState;
|
||||||
|
struct ThreadState;
|
||||||
|
//class WorkItem;
|
||||||
|
|
||||||
|
class AsyncApp : public IAsyncApp
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
AsyncApp();
|
||||||
|
|
||||||
|
// Main thread logic
|
||||||
|
void Start() override;
|
||||||
|
void Main() override;
|
||||||
|
void Shutdown() override;
|
||||||
|
bool Exiting() override;
|
||||||
|
|
||||||
|
// Spawning
|
||||||
|
bool Spawn(WorkerId_t) override;
|
||||||
|
Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
|
||||||
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
|
||||||
|
WorkerId_t GetCurrentThread() override;
|
||||||
|
|
||||||
|
// Synchronization
|
||||||
|
bool Sync(ThreadGroup_t group, bool requireSignal, AuUInt32 timeout) override;
|
||||||
|
void Signal(ThreadGroup_t group) override;
|
||||||
|
|
||||||
|
bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) override; // when unlocker = this, pump event loop
|
||||||
|
|
||||||
|
bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) override;
|
||||||
|
|
||||||
|
void SyncAllSafe() override;
|
||||||
|
|
||||||
|
// Features
|
||||||
|
void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async = false) override;
|
||||||
|
|
||||||
|
// Debug
|
||||||
|
void AssertInThreadGroup(ThreadGroup_t group) override;
|
||||||
|
void AssertWorker(WorkerId_t id) override;
|
||||||
|
|
||||||
|
void Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
void ShutdownOutOfTasks();
|
||||||
|
// TODO: BarrierMultiple
|
||||||
|
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
|
||||||
|
bool Poll(bool a);
|
||||||
|
|
||||||
|
Threading::Primitives::RWLockUnique_t rwlock_;
|
||||||
|
|
||||||
|
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
|
||||||
|
|
||||||
|
AuSPtr<ThreadState> GetThreadState();
|
||||||
|
|
||||||
|
void Entrypoint(WorkerId_t id);
|
||||||
|
|
||||||
|
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
|
||||||
|
|
||||||
|
ThreadDb_t threads_;
|
||||||
|
bool shuttingdown_ {};
|
||||||
|
};
|
||||||
}
|
}
|
@ -6,7 +6,9 @@
|
|||||||
Author: Reece
|
Author: Reece
|
||||||
***/
|
***/
|
||||||
#include <RuntimeInternal.hpp>
|
#include <RuntimeInternal.hpp>
|
||||||
|
#include "Async.hpp"
|
||||||
#include "WorkItem.hpp"
|
#include "WorkItem.hpp"
|
||||||
|
#include "AsyncApp.hpp"
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
@ -21,7 +23,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
WorkItem::~WorkItem()
|
WorkItem::~WorkItem()
|
||||||
{
|
{
|
||||||
Fail();
|
//Fail();
|
||||||
}
|
}
|
||||||
|
|
||||||
void WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
|
void WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
|
||||||
@ -87,7 +89,7 @@ namespace Aurora::Async
|
|||||||
itr = waitOn_.erase(itr);
|
itr = waitOn_.erase(itr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Time::CurrentClockMS() >= dispatchTime_)
|
if (Time::CurrentClockMS() < dispatchTime_)
|
||||||
{
|
{
|
||||||
Schedule();
|
Schedule();
|
||||||
return;
|
return;
|
||||||
@ -96,7 +98,13 @@ namespace Aurora::Async
|
|||||||
SendOff();
|
SendOff();
|
||||||
}
|
}
|
||||||
|
|
||||||
void WorkItem::Handle()
|
|
||||||
|
void WorkItem::CancelAsync()
|
||||||
|
{
|
||||||
|
Fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
void WorkItem::RunAsync()
|
||||||
{
|
{
|
||||||
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
|
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
|
||||||
|
|
||||||
@ -195,11 +203,11 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
void WorkItem::SendOff()
|
void WorkItem::SendOff()
|
||||||
{
|
{
|
||||||
// TODO:
|
static_cast<AsyncApp *>(GetAsyncApp())->Run(worker_, this->shared_from_this());
|
||||||
}
|
}
|
||||||
|
|
||||||
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool annoying)
|
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
|
||||||
{
|
{
|
||||||
return std::make_shared<WorkItem>(worker, task, annoying);
|
return std::make_shared<WorkItem>(worker, task, supportsBlocking);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -9,7 +9,7 @@
|
|||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
class WorkItem : public IWorkItem, public std::enable_shared_from_this<WorkItem>
|
class WorkItem : public IWorkItem, public IAsyncRunnable, public std::enable_shared_from_this<WorkItem>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
WorkItem(const DispatchTarget_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking);
|
WorkItem(const DispatchTarget_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking);
|
||||||
@ -24,10 +24,11 @@ namespace Aurora::Async
|
|||||||
bool BlockUntilComplete() override;
|
bool BlockUntilComplete() override;
|
||||||
bool HasFinished() override;
|
bool HasFinished() override;
|
||||||
bool HasFailed() override;
|
bool HasFailed() override;
|
||||||
|
|
||||||
void Handle();
|
void RunAsync() override;
|
||||||
|
void CancelAsync() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void DispatchEx(bool check);
|
void DispatchEx(bool check);
|
||||||
AuSPtr<IWorkItemHandler> task_;
|
AuSPtr<IWorkItemHandler> task_;
|
||||||
DispatchTarget_t worker_;
|
DispatchTarget_t worker_;
|
||||||
|
@ -212,6 +212,10 @@ namespace Aurora::Debug
|
|||||||
TryGetOrFetchCError();
|
TryGetOrFetchCError();
|
||||||
TryGetOrFetchOSError();
|
TryGetOrFetchOSError();
|
||||||
Telemetry::InsertManualFence(rng);
|
Telemetry::InsertManualFence(rng);
|
||||||
|
|
||||||
|
#if defined(DEBUG) || defined(INTERNAL)
|
||||||
|
LogWarn("ERROR: {}", msg);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
AUKN_SYM AuString StackTraceEntry::Stringify() const
|
AUKN_SYM AuString StackTraceEntry::Stringify() const
|
||||||
|
@ -284,7 +284,7 @@ namespace Aurora::Debug
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (minimal < 3)
|
if (minimal < 2)
|
||||||
{
|
{
|
||||||
ParseStack(ExceptionInfo->ContextRecord, entry.wincxx.stack.backtrace);
|
ParseStack(ExceptionInfo->ContextRecord, entry.wincxx.stack.backtrace);
|
||||||
}
|
}
|
||||||
|
@ -56,12 +56,12 @@ namespace Aurora::Processes
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
HANDLE pipeStdOutRead_ {};
|
HANDLE pipeStdOutRead_ {INVALID_HANDLE_VALUE};
|
||||||
HANDLE pipeStdOutWrite_ {};
|
HANDLE pipeStdOutWrite_ {INVALID_HANDLE_VALUE};
|
||||||
HANDLE pipeStdErrRead_ {};
|
HANDLE pipeStdErrRead_ {INVALID_HANDLE_VALUE};
|
||||||
HANDLE pipeStdErrWrite_ {};
|
HANDLE pipeStdErrWrite_ {INVALID_HANDLE_VALUE};
|
||||||
HANDLE pipeStdInRead_ {};
|
HANDLE pipeStdInRead_ {INVALID_HANDLE_VALUE};
|
||||||
HANDLE pipeStdInWrite_ {};
|
HANDLE pipeStdInWrite_ {INVALID_HANDLE_VALUE};
|
||||||
|
|
||||||
AuString execModule_;
|
AuString execModule_;
|
||||||
ESpawnType type_;
|
ESpawnType type_;
|
||||||
@ -196,22 +196,24 @@ namespace Aurora::Processes
|
|||||||
|
|
||||||
void ProcessImpl::ShutdownPipes()
|
void ProcessImpl::ShutdownPipes()
|
||||||
{
|
{
|
||||||
if (auto handle = std::exchange(this->pipeStdOutRead_, {}))
|
HANDLE handle;
|
||||||
CloseHandle(handle);
|
|
||||||
|
|
||||||
if (auto handle = std::exchange(this->pipeStdOutWrite_, {}))
|
if ((handle = std::exchange(this->pipeStdOutRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
|
||||||
CloseHandle(handle);
|
CloseHandle(handle);
|
||||||
|
|
||||||
if (auto handle = std::exchange(this->pipeStdErrRead_, {}))
|
if ((handle = std::exchange(this->pipeStdOutWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
|
||||||
CloseHandle(handle);
|
CloseHandle(handle);
|
||||||
|
|
||||||
if (auto handle = std::exchange(this->pipeStdErrWrite_, {}))
|
if ((handle = std::exchange(this->pipeStdErrRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
|
||||||
CloseHandle(handle);
|
CloseHandle(handle);
|
||||||
|
|
||||||
if (auto handle = std::exchange(this->pipeStdInRead_, {}))
|
if ((handle = std::exchange(this->pipeStdErrWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
|
||||||
CloseHandle(handle);
|
CloseHandle(handle);
|
||||||
|
|
||||||
if (auto handle = std::exchange(this->pipeStdInWrite_, {}))
|
if ((handle = std::exchange(this->pipeStdInRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
|
||||||
|
CloseHandle(handle);
|
||||||
|
|
||||||
|
if ((handle = std::exchange(this->pipeStdInWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
|
||||||
CloseHandle(handle);
|
CloseHandle(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
bool EventImpl::Lock(AuUInt64 timeout /*=0*/)
|
bool EventImpl::Lock(AuUInt64 timeout /*=0*/)
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> re(mutex_.get());
|
LockGuardPtr re(mutex_);
|
||||||
|
|
||||||
AuInt64 startTime = Aurora::Time::CurrentClockMS();
|
AuInt64 startTime = Aurora::Time::CurrentClockMS();
|
||||||
AuInt64 endTime = startTime + timeout;
|
AuInt64 endTime = startTime + timeout;
|
||||||
@ -66,7 +66,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
bool EventImpl::TryLock()
|
bool EventImpl::TryLock()
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> re(mutex_.get());
|
LockGuardPtr re(mutex_);
|
||||||
|
|
||||||
return AtomicIsEventSet();
|
return AtomicIsEventSet();
|
||||||
}
|
}
|
||||||
@ -89,7 +89,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
void EventImpl::Set()
|
void EventImpl::Set()
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> re(mutex_.get());
|
LockGuardPtr re(mutex_);
|
||||||
SysAssertExp((permitMultipleTriggers_) || (!triggered_), "Can not trigger an awake event object");
|
SysAssertExp((permitMultipleTriggers_) || (!triggered_), "Can not trigger an awake event object");
|
||||||
triggered_ = true;
|
triggered_ = true;
|
||||||
condition_->Broadcast();
|
condition_->Broadcast();
|
||||||
|
@ -17,7 +17,8 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
RWLockImpl::~RWLockImpl()
|
RWLockImpl::~RWLockImpl()
|
||||||
{
|
{
|
||||||
|
mutex_.reset();
|
||||||
|
condition_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RWLockImpl::Init()
|
bool RWLockImpl::Init()
|
||||||
@ -38,8 +39,8 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
bool RWLockImpl::LockRead(AuUInt64 timeout)
|
bool RWLockImpl::LockRead(AuUInt64 timeout)
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> mutex(mutex_.get());
|
LockGuardPtr lock(mutex_);
|
||||||
|
|
||||||
while (state_ < 0)
|
while (state_ < 0)
|
||||||
{
|
{
|
||||||
if (!condition_->WaitForSignal(timeout))
|
if (!condition_->WaitForSignal(timeout))
|
||||||
@ -54,7 +55,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
bool RWLockImpl::LockWrite(AuUInt64 timeout)
|
bool RWLockImpl::LockWrite(AuUInt64 timeout)
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> mutex(mutex_.get());
|
LockGuardPtr lock(mutex_);
|
||||||
|
|
||||||
while (state_ != 0)
|
while (state_ != 0)
|
||||||
{
|
{
|
||||||
@ -70,7 +71,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
bool RWLockImpl::TryLockRead()
|
bool RWLockImpl::TryLockRead()
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> mutex(mutex_.get());
|
LockGuardPtr lock(mutex_);
|
||||||
|
|
||||||
if (state_ == -1)
|
if (state_ == -1)
|
||||||
{
|
{
|
||||||
@ -83,7 +84,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
bool RWLockImpl::TryLockWrite()
|
bool RWLockImpl::TryLockWrite()
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> mutex(mutex_.get());
|
LockGuardPtr lock(mutex_);
|
||||||
|
|
||||||
if (state_ > 0)
|
if (state_ > 0)
|
||||||
{
|
{
|
||||||
@ -96,7 +97,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
void RWLockImpl::UnlockRead()
|
void RWLockImpl::UnlockRead()
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> mutex(mutex_.get());
|
LockGuardPtr lock(mutex_);
|
||||||
|
|
||||||
if(--state_ == 0)
|
if(--state_ == 0)
|
||||||
{
|
{
|
||||||
@ -106,7 +107,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
void RWLockImpl::UnlockWrite()
|
void RWLockImpl::UnlockWrite()
|
||||||
{
|
{
|
||||||
LockGuardPtr<IConditionMutex> mutex(mutex_.get());
|
LockGuardPtr lock(mutex_);
|
||||||
state_ = 0;
|
state_ = 0;
|
||||||
condition_->Broadcast();
|
condition_->Broadcast();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user