Compare commits

...

3 Commits

Author SHA1 Message Date
452b49b610 [+] Begin work on the Async subsystem 2021-06-30 10:28:52 +01:00
95b6854f92 [+] Added license 2021-06-30 10:28:35 +01:00
1d6efb1e4e [*] Amend broken optimization 2021-06-30 10:28:28 +01:00
18 changed files with 817 additions and 110 deletions

View File

@ -154,8 +154,8 @@ namespace Aurora::Async
private: private:
static constexpr bool IsCallbackPtr = std::is_pointer_v<Job_t> || is_base_of_template<AuSPtr, Job_t>; static constexpr bool IsCallbackPtr = std::is_pointer_v<Job_t> || is_base_of_template<std::shared_ptr, Job_t>::value;
static constexpr bool IsTaskPtr = std::is_pointer_v<Task_t> || is_base_of_template<AuSPtr, Task_t>; static constexpr bool IsTaskPtr = std::is_pointer_v<Task_t> || is_base_of_template<std::shared_ptr, Task_t>::value;
WorkerId_t caller; WorkerId_t caller;
@ -172,9 +172,9 @@ namespace Aurora::Async
ret = task.onFrame(input); ret = task.onFrame(input);
} }
auto pin = this->shared_from_this(); auto pin = std::static_pointer_cast<std::remove_pointer_t<decltype(this)>>(this->shared_from_this());
std::function<void()> func = [ret, &callback, &input, pin]() std::function<void()> func = [ret, pin]()
{ {
try try
{ {
@ -182,23 +182,16 @@ namespace Aurora::Async
{ {
if constexpr (IsCallbackPtr) if constexpr (IsCallbackPtr)
{ {
callback->onSuccess(input, *ret); pin->callback->onSuccess(pin->input, *ret);
} }
else else
{ {
callback.onSuccess(input, *ret); pin->callback.onSuccess(pin->input, *ret);
} }
} }
else else
{ {
if constexpr (IsCallbackPtr) pin->CallOnFailure(false);
{
callback->onFailure(input, false);
}
else
{
callback.onFailure(input, false);
}
} }
} }
catch (...) catch (...)
@ -215,21 +208,13 @@ namespace Aurora::Async
} }
else else
{ {
std::function<void()> err = [&callback, &input, pin]() std::function<void()> err = [pin]()
{ {
// TODO: we shold probably pass: is caller thread: false, has finished: true pin->CallOnFailure(false);
if constexpr (IsCallbackPtr)
{
callback->onFailure(input, false);
}
else
{
callback.onFailure(input, false);
}
}; };
// TODO: this is somewhat evil. double alloc when we could reuse this // TODO: this is somewhat evil. double alloc when we could reuse this
WorkItemShared(caller, std::make_shared<BasicWorkStdFunc>(func, err))->Dispatch(); NewWorkItem(caller, std::make_shared<BasicWorkStdFunc>(func, err))->Dispatch();
} }
} }
catch (...) catch (...)
@ -243,20 +228,40 @@ namespace Aurora::Async
{ {
try try
{ {
if constexpr (IsCallbackPtr) CallOnFailure(true);
{
callback->onFailure(true);
}
else
{
callback.onFailure(true);
}
} }
catch (...) catch (...)
{ {
Debug::PrintError(); Debug::PrintError();
} }
} }
void CallOnFailure(bool fail)
{
if constexpr (IsCallbackPtr)
{
if constexpr (is_base_of_template<std::function, decltype(callback->onFailure)>::value)
{
if (!callback->onFailure)
{
return;
}
}
callback->onFailure(input, fail);
}
else
{
if constexpr (is_base_of_template<std::function, decltype(callback.onFailure)>::value)
{
if (!callback.onFailure)
{
return;
}
}
callback.onFailure(input, fail);
}
}
}; };
template<typename Frame_t = std::function<void()>, typename Cleanup_t = std::function<void()>> template<typename Frame_t = std::function<void()>, typename Cleanup_t = std::function<void()>>
@ -288,6 +293,7 @@ namespace Aurora::Async
{ {
public: public:
// Main thread logic // Main thread logic
virtual void Start() = 0;
virtual void Main() = 0; virtual void Main() = 0;
virtual void Shutdown() = 0; virtual void Shutdown() = 0;
virtual bool Exiting() = 0; virtual bool Exiting() = 0;
@ -295,14 +301,14 @@ namespace Aurora::Async
// Spawning // Spawning
virtual bool Spawn(WorkerId_t) = 0; virtual bool Spawn(WorkerId_t) = 0;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0; virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0;
virtual AuBST<ThreadGroup_t, AuList<AuUInt8>> GetThreads() = 0; virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() = 0;
virtual WorkerId_t GetCurrentThread() = 0; virtual WorkerId_t GetCurrentThread() = 0;
// Synchronization // Synchronization
virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0; virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0;
virtual void Signal(ThreadGroup_t group) = 0; virtual void Signal(ThreadGroup_t group) = 0;
virtual bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop
virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0; virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0;

View File

@ -13,9 +13,9 @@ namespace Aurora::Parse
{ {
AuMach index = 0, startIdx = 0; AuMach index = 0, startIdx = 0;
while ((index = in.find("\n"), startIdx) != AuString::npos) while ((index = in.find("\n", startIdx)) != AuString::npos)
{ {
auto line = in.substr(0, index); auto line = in.substr(startIdx, index - startIdx);
startIdx = index + 1; startIdx = index + 1;
if (line[line.size() - 1] == '\r') if (line[line.size() - 1] == '\r')

View File

@ -13,17 +13,29 @@ namespace Aurora::Threading
class LockGuardPtr class LockGuardPtr
{ {
public: public:
LockGuardPtr(T *lock) : lockReference_(lock) LockGuardPtr(T *lock) : annoying_(lock)
{ {
lockReference_->Lock(); lock->Lock();
}
LockGuardPtr(T &lock) : annoying_(&lock)
{
lock->Lock();
} }
~LockGuardPtr() ~LockGuardPtr()
{ {
lockReference_->Unlock(); if constexpr (is_base_of_template<std::shared_ptr, T>::value || is_base_of_template<std::unique_ptr, T>::value)
{
annoying_->get()->Unlock();
}
else
{
annoying_->Unlock();
}
} }
private: private:
T *lockReference_; T *annoying_;
}; };
} }

View File

@ -9,9 +9,6 @@
namespace Aurora::Threading::Threads namespace Aurora::Threading::Threads
{ {
template<typename T> // TODO: #if defined(fibers) static TLSVariable<type, true> name #endif
class TLSStaticVariable #define STATIC_TLS(type, name) static thread_local type name;
{
// TODO: implement templates for a thread feature based tls implementation
};
} }

View File

@ -9,7 +9,7 @@
namespace Aurora::Threading::Threads namespace Aurora::Threading::Threads
{ {
template<typename T> template<typename T, bool isStatic = false>
class TLSVariable class TLSVariable
{ {
private: private:
@ -18,19 +18,48 @@ namespace Aurora::Threading::Threads
public: public:
TLSVariable() {} TLSVariable() {}
~TLSVariable() ~TLSVariable()
{
if constexpr (!isStatic)
{ {
GetThread()->GetTlsView()->Remove(GetHandle()); GetThread()->GetTlsView()->Remove(GetHandle());
} }
}
T& operator ->()
{
return Get();
}
operator T&()
{
return Get();
}
TLSVariable& operator =(const T & val)
{
Get() = val;
return *this;
}
private:
AuUInt64 GetHandle() AuUInt64 GetHandle()
{
if constexpr (isStatic)
{
return (AuUInt64(reinterpret_cast<AuUInt>(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyStaticPointerHandle;
}
else
{ {
return (AuUInt64(reinterpret_cast<AuUInt>(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyResettablePointerHandle; return (AuUInt64(reinterpret_cast<AuUInt>(&_)) & (~kTlsKeyMask)) | kTlsKeyFollowsConvention | kTlsKeyResettablePointerHandle;
} }
}
T &Get() T &Get()
{ {
auto view = GetThread()->GetTlsView(); auto view = GetThread()->GetTlsView();
auto ptr = view->GetOrSetup(GetHandle(), auto ptr = view->GetOrSetup(GetHandle(),
sizeof(T),
[](void *buffer) -> void [](void *buffer) -> void
{ {
if constexpr (std::is_class_v<T>) if constexpr (std::is_class_v<T>)
@ -52,15 +81,5 @@ namespace Aurora::Threading::Threads
return *reinterpret_cast<T *>(ptr); return *reinterpret_cast<T *>(ptr);
} }
T& operator ->()
{
return Get();
}
TLSVariable& operator =(const T & val)
{
Get() = val;
return *this;
}
}; };
} }

View File

@ -74,3 +74,5 @@ name ## Shared_t name ## Shared(T... args) \
#if !defined(NO__NEW) #if !defined(NO__NEW)
#define _new new (std::nothrow) #define _new new (std::nothrow)
#endif #endif
//#define AU_FWD(var) std::forward<decltype(var)>(var)

View File

@ -96,7 +96,7 @@ static inline AuString ToUpper(const AuString &in)
return ToStringASCIIOp<int(*)(int)>(std::toupper, in); return ToStringASCIIOp<int(*)(int)>(std::toupper, in);
} }
template<typename Map, typename Key, typename Value> template<typename Map, class Key, typename Value>
static inline bool TryFind(Map &map, const Key &key, Value *&ptr) static inline bool TryFind(Map &map, const Key &key, Value *&ptr)
{ {
auto itr = map.find(key); auto itr = map.find(key);
@ -112,7 +112,7 @@ static inline bool TryFind(Map &map, const Key &key, Value *&ptr)
} }
} }
template<typename Map, typename Key, typename Value> template<typename Map, class Key, typename Value>
static inline bool TryFind(Map *map, const Key &key, Value *&ptr) static inline bool TryFind(Map *map, const Key &key, Value *&ptr)
{ {
auto itr = map->find(key); auto itr = map->find(key);
@ -128,7 +128,7 @@ static inline bool TryFind(Map *map, const Key &key, Value *&ptr)
} }
} }
template<typename Map, typename Key> template<typename Map, class Key>
static inline bool TryFind(Map &map, const Key &key) static inline bool TryFind(Map &map, const Key &key)
{ {
auto itr = map.find(key); auto itr = map.find(key);
@ -142,7 +142,7 @@ static inline bool TryFind(Map &map, const Key &key)
} }
} }
template<typename Map, typename Key> template<typename Map, class Key>
static inline bool TryFind(Map *map, const Key &key) static inline bool TryFind(Map *map, const Key &key)
{ {
auto itr = map->find(key); auto itr = map->find(key);
@ -155,7 +155,7 @@ static inline bool TryFind(Map *map, const Key &key)
return false; return false;
} }
} }
template<typename Map, typename Key, typename Value> template<typename Map, class Key, typename Value>
static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr) static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr)
{ {
auto itr = map.find(key); auto itr = map.find(key);
@ -171,7 +171,7 @@ static inline bool TryFindGeneric(Map &map, const Key &key, Value *&ptr)
} }
} }
template<typename Map, typename Key> template<typename Map, class Key>
static inline bool TryDelete(Map &map, const Key &key) static inline bool TryDelete(Map &map, const Key &key)
{ {
auto itr = map.find(key); auto itr = map.find(key);
@ -186,7 +186,7 @@ static inline bool TryDelete(Map &map, const Key &key)
} }
} }
template<typename List, typename Key> template<typename List, class Key>
static inline bool TryDeleteList(List &list, const Key &key) static inline bool TryDeleteList(List &list, const Key &key)
{ {
auto itr = std::find(list.begin(), list.end(), key); auto itr = std::find(list.begin(), list.end(), key);
@ -363,6 +363,8 @@ static inline AuList<AuString> SplitString(const AuString& str, const AuString&
return tokens; return tokens;
} }
// more copy/pasta. work smart, not hard.
// i dont want to waste time working out template kinks between clang and msvc
template < template <typename...> class base,typename derived> template < template <typename...> class base,typename derived>
struct is_base_of_template_impl struct is_base_of_template_impl
{ {

8
LICENSE Normal file
View File

@ -0,0 +1,8 @@
Copyright 2020-2021 J Reece Wilson
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -9,5 +9,38 @@
namespace Aurora::Async namespace Aurora::Async
{ {
class IAsyncRunnable
{
public:
virtual void RunAsync() = 0;
virtual void CancelAsync() {}
};
class AsyncFuncRunnable : public IAsyncRunnable
{
public:
std::function<void()> callback;
AsyncFuncRunnable(std::function<void()> &&callback) : callback(std::move(callback))
{}
AsyncFuncRunnable(const std::function<void()> &callback) : callback(callback)
{}
void RunAsync() override
{
try
{
callback();
}
catch (...)
{
Debug::PrintError();
}
}
};
void InitAsync(); void InitAsync();
} }

View File

@ -6,9 +6,562 @@
Author: Reece Author: Reece
***/ ***/
#include <RuntimeInternal.hpp> #include <RuntimeInternal.hpp>
#include "Async.hpp"
#include "AsyncApp.hpp" #include "AsyncApp.hpp"
#include "WorkItem.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
static AsyncApp gAsyncApp;
static std::atomic_int gRunningTasks {};
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId;
struct ThreadState
{
WorkerId_t id;
AuUInt8 multipopCount = 1;
AuUInt32 lastFrameTime {};
Threading::Threads::ThreadShared_t threadObject;
AuWPtr<GroupState> parent;
Threading::Primitives::SemaphoreUnique_t syncSema;
AuList<AuSPtr<Threading::Threads::IThreadFeature>> features;
bool rejecting {};
bool exiting {};
Threading::Primitives::EventUnique_t running;
//bool running;
bool inline IsSysThread()
{
return id.first == 0;
}
AuList<AuSPtr<IAsyncRunnable>> pendingWorkItems;
};
struct GroupState
{
ThreadGroup_t group;
Threading::Primitives::ConditionMutexUnique_t cvWorkMutex;
Threading::Primitives::ConditionVariableUnique_t cvVariable;
using WorkEntry_t = std::pair<std::optional<ThreadId_t>, AuSPtr<IAsyncRunnable>>;
AuList<WorkEntry_t> workQueue;
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
bool Init();
bool inline IsSysThread()
{
return group == 0;
}
};
bool GroupState::Init()
{
cvWorkMutex = Threading::Primitives::ConditionMutexUnique();
if (!cvWorkMutex)
{
return false;
}
cvVariable = Threading::Primitives::ConditionVariableUnique(cvWorkMutex.get());
if (!cvVariable)
{
return false;
}
return true;
}
AsyncApp::AsyncApp()
{
rwlock_ = Threading::Primitives::RWLockUnique();
SysAssert(static_cast<bool>(rwlock_), "Couldn't initialize AsyncApp. Unable to allocate an RWLock");
}
// TODO: barrier multiple
bool AsyncApp::Barrier(WorkerId_t worker, AuUInt32 ms, bool requireSignal, bool drop)
{
auto & semaphore = GetThreadState()->syncSema;
auto unsafeSemaphore = semaphore.get();
auto work = std::make_shared</*Async::BasicWorkStdFunc*/AsyncFuncRunnable>(([=]()
{
auto state = GetThreadState();
if (drop)
{
state->rejecting = true;
}
if (requireSignal)
{
state->running->Reset();
}
unsafeSemaphore->Unlock(1);
}));
#if 0
NewWorkItem({worker.first, worker.second}, work)->Dispatch();
#else
Run(worker, work);
#endif
return WaitFor(worker, semaphore.get(), ms);
}
void AsyncApp::Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable)
{
auto state = GetGroup(target.first);
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
gRunningTasks++;
{
Threading::LockGuardPtr lol(state->cvWorkMutex);
#if defined(INTERNAL) || defined(DEBUG)
Threading::LockGuardPtr lock(rwlock_->AsReadable());
if (target.second.has_value())
{
if (state->workers[*target.second]->rejecting)
{
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0));
throw "Requested job worker is offline";
}
}
else
{
auto workers = state->workers;
bool found = false;
for (const auto &worker : state->workers)
{
if (!worker.second->rejecting)
{
found = true;
break;
}
}
if (!found)
{
throw "No workers available";
}
}
#endif
state->workQueue.push_back(std::make_pair(target.second, runnable));
}
if (target.second.has_value())
{
// sad :(
state->cvVariable->Broadcast();
}
else
{
state->cvVariable->Signal();
}
}
bool AsyncApp::Poll(bool blocking)
{
auto state = GetThreadState();
auto group = state->parent.lock();
state->pendingWorkItems.clear();
{
Threading::LockGuardPtr lol(group->cvWorkMutex);
do
{
for (auto itr = group->workQueue.begin();
((itr != group->workQueue.end()) &&
(state->pendingWorkItems.size() < state->multipopCount));
)
{
if (!itr->first.has_value())
{
state->pendingWorkItems.push_back((*itr).second);
itr = group->workQueue.erase(itr);
continue;
}
if ((itr->first.has_value()) && (itr->first.value() == state->id.second))
{
state->pendingWorkItems.push_back((*itr).second);
itr = group->workQueue.erase(itr);
continue;
}
itr++;
}
if (!blocking)
{
break;
}
if (state->pendingWorkItems.empty())
{
group->cvVariable->WaitForSignal();
}
} while (state->pendingWorkItems.empty());
}
if (state->pendingWorkItems.empty())
{
return false;
}
int runningTasks {};
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
// TODO: we should be able to implement a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS();
(*itr)->RunAsync();
itr = state->pendingWorkItems.erase(itr);
runningTasks = --gRunningTasks;
}
if (runningTasks)
{
ShutdownOutOfTasks();
}
return true;
}
bool AsyncApp::WaitFor(WorkerId_t worker, Aurora::Threading::IWaitable *primitive, int timeoutMs)
{
auto curThread = GetThreadState();
if (worker == curThread->id)
{
// TODO: timeout isn't respected here as well
while (!Threading::WaitFor(primitive, 2))
{
while (this->Poll(false));
}
return true;
}
else
{
return Threading::WaitFor(primitive, timeoutMs);
}
}
void AsyncApp::Start()
{
SysAssert(Spawn({0, 0}));
}
void AsyncApp::Main()
{
Entrypoint({0, 0});
}
void AsyncApp::Shutdown()
{
// Set shutdown trigger
shuttingdown_ = true;
// Unpause all
for (auto &re : this->threads_)
{
for (auto &[id, worker] : re.second->workers)
{
auto &event = worker->running;
if (event)
{
event->Set();
}
}
}
// Drop all tasks from this point onwards
for (auto &re : this->threads_)
{
for (auto &[id, worker] : re.second->workers)
{
SysAssert(Barrier(worker->id, 0, true, true));
}
}
// TODO: abort all pending tests
// Signal the event loop to abort
ShutdownOutOfTasks();
}
void AsyncApp::ShutdownOutOfTasks()
{
for (auto& [id, group]: this->threads_)
{
for (auto & [id, worker] : group->workers)
{
worker->threadObject->SendExitSignal();
}
if (group->cvVariable)
{
Threading::LockGuardPtr lol(group->cvWorkMutex);
group->cvVariable->Broadcast();
}
}
}
bool AsyncApp::Exiting()
{
return shuttingdown_ || GetThreadState()->exiting;
}
bool AsyncApp::Spawn(WorkerId_t workerId)
{
Threading::LockGuardPtr lock(rwlock_->AsWritable());
AuSPtr<GroupState> group;
// Try fetch or allocate group
{
AuSPtr<GroupState>* groupPtr;
if (!TryFind(this->threads_, workerId.first, groupPtr))
{
group = std::make_shared<GroupState>();
if (!group->Init())
{
SysPushErrorMem("Not enough memory to intiialize a new group state");
return false;
}
if (!TryInsert(this->threads_, std::make_pair(workerId.first, group)))
{
return false;
}
}
else
{
group = *groupPtr;
}
}
// Assert worker does not already exist
{
AuSPtr<ThreadState>* ret;
if (TryFind(group->workers, workerId.second, ret))
{
SysPushErrorGen("Thread ID already exists");
return false;
}
}
auto threadState = std::make_shared<ThreadState>();
threadState->parent = group;
threadState->running = Threading::Primitives::EventUnique(true, false, false);
threadState->syncSema = Threading::Primitives::SemaphoreUnique(0);
threadState->id = workerId;
if (!threadState->IsSysThread())
{
Threading::Threads::AbstractThreadVectors handler;
handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread)
{
Entrypoint(threadState->id);
};
threadState->threadObject = Threading::Threads::ThreadUnique(handler);
threadState->threadObject->Run();
}
else
{
threadState->threadObject = std::shared_ptr<Threading::Threads::IAuroraThread>(Threading::Threads::GetThread(), [](Threading::Threads::IAuroraThread *){});
}
group->workers.insert(std::make_pair(workerId.second, threadState));
return true;
}
Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id)
{
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
AuSPtr<ThreadState>* ret;
if (!TryFind(group->workers, id.second, ret))
{
return {};
}
return ret->get()->threadObject;
}
AuBST<ThreadGroup_t, AuList<ThreadId_t>> AsyncApp::GetThreads()
{
Threading::LockGuardPtr lock(rwlock_->AsReadable());
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
for (const auto &group : this->threads_)
{
AuList<ThreadId_t> workers;
for (const auto &thread : group.second->workers)
{
workers.push_back(thread.second->id.second);
}
ret[group.first] = workers;
}
return ret;
}
WorkerId_t AsyncApp::GetCurrentThread()
{
return tlsWorkerId;
}
bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs)
{
Threading::LockGuardPtr lock(rwlock_->AsReadable());
auto group = GetGroup(groupId);
for (auto &jobWorker : group->workers)
{
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{
return false;
}
}
return true;
}
void AsyncApp::Signal(ThreadGroup_t groupId)
{
Threading::LockGuardPtr lock(rwlock_->AsReadable());
auto group = GetGroup(groupId);
for (auto &jobWorker : group->workers)
{
jobWorker.second->running->Set();
}
}
bool AsyncApp::SyncTimeout(ThreadGroup_t group, AuUInt32 ms)
{
return Sync(group, false, ms);
}
void AsyncApp::SyncAllSafe()
{
Threading::LockGuardPtr lock(rwlock_->AsReadable());
for (const auto &re : this->threads_)
{
for (auto &jobWorker : re.second->workers)
{
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
}
}
}
AuSPtr<GroupState> AsyncApp::GetGroup(ThreadGroup_t type)
{
Threading::LockGuardPtr lock(rwlock_->AsReadable());
AuSPtr<GroupState>* ret;
if (!TryFind(this->threads_, type, ret))
{
return {};
}
return *ret;
}
AuSPtr<ThreadState> AsyncApp::GetThreadState()
{
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
return state->workers[id.second];
}
void AsyncApp::Entrypoint(WorkerId_t id)
{
tlsWorkerId = id;
auto auThread = Threading::Threads::GetThread();
auto job = GetThreadState();
while (!auThread->Exiting())
{
// Do work (blocking)
Poll(true);
// Synchronization after pause
job->running->Lock();
}
for (const auto &thread : job->features)
{
thread->Cleanup();
}
job->features.clear();
}
void AsyncApp::AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async)
{
auto work = std::make_shared<BasicWorkStdFunc>(([=]()
{
GetThreadState()->features.push_back(feature);
feature->Init();
}));
auto workItem = NewWorkItem(id, work, !async);
workItem->Dispatch();
if (!async)
{
workItem->BlockUntilComplete();
}
}
void AsyncApp::AssertInThreadGroup(ThreadGroup_t group)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
}
void AsyncApp::AssertWorker(WorkerId_t id)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
}
AUKN_SYM IAsyncApp *GetAsyncApp()
{
return &gAsyncApp;
}
} }

