[*] Fix deadlock involving WaitFor under ThreadPool (shutdown race)

[*] Optimize mutex lock out of RWLockImpl::TryLockWrite
[*] Force all relevant members of RWLockImpl to be volatile just bc lol (afaik we cant justify it yet; however, i want to minimalize the risk of future issues in this type)
This commit is contained in:
Reece Wilson 2023-01-30 14:32:26 +00:00
parent 51a2816f3f
commit 8ff81df129
5 changed files with 99 additions and 32 deletions

View File

@ -80,7 +80,34 @@ namespace Aurora::Async
} }
else else
{ {
return Threading::WaitFor(primitive.get(), timeoutMs); AuSPtr<ThreadState> pHandle;
{
AU_LOCK_GUARD(AuStaticCast<ThreadPool>(unlocker.pool)->rwlock_->AsReadable());
if (pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker))
{
if (pHandle->exitingflag2)
{
return primitive->TryLock();
}
else
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
pHandle->externalFences.push_back(primitive.get());
}
}
}
bool bRet = Threading::WaitFor(primitive.get(), timeoutMs);
if (pHandle)
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
AuTryRemove(pHandle->externalFences, primitive.get());
}
return bRet;
} }
} }
@ -597,6 +624,8 @@ namespace Aurora::Async
void ThreadPool::Shutdown() void ThreadPool::Shutdown()
{ {
// Always do this // Always do this
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
{ {
AU_LOCK_GUARD(this->rwlock_->AsWritable()); AU_LOCK_GUARD(this->rwlock_->AsWritable());
@ -614,19 +643,9 @@ namespace Aurora::Async
} }
} }
// Nested shutdowns can happen; prevent a write lock // Update shutting down flag
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
if (this->shuttingdown_)
{
return;
}
}
// Set shutdown flag
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
if (AuExchange(this->shuttingdown_, true))
{ {
return; return;
} }
@ -639,17 +658,31 @@ namespace Aurora::Async
// //
// Perform the following shutdown of the schedular and other available threads under a read lock // Perform the following shutdown of the schedular and other available threads under a read lock
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AuList<WorkerId_t> toBarrier;
StopSched();
for (auto &[groupId, group] : this->threads_)
{ {
for (auto &[id, worker] : group->workers) AU_LOCK_GUARD(this->rwlock_->AsReadable());
StopSched();
for (auto &[groupId, group] : this->threads_)
{ {
Barrier(worker->id, 0, false, true); for (auto &[id, worker] : group->workers)
{
if (trySelfPid == worker->id)
{
continue;
}
toBarrier.push_back(worker->id);
}
} }
} }
for (const auto &id : toBarrier)
{
Barrier(id, 0, false, true);
}
} }
// Finally set the shutdown flag on all of our thread contexts // Finally set the shutdown flag on all of our thread contexts
@ -1172,6 +1205,21 @@ namespace Aurora::Async
// Abort scheduled tasks // Abort scheduled tasks
TerminateSceduledTasks(this, id); TerminateSceduledTasks(this, id);
// Prevent deadlocks
jobWorker->syncSema->Unlock(10000); // prevent ::Barrier dead-locks
{
AU_LOCK_GUARD(jobWorker->externalFencesLock);
jobWorker->exitingflag2 = true;
for (const auto &pIWaitable : jobWorker->externalFences)
{
pIWaitable->Unlock();
}
jobWorker->externalFences.clear();
}
// Clean up thread features // Clean up thread features
// -> transferable TLS handles // -> transferable TLS handles
// -> thread specific vms // -> thread specific vms

View File

@ -127,7 +127,7 @@ namespace Aurora::Async
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>; using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
ThreadDb_t threads_; ThreadDb_t threads_;
bool shuttingdown_ {}; AuUInt shuttingdown_ {};
AuThreadPrimitives::RWLockUnique_t rwlock_; AuThreadPrimitives::RWLockUnique_t rwlock_;
std::atomic_int tasksRunning_; std::atomic_int tasksRunning_;
bool runnersRunning_ {}; bool runnersRunning_ {};

View File

@ -38,6 +38,7 @@ namespace Aurora::Async
bool rejecting {}; bool rejecting {};
bool exiting {}; bool exiting {};
bool shuttingdown {}; bool shuttingdown {};
bool exitingflag2 {};
AuThreadPrimitives::EventUnique_t running; AuThreadPrimitives::EventUnique_t running;
//bool running; //bool running;
AuList<AsyncAppWaitSourceRequest> loopSources; AuList<AsyncAppWaitSourceRequest> loopSources;
@ -46,5 +47,9 @@ namespace Aurora::Async
Utility::RateLimiter rateLimiter; Utility::RateLimiter rateLimiter;
ERunMode runMode; ERunMode runMode;
int cookie {0}; int cookie {0};
//
AuThreadPrimitives::SpinLock externalFencesLock;
AuList<AuThreading::IWaitable *> externalFences;
}; };
} }

View File

@ -126,7 +126,7 @@ namespace Aurora::Threading::Primitives
} }
} }
while (iCurState == -1 || while (iCurState == -1 ||
AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState); AuAtomicCompareExchange((AuInt32*)&this->state_, iCurState + 1, iCurState) != iCurState);
#endif #endif
return true; return true;
@ -134,7 +134,7 @@ namespace Aurora::Threading::Primitives
bool RWLockImpl::LockWrite(AuUInt64 timeout) bool RWLockImpl::LockWrite(AuUInt64 timeout)
{ {
if (AuAtomicCompareExchange(&this->state_, -1, 0) == 0) if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, 0) == 0)
{ {
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId(); this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
return true; return true;
@ -167,21 +167,35 @@ namespace Aurora::Threading::Primitives
return this->reentrantWriteLockHandle_ == AuThreads::GetThreadId(); return this->reentrantWriteLockHandle_ == AuThreads::GetThreadId();
} }
return AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) == iCurState; return AuAtomicCompareExchange((AuInt32 *)&this->state_, iCurState + 1, iCurState) == iCurState;
} }
bool RWLockImpl::TryLockWrite() bool RWLockImpl::TryLockWrite()
{ {
AU_LOCK_GUARD(this->mutex_); //AU_LOCK_GUARD(this->mutex_);
if (this->state_ > 0) for (AuUInt i = 0; i < 20; i++)
{ {
return false; auto curVal = this->state_;
if (curVal == -1)
{
AuThreading::ContextYield();
continue;
}
if (curVal != 0)
{
continue;
}
if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, curVal) == curVal)
{
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
return true;
}
} }
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId(); return false;
this->state_ = -1;
return true;
} }
void RWLockImpl::UnlockRead() void RWLockImpl::UnlockRead()
@ -194,7 +208,7 @@ namespace Aurora::Threading::Primitives
return; return;
} }
auto val = AuAtomicSub(&this->state_, 1); auto val = AuAtomicSub((AuInt32*)&this->state_, 1);
if ((val == 1) && (this->bElevaterPending_)) if ((val == 1) && (this->bElevaterPending_))
{ {

View File

@ -77,7 +77,7 @@ namespace Aurora::Threading::Primitives
ConditionMutexImpl mutex_; ConditionMutexImpl mutex_;
ConditionVariableImpl condition_; ConditionVariableImpl condition_;
AuInt32 state_ {}; volatile AuInt32 state_ {};
AuInt32 writersPending_ {}; AuInt32 writersPending_ {};
bool bElevaterPending_ {}; bool bElevaterPending_ {};
}; };