[*] Fix deadlock involving WaitFor under ThreadPool (shutdown race)
[*] Optimize mutex lock out of RWLockImpl::TryLockWrite [*] Force all relevant members of RWLockImpl to be volatile just bc lol (afaik we cant justify it yet; however, i want to minimalize the risk of future issues in this type)
This commit is contained in:
parent
51a2816f3f
commit
8ff81df129
@ -80,7 +80,34 @@ namespace Aurora::Async
|
||||
}
|
||||
else
|
||||
{
|
||||
return Threading::WaitFor(primitive.get(), timeoutMs);
|
||||
AuSPtr<ThreadState> pHandle;
|
||||
|
||||
{
|
||||
AU_LOCK_GUARD(AuStaticCast<ThreadPool>(unlocker.pool)->rwlock_->AsReadable());
|
||||
|
||||
if (pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker))
|
||||
{
|
||||
if (pHandle->exitingflag2)
|
||||
{
|
||||
return primitive->TryLock();
|
||||
}
|
||||
else
|
||||
{
|
||||
AU_LOCK_GUARD(pHandle->externalFencesLock);
|
||||
pHandle->externalFences.push_back(primitive.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool bRet = Threading::WaitFor(primitive.get(), timeoutMs);
|
||||
|
||||
if (pHandle)
|
||||
{
|
||||
AU_LOCK_GUARD(pHandle->externalFencesLock);
|
||||
AuTryRemove(pHandle->externalFences, primitive.get());
|
||||
}
|
||||
|
||||
return bRet;
|
||||
}
|
||||
}
|
||||
|
||||
@ -597,6 +624,8 @@ namespace Aurora::Async
|
||||
void ThreadPool::Shutdown()
|
||||
{
|
||||
// Always do this
|
||||
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
|
||||
|
||||
{
|
||||
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
||||
|
||||
@ -614,19 +643,9 @@ namespace Aurora::Async
|
||||
}
|
||||
}
|
||||
|
||||
// Nested shutdowns can happen; prevent a write lock
|
||||
// Update shutting down flag
|
||||
{
|
||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
||||
if (this->shuttingdown_)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Set shutdown flag
|
||||
{
|
||||
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
||||
if (AuExchange(this->shuttingdown_, true))
|
||||
if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
@ -639,17 +658,31 @@ namespace Aurora::Async
|
||||
//
|
||||
// Perform the following shutdown of the schedular and other available threads under a read lock
|
||||
{
|
||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
||||
|
||||
StopSched();
|
||||
AuList<WorkerId_t> toBarrier;
|
||||
|
||||
for (auto &[groupId, group] : this->threads_)
|
||||
{
|
||||
for (auto &[id, worker] : group->workers)
|
||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
||||
|
||||
StopSched();
|
||||
|
||||
for (auto &[groupId, group] : this->threads_)
|
||||
{
|
||||
Barrier(worker->id, 0, false, true);
|
||||
for (auto &[id, worker] : group->workers)
|
||||
{
|
||||
if (trySelfPid == worker->id)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
toBarrier.push_back(worker->id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto &id : toBarrier)
|
||||
{
|
||||
Barrier(id, 0, false, true);
|
||||
}
|
||||
}
|
||||
|
||||
// Finally set the shutdown flag on all of our thread contexts
|
||||
@ -1172,6 +1205,21 @@ namespace Aurora::Async
|
||||
// Abort scheduled tasks
|
||||
TerminateSceduledTasks(this, id);
|
||||
|
||||
// Prevent deadlocks
|
||||
jobWorker->syncSema->Unlock(10000); // prevent ::Barrier dead-locks
|
||||
|
||||
{
|
||||
AU_LOCK_GUARD(jobWorker->externalFencesLock);
|
||||
jobWorker->exitingflag2 = true;
|
||||
|
||||
for (const auto &pIWaitable : jobWorker->externalFences)
|
||||
{
|
||||
pIWaitable->Unlock();
|
||||
}
|
||||
|
||||
jobWorker->externalFences.clear();
|
||||
}
|
||||
|
||||
// Clean up thread features
|
||||
// -> transferable TLS handles
|
||||
// -> thread specific vms
|
||||
|
@ -127,7 +127,7 @@ namespace Aurora::Async
|
||||
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
|
||||
|
||||
ThreadDb_t threads_;
|
||||
bool shuttingdown_ {};
|
||||
AuUInt shuttingdown_ {};
|
||||
AuThreadPrimitives::RWLockUnique_t rwlock_;
|
||||
std::atomic_int tasksRunning_;
|
||||
bool runnersRunning_ {};
|
||||
|
@ -38,6 +38,7 @@ namespace Aurora::Async
|
||||
bool rejecting {};
|
||||
bool exiting {};
|
||||
bool shuttingdown {};
|
||||
bool exitingflag2 {};
|
||||
AuThreadPrimitives::EventUnique_t running;
|
||||
//bool running;
|
||||
AuList<AsyncAppWaitSourceRequest> loopSources;
|
||||
@ -46,5 +47,9 @@ namespace Aurora::Async
|
||||
Utility::RateLimiter rateLimiter;
|
||||
ERunMode runMode;
|
||||
int cookie {0};
|
||||
|
||||
//
|
||||
AuThreadPrimitives::SpinLock externalFencesLock;
|
||||
AuList<AuThreading::IWaitable *> externalFences;
|
||||
};
|
||||
}
|
@ -126,7 +126,7 @@ namespace Aurora::Threading::Primitives
|
||||
}
|
||||
}
|
||||
while (iCurState == -1 ||
|
||||
AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState);
|
||||
AuAtomicCompareExchange((AuInt32*)&this->state_, iCurState + 1, iCurState) != iCurState);
|
||||
|
||||
#endif
|
||||
return true;
|
||||
@ -134,7 +134,7 @@ namespace Aurora::Threading::Primitives
|
||||
|
||||
bool RWLockImpl::LockWrite(AuUInt64 timeout)
|
||||
{
|
||||
if (AuAtomicCompareExchange(&this->state_, -1, 0) == 0)
|
||||
if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, 0) == 0)
|
||||
{
|
||||
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
|
||||
return true;
|
||||
@ -167,21 +167,35 @@ namespace Aurora::Threading::Primitives
|
||||
return this->reentrantWriteLockHandle_ == AuThreads::GetThreadId();
|
||||
}
|
||||
|
||||
return AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) == iCurState;
|
||||
return AuAtomicCompareExchange((AuInt32 *)&this->state_, iCurState + 1, iCurState) == iCurState;
|
||||
}
|
||||
|
||||
bool RWLockImpl::TryLockWrite()
|
||||
{
|
||||
AU_LOCK_GUARD(this->mutex_);
|
||||
|
||||
if (this->state_ > 0)
|
||||
//AU_LOCK_GUARD(this->mutex_);
|
||||
|
||||
for (AuUInt i = 0; i < 20; i++)
|
||||
{
|
||||
return false;
|
||||
auto curVal = this->state_;
|
||||
if (curVal == -1)
|
||||
{
|
||||
AuThreading::ContextYield();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (curVal != 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, curVal) == curVal)
|
||||
{
|
||||
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
|
||||
this->state_ = -1;
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
void RWLockImpl::UnlockRead()
|
||||
@ -194,7 +208,7 @@ namespace Aurora::Threading::Primitives
|
||||
return;
|
||||
}
|
||||
|
||||
auto val = AuAtomicSub(&this->state_, 1);
|
||||
auto val = AuAtomicSub((AuInt32*)&this->state_, 1);
|
||||
|
||||
if ((val == 1) && (this->bElevaterPending_))
|
||||
{
|
||||
|
@ -77,7 +77,7 @@ namespace Aurora::Threading::Primitives
|
||||
|
||||
ConditionMutexImpl mutex_;
|
||||
ConditionVariableImpl condition_;
|
||||
AuInt32 state_ {};
|
||||
volatile AuInt32 state_ {};
|
||||
AuInt32 writersPending_ {};
|
||||
bool bElevaterPending_ {};
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user