diff --git a/Include/Aurora/Async/IThreadPool.hpp b/Include/Aurora/Async/IThreadPool.hpp index 25f7f864..84f86508 100644 --- a/Include/Aurora/Async/IThreadPool.hpp +++ b/Include/Aurora/Async/IThreadPool.hpp @@ -7,8 +7,26 @@ ***/ #pragma once +namespace Aurora::Loop +{ + struct ILoopQueue; +} + namespace Aurora::Async { + AUE_DEFINE(ERunMode, + ( + eLowLatencyYield, // uses high perf cond var + yield + trylock + eLowLatencyFreqKernel, // uses high perf cond var + timeout(freqency) + eEfficient // delegates sleep to the kernel once kernel objects are scheduled + )); + + struct RunMode + { + ERunMode mode; + AuUInt16 freqMsTick; + }; + struct IThreadPool { // Spawning @@ -65,6 +83,10 @@ namespace Aurora::Async virtual void AssertWorker(WorkerId_t id) = 0; // Async subsystem glue - virtual bool ScheduleLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) = 0; + virtual AuSPtr ToKernelWorkQueue() = 0; + virtual AuSPtr ToKernelWorkQueue(WorkerId_t workerId) = 0; + virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) = 0; + virtual ERunMode GetCurrentThreadRunMode() = 0; + virtual ERunMode GetThreadRunMode(WorkerId_t workerId) = 0; }; } \ No newline at end of file diff --git a/Include/Aurora/Loop/Loop.hpp b/Include/Aurora/Loop/Loop.hpp index 5a0e3df8..169258af 100644 --- a/Include/Aurora/Loop/Loop.hpp +++ b/Include/Aurora/Loop/Loop.hpp @@ -8,7 +8,7 @@ While you may drive low perf user facing apps and small services from this, other services should divide and conquer * Allow the network subsystem to load balance sockets across a predefined amount of workers, * Use semaphores instead of massive lengthy arrays of mutexes - * Use grouped long-polling LoopSources to convert kernel or waitable objects to a single time-polled loop source + * Use grouped long-polling LoopSources to convert kernel or waitable objects to a single time-polled loop source (freq based thread runners, alt to eEfficient) ***/ #pragma once diff --git a/Include/Aurora/Memory/MemoryView.hpp b/Include/Aurora/Memory/MemoryView.hpp index f6da8e50..f33fe03c 100644 --- a/Include/Aurora/Memory/MemoryView.hpp +++ b/Include/Aurora/Memory/MemoryView.hpp @@ -206,4 +206,4 @@ namespace Aurora::Memory using MemoryViewStreamRead = MemoryViewStream; using MemoryViewStreamWrite = MemoryViewStream; -} +} \ No newline at end of file diff --git a/README.md b/README.md index eba0d505..ce31f07e 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ the buildscripts into your applications build pipeline to get started. - Basic cmdline parsing from any module - Exit and fatal save condition callbacks - IPC [WIP] -- Network +- Network [WIP] - Random; secure and fast - Hardware Info; memory and cpu info - Software Stack Info @@ -345,7 +345,7 @@ UTF-8 and attempt to read a BOM to translate any other input to UTF-8. ### Paths We assume all paths are messy. Incorrect splitters, double splitters, relative paths, and keywords are resolved internally. -No such URL or path builder, data structure to hold a tokenized representation, or similar concept exists in the codebase. +No URL or path builder, data structure to hold a tokenized URI expression, or similar concept exists in the codebase. All string 'paths' are simply expanded, similar to MSCRT's `fullpath` or UNIX's `realpath`, at time of usage.
Path tokens include:
@@ -366,13 +366,9 @@ Path tokens include:
### NIO -The networking stack supports a handful of architectural paradigms
-- block on write
-- delegate write to end of network frame on write
-- read with an all-or-nothing flag and an async flag
-- read with an asynchronous stream callback -- peaking
-- async read/write pump whenever and/or all +- Worker thread delegated resolve using system resolver +- Callback with fence-id based asynchronous write abstraction +- Loop Source support ### CIO @@ -468,10 +464,10 @@ Utility (third party) provides production hardening
Examples:
-> Use embedded crypto libraries; libtomcrypt, libtommath
- -> While there are some bugs in libtomcrypt and others, none appear to
+ ->> While there are some bugs in libtomcrypt and others, none appear to
cryptographically cripple the library. Could you do better?
-> Use portable libraries like mbedtls, O(1) heap, mimalloc
- -> Writing a [D]TLS/allocator stack would take too much time
- -> Linking against external allocators, small cross-platform utilities, and
+ ->> Writing a [D]TLS/allocator stack would take too much time
+ ->> Linking against external allocators, small cross-platform utilities, and
so on is probably fine
-> Shim libcurl instead of inventing yet another http stack
diff --git a/Source/Async/AsyncApp.cpp b/Source/Async/AsyncApp.cpp index 67547784..b0187c0b 100644 --- a/Source/Async/AsyncApp.cpp +++ b/Source/Async/AsyncApp.cpp @@ -175,8 +175,28 @@ namespace Aurora::Async ThreadPool::AssertWorker(id); } - bool AsyncApp::ScheduleLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) + AuSPtr AsyncApp::ToKernelWorkQueue() { - return ThreadPool::ScheduleLoopSource(loopSource, workerId, timeout, callback); + return ThreadPool::ToKernelWorkQueue(); + } + + AuSPtr AsyncApp::ToKernelWorkQueue(WorkerId_t workerId) + { + return ThreadPool::ToKernelWorkQueue(workerId); + } + + void AsyncApp::UpdateWorkMode(WorkerId_t workerId, RunMode mode) + { + ThreadPool::UpdateWorkMode(workerId, mode); + } + + ERunMode AsyncApp::GetCurrentThreadRunMode() + { + return ThreadPool::GetCurrentThreadRunMode(); + } + + ERunMode AsyncApp::GetThreadRunMode(WorkerId_t workerId) + { + return ThreadPool::GetThreadRunMode(workerId); } } \ No newline at end of file diff --git a/Source/Async/AsyncApp.hpp b/Source/Async/AsyncApp.hpp index 56be333a..cb33dd7f 100644 --- a/Source/Async/AsyncApp.hpp +++ b/Source/Async/AsyncApp.hpp @@ -37,8 +37,12 @@ namespace Aurora::Async void AddFeature(WorkerId_t id, AuSPtr feature, bool async) override; void AssertInThreadGroup(ThreadGroup_t group) override; void AssertWorker(WorkerId_t id) override; - bool ScheduleLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) override; + AuSPtr ToKernelWorkQueue() override; + AuSPtr ToKernelWorkQueue(WorkerId_t workerId) override; + void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override; + ERunMode GetCurrentThreadRunMode() override; + ERunMode GetThreadRunMode(WorkerId_t workerId) override; // Main thread logic void Start() override; diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 7808bdee..d081707d 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -10,6 +10,7 @@ #include "ThreadPool.hpp" #include "WorkItem.hpp" #include "Schedular.hpp" +#include "ThreadWorkerQueueShim.hpp" namespace Aurora::Async { @@ -221,16 +222,64 @@ namespace Aurora::Async auto state = GetThreadState(); bool success {}; + auto runMode = GetCurrentThreadRunMode(); + do { - if (state->inLoopSourceMode) + auto asyncLoop = state->asyncLoop; + + asyncLoop->OnFrame(); + + if (asyncLoop->GetSourceCount() > 1) { - success = PollLoopSource(block); + bool bShouldTrySleepForKernel {}; + + if (runMode == ERunMode::eLowLatencyFreqKernel) + { + if (state->rateLimiter.CheckExchangePass()) + { + bShouldTrySleepForKernel = asyncLoop->IsSignaled(); + } + else + { + if (!PollInternal(false)) + { + AuThreading::ContextYield(); + } + else + { + success = true; + } + } + } + else if (runMode == ERunMode::eLowLatencyYield) + { + AuThreading::ContextYield(); + block = false; + bShouldTrySleepForKernel = asyncLoop->IsSignaled(); + } + else if (runMode == ERunMode::eEfficient) + { + bShouldTrySleepForKernel = block; + if (!block) + { + bShouldTrySleepForKernel = asyncLoop->IsSignaled(); + } + } + + if (bShouldTrySleepForKernel && asyncLoop->WaitAny(0)) + { + PollInternal(block); + success = true; + } + else + { + success |= PollInternal(block); + } } else { success = PollInternal(block); - success |= state->inLoopSourceMode; } } while (success); @@ -251,7 +300,7 @@ namespace Aurora::Async // TODO: reimplement this // this is stupid and gross - if (group->workQueue.size() > 2) + if (group->workQueue.size() > group->workers.size()*3) { if (!group->sorted) { @@ -374,6 +423,8 @@ namespace Aurora::Async if (itr->second->GetPrio() < 0.25) { + group->sorted = false; + if (lowPrioCont) continue; if (!lowPrioContCached) @@ -430,102 +481,6 @@ namespace Aurora::Async return true; } - bool ThreadPool::PollLoopSource(bool block) - { - auto state = GetThreadState(); - auto group = state->parent.lock(); - - //state->pendingWorkItems.clear(); - - auto magic = CtxPollPush(); - bool retValue {}; - - // TODO (reece): This function isn't very efficient - { - AU_LOCK_GUARD(group->cvWorkMutex); - - AuList curLoopReq = state->loopSources; - AuList> curLoopSources; - - auto lenLoopReqs = curLoopReq.size(); - - curLoopSources.resize(lenLoopReqs + 1); - - for (auto i = 0; i < lenLoopReqs; i++) - { - curLoopSources[i] = curLoopReq[i].loopSource; - } - - curLoopSources[lenLoopReqs] = group->eventLs; - - AuList> nextLoopSources; - if (block) - { - // TODO (reece): work on async epoll like abstraction - nextLoopSources = Loop::WaitMultipleOrObjects(curLoopSources, 0); - } - else - { - nextLoopSources.reserve(curLoopSources.size()); - for (const auto &source : curLoopSources) - { - if (source->IsSignaled()) - { - nextLoopSources.push_back(source); - } - } - } - - auto time = Time::CurrentClockMS(); - - state->loopSources.clear(); - state->loopSources.reserve(curLoopReq.size()); - - if (AuExists(nextLoopSources, group->eventLs)) - { - PollInternal(false); - } - - for (const auto &request : curLoopReq) - { - bool remove {}; - bool removeType {}; - - if (AuExists(nextLoopSources, request.loopSource)) - { - remove = true; - removeType = true; - } - else - { - if (request.requestedOffset) - { - if (request.endTime < time) - { - remove = true; - removeType = false; - } - } - } - - if (!remove) - { - state->loopSources.push_back(request); - } - else - { - request.callback(request.loopSource, removeType); - retValue |= removeType; - } - } - - state->inLoopSourceMode = state->loopSources.size(); - } - - return retValue; - } - - void ThreadPool::Shutdown() { // Nested shutdowns can happen; prevent a write lock @@ -755,38 +710,60 @@ namespace Aurora::Async SysAssert(static_cast(tlsWorkerId) == id); } - bool ThreadPool::ScheduleLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) + AuSPtr ThreadPool::ToKernelWorkQueue() { - auto thread = this->GetThreadHandle(workerId); - if (!thread) + return this->GetThreadState()->asyncLoop; + } + + AuSPtr ThreadPool::ToKernelWorkQueue(WorkerId_t workerId) + { + auto worker = this->GetThreadHandle(workerId); + if (!worker) { - return false; + SysPushErrorGen("Couldn't find requested worker"); + return {}; } - - auto group = thread->parent.lock(); + return worker->asyncLoop; + } + void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode) + { + auto states = this->GetThreadHandles(workerId); + if (!states.size()) { - AU_LOCK_GUARD(group->cvWorkMutex); - - AsyncAppWaitSourceRequest req {}; - req.startTime = Time::CurrentClockMS(); - if (timeout) - { - req.requestedOffset = timeout; - req.endTime = req.startTime + timeout; - } - req.loopSource = loopSource; - req.callback = callback; - - if (!AuTryInsert(thread->loopSources, req)) - { - return false; - } - - thread->inLoopSourceMode = thread->loopSources.size(); + SysPushErrorGen("Couldn't find requested worker"); + return; } - return true; + for (const auto &state : states) + { + state->runMode = mode.mode; + if (mode.freqMsTick) + { + state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000); + } + } + } + + ERunMode ThreadPool::GetCurrentThreadRunMode() + { + auto state = this->GetThreadState(); + if (!state) + { + return ERunMode::eEfficient; + } + return state->runMode; + } + + ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId) + { + auto worker = this->GetThreadHandle(workerId); + if (!worker) + { + SysPushErrorGen("Couldn't find requested worker"); + return {}; + } + return worker->runMode; } // Unimplemented fiber hooks, 'twas used for science @@ -869,7 +846,28 @@ namespace Aurora::Async threadState->running = AuThreadPrimitives::EventUnique(true, false, true); threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0); threadState->id = workerId; - //threadState->eventDriven = runner; + threadState->asyncLoop = AuMakeShared(); + threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds + + if (!threadState->asyncLoop) + { + SysPushErrorMem(); + return {}; + } + + if (!threadState->syncSema) + { + SysPushErrorMem(); + return {}; + } + + if (!threadState->syncSema) + { + SysPushErrorMem(); + return {}; + } + + threadState->asyncLoop->SourceAdd(group->eventLs); if (!create) { @@ -1064,6 +1062,38 @@ namespace Aurora::Async return *ret; } + AuList> ThreadPool::GetThreadHandles(WorkerId_t id) + { + AU_LOCK_GUARD(this->rwlock_->AsReadable()); + + auto group = GetGroup(id.first); + if (!group) + { + return {}; + } + + AuList> ret; + if (id.second != Async::kThreadIdAny) + { + AuSPtr *ptr; + if (!AuTryFind(group->workers, id.second, ptr)) + { + return {}; + } + ret.push_back(*ptr); + } + else + { + for (const auto &[key, value] : group->workers) + { + ret.push_back(value); + } + } + + + return ret; + } + AUKN_SYM AuSPtr NewThreadPool() { // apps that don't require async shouldn't be burdened with the overhead of this litl spiner diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index e5194441..ffd27f50 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -69,7 +69,13 @@ namespace Aurora::Async virtual void AssertInThreadGroup(ThreadGroup_t group) override; virtual void AssertWorker(WorkerId_t id) override; - virtual bool ScheduleLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) override; + virtual AuSPtr ToKernelWorkQueue() override; + virtual AuSPtr ToKernelWorkQueue(WorkerId_t workerId) override; + virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override; + virtual ERunMode GetCurrentThreadRunMode() override; + virtual ERunMode GetThreadRunMode(WorkerId_t workerId) override; + + //virtual bool ScheduleLoopSource(const AuSPtr &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer, bool> &callback) override; // Internal API @@ -77,7 +83,6 @@ namespace Aurora::Async bool InternalRunOne(bool block); bool PollInternal(bool block); - bool PollLoopSource(bool block); size_t GetThreadWorkersCount(ThreadGroup_t group); @@ -116,7 +121,8 @@ namespace Aurora::Async AuSPtr GetGroup(ThreadGroup_t type); AuSPtr GetThreadState(); AuSPtr GetThreadHandle(WorkerId_t id); - + AuList> GetThreadHandles(WorkerId_t id); + using ThreadDb_t = AuBST>; ThreadDb_t threads_; diff --git a/Source/Async/ThreadState.hpp b/Source/Async/ThreadState.hpp index 9ed6c430..7c53e3bc 100644 --- a/Source/Async/ThreadState.hpp +++ b/Source/Async/ThreadState.hpp @@ -24,6 +24,7 @@ namespace Aurora::Async }; struct GroupState; + struct AsyncLoop; struct ThreadState { @@ -36,11 +37,13 @@ namespace Aurora::Async AuList> features; bool rejecting {}; bool exiting {}; - bool inLoopSourceMode {}; bool shuttingdown {}; AuThreadPrimitives::EventUnique_t running; //bool running; AuList loopSources; AuList pendingWorkItems; + AuSPtr asyncLoop; + Utility::RateLimiter rateLimiter; + ERunMode runMode; }; } \ No newline at end of file diff --git a/Source/Async/ThreadWorkerQueueShim.cpp b/Source/Async/ThreadWorkerQueueShim.cpp new file mode 100644 index 00000000..0c96e549 --- /dev/null +++ b/Source/Async/ThreadWorkerQueueShim.cpp @@ -0,0 +1,60 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: ThreadWorkerQueueShim.cpp + Date: 2022-3-9 + Author: Reece +***/ +#include +#include "Async.hpp" +#include "ThreadWorkerQueueShim.hpp" + +namespace Aurora::Async +{ + void AsyncLoop::OnFrame() + { + if (this->commitPending_) + { + if (LoopQueue::Commit()) + { + this->commitPending_ = false; + } + } + } + + bool AsyncLoop::AddCallback(const AuSPtr &source, const AuSPtr &subscriber) + { + auto ret = LoopQueue::AddCallback(source, subscriber); + if (ret) + { + this->commitPending_ = true; + } + return ret; + } + + bool AsyncLoop::AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) + { + auto ret = LoopQueue::AddCallbackEx(source, subscriber); + if (ret) + { + this->commitPending_ = true; + } + return ret; + } + + bool AsyncLoop::AddCallback(const AuSPtr &subscriber) + { + auto ret = LoopQueue::AddCallback(subscriber); + if (ret) + { + this->commitPending_ = true; + } + return ret; + } + + bool AsyncLoop::Commit() + { + this->commitPending_ = true; + return true; + } +} \ No newline at end of file diff --git a/Source/Async/ThreadWorkerQueueShim.hpp b/Source/Async/ThreadWorkerQueueShim.hpp new file mode 100644 index 00000000..c6786805 --- /dev/null +++ b/Source/Async/ThreadWorkerQueueShim.hpp @@ -0,0 +1,27 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: ThreadWorkerQueueShim.hpp + Date: 2022-3-9 + Author: Reece +***/ +#pragma once + +#include +#include + +namespace Aurora::Async +{ + struct AsyncLoop : public Loop::LoopQueue + { + void OnFrame(); + + virtual bool AddCallback (const AuSPtr &source, const AuSPtr &subscriber) override; + virtual bool AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) override; + virtual bool AddCallback (const AuSPtr &subscriber) override; + virtual bool Commit () override; + + private: + bool commitPending_ {}; + }; +} \ No newline at end of file diff --git a/Source/Loop/LoopQueue.NT.hpp b/Source/Loop/LoopQueue.NT.hpp index f0e69586..176d3d8c 100644 --- a/Source/Loop/LoopQueue.NT.hpp +++ b/Source/Loop/LoopQueue.NT.hpp @@ -7,6 +7,8 @@ ***/ #pragma once +#include "ILoopSourceEx.hpp" + namespace Aurora::Loop { struct LoopQueue : ILoopQueue @@ -20,14 +22,14 @@ namespace Aurora::Loop AuUInt32 GetSourceCount() override; - bool AddCallback(const AuSPtr &source, const AuSPtr &subscriber) override; - bool AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) override; - bool AddCallback(const AuSPtr &subscriber) override; + virtual bool AddCallback(const AuSPtr &source, const AuSPtr &subscriber) override; + virtual bool AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) override; + virtual bool AddCallback(const AuSPtr &subscriber) override; void ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) override; void ChugHint(bool value) override; - bool Commit() override; + virtual bool Commit() override; bool HasFinished() override; bool IsSignaled() override; diff --git a/Source/Loop/LoopQueue.hpp b/Source/Loop/LoopQueue.hpp new file mode 100644 index 00000000..d316cd2a --- /dev/null +++ b/Source/Loop/LoopQueue.hpp @@ -0,0 +1,14 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: LoopQueue.cpp + Date: 2022-3-9 + Author: Reece +***/ +#pragma once + +#if defined(AURORA_IS_MODERNNT_DERIVED) + #include "LoopQueue.NT.hpp" +#elif defined(AURORA_PLATFORM_LINX) + #include "LoopQueue.Linux.hpp" +#endif \ No newline at end of file diff --git a/Source/RuntimeInternal.hpp b/Source/RuntimeInternal.hpp index cb7a1f50..e3bcabb2 100644 --- a/Source/RuntimeInternal.hpp +++ b/Source/RuntimeInternal.hpp @@ -22,6 +22,8 @@ //_WIN32_WINNT=0x0601 #include + #include + #if defined(_AUHAS_ASIO) #include #endif