[*] Fix deadlock involving WaitFor under ThreadPool (shutdown race)

[*] Optimize mutex lock out of RWLockImpl::TryLockWrite
[*] Force all relevant members of RWLockImpl to be volatile just bc lol (afaik we cant justify it yet; however, i want to minimalize the risk of future issues in this type)
This commit is contained in:
Reece Wilson 2023-01-30 14:32:26 +00:00
parent 51a2816f3f
commit 8ff81df129
5 changed files with 99 additions and 32 deletions

View File

@ -80,7 +80,34 @@ namespace Aurora::Async
}
else
{
return Threading::WaitFor(primitive.get(), timeoutMs);
AuSPtr<ThreadState> pHandle;
{
AU_LOCK_GUARD(AuStaticCast<ThreadPool>(unlocker.pool)->rwlock_->AsReadable());
if (pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker))
{
if (pHandle->exitingflag2)
{
return primitive->TryLock();
}
else
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
pHandle->externalFences.push_back(primitive.get());
}
}
}
bool bRet = Threading::WaitFor(primitive.get(), timeoutMs);
if (pHandle)
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
AuTryRemove(pHandle->externalFences, primitive.get());
}
return bRet;
}
}
@ -597,6 +624,8 @@ namespace Aurora::Async
void ThreadPool::Shutdown()
{
// Always do this
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
@ -614,19 +643,9 @@ namespace Aurora::Async
}
}
// Nested shutdowns can happen; prevent a write lock
// Update shutting down flag
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
if (this->shuttingdown_)
{
return;
}
}
// Set shutdown flag
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
if (AuExchange(this->shuttingdown_, true))
if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
{
return;
}
@ -639,17 +658,31 @@ namespace Aurora::Async
//
// Perform the following shutdown of the schedular and other available threads under a read lock
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
StopSched();
AuList<WorkerId_t> toBarrier;
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
AU_LOCK_GUARD(this->rwlock_->AsReadable());
StopSched();
for (auto &[groupId, group] : this->threads_)
{
Barrier(worker->id, 0, false, true);
for (auto &[id, worker] : group->workers)
{
if (trySelfPid == worker->id)
{
continue;
}
toBarrier.push_back(worker->id);
}
}
}
for (const auto &id : toBarrier)
{
Barrier(id, 0, false, true);
}
}
// Finally set the shutdown flag on all of our thread contexts
@ -1172,6 +1205,21 @@ namespace Aurora::Async
// Abort scheduled tasks
TerminateSceduledTasks(this, id);
// Prevent deadlocks
jobWorker->syncSema->Unlock(10000); // prevent ::Barrier dead-locks
{
AU_LOCK_GUARD(jobWorker->externalFencesLock);
jobWorker->exitingflag2 = true;
for (const auto &pIWaitable : jobWorker->externalFences)
{
pIWaitable->Unlock();
}
jobWorker->externalFences.clear();
}
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms

View File

@ -127,7 +127,7 @@ namespace Aurora::Async
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
ThreadDb_t threads_;
bool shuttingdown_ {};
AuUInt shuttingdown_ {};
AuThreadPrimitives::RWLockUnique_t rwlock_;
std::atomic_int tasksRunning_;
bool runnersRunning_ {};

View File

@ -38,6 +38,7 @@ namespace Aurora::Async
bool rejecting {};
bool exiting {};
bool shuttingdown {};
bool exitingflag2 {};
AuThreadPrimitives::EventUnique_t running;
//bool running;
AuList<AsyncAppWaitSourceRequest> loopSources;
@ -46,5 +47,9 @@ namespace Aurora::Async
Utility::RateLimiter rateLimiter;
ERunMode runMode;
int cookie {0};
//
AuThreadPrimitives::SpinLock externalFencesLock;
AuList<AuThreading::IWaitable *> externalFences;
};
}

View File

@ -126,7 +126,7 @@ namespace Aurora::Threading::Primitives
}
}
while (iCurState == -1 ||
AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState);
AuAtomicCompareExchange((AuInt32*)&this->state_, iCurState + 1, iCurState) != iCurState);
#endif
return true;
@ -134,7 +134,7 @@ namespace Aurora::Threading::Primitives
bool RWLockImpl::LockWrite(AuUInt64 timeout)
{
if (AuAtomicCompareExchange(&this->state_, -1, 0) == 0)
if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, 0) == 0)
{
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
return true;
@ -167,21 +167,35 @@ namespace Aurora::Threading::Primitives
return this->reentrantWriteLockHandle_ == AuThreads::GetThreadId();
}
return AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) == iCurState;
return AuAtomicCompareExchange((AuInt32 *)&this->state_, iCurState + 1, iCurState) == iCurState;
}
bool RWLockImpl::TryLockWrite()
{
AU_LOCK_GUARD(this->mutex_);
if (this->state_ > 0)
//AU_LOCK_GUARD(this->mutex_);
for (AuUInt i = 0; i < 20; i++)
{
return false;
auto curVal = this->state_;
if (curVal == -1)
{
AuThreading::ContextYield();
continue;
}
if (curVal != 0)
{
continue;
}
if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, curVal) == curVal)
{
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
return true;
}
}
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
this->state_ = -1;
return true;
return false;
}
void RWLockImpl::UnlockRead()
@ -194,7 +208,7 @@ namespace Aurora::Threading::Primitives
return;
}
auto val = AuAtomicSub(&this->state_, 1);
auto val = AuAtomicSub((AuInt32*)&this->state_, 1);
if ((val == 1) && (this->bElevaterPending_))
{

View File

@ -77,7 +77,7 @@ namespace Aurora::Threading::Primitives
ConditionMutexImpl mutex_;
ConditionVariableImpl condition_;
AuInt32 state_ {};
volatile AuInt32 state_ {};
AuInt32 writersPending_ {};
bool bElevaterPending_ {};
};