View File

@ -9,5 +9,64 @@
namespace Aurora::Async namespace Aurora::Async
{ {
struct GroupState;
struct ThreadState;
//class WorkItem;
class AsyncApp : public IAsyncApp
{
public:
AsyncApp();
// Main thread logic
void Start() override;
void Main() override;
void Shutdown() override;
bool Exiting() override;
// Spawning
bool Spawn(WorkerId_t) override;
Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
WorkerId_t GetCurrentThread() override;
// Synchronization
bool Sync(ThreadGroup_t group, bool requireSignal, AuUInt32 timeout) override;
void Signal(ThreadGroup_t group) override;
bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) override; // when unlocker = this, pump event loop
bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) override;
void SyncAllSafe() override;
// Features
void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async = false) override;
// Debug
void AssertInThreadGroup(ThreadGroup_t group) override;
void AssertWorker(WorkerId_t id) override;
void Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
private:
void ShutdownOutOfTasks();
// TODO: BarrierMultiple
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
bool Poll(bool a);
Threading::Primitives::RWLockUnique_t rwlock_;
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
AuSPtr<ThreadState> GetThreadState();
void Entrypoint(WorkerId_t id);
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
ThreadDb_t threads_;
bool shuttingdown_ {};
};
} }

View File

@ -6,7 +6,9 @@
Author: Reece Author: Reece
***/ ***/
#include <RuntimeInternal.hpp> #include <RuntimeInternal.hpp>
#include "Async.hpp"
#include "WorkItem.hpp" #include "WorkItem.hpp"
#include "AsyncApp.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
@ -21,7 +23,7 @@ namespace Aurora::Async
WorkItem::~WorkItem() WorkItem::~WorkItem()
{ {
Fail(); //Fail();
} }
void WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem) void WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
@ -87,7 +89,7 @@ namespace Aurora::Async
itr = waitOn_.erase(itr); itr = waitOn_.erase(itr);
} }
if (Time::CurrentClockMS() >= dispatchTime_) if (Time::CurrentClockMS() < dispatchTime_)
{ {
Schedule(); Schedule();
return; return;
@ -96,7 +98,13 @@ namespace Aurora::Async
SendOff(); SendOff();
} }
void WorkItem::Handle()
void WorkItem::CancelAsync()
{
Fail();
}
void WorkItem::RunAsync()
{ {
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock); Threading::LockGuard<Threading::Primitives::SpinLock> l(lock);
@ -195,11 +203,11 @@ namespace Aurora::Async
void WorkItem::SendOff() void WorkItem::SendOff()
{ {
// TODO: static_cast<AsyncApp *>(GetAsyncApp())->Run(worker_, this->shared_from_this());
} }
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool annoying) AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
{ {
return std::make_shared<WorkItem>(worker, task, annoying); return std::make_shared<WorkItem>(worker, task, supportsBlocking);
} }
} }

