From 3225d8cbdabdd32bba7dc01d04e7bcf1d4c8a817 Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Sun, 12 Nov 2023 00:18:34 +0000 Subject: [PATCH] [*] Fix shutdown regression (e037fc21, a7dfd899 151bb106, cleanup cont) --- Source/Async/IThreadPoolInternal.hpp | 2 +- Source/Async/ThreadPool.cpp | 54 ++++++++++++++------------- Source/Async/ThreadPool.hpp | 4 +- Source/Async/WorkItem.cpp | 55 ++++++++++++++++++++++++++-- 4 files changed, 84 insertions(+), 31 deletions(-) diff --git a/Source/Async/IThreadPoolInternal.hpp b/Source/Async/IThreadPoolInternal.hpp index 0e0c3641..6593f4bd 100644 --- a/Source/Async/IThreadPoolInternal.hpp +++ b/Source/Async/IThreadPoolInternal.hpp @@ -14,7 +14,7 @@ namespace Aurora::Async struct IThreadPoolInternal { - virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) = 0; + virtual bool WaitFor(WorkerPId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) = 0; virtual void Run(WorkerId_t target, AuSPtr runnable) = 0; virtual AuSPtr GetThreadHandle(WorkerId_t id) = 0; virtual IThreadPool *ToThreadPool() = 0; diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 4be07c09..c5876adc 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -63,39 +63,39 @@ namespace Aurora::Async bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr &primitive, AuUInt32 timeoutMs) { - auto curThread = GetThreadState(); - - if (!curThread) + if (auto pCurThread = GetThreadState()) { - return Threading::WaitFor(primitive.get(), timeoutMs); - } - - bool workerIdMatches = (unlocker.second == curThread->thread.id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1)); - - if ((unlocker.first == curThread->thread.id.first) && - (unlocker.pool.get() == this) && // work group matches - (workerIdMatches)) // well, crap - { - - bool queryAsync = false; - while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2))) + bool bWorkerIdMatches = (unlocker.second == pCurThread->thread.id.second) || + ((unlocker.second == Async::kThreadIdAny) && + (GetThreadWorkersCount(unlocker.first) == 1)); + + if ((unlocker.first == pCurThread->thread.id.first) && + (unlocker.pool.get() == this) && + (bWorkerIdMatches)) { - queryAsync = CtxYield(); - if (!queryAsync && this->shuttingdown_) + bool queryAsync = false; + while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2))) { - return false; - } - } + queryAsync = CtxYield(); - return true; + if (!queryAsync && this->shuttingdown_) + { + return false; + } + } + + return true; + } } - else + { AuSPtr pHandle; + if (auto pPool = unlocker.pool) { - AU_LOCK_GUARD(AuStaticCast(unlocker.pool)->rwlock_->AsReadable()); + auto pPoolEx = AuStaticCast(unlocker.pool); + AU_LOCK_GUARD(pPoolEx->rwlock_->AsReadable()); if ((pHandle = AuStaticCast(unlocker.pool)->GetThreadHandle(unlocker))) { @@ -111,7 +111,7 @@ namespace Aurora::Async } else if (unlocker.pool.get() == this) { - return primitive->LockMS(timeoutMs); + return primitive->LockMS(timeoutMs); } } @@ -600,9 +600,13 @@ namespace Aurora::Async } // Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads + auto pSelf = AuThreads::GetThread(); for (const auto &thread : threads) { - thread->Exit(); + if (thread.get() != pSelf) + { + thread->Exit(); + } } // Is dead flag diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index 79ab33a1..5177ed4d 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -25,8 +25,8 @@ namespace Aurora::Async ThreadPool(); // IThreadPoolInternal - bool WaitFor(WorkerPId_t unlocker, const AuSPtr &primitive, AuUInt32 ms); - bool WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) override; + bool WaitFor(WorkerPId_t unlocker, const AuSPtr &primitive, AuUInt32 ms) override; + bool WaitFor(WorkerId_t unlocker, const AuSPtr &primitive, AuUInt32 ms); void Run(WorkerId_t target, AuSPtr runnable) override; void Run(WorkerId_t target, AuSPtr runnable, bool bIncrement); IThreadPool *ToThreadPool() override; diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 7b0b2399..76da1608 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -455,13 +455,62 @@ namespace Aurora::Async bool WorkItem::BlockUntilComplete() { - if (!this->finishedEvent_) return false; if (!this->worker_) { this->finishedEvent_->Wait(); return true; - }; - return this->owner_->WaitFor(this->worker_.value(), AuUnsafeRaiiToShared(this->finishedEvent_.AsPointer()), 0); + } + + struct WaitProxy : Threading::IWaitable + { + AuThreadPrimitives::IEvent *pEvent {}; + WaitProxy(AuThreadPrimitives::IEvent *pEvent) : + pEvent(pEvent) + { + + } + bool HasOSHandle(AuMach &mach) override + { + return this->pEvent->HasOSHandle(mach); + } + bool HasLockImplementation() override + { + return this->pEvent->HasLockImplementation(); + } + void Lock() override + { + return this->pEvent->Lock(); + } + bool LockMS(AuUInt64 qwRelTimeoutInMs) override + { + return this->pEvent->LockMS(qwRelTimeoutInMs); + } + bool LockNS(AuUInt64 qwRelTimeoutInNs) override + { + return this->pEvent->LockNS(qwRelTimeoutInNs); + } + bool LockAbsMS(AuUInt64 qwAbsTimeoutInMs) override + { + return this->pEvent->LockAbsMS(qwAbsTimeoutInMs); + } + bool LockAbsNS(AuUInt64 qwAbsTimeoutInNs) override + { + return this->pEvent->LockAbsNS(qwAbsTimeoutInNs); + } + bool TryLock() override + { + return this->pEvent->TryLock(); + } + void Unlock() override + { + // PATCH: ensure release notifications set the event! + this->pEvent->Set(); + } + } waitProxy(this->finishedEvent_.AsPointer()); + + return this->owner_->WaitFor(this->worker_.value(), + AuUnsafeRaiiToShared(&waitProxy), + 0 /*forever*/); } bool WorkItem::HasFinished()