diff --git a/Include/Aurora/Async/IThreadPool.hpp b/Include/Aurora/Async/IThreadPool.hpp index d2fc99c9..cf6242cd 100644 --- a/Include/Aurora/Async/IThreadPool.hpp +++ b/Include/Aurora/Async/IThreadPool.hpp @@ -13,6 +13,12 @@ namespace Aurora::IO::Loop struct ILoopSource; } +namespace Aurora::IO::Net +{ + struct INetInterface; + struct INetWorker; +} + namespace Aurora::Async { struct IThreadPool @@ -77,6 +83,11 @@ namespace Aurora::Async // virtual WorkerId_t GetCurrentThread() = 0; + // + virtual AuSPtr GetIOProcessor(WorkerId_t id) = 0; + virtual AuSPtr GetIONetInterface(WorkerId_t id) = 0; + virtual AuSPtr GetIONetWorker(WorkerId_t id) = 0; + // Synchronization // Note: syncing to yourself will nullify requireSignal to prevent deadlock conditions virtual bool Sync(WorkerId_t workerId, diff --git a/Include/Aurora/Async/IWorkItem.hpp b/Include/Aurora/Async/IWorkItem.hpp index 1b10de9d..b664e4a7 100644 --- a/Include/Aurora/Async/IWorkItem.hpp +++ b/Include/Aurora/Async/IWorkItem.hpp @@ -22,6 +22,8 @@ namespace Aurora::Async virtual AuSPtr WaitFor(const AuSPtr &pWorkItem) = 0; virtual AuSPtr WaitFor(const AuList> &workItems) = 0; + virtual AuSPtr SetSchedByLoopSource(const AuSPtr &pLoopSource) = 0; + // ms = time relative to the current time virtual AuSPtr SetSchedTime(AuUInt32 ms) = 0; diff --git a/Include/Aurora/Async/IWorkItemHandler.hpp b/Include/Aurora/Async/IWorkItemHandler.hpp index 01ee03f6..2bd9fe8e 100644 --- a/Include/Aurora/Async/IWorkItemHandler.hpp +++ b/Include/Aurora/Async/IWorkItemHandler.hpp @@ -33,6 +33,7 @@ namespace Aurora::Async AuUInt32 reschedClockAbsMs {}; AuUInt64 reschedClockAbsNs {}; AuUInt64 reschedSteadyClockAbsNs {}; + AuSPtr pLoopSource {}; // @hideinitializer IThreadPool *pool; }; diff --git a/Source/Async/AsyncApp.cpp b/Source/Async/AsyncApp.cpp index ab9a4f6c..770b2618 100644 --- a/Source/Async/AsyncApp.cpp +++ b/Source/Async/AsyncApp.cpp @@ -35,13 +35,13 @@ namespace Aurora::Async void AsyncApp::Start() { ThreadPool::SetRunningMode(true); - SysAssert(ThreadPool::Create({0, 0})); + SysAssert(ThreadPool::Create({ 0, 0 })); StartSched(); // this is now an init once function } void AsyncApp::Main() { - ThreadPool::Entrypoint({0, 0}); + ThreadPool::Entrypoint({ 0, 0 }); } void AsyncApp::SetConsoleCommandDispatcher(WorkerId_t id) @@ -152,7 +152,7 @@ namespace Aurora::Async { return ThreadPool::RunAllPending(); } - + void AsyncApp::Shutdown() { ThreadPool::Shutdown(); @@ -228,6 +228,21 @@ namespace Aurora::Async ThreadPool::AssertWorker(id); } + AuSPtr AsyncApp::GetIOProcessor(WorkerId_t pid) + { + return ThreadPool::GetIOProcessor(pid); + } + + AuSPtr AsyncApp::GetIONetInterface(WorkerId_t pid) + { + return ThreadPool::GetIONetInterface(pid); + } + + AuSPtr AsyncApp::GetIONetWorker(WorkerId_t pid) + { + return ThreadPool::GetIONetWorker(pid); + } + AuSPtr AsyncApp::ToKernelWorkQueue() { return ThreadPool::ToKernelWorkQueue(); diff --git a/Source/Async/AsyncApp.hpp b/Source/Async/AsyncApp.hpp index f5249160..8f4c070a 100644 --- a/Source/Async/AsyncApp.hpp +++ b/Source/Async/AsyncApp.hpp @@ -42,7 +42,9 @@ namespace Aurora::Async void AddFeature(WorkerId_t id, AuSPtr feature, bool async) override; void AssertInThreadGroup(ThreadGroup_t group) override; void AssertWorker(WorkerId_t id) override; - + AuSPtr GetIOProcessor(WorkerId_t pid) override; + AuSPtr GetIONetInterface(WorkerId_t pid) override; + AuSPtr GetIONetWorker(WorkerId_t pid) override; AuSPtr ToKernelWorkQueue() override; AuSPtr ToKernelWorkQueue(WorkerId_t workerId) override; diff --git a/Source/Async/AuThreadState.hpp b/Source/Async/AuThreadState.hpp index eb891c0e..2fb0aa7d 100644 --- a/Source/Async/AuThreadState.hpp +++ b/Source/Async/AuThreadState.hpp @@ -7,6 +7,8 @@ ***/ #pragma once +#include "AuThreadStateSingletons.hpp" + namespace Aurora::Async { struct AsyncLoop; @@ -68,6 +70,7 @@ namespace Aurora::Async AuSPtr asyncLoop; ThreadStateStack stackState; ThreadStateFeatureCallbacks tlsFeatures; + ThreadStateSingletons singletons; bool Init(); }; diff --git a/Source/Async/AuThreadStateSingletons.cpp b/Source/Async/AuThreadStateSingletons.cpp index 035ac357..f01c46d3 100644 --- a/Source/Async/AuThreadStateSingletons.cpp +++ b/Source/Async/AuThreadStateSingletons.cpp @@ -10,5 +10,59 @@ namespace Aurora::Async { + AuSPtr ThreadStateSingletons::GetIOProcessor(AuWorkerPId_t pid) + { + if (this->pIOProcessors) + { + return this->pIOProcessors; + } + { + AU_LOCK_GUARD(this->mutex); + + if (this->pIOProcessors) + { + return this->pIOProcessors; + } + + return this->pIOProcessors = AuIO::NewIOProcessorOnThread(false, pid); + } + } + + void ThreadStateSingletons::TryInitNet(AuWorkerPId_t pid) + { + if (this->pNetInterface) + { + return; + } + + AU_LOCK_GUARD(this->mutex); + + auto pNetProcessor = AuNet::NewNetworkInterface(); + if (!pNetProcessor) + { + return; + } + + auto pNetWorker = pNetProcessor->GetWorkersService()->Attach(this->GetIOProcessor(pid)); + if (!pNetWorker) + { + return; + } + + this->pNetWorker = pNetWorker; + this->pNetInterface = pNetProcessor; + } + + AuSPtr ThreadStateSingletons::GetIONetInterface(AuWorkerPId_t pid) + { + this->TryInitNet(pid); + return this->pNetInterface; + } + + AuSPtr ThreadStateSingletons::GetIONetWorker(AuWorkerPId_t pid) + { + this->TryInitNet(pid); + return this->pNetWorker; + } } \ No newline at end of file diff --git a/Source/Async/AuThreadStateSingletons.hpp b/Source/Async/AuThreadStateSingletons.hpp index ce583d18..3c085087 100644 --- a/Source/Async/AuThreadStateSingletons.hpp +++ b/Source/Async/AuThreadStateSingletons.hpp @@ -7,7 +7,23 @@ ***/ #pragma once +#include + namespace Aurora::Async { + struct ThreadStateSingletons + { + AuCriticalSection mutex; + AuSPtr pIOProcessors; + + AuSPtr pIOProcessor; + AuSPtr pNetInterface; + AuSPtr pNetWorker; + + AuSPtr GetIOProcessor(AuWorkerPId_t pid); + AuSPtr GetIONetInterface(AuWorkerPId_t pid); + AuSPtr GetIONetWorker(AuWorkerPId_t pid); + void TryInitNet(AuWorkerPId_t pid); + }; } \ No newline at end of file diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index c5876adc..1c0b286e 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -738,7 +738,37 @@ namespace Aurora::Async { return tlsWorkerId; } - + + AuSPtr ThreadPool::GetIOProcessor(WorkerId_t pid) + { + if (auto pState = this->GetThreadHandle(pid)) + { + return pState->singletons.GetIOProcessor({ this->SharedFromThis(), pid }); + } + + return {}; + } + + AuSPtr ThreadPool::GetIONetInterface(WorkerId_t pid) + { + if (auto pState = this->GetThreadHandle(pid)) + { + return pState->singletons.GetIONetInterface({ this->SharedFromThis(), pid }); + } + + return {}; + } + + AuSPtr ThreadPool::GetIONetWorker(WorkerId_t pid) + { + if (auto pState = this->GetThreadHandle(pid)) + { + return pState->singletons.GetIONetWorker({ this->SharedFromThis(), pid }); + } + + return {}; + } + bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) { AU_LOCK_GUARD(this->pRWReadView); diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index 5177ed4d..977a10e5 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -60,6 +60,10 @@ namespace Aurora::Async virtual WorkerId_t GetCurrentThread() override; + virtual AuSPtr GetIOProcessor(WorkerId_t id) override; + virtual AuSPtr GetIONetInterface(WorkerId_t pid) override; + virtual AuSPtr GetIONetWorker(WorkerId_t id) override; + virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override; virtual void Signal(WorkerId_t workerId) override; virtual AuSPtr WorkerToLoopSource(WorkerId_t id) override; diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 76da1608..271c07f2 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -43,8 +43,10 @@ namespace Aurora::Async WorkItem::~WorkItem() { - - //Fail(); + if (auto pIOWatch = AuExchange(this->pIOWatch, {})) + { + pIOWatch->StopWatch(); + } } AuSPtr WorkItem::WaitFor(const AuSPtr &workItem) @@ -172,6 +174,12 @@ namespace Aurora::Async return AU_SHARED_FROM_THIS; } + AuSPtr WorkItem::SetSchedByLoopSource(const AuSPtr &pLoopSource) + { + this->pIOWatchLS = pLoopSource; + return AU_SHARED_FROM_THIS; + } + AuSPtr WorkItem::SetSchedTime(AuUInt32 ms) { this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS(ms); @@ -232,6 +240,22 @@ namespace Aurora::Async this->dispatchPending_ = true; + if (this->pIOWatchLS) + { + if (!this->pIOWatchLS->IsSignaled()) + { + if (!Schedule()) + { + this->Fail(); + } + return; + } + else + { + AuResetMember(this->pIOWatchLS); + } + } + if (Time::SteadyClockNS() < this->dispatchTimeNs_) { if (!Schedule()) @@ -324,11 +348,14 @@ namespace Aurora::Async AuSPtr WorkItem::GetState() { - if (!this->worker_.HasValue()) + if (this->worker_.HasValue()) + { + return this->owner_->GetThreadHandle(this->worker_.value()); + } + else { return {}; } - return this->owner_->GetThreadHandle(this->worker_.value()); } void WorkItem::RunAsyncLocked2() @@ -398,12 +425,15 @@ namespace Aurora::Async { SetSchedSteadyTimeNsAbs(info.reschedSteadyClockAbsNs); } + else if (info.pLoopSource) + { + SetSchedByLoopSource(info.pLoopSource); + } if (!WaitForLocked(info.waitFor)) { this->Fail(); } - } [[fallthrough]]; case ETickType::eRerun: @@ -419,21 +449,38 @@ namespace Aurora::Async } this->finished = true; + if (this->finishedEvent_) { this->finishedEvent_->Set(); } - for (auto &waiter : this->waiters_) + for (auto &waiter : AuExchange(this->waiters_, {})) { AuReinterpretCast(waiter)->DispatchExLocked(true); } + + this->waitOn_.clear(); + + if (auto pIOWatch = AuExchange(this->pIOWatch, {})) + { + pIOWatch->StopWatch(); + } + + AuResetMember(this->pIOWatchLS); } void WorkItem::Fail() { failed = true; + if (auto pIOWatch = AuExchange(this->pIOWatch, {})) + { + pIOWatch->StopWatch(); + } + + AuResetMember(this->pIOWatchLS); + if (auto task_ = AuExchange(this->task_, {})) { task_->OnFailure(); @@ -531,7 +578,47 @@ namespace Aurora::Async bool WorkItem::Schedule() { - return Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_.value(), AuSharedFromThis()); + if (auto pLoopSource = this->pIOWatchLS) + { + if (this->pIOWatch) + { + return true; + } + + auto pState = this->GetState(); + if (!pState) + { + return false; + } + + auto pIOProcessor = pState->singletons.GetIOProcessor(this->worker_.value()); + if (!pIOProcessor) + { + return false; + } + + this->pIOWatch = pIOProcessor->StartSimpleLSWatch(pLoopSource, AuMakeSharedThrow([=]() + { + this->Dispatch(); + }, [=]() + { + this->Dispatch(); + }, [=]() + { + this->Dispatch(); + })); + + if (!this->pIOWatch) + { + return false; + } + + return true; + } + else + { + return Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_.value(), AuSharedFromThis()); + } } void WorkItem::SendOff() diff --git a/Source/Async/WorkItem.hpp b/Source/Async/WorkItem.hpp index 141c2a9f..3a422f99 100644 --- a/Source/Async/WorkItem.hpp +++ b/Source/Async/WorkItem.hpp @@ -32,6 +32,7 @@ namespace Aurora::Async AuSPtr SetSchedTimeAbs(AuUInt32 ms) override; AuSPtr SetSchedTimeNsAbs(AuUInt64 ns) override; AuSPtr SetSchedSteadyTimeNsAbs(AuUInt64 ns) override; + AuSPtr SetSchedByLoopSource(const AuSPtr &pLoopSource) override; AuSPtr Then(const AuSPtr &next) override; AuSPtr Dispatch() override; @@ -80,6 +81,8 @@ namespace Aurora::Async AuThreadPrimitives::Event finishedEvent_; AuUInt32 uShutdownCookie {}; AuOptionalEx optOtherCookie {}; + AuSPtr pIOWatch; + AuSPtr pIOWatchLS; bool finished {}; bool failed {};