[+] AuAsync::IThreadPool::GetIOProcessor

[+] AuAsync::IThreadPool::GetIONetInterface
[+] AuAsync::IThreadPool::GetIONetWorker
[+] AuAsync::IWorkItem::SetSchedByLoopSource
This commit is contained in:
Reece Wilson 2023-12-01 09:22:51 +00:00
parent cde646ae86
commit a189151c59
12 changed files with 240 additions and 12 deletions

View File

@ -13,6 +13,12 @@ namespace Aurora::IO::Loop
struct ILoopSource; struct ILoopSource;
} }
namespace Aurora::IO::Net
{
struct INetInterface;
struct INetWorker;
}
namespace Aurora::Async namespace Aurora::Async
{ {
struct IThreadPool struct IThreadPool
@ -77,6 +83,11 @@ namespace Aurora::Async
// //
virtual WorkerId_t GetCurrentThread() = 0; virtual WorkerId_t GetCurrentThread() = 0;
//
virtual AuSPtr<IO::IIOProcessor> GetIOProcessor(WorkerId_t id) = 0;
virtual AuSPtr<IO::Net::INetInterface> GetIONetInterface(WorkerId_t id) = 0;
virtual AuSPtr<IO::Net::INetWorker> GetIONetWorker(WorkerId_t id) = 0;
// Synchronization // Synchronization
// Note: syncing to yourself will nullify requireSignal to prevent deadlock conditions // Note: syncing to yourself will nullify requireSignal to prevent deadlock conditions
virtual bool Sync(WorkerId_t workerId, virtual bool Sync(WorkerId_t workerId,

View File

@ -22,6 +22,8 @@ namespace Aurora::Async
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &pWorkItem) = 0; virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &pWorkItem) = 0;
virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItems) = 0; virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItems) = 0;
virtual AuSPtr<IWorkItem> SetSchedByLoopSource(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) = 0;
// ms = time relative to the current time // ms = time relative to the current time
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0; virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;

View File

@ -33,6 +33,7 @@ namespace Aurora::Async
AuUInt32 reschedClockAbsMs {}; AuUInt32 reschedClockAbsMs {};
AuUInt64 reschedClockAbsNs {}; AuUInt64 reschedClockAbsNs {};
AuUInt64 reschedSteadyClockAbsNs {}; AuUInt64 reschedSteadyClockAbsNs {};
AuSPtr<IO::Loop::ILoopSource> pLoopSource {};
// @hideinitializer // @hideinitializer
IThreadPool *pool; IThreadPool *pool;
}; };

View File

@ -228,6 +228,21 @@ namespace Aurora::Async
ThreadPool::AssertWorker(id); ThreadPool::AssertWorker(id);
} }
AuSPtr<AuIO::IIOProcessor> AsyncApp::GetIOProcessor(WorkerId_t pid)
{
return ThreadPool::GetIOProcessor(pid);
}
AuSPtr<AuIO::Net::INetInterface> AsyncApp::GetIONetInterface(WorkerId_t pid)
{
return ThreadPool::GetIONetInterface(pid);
}
AuSPtr<AuIO::Net::INetWorker> AsyncApp::GetIONetWorker(WorkerId_t pid)
{
return ThreadPool::GetIONetWorker(pid);
}
AuSPtr<AuLoop::ILoopQueue> AsyncApp::ToKernelWorkQueue() AuSPtr<AuLoop::ILoopQueue> AsyncApp::ToKernelWorkQueue()
{ {
return ThreadPool::ToKernelWorkQueue(); return ThreadPool::ToKernelWorkQueue();

View File

@ -42,7 +42,9 @@ namespace Aurora::Async
void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override; void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override;
void AssertInThreadGroup(ThreadGroup_t group) override; void AssertInThreadGroup(ThreadGroup_t group) override;
void AssertWorker(WorkerId_t id) override; void AssertWorker(WorkerId_t id) override;
AuSPtr<AuIO::IIOProcessor> GetIOProcessor(WorkerId_t pid) override;
AuSPtr<AuIO::Net::INetInterface> GetIONetInterface(WorkerId_t pid) override;
AuSPtr<AuIO::Net::INetWorker> GetIONetWorker(WorkerId_t pid) override;
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override; AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override; AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;

View File

@ -7,6 +7,8 @@
***/ ***/
#pragma once #pragma once
#include "AuThreadStateSingletons.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
struct AsyncLoop; struct AsyncLoop;
@ -68,6 +70,7 @@ namespace Aurora::Async
AuSPtr<AsyncLoop> asyncLoop; AuSPtr<AsyncLoop> asyncLoop;
ThreadStateStack stackState; ThreadStateStack stackState;
ThreadStateFeatureCallbacks tlsFeatures; ThreadStateFeatureCallbacks tlsFeatures;
ThreadStateSingletons singletons;
bool Init(); bool Init();
}; };

