From 8ff81df1299f8ccc29c31888f385a2e02ef59ad2 Mon Sep 17 00:00:00 2001 From: Reece Wilson Date: Mon, 30 Jan 2023 14:32:26 +0000 Subject: [PATCH] [*] Fix deadlock involving WaitFor under ThreadPool (shutdown race) [*] Optimize mutex lock out of RWLockImpl::TryLockWrite [*] Force all relevant members of RWLockImpl to be volatile just bc lol (afaik we cant justify it yet; however, i want to minimalize the risk of future issues in this type) --- Source/Async/ThreadPool.cpp | 86 ++++++++++++++++++------ Source/Async/ThreadPool.hpp | 2 +- Source/Async/ThreadState.hpp | 5 ++ Source/Threading/Primitives/AuRWLock.cpp | 36 +++++++--- Source/Threading/Primitives/AuRWLock.hpp | 2 +- 5 files changed, 99 insertions(+), 32 deletions(-) diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index a7ec177d..38952584 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -80,7 +80,34 @@ namespace Aurora::Async } else { - return Threading::WaitFor(primitive.get(), timeoutMs); + AuSPtr pHandle; + + { + AU_LOCK_GUARD(AuStaticCast(unlocker.pool)->rwlock_->AsReadable()); + + if (pHandle = AuStaticCast(unlocker.pool)->GetThreadHandle(unlocker)) + { + if (pHandle->exitingflag2) + { + return primitive->TryLock(); + } + else + { + AU_LOCK_GUARD(pHandle->externalFencesLock); + pHandle->externalFences.push_back(primitive.get()); + } + } + } + + bool bRet = Threading::WaitFor(primitive.get(), timeoutMs); + + if (pHandle) + { + AU_LOCK_GUARD(pHandle->externalFencesLock); + AuTryRemove(pHandle->externalFences, primitive.get()); + } + + return bRet; } } @@ -597,6 +624,8 @@ namespace Aurora::Async void ThreadPool::Shutdown() { // Always do this + auto trySelfPid = AuAsync::GetCurrentWorkerPId(); + { AU_LOCK_GUARD(this->rwlock_->AsWritable()); @@ -614,19 +643,9 @@ namespace Aurora::Async } } - // Nested shutdowns can happen; prevent a write lock + // Update shutting down flag { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); - if (this->shuttingdown_) - { - return; - } - } - - // Set shutdown flag - { - AU_LOCK_GUARD(this->rwlock_->AsWritable()); - if (AuExchange(this->shuttingdown_, true)) + if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0) { return; } @@ -639,17 +658,31 @@ namespace Aurora::Async // // Perform the following shutdown of the schedular and other available threads under a read lock { - AU_LOCK_GUARD(this->rwlock_->AsReadable()); - - StopSched(); + AuList toBarrier; - for (auto &[groupId, group] : this->threads_) { - for (auto &[id, worker] : group->workers) + AU_LOCK_GUARD(this->rwlock_->AsReadable()); + + StopSched(); + + for (auto &[groupId, group] : this->threads_) { - Barrier(worker->id, 0, false, true); + for (auto &[id, worker] : group->workers) + { + if (trySelfPid == worker->id) + { + continue; + } + + toBarrier.push_back(worker->id); + } } } + + for (const auto &id : toBarrier) + { + Barrier(id, 0, false, true); + } } // Finally set the shutdown flag on all of our thread contexts @@ -1172,6 +1205,21 @@ namespace Aurora::Async // Abort scheduled tasks TerminateSceduledTasks(this, id); + // Prevent deadlocks + jobWorker->syncSema->Unlock(10000); // prevent ::Barrier dead-locks + + { + AU_LOCK_GUARD(jobWorker->externalFencesLock); + jobWorker->exitingflag2 = true; + + for (const auto &pIWaitable : jobWorker->externalFences) + { + pIWaitable->Unlock(); + } + + jobWorker->externalFences.clear(); + } + // Clean up thread features // -> transferable TLS handles // -> thread specific vms diff --git a/Source/Async/ThreadPool.hpp b/Source/Async/ThreadPool.hpp index 83aedaa9..2525e5ad 100644 --- a/Source/Async/ThreadPool.hpp +++ b/Source/Async/ThreadPool.hpp @@ -127,7 +127,7 @@ namespace Aurora::Async using ThreadDb_t = AuBST>; ThreadDb_t threads_; - bool shuttingdown_ {}; + AuUInt shuttingdown_ {}; AuThreadPrimitives::RWLockUnique_t rwlock_; std::atomic_int tasksRunning_; bool runnersRunning_ {}; diff --git a/Source/Async/ThreadState.hpp b/Source/Async/ThreadState.hpp index 28e50c5a..c29495f0 100644 --- a/Source/Async/ThreadState.hpp +++ b/Source/Async/ThreadState.hpp @@ -38,6 +38,7 @@ namespace Aurora::Async bool rejecting {}; bool exiting {}; bool shuttingdown {}; + bool exitingflag2 {}; AuThreadPrimitives::EventUnique_t running; //bool running; AuList loopSources; @@ -46,5 +47,9 @@ namespace Aurora::Async Utility::RateLimiter rateLimiter; ERunMode runMode; int cookie {0}; + + // + AuThreadPrimitives::SpinLock externalFencesLock; + AuList externalFences; }; } \ No newline at end of file diff --git a/Source/Threading/Primitives/AuRWLock.cpp b/Source/Threading/Primitives/AuRWLock.cpp index 2800fec1..20a57531 100644 --- a/Source/Threading/Primitives/AuRWLock.cpp +++ b/Source/Threading/Primitives/AuRWLock.cpp @@ -126,7 +126,7 @@ namespace Aurora::Threading::Primitives } } while (iCurState == -1 || - AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState); + AuAtomicCompareExchange((AuInt32*)&this->state_, iCurState + 1, iCurState) != iCurState); #endif return true; @@ -134,7 +134,7 @@ namespace Aurora::Threading::Primitives bool RWLockImpl::LockWrite(AuUInt64 timeout) { - if (AuAtomicCompareExchange(&this->state_, -1, 0) == 0) + if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, 0) == 0) { this->reentrantWriteLockHandle_ = AuThreads::GetThreadId(); return true; @@ -167,21 +167,35 @@ namespace Aurora::Threading::Primitives return this->reentrantWriteLockHandle_ == AuThreads::GetThreadId(); } - return AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) == iCurState; + return AuAtomicCompareExchange((AuInt32 *)&this->state_, iCurState + 1, iCurState) == iCurState; } bool RWLockImpl::TryLockWrite() { - AU_LOCK_GUARD(this->mutex_); - - if (this->state_ > 0) + //AU_LOCK_GUARD(this->mutex_); + + for (AuUInt i = 0; i < 20; i++) { - return false; + auto curVal = this->state_; + if (curVal == -1) + { + AuThreading::ContextYield(); + continue; + } + + if (curVal != 0) + { + continue; + } + + if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, curVal) == curVal) + { + this->reentrantWriteLockHandle_ = AuThreads::GetThreadId(); + return true; + } } - this->reentrantWriteLockHandle_ = AuThreads::GetThreadId(); - this->state_ = -1; - return true; + return false; } void RWLockImpl::UnlockRead() @@ -194,7 +208,7 @@ namespace Aurora::Threading::Primitives return; } - auto val = AuAtomicSub(&this->state_, 1); + auto val = AuAtomicSub((AuInt32*)&this->state_, 1); if ((val == 1) && (this->bElevaterPending_)) { diff --git a/Source/Threading/Primitives/AuRWLock.hpp b/Source/Threading/Primitives/AuRWLock.hpp index a042bff5..b809e006 100644 --- a/Source/Threading/Primitives/AuRWLock.hpp +++ b/Source/Threading/Primitives/AuRWLock.hpp @@ -77,7 +77,7 @@ namespace Aurora::Threading::Primitives ConditionMutexImpl mutex_; ConditionVariableImpl condition_; - AuInt32 state_ {}; + volatile AuInt32 state_ {}; AuInt32 writersPending_ {}; bool bElevaterPending_ {}; };