View File

@ -9,7 +9,7 @@
namespace Aurora::Async namespace Aurora::Async
{ {
class WorkItem : public IWorkItem, public std::enable_shared_from_this<WorkItem> class WorkItem : public IWorkItem, public IAsyncRunnable, public std::enable_shared_from_this<WorkItem>
{ {
public: public:
WorkItem(const DispatchTarget_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking); WorkItem(const DispatchTarget_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking);
@ -25,7 +25,8 @@ namespace Aurora::Async
bool HasFinished() override; bool HasFinished() override;
bool HasFailed() override; bool HasFailed() override;
void Handle(); void RunAsync() override;
void CancelAsync() override;
private: private:
void DispatchEx(bool check); void DispatchEx(bool check);

View File

@ -212,6 +212,10 @@ namespace Aurora::Debug
TryGetOrFetchCError(); TryGetOrFetchCError();
TryGetOrFetchOSError(); TryGetOrFetchOSError();
Telemetry::InsertManualFence(rng); Telemetry::InsertManualFence(rng);
#if defined(DEBUG) || defined(INTERNAL)
LogWarn("ERROR: {}", msg);
#endif
} }
AUKN_SYM AuString StackTraceEntry::Stringify() const AUKN_SYM AuString StackTraceEntry::Stringify() const

View File

@ -284,7 +284,7 @@ namespace Aurora::Debug
try try
{ {
if (minimal < 3) if (minimal < 2)
{ {
ParseStack(ExceptionInfo->ContextRecord, entry.wincxx.stack.backtrace); ParseStack(ExceptionInfo->ContextRecord, entry.wincxx.stack.backtrace);
} }

View File

@ -56,12 +56,12 @@ namespace Aurora::Processes
private: private:
HANDLE pipeStdOutRead_ {}; HANDLE pipeStdOutRead_ {INVALID_HANDLE_VALUE};
HANDLE pipeStdOutWrite_ {}; HANDLE pipeStdOutWrite_ {INVALID_HANDLE_VALUE};
HANDLE pipeStdErrRead_ {}; HANDLE pipeStdErrRead_ {INVALID_HANDLE_VALUE};
HANDLE pipeStdErrWrite_ {}; HANDLE pipeStdErrWrite_ {INVALID_HANDLE_VALUE};
HANDLE pipeStdInRead_ {}; HANDLE pipeStdInRead_ {INVALID_HANDLE_VALUE};
HANDLE pipeStdInWrite_ {}; HANDLE pipeStdInWrite_ {INVALID_HANDLE_VALUE};
AuString execModule_; AuString execModule_;
ESpawnType type_; ESpawnType type_;
@ -196,22 +196,24 @@ namespace Aurora::Processes
void ProcessImpl::ShutdownPipes() void ProcessImpl::ShutdownPipes()
{ {
if (auto handle = std::exchange(this->pipeStdOutRead_, {})) HANDLE handle;
if ((handle = std::exchange(this->pipeStdOutRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
CloseHandle(handle); CloseHandle(handle);
if (auto handle = std::exchange(this->pipeStdOutWrite_, {})) if ((handle = std::exchange(this->pipeStdOutWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
CloseHandle(handle); CloseHandle(handle);
if (auto handle = std::exchange(this->pipeStdErrRead_, {})) if ((handle = std::exchange(this->pipeStdErrRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
CloseHandle(handle); CloseHandle(handle);
if (auto handle = std::exchange(this->pipeStdErrWrite_, {})) if ((handle = std::exchange(this->pipeStdErrWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
CloseHandle(handle); CloseHandle(handle);
if (auto handle = std::exchange(this->pipeStdInRead_, {})) if ((handle = std::exchange(this->pipeStdInRead_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
CloseHandle(handle); CloseHandle(handle);
if (auto handle = std::exchange(this->pipeStdInWrite_, {})) if ((handle = std::exchange(this->pipeStdInWrite_, INVALID_HANDLE_VALUE)) != INVALID_HANDLE_VALUE)
CloseHandle(handle); CloseHandle(handle);
} }

View File

@ -31,7 +31,7 @@ namespace Aurora::Threading::Primitives
bool EventImpl::Lock(AuUInt64 timeout /*=0*/) bool EventImpl::Lock(AuUInt64 timeout /*=0*/)
{ {
LockGuardPtr<IConditionMutex> re(mutex_.get()); LockGuardPtr re(mutex_);
AuInt64 startTime = Aurora::Time::CurrentClockMS(); AuInt64 startTime = Aurora::Time::CurrentClockMS();
AuInt64 endTime = startTime + timeout; AuInt64 endTime = startTime + timeout;
@ -66,7 +66,7 @@ namespace Aurora::Threading::Primitives
bool EventImpl::TryLock() bool EventImpl::TryLock()
{ {
LockGuardPtr<IConditionMutex> re(mutex_.get()); LockGuardPtr re(mutex_);
return AtomicIsEventSet(); return AtomicIsEventSet();
} }
@ -89,7 +89,7 @@ namespace Aurora::Threading::Primitives
void EventImpl::Set() void EventImpl::Set()
{ {
LockGuardPtr<IConditionMutex> re(mutex_.get()); LockGuardPtr re(mutex_);
SysAssertExp((permitMultipleTriggers_) || (!triggered_), "Can not trigger an awake event object"); SysAssertExp((permitMultipleTriggers_) || (!triggered_), "Can not trigger an awake event object");
triggered_ = true; triggered_ = true;
condition_->Broadcast(); condition_->Broadcast();

View File

@ -17,7 +17,8 @@ namespace Aurora::Threading::Primitives
RWLockImpl::~RWLockImpl() RWLockImpl::~RWLockImpl()
{ {
mutex_.reset();
condition_.reset();
} }
bool RWLockImpl::Init() bool RWLockImpl::Init()
@ -38,7 +39,7 @@ namespace Aurora::Threading::Primitives
bool RWLockImpl::LockRead(AuUInt64 timeout) bool RWLockImpl::LockRead(AuUInt64 timeout)
{ {
LockGuardPtr<IConditionMutex> mutex(mutex_.get()); LockGuardPtr lock(mutex_);
while (state_ < 0) while (state_ < 0)
{ {
@ -54,7 +55,7 @@ namespace Aurora::Threading::Primitives
bool RWLockImpl::LockWrite(AuUInt64 timeout) bool RWLockImpl::LockWrite(AuUInt64 timeout)
{ {
LockGuardPtr<IConditionMutex> mutex(mutex_.get()); LockGuardPtr lock(mutex_);
while (state_ != 0) while (state_ != 0)
{ {
@ -70,7 +71,7 @@ namespace Aurora::Threading::Primitives
bool RWLockImpl::TryLockRead() bool RWLockImpl::TryLockRead()
{ {
LockGuardPtr<IConditionMutex> mutex(mutex_.get()); LockGuardPtr lock(mutex_);
if (state_ == -1) if (state_ == -1)
{ {
@ -83,7 +84,7 @@ namespace Aurora::Threading::Primitives
bool RWLockImpl::TryLockWrite() bool RWLockImpl::TryLockWrite()
{ {
LockGuardPtr<IConditionMutex> mutex(mutex_.get()); LockGuardPtr lock(mutex_);
if (state_ > 0) if (state_ > 0)
{ {
@ -96,7 +97,7 @@ namespace Aurora::Threading::Primitives
void RWLockImpl::UnlockRead() void RWLockImpl::UnlockRead()
{ {
LockGuardPtr<IConditionMutex> mutex(mutex_.get()); LockGuardPtr lock(mutex_);
if(--state_ == 0) if(--state_ == 0)
{ {
@ -106,7 +107,7 @@ namespace Aurora::Threading::Primitives
void RWLockImpl::UnlockWrite() void RWLockImpl::UnlockWrite()
{ {
LockGuardPtr<IConditionMutex> mutex(mutex_.get()); LockGuardPtr lock(mutex_);
state_ = 0; state_ = 0;
condition_->Broadcast(); condition_->Broadcast();
} }