View File

@ -10,5 +10,59 @@
namespace Aurora::Async namespace Aurora::Async
{ {
AuSPtr<AuIO::IIOProcessor> ThreadStateSingletons::GetIOProcessor(AuWorkerPId_t pid)
{
if (this->pIOProcessors)
{
return this->pIOProcessors;
}
{
AU_LOCK_GUARD(this->mutex);
if (this->pIOProcessors)
{
return this->pIOProcessors;
}
return this->pIOProcessors = AuIO::NewIOProcessorOnThread(false, pid);
}
}
void ThreadStateSingletons::TryInitNet(AuWorkerPId_t pid)
{
if (this->pNetInterface)
{
return;
}
AU_LOCK_GUARD(this->mutex);
auto pNetProcessor = AuNet::NewNetworkInterface();
if (!pNetProcessor)
{
return;
}
auto pNetWorker = pNetProcessor->GetWorkersService()->Attach(this->GetIOProcessor(pid));
if (!pNetWorker)
{
return;
}
this->pNetWorker = pNetWorker;
this->pNetInterface = pNetProcessor;
}
AuSPtr<AuIO::Net::INetInterface> ThreadStateSingletons::GetIONetInterface(AuWorkerPId_t pid)
{
this->TryInitNet(pid);
return this->pNetInterface;
}
AuSPtr<AuIO::Net::INetWorker> ThreadStateSingletons::GetIONetWorker(AuWorkerPId_t pid)
{
this->TryInitNet(pid);
return this->pNetWorker;
}
} }

View File

@ -7,7 +7,23 @@
***/ ***/
#pragma once #pragma once
#include <Aurora/IO/Net/NetExperimental.hpp>
namespace Aurora::Async namespace Aurora::Async
{ {
struct ThreadStateSingletons
{
AuCriticalSection mutex;
AuSPtr<AuIO::IIOProcessor> pIOProcessors;
AuSPtr<AuIO::IIOProcessor> pIOProcessor;
AuSPtr<AuIO::Net::INetInterface> pNetInterface;
AuSPtr<AuIO::Net::INetWorker> pNetWorker;
AuSPtr<AuIO::IIOProcessor> GetIOProcessor(AuWorkerPId_t pid);
AuSPtr<AuIO::Net::INetInterface> GetIONetInterface(AuWorkerPId_t pid);
AuSPtr<AuIO::Net::INetWorker> GetIONetWorker(AuWorkerPId_t pid);
void TryInitNet(AuWorkerPId_t pid);
};
} }

View File

