diff --git a/Source/Async/AuThreadState.cpp b/Source/Async/AuThreadState.cpp index f52453b2..14cbfe6d 100644 --- a/Source/Async/AuThreadState.cpp +++ b/Source/Async/AuThreadState.cpp @@ -71,6 +71,12 @@ namespace Aurora::Async return true; } + + + void ThreadStateBase::Deinit() + { + AuResetMember(this->singletons); + } void ThreadStateSync::SetEvent(bool bBoth, bool bHasWork) { diff --git a/Source/Async/AuThreadState.hpp b/Source/Async/AuThreadState.hpp index de42fd34..b8e5b2a1 100644 --- a/Source/Async/AuThreadState.hpp +++ b/Source/Async/AuThreadState.hpp @@ -76,5 +76,6 @@ namespace Aurora::Async ThreadStateSingletons singletons; bool Init(); + void Deinit(); }; } \ No newline at end of file diff --git a/Source/Async/AuThreadStateSingletons.cpp b/Source/Async/AuThreadStateSingletons.cpp index 6950c6f1..42479866 100644 --- a/Source/Async/AuThreadStateSingletons.cpp +++ b/Source/Async/AuThreadStateSingletons.cpp @@ -7,6 +7,7 @@ ***/ #include #include "AuThreadStateSingletons.hpp" +#include namespace Aurora::Async { @@ -29,7 +30,7 @@ namespace Aurora::Async } } - AuSPtr ThreadStateSingletons::GetIOGroup() + AuSPtr ThreadStateSingletons::GetIOGroup(AuWorkerPId_t pid) { if (this->pGroup) { @@ -44,7 +45,16 @@ namespace Aurora::Async return this->pGroup; } - return this->pGroup = AuIO::CompletionGroup::NewCompletionGroup(); + this->pGroup = AuIO::CompletionGroup::NewCompletionGroup(); + + if (!this->pGroup) + { + return {}; + } + + AuStaticCast(this->pGroup)->MakeInternalAsync(pid); + + return this->pGroup; } } diff --git a/Source/Async/AuThreadStateSingletons.hpp b/Source/Async/AuThreadStateSingletons.hpp index 9ccbe20d..0f27a549 100644 --- a/Source/Async/AuThreadStateSingletons.hpp +++ b/Source/Async/AuThreadStateSingletons.hpp @@ -22,7 +22,7 @@ namespace Aurora::Async AuSPtr pNetWorker; AuSPtr pGroup; - AuSPtr GetIOGroup(); + AuSPtr GetIOGroup(AuWorkerPId_t pid); AuSPtr GetIOProcessor(AuWorkerPId_t pid); AuSPtr GetIONetInterface(AuWorkerPId_t pid); AuSPtr GetIONetWorker(AuWorkerPId_t pid); diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index edee1937..706fe5a4 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -801,11 +801,11 @@ namespace Aurora::Async return tlsWorkerId; } - AuSPtr ThreadPool::GetIOProcessor(WorkerId_t pid) + AuSPtr ThreadPool::GetIOProcessor(WorkerId_t id) { - if (auto pState = this->GetThreadHandle(pid)) + if (auto pState = this->GetThreadHandle(id)) { - return pState->singletons.GetIOProcessor({ this->SharedFromThis(), pid }); + return pState->singletons.GetIOProcessor({ this->SharedFromThis(), id }); } return {}; @@ -815,27 +815,27 @@ namespace Aurora::Async { if (auto pState = this->GetThreadHandle(id)) { - return pState->singletons.GetIOGroup(); + return pState->singletons.GetIOGroup({ this->SharedFromThis(), id }); } return {}; } - AuSPtr ThreadPool::GetIONetInterface(WorkerId_t pid) + AuSPtr ThreadPool::GetIONetInterface(WorkerId_t id) { - if (auto pState = this->GetThreadHandle(pid)) + if (auto pState = this->GetThreadHandle(id)) { - return pState->singletons.GetIONetInterface({ this->SharedFromThis(), pid }); + return pState->singletons.GetIONetInterface({ this->SharedFromThis(), id }); } return {}; } - AuSPtr ThreadPool::GetIONetWorker(WorkerId_t pid) + AuSPtr ThreadPool::GetIONetWorker(WorkerId_t id) { - if (auto pState = this->GetThreadHandle(pid)) + if (auto pState = this->GetThreadHandle(id)) { - return pState->singletons.GetIONetWorker({ this->SharedFromThis(), pid }); + return pState->singletons.GetIONetWorker({ this->SharedFromThis(), id }); } return {}; @@ -1418,6 +1418,8 @@ namespace Aurora::Async { state->Decommit(id.second); } + + pLocalState->Deinit(); } AuSPtr ThreadPool::GetGroup(ThreadGroup_t type) diff --git a/Source/IO/CompletionGroup/CompletionGroup.cpp b/Source/IO/CompletionGroup/CompletionGroup.cpp index 7a7b4abe..e3e5ea13 100644 --- a/Source/IO/CompletionGroup/CompletionGroup.cpp +++ b/Source/IO/CompletionGroup/CompletionGroup.cpp @@ -186,6 +186,8 @@ namespace Aurora::IO::CompletionGroup // anyone else? this->ResetMemoryPins(); } + + this->TryCollectInternal(); } void CompletionGroup::AddWorkItem(const AuSPtr &pCompletable) @@ -194,6 +196,7 @@ namespace Aurora::IO::CompletionGroup AU_LOCK_GUARD(this->mutex); this->workItems.push_back(pCompletable); AuAtomicAdd(&this->uAdded, 1u); + this->TryCreateInternal(); } void CompletionGroup::UnsafeRemoveItem(const AuSPtr &pCompletable) @@ -335,15 +338,8 @@ namespace Aurora::IO::CompletionGroup AuResetMember(this->pAndBarrier); } - AuSPtr CompletionGroup::OnSingleCompletion() + AuSPtr CompletionGroup::OnSingleCompletionIntl() { - AU_LOCK_GUARD(this->cs); - - if (this->pAnyBarrier) - { - return this->pAnyBarrier; - } - if (this->bNoAny) { SysPushErrorGeneric("To prevent double LoopQueue::SourceAdd on the same source, you must not call ::OnSingleCompletion() after ::OnCompletion() in this specific order"); @@ -362,6 +358,93 @@ namespace Aurora::IO::CompletionGroup return pRet; } + AuSPtr CompletionGroup::OnSingleCompletion() + { + AU_LOCK_GUARD(this->cs); + + if (this->pAnyBarrier) + { + return this->pAnyBarrier; + } + + if (this->pAnyBarrierInternal) + { + return this->pAnyBarrierInternal; + } + + if (this->workerId) + { + TryCreateInternal(); + return this->pAnyBarrierInternal; + } + else + { + return this->pAnyBarrier = OnSingleCompletionIntl(); + } + } + + void CompletionGroup::TryCreateInternal() + { + if (!this->workerId) + { + return; + } + + if (this->pAnyBarrierInternal) + { + return; + } + + if (this->workerId != AuAsync::GetCurrentWorkerPId()) + { + return; + } + + this->pAnyBarrierInternal = OnSingleCompletionIntl(); + this->SetNeverEnding(true); + + if (auto pKernel = this->workerId.GetPool()->ToKernelWorkQueue(this->workerId)) + { + (void)pKernel->Commit(); + } + } + + void CompletionGroup::TryCollectInternal() + { + if (!this->workItems.empty()) + { + return; + } + + if (!this->workerId) + { + return; + } + + if (!this->pAnyBarrierInternal) + { + return; + } + + if (this->workerId != AuAsync::GetCurrentWorkerPId()) + { + return; + } + + AuResetMember(this->pAnyBarrierInternal); + + if (auto pKernel = this->workerId.GetPool()->ToKernelWorkQueue(this->workerId)) + { + (void)pKernel->Commit(); + } + } + + void CompletionGroup::MakeInternalAsync(AuWorkerID pid) + { + this->SetNeverEnding(true); + this->workerId = pid; + } + AUKN_SYM AuSPtr NewCompletionGroup() { return AuMakeShared(); diff --git a/Source/IO/CompletionGroup/CompletionGroup.hpp b/Source/IO/CompletionGroup/CompletionGroup.hpp index 2ee7417b..1daba7c2 100644 --- a/Source/IO/CompletionGroup/CompletionGroup.hpp +++ b/Source/IO/CompletionGroup/CompletionGroup.hpp @@ -23,6 +23,9 @@ namespace Aurora::IO::CompletionGroup AuSPtr OnCompletion() override; AuSPtr OnSingleCompletion() override; + AuSPtr OnSingleCompletionIntl(); + void TryCreateInternal(); + void TryCollectInternal(); bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS) override; bool WaitForAllMS(AuUInt32 uTimeoutOrZeroMS) override; @@ -50,6 +53,8 @@ namespace Aurora::IO::CompletionGroup void ResetAnd(); + void MakeInternalAsync(AuWorkerID pid); + private: AuMutex mutex; AuCriticalSection cs; @@ -59,6 +64,8 @@ namespace Aurora::IO::CompletionGroup AuSPtr pCallbacks; AuSPtr pAnyBarrier; AuSPtr pAndBarrier; + AuSPtr pAnyBarrierInternal; + AuWorkerID workerId; AuList, bool>> callbackTicks; AuUInt32 uAdded {}; AuUInt32 uTriggered {};