[*] Fix deadlock involving WaitFor under ThreadPool (shutdown race)
[*] Optimize mutex lock out of RWLockImpl::TryLockWrite [*] Force all relevant members of RWLockImpl to be volatile just bc lol (afaik we cant justify it yet; however, i want to minimalize the risk of future issues in this type)
This commit is contained in:
parent
51a2816f3f
commit
8ff81df129
@ -80,7 +80,34 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return Threading::WaitFor(primitive.get(), timeoutMs);
|
AuSPtr<ThreadState> pHandle;
|
||||||
|
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(AuStaticCast<ThreadPool>(unlocker.pool)->rwlock_->AsReadable());
|
||||||
|
|
||||||
|
if (pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker))
|
||||||
|
{
|
||||||
|
if (pHandle->exitingflag2)
|
||||||
|
{
|
||||||
|
return primitive->TryLock();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(pHandle->externalFencesLock);
|
||||||
|
pHandle->externalFences.push_back(primitive.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool bRet = Threading::WaitFor(primitive.get(), timeoutMs);
|
||||||
|
|
||||||
|
if (pHandle)
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(pHandle->externalFencesLock);
|
||||||
|
AuTryRemove(pHandle->externalFences, primitive.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
return bRet;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -597,6 +624,8 @@ namespace Aurora::Async
|
|||||||
void ThreadPool::Shutdown()
|
void ThreadPool::Shutdown()
|
||||||
{
|
{
|
||||||
// Always do this
|
// Always do this
|
||||||
|
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
|
||||||
|
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
||||||
|
|
||||||
@ -614,19 +643,9 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nested shutdowns can happen; prevent a write lock
|
// Update shutting down flag
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
|
||||||
if (this->shuttingdown_)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set shutdown flag
|
|
||||||
{
|
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsWritable());
|
|
||||||
if (AuExchange(this->shuttingdown_, true))
|
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -639,17 +658,31 @@ namespace Aurora::Async
|
|||||||
//
|
//
|
||||||
// Perform the following shutdown of the schedular and other available threads under a read lock
|
// Perform the following shutdown of the schedular and other available threads under a read lock
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AuList<WorkerId_t> toBarrier;
|
||||||
|
|
||||||
StopSched();
|
|
||||||
|
|
||||||
for (auto &[groupId, group] : this->threads_)
|
|
||||||
{
|
{
|
||||||
for (auto &[id, worker] : group->workers)
|
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
||||||
|
|
||||||
|
StopSched();
|
||||||
|
|
||||||
|
for (auto &[groupId, group] : this->threads_)
|
||||||
{
|
{
|
||||||
Barrier(worker->id, 0, false, true);
|
for (auto &[id, worker] : group->workers)
|
||||||
|
{
|
||||||
|
if (trySelfPid == worker->id)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
toBarrier.push_back(worker->id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const auto &id : toBarrier)
|
||||||
|
{
|
||||||
|
Barrier(id, 0, false, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally set the shutdown flag on all of our thread contexts
|
// Finally set the shutdown flag on all of our thread contexts
|
||||||
@ -1172,6 +1205,21 @@ namespace Aurora::Async
|
|||||||
// Abort scheduled tasks
|
// Abort scheduled tasks
|
||||||
TerminateSceduledTasks(this, id);
|
TerminateSceduledTasks(this, id);
|
||||||
|
|
||||||
|
// Prevent deadlocks
|
||||||
|
jobWorker->syncSema->Unlock(10000); // prevent ::Barrier dead-locks
|
||||||
|
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(jobWorker->externalFencesLock);
|
||||||
|
jobWorker->exitingflag2 = true;
|
||||||
|
|
||||||
|
for (const auto &pIWaitable : jobWorker->externalFences)
|
||||||
|
{
|
||||||
|
pIWaitable->Unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
jobWorker->externalFences.clear();
|
||||||
|
}
|
||||||
|
|
||||||
// Clean up thread features
|
// Clean up thread features
|
||||||
// -> transferable TLS handles
|
// -> transferable TLS handles
|
||||||
// -> thread specific vms
|
// -> thread specific vms
|
||||||
|
@ -127,7 +127,7 @@ namespace Aurora::Async
|
|||||||
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
|
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
|
||||||
|
|
||||||
ThreadDb_t threads_;
|
ThreadDb_t threads_;
|
||||||
bool shuttingdown_ {};
|
AuUInt shuttingdown_ {};
|
||||||
AuThreadPrimitives::RWLockUnique_t rwlock_;
|
AuThreadPrimitives::RWLockUnique_t rwlock_;
|
||||||
std::atomic_int tasksRunning_;
|
std::atomic_int tasksRunning_;
|
||||||
bool runnersRunning_ {};
|
bool runnersRunning_ {};
|
||||||
|
@ -38,6 +38,7 @@ namespace Aurora::Async
|
|||||||
bool rejecting {};
|
bool rejecting {};
|
||||||
bool exiting {};
|
bool exiting {};
|
||||||
bool shuttingdown {};
|
bool shuttingdown {};
|
||||||
|
bool exitingflag2 {};
|
||||||
AuThreadPrimitives::EventUnique_t running;
|
AuThreadPrimitives::EventUnique_t running;
|
||||||
//bool running;
|
//bool running;
|
||||||
AuList<AsyncAppWaitSourceRequest> loopSources;
|
AuList<AsyncAppWaitSourceRequest> loopSources;
|
||||||
@ -46,5 +47,9 @@ namespace Aurora::Async
|
|||||||
Utility::RateLimiter rateLimiter;
|
Utility::RateLimiter rateLimiter;
|
||||||
ERunMode runMode;
|
ERunMode runMode;
|
||||||
int cookie {0};
|
int cookie {0};
|
||||||
|
|
||||||
|
//
|
||||||
|
AuThreadPrimitives::SpinLock externalFencesLock;
|
||||||
|
AuList<AuThreading::IWaitable *> externalFences;
|
||||||
};
|
};
|
||||||
}
|
}
|
@ -126,7 +126,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (iCurState == -1 ||
|
while (iCurState == -1 ||
|
||||||
AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState);
|
AuAtomicCompareExchange((AuInt32*)&this->state_, iCurState + 1, iCurState) != iCurState);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
return true;
|
return true;
|
||||||
@ -134,7 +134,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
bool RWLockImpl::LockWrite(AuUInt64 timeout)
|
bool RWLockImpl::LockWrite(AuUInt64 timeout)
|
||||||
{
|
{
|
||||||
if (AuAtomicCompareExchange(&this->state_, -1, 0) == 0)
|
if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, 0) == 0)
|
||||||
{
|
{
|
||||||
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
|
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
|
||||||
return true;
|
return true;
|
||||||
@ -167,21 +167,35 @@ namespace Aurora::Threading::Primitives
|
|||||||
return this->reentrantWriteLockHandle_ == AuThreads::GetThreadId();
|
return this->reentrantWriteLockHandle_ == AuThreads::GetThreadId();
|
||||||
}
|
}
|
||||||
|
|
||||||
return AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) == iCurState;
|
return AuAtomicCompareExchange((AuInt32 *)&this->state_, iCurState + 1, iCurState) == iCurState;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RWLockImpl::TryLockWrite()
|
bool RWLockImpl::TryLockWrite()
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->mutex_);
|
//AU_LOCK_GUARD(this->mutex_);
|
||||||
|
|
||||||
if (this->state_ > 0)
|
for (AuUInt i = 0; i < 20; i++)
|
||||||
{
|
{
|
||||||
return false;
|
auto curVal = this->state_;
|
||||||
|
if (curVal == -1)
|
||||||
|
{
|
||||||
|
AuThreading::ContextYield();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (curVal != 0)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (AuAtomicCompareExchange((AuInt32 *)&this->state_, -1, curVal) == curVal)
|
||||||
|
{
|
||||||
|
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this->reentrantWriteLockHandle_ = AuThreads::GetThreadId();
|
return false;
|
||||||
this->state_ = -1;
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RWLockImpl::UnlockRead()
|
void RWLockImpl::UnlockRead()
|
||||||
@ -194,7 +208,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto val = AuAtomicSub(&this->state_, 1);
|
auto val = AuAtomicSub((AuInt32*)&this->state_, 1);
|
||||||
|
|
||||||
if ((val == 1) && (this->bElevaterPending_))
|
if ((val == 1) && (this->bElevaterPending_))
|
||||||
{
|
{
|
||||||
|
@ -77,7 +77,7 @@ namespace Aurora::Threading::Primitives
|
|||||||
|
|
||||||
ConditionMutexImpl mutex_;
|
ConditionMutexImpl mutex_;
|
||||||
ConditionVariableImpl condition_;
|
ConditionVariableImpl condition_;
|
||||||
AuInt32 state_ {};
|
volatile AuInt32 state_ {};
|
||||||
AuInt32 writersPending_ {};
|
AuInt32 writersPending_ {};
|
||||||
bool bElevaterPending_ {};
|
bool bElevaterPending_ {};
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user