@ -739,6 +739,36 @@ namespace Aurora::Async
return tlsWorkerId; return tlsWorkerId;
} }
AuSPtr<AuIO::IIOProcessor> ThreadPool::GetIOProcessor(WorkerId_t pid)
{
if (auto pState = this->GetThreadHandle(pid))
{
return pState->singletons.GetIOProcessor({ this->SharedFromThis(), pid });
}
return {};
}
AuSPtr<AuIO::Net::INetInterface> ThreadPool::GetIONetInterface(WorkerId_t pid)
{
if (auto pState = this->GetThreadHandle(pid))
{
return pState->singletons.GetIONetInterface({ this->SharedFromThis(), pid });
}
return {};
}
AuSPtr<AuIO::Net::INetWorker> ThreadPool::GetIONetWorker(WorkerId_t pid)
{
if (auto pState = this->GetThreadHandle(pid))
{
return pState->singletons.GetIONetWorker({ this->SharedFromThis(), pid });
}
return {};
}
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
{ {
AU_LOCK_GUARD(this->pRWReadView); AU_LOCK_GUARD(this->pRWReadView);

View File

@ -60,6 +60,10 @@ namespace Aurora::Async
virtual WorkerId_t GetCurrentThread() override; virtual WorkerId_t GetCurrentThread() override;
virtual AuSPtr<AuIO::IIOProcessor> GetIOProcessor(WorkerId_t id) override;
virtual AuSPtr<AuIO::Net::INetInterface> GetIONetInterface(WorkerId_t pid) override;
virtual AuSPtr<AuIO::Net::INetWorker> GetIONetWorker(WorkerId_t id) override;
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override; virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
virtual void Signal(WorkerId_t workerId) override; virtual void Signal(WorkerId_t workerId) override;
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override; virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;

View File

@ -43,8 +43,10 @@ namespace Aurora::Async
WorkItem::~WorkItem() WorkItem::~WorkItem()
{ {
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
//Fail(); {
pIOWatch->StopWatch();
}
} }
AuSPtr<IWorkItem> WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem) AuSPtr<IWorkItem> WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
@ -172,6 +174,12 @@ namespace Aurora::Async
return AU_SHARED_FROM_THIS; return AU_SHARED_FROM_THIS;
} }
AuSPtr<IWorkItem> WorkItem::SetSchedByLoopSource(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource)
{
this->pIOWatchLS = pLoopSource;
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedTime(AuUInt32 ms) AuSPtr<IWorkItem> WorkItem::SetSchedTime(AuUInt32 ms)
{ {
this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS<AuUInt64>(ms); this->dispatchTimeNs_ = Time::SteadyClockNS() + AuMSToNS<AuUInt64>(ms);
@ -232,6 +240,22 @@ namespace Aurora::Async
this->dispatchPending_ = true; this->dispatchPending_ = true;
if (this->pIOWatchLS)
{
if (!this->pIOWatchLS->IsSignaled())
{
if (!Schedule())
{
this->Fail();
}
return;
}
else
{
AuResetMember(this->pIOWatchLS);
}
}
if (Time::SteadyClockNS() < this->dispatchTimeNs_) if (Time::SteadyClockNS() < this->dispatchTimeNs_)
{ {
if (!Schedule()) if (!Schedule())
@ -324,11 +348,14 @@ namespace Aurora::Async
AuSPtr<ThreadState> WorkItem::GetState() AuSPtr<ThreadState> WorkItem::GetState()
{ {
if (!this->worker_.HasValue()) if (this->worker_.HasValue())
{
return this->owner_->GetThreadHandle(this->worker_.value());
}
else
{ {
return {}; return {};
} }
return this->owner_->GetThreadHandle(this->worker_.value());
} }
void WorkItem::RunAsyncLocked2() void WorkItem::RunAsyncLocked2()
@ -398,12 +425,15 @@ namespace Aurora::Async
{ {
SetSchedSteadyTimeNsAbs(info.reschedSteadyClockAbsNs); SetSchedSteadyTimeNsAbs(info.reschedSteadyClockAbsNs);
} }
else if (info.pLoopSource)
{
SetSchedByLoopSource(info.pLoopSource);
}
if (!WaitForLocked(info.waitFor)) if (!WaitForLocked(info.waitFor))
{ {
this->Fail(); this->Fail();
} }
} }
[[fallthrough]]; [[fallthrough]];
case ETickType::eRerun: case ETickType::eRerun:
@ -419,21 +449,38 @@ namespace Aurora::Async
} }
this->finished = true; this->finished = true;
if (this->finishedEvent_) if (this->finishedEvent_)
{ {
this->finishedEvent_->Set(); this->finishedEvent_->Set();
} }
for (auto &waiter : this->waiters_) for (auto &waiter : AuExchange(this->waiters_, {}))
{ {
AuReinterpretCast<WorkItem>(waiter)->DispatchExLocked(true); AuReinterpretCast<WorkItem>(waiter)->DispatchExLocked(true);
} }
this->waitOn_.clear();
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
{
pIOWatch->StopWatch();
}
AuResetMember(this->pIOWatchLS);
} }
void WorkItem::Fail() void WorkItem::Fail()
{ {
failed = true; failed = true;
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
{
pIOWatch->StopWatch();
}
AuResetMember(this->pIOWatchLS);
if (auto task_ = AuExchange(this->task_, {})) if (auto task_ = AuExchange(this->task_, {}))
{ {
task_->OnFailure(); task_->OnFailure();
@ -530,9 +577,49 @@ namespace Aurora::Async
} }
bool WorkItem::Schedule() bool WorkItem::Schedule()
{
if (auto pLoopSource = this->pIOWatchLS)
{
if (this->pIOWatch)
{
return true;
}
auto pState = this->GetState();
if (!pState)
{
return false;
}
auto pIOProcessor = pState->singletons.GetIOProcessor(this->worker_.value());
if (!pIOProcessor)
{
return false;
}
this->pIOWatch = pIOProcessor->StartSimpleLSWatch(pLoopSource, AuMakeSharedThrow<AuIO::IIOSimpleEventListenerFunctional>([=]()
{
this->Dispatch();
}, [=]()
{
this->Dispatch();
}, [=]()
{
this->Dispatch();
}));
if (!this->pIOWatch)
{
return false;
}
return true;
}
else
{ {
return Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_.value(), AuSharedFromThis()); return Async::Schedule(this->dispatchTimeNs_, this->owner_, this->worker_.value(), AuSharedFromThis());
} }
}
void WorkItem::SendOff() void WorkItem::SendOff()
{ {

View File

@ -32,6 +32,7 @@ namespace Aurora::Async
AuSPtr<IWorkItem> SetSchedTimeAbs(AuUInt32 ms) override; AuSPtr<IWorkItem> SetSchedTimeAbs(AuUInt32 ms) override;
AuSPtr<IWorkItem> SetSchedTimeNsAbs(AuUInt64 ns) override; AuSPtr<IWorkItem> SetSchedTimeNsAbs(AuUInt64 ns) override;
AuSPtr<IWorkItem> SetSchedSteadyTimeNsAbs(AuUInt64 ns) override; AuSPtr<IWorkItem> SetSchedSteadyTimeNsAbs(AuUInt64 ns) override;
AuSPtr<IWorkItem> SetSchedByLoopSource(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) override;
AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) override; AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) override;
AuSPtr<IWorkItem> Dispatch() override; AuSPtr<IWorkItem> Dispatch() override;
@ -80,6 +81,8 @@ namespace Aurora::Async
AuThreadPrimitives::Event finishedEvent_; AuThreadPrimitives::Event finishedEvent_;
AuUInt32 uShutdownCookie {}; AuUInt32 uShutdownCookie {};
AuOptionalEx<AuUInt32> optOtherCookie {}; AuOptionalEx<AuUInt32> optOtherCookie {};
AuSPtr<AuIO::IIOProcessorItem> pIOWatch;
AuSPtr<IO::Loop::ILoopSource> pIOWatchLS;
bool finished {}; bool finished {};
bool failed {}; bool failed {};