[*] Fix shutdown regression (e037fc21, a7dfd899 151bb106, cleanup cont)

This commit is contained in:
Reece Wilson 2023-11-12 00:18:34 +00:00
parent 9b74a623af
commit 3225d8cbda
4 changed files with 84 additions and 31 deletions

View File

@ -14,7 +14,7 @@ namespace Aurora::Async
struct IThreadPoolInternal
{
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
virtual IThreadPool *ToThreadPool() = 0;

View File

@ -63,39 +63,39 @@ namespace Aurora::Async
bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
auto curThread = GetThreadState();
if (!curThread)
if (auto pCurThread = GetThreadState())
{
return Threading::WaitFor(primitive.get(), timeoutMs);
}
bool workerIdMatches = (unlocker.second == curThread->thread.id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == curThread->thread.id.first) &&
(unlocker.pool.get() == this) && // work group matches
(workerIdMatches)) // well, crap
{
bool queryAsync = false;
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
bool bWorkerIdMatches = (unlocker.second == pCurThread->thread.id.second) ||
((unlocker.second == Async::kThreadIdAny) &&
(GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == pCurThread->thread.id.first) &&
(unlocker.pool.get() == this) &&
(bWorkerIdMatches))
{
queryAsync = CtxYield();
if (!queryAsync && this->shuttingdown_)
bool queryAsync = false;
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
{
return false;
}
}
queryAsync = CtxYield();
return true;
if (!queryAsync && this->shuttingdown_)
{
return false;
}
}
return true;
}
}
else
{
AuSPtr<ThreadState> pHandle;
if (auto pPool = unlocker.pool)
{
AU_LOCK_GUARD(AuStaticCast<ThreadPool>(unlocker.pool)->rwlock_->AsReadable());
auto pPoolEx = AuStaticCast<ThreadPool>(unlocker.pool);
AU_LOCK_GUARD(pPoolEx->rwlock_->AsReadable());
if ((pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker)))
{
@ -111,7 +111,7 @@ namespace Aurora::Async
}
else if (unlocker.pool.get() == this)
{
return primitive->LockMS(timeoutMs);
return primitive->LockMS(timeoutMs);
}
}
@ -600,9 +600,13 @@ namespace Aurora::Async
}
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
auto pSelf = AuThreads::GetThread();
for (const auto &thread : threads)
{
thread->Exit();
if (thread.get() != pSelf)
{
thread->Exit();
}
}
// Is dead flag

View File

@ -25,8 +25,8 @@ namespace Aurora::Async
ThreadPool();
// IThreadPoolInternal
bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms);
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms);
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement);
IThreadPool *ToThreadPool() override;

View File

@ -455,13 +455,62 @@ namespace Aurora::Async
bool WorkItem::BlockUntilComplete()
{
if (!this->finishedEvent_) return false;
if (!this->worker_)
{
this->finishedEvent_->Wait();
return true;
};
return this->owner_->WaitFor(this->worker_.value(), AuUnsafeRaiiToShared(this->finishedEvent_.AsPointer()), 0);
}
struct WaitProxy : Threading::IWaitable
{
AuThreadPrimitives::IEvent *pEvent {};
WaitProxy(AuThreadPrimitives::IEvent *pEvent) :
pEvent(pEvent)
{
}
bool HasOSHandle(AuMach &mach) override
{
return this->pEvent->HasOSHandle(mach);
}
bool HasLockImplementation() override
{
return this->pEvent->HasLockImplementation();
}
void Lock() override
{
return this->pEvent->Lock();
}
bool LockMS(AuUInt64 qwRelTimeoutInMs) override
{
return this->pEvent->LockMS(qwRelTimeoutInMs);
}
bool LockNS(AuUInt64 qwRelTimeoutInNs) override
{
return this->pEvent->LockNS(qwRelTimeoutInNs);
}
bool LockAbsMS(AuUInt64 qwAbsTimeoutInMs) override
{
return this->pEvent->LockAbsMS(qwAbsTimeoutInMs);
}
bool LockAbsNS(AuUInt64 qwAbsTimeoutInNs) override
{
return this->pEvent->LockAbsNS(qwAbsTimeoutInNs);
}
bool TryLock() override
{
return this->pEvent->TryLock();
}
void Unlock() override
{
// PATCH: ensure release notifications set the event!
this->pEvent->Set();
}
} waitProxy(this->finishedEvent_.AsPointer());
return this->owner_->WaitFor(this->worker_.value(),
AuUnsafeRaiiToShared(&waitProxy),
0 /*forever*/);
}
bool WorkItem::HasFinished()