[*] Reworked AuAsync ThreadPool shutdown (again)

[*] Fixed nested lock under WorkItem
This commit is contained in:
Reece Wilson 2023-02-08 18:23:37 +00:00
parent 75c37f4e55
commit 8293484a86
5 changed files with 252 additions and 22 deletions

View File

@ -74,6 +74,11 @@ namespace Aurora::Async
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
{
queryAsync = CtxYield();
if (!queryAsync && this->shuttingdown_)
{
return false;
}
}
return true;
@ -266,10 +271,18 @@ namespace Aurora::Async
SysPushErrorUninitialized("Not an async thread");
}
while ((!auThread->Exiting()) && (!job->shuttingdown))
while ((!auThread->Exiting()) &&
(!this->shutdown) &&
(!job->bBreakEarly))
{
// Do work (blocking)
InternalRunOne(true);
if (!InternalRunOne(true))
{
if (this->shutdown)
{
return ranOnce;
}
}
ranOnce = true;
}
@ -286,6 +299,8 @@ namespace Aurora::Async
bool success {};
auto runMode = GetCurrentThreadRunMode();
EarlyExitTick();
//do
{
@ -347,8 +362,7 @@ namespace Aurora::Async
&& (asyncLoop->WaitAny(0))
)
{
PollInternal(false);
success = true;
success = PollInternal(false);
}
else
{
@ -361,6 +375,8 @@ namespace Aurora::Async
}
} //while (success);
EarlyExitTick();
return success;
}
@ -456,19 +472,32 @@ namespace Aurora::Async
}
// pre-wakeup thread terminating check
if (state->threadObject->Exiting() || state->shuttingdown)
if (state->threadObject->Exiting())
{
break;
}
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
if (this->shuttingdown_)
{
//this->EarlyExitTick();
break;
}
group->cvVariable->WaitForSignal();
if (this->shuttingdown_)
{
//this->EarlyExitTick();
break;
}
}
// Post-wakeup thread terminating check
if (state->threadObject->Exiting() || state->shuttingdown)
if (state->threadObject->Exiting())
{
break;
}
@ -523,7 +552,7 @@ namespace Aurora::Async
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
if (state->threadObject->Exiting() || state->shuttingdown)
if (state->threadObject->Exiting() || this->shutdown)
{
break;
}
@ -626,6 +655,7 @@ namespace Aurora::Async
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
// Update shutting down flag
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down soon [microseconds, probably]'
{
if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
{
@ -633,6 +663,9 @@ namespace Aurora::Async
}
}
auto pLocalRunner = this->GetThreadStateNoWarn();
AuList<WorkerId_t> toBarrier;
// Noting
// 1) that StopSched may lockup under a writable lock
// -> we will terminate a thread that may be dispatching a sys pump event
@ -640,7 +673,6 @@ namespace Aurora::Async
//
// Perform the following shutdown of the schedular and other available threads under a read lock
{
AuList<WorkerId_t> toBarrier;
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
@ -661,12 +693,28 @@ namespace Aurora::Async
}
}
// Ehhhh
// We need this fix to a specific V8 deinit lockup
//this->Poll();
for (const auto &id : toBarrier)
{
Barrier(id, 0, false, true);
if (trySelfPid == id)
{
continue;
}
this->Barrier(id, 0, false, false /* no reject*/); // absolute safest point in shutdown; sync to already submitted work
}
}
// Time for fuckiness
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down after sync barrier'
{
AuAtomicTestAndSet(&this->shuttingdown_, 1);
}
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
@ -690,7 +738,7 @@ namespace Aurora::Async
worker->shuttingdown = true;
}
if (!group->IsSysThread())
if (!group->IsSysThread()) // bug?
{
worker->threadObject->SendExitSignal();
threads.push_back(worker->threadObject);
@ -705,11 +753,37 @@ namespace Aurora::Async
}
}
// Final sync to exit
{
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
auto handle = this->GetThreadHandle(id);
if (handle)
{
handle->rejecting = false;
}
this->Barrier(id, 0, false, true);
}
}
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
for (const auto &thread : threads)
{
thread->Exit();
}
// Is dead flag
this->shutdown = true;
if (pLocalRunner)
{
pLocalRunner->bIsKiller = true;
}
}
bool ThreadPool::Exiting()
@ -954,6 +1028,15 @@ namespace Aurora::Async
}
#endif
if (this->shutdown ||
this->shuttingdown_ & 2) // fast
{
if (GetThreadState()->rejecting)
{
return false;
}
}
return this->InternalRunOne(false);
}
@ -1096,7 +1179,7 @@ namespace Aurora::Async
// private api
bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop)
AU_NOINLINE bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop)
{
auto self = GetThreadState();
if (!self)
@ -1174,12 +1257,83 @@ namespace Aurora::Async
CleanWorkerPoolReservedZeroFree();
}
}
void ThreadPool::EarlyExitTick()
{
auto jobWorker = GetThreadState();
auto state = jobWorker->parent.lock();
if (!jobWorker)
{
SysPushErrorUninitialized("Not an async thread");
return;
}
if ((this->shuttingdown_ & 2) != 2)
{
return;
}
state->eventLs->Set();
state->cvVariable->Broadcast();
{
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
{
return;
}
this->PollInternal(false);
}
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
features = AuExchange(jobWorker->features, {});
}
{
for (const auto &thread : features)
{
try
{
thread->Cleanup();
}
catch (...)
{
SysPushErrorCatch("Couldn't clean up thread feature!");
}
}
if (!this->GetThreadState()->bIsKiller)
{
#if 0
// TODO... i know what to do
#else
// this will do for now
if (!jobWorker->rejecting)
{
this->Barrier(AuAsync::GetCurrentWorkerPId(), 0, false , true);
}
//this->Barrier(AuAsync::GetCurrentWorkerPId(), 0, false, false);
#endif
}
jobWorker->bAlreadyDoingExitTick = false;
jobWorker->bBreakEarly = true;
}
}
void ThreadPool::ThisExiting()
{
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
@ -1206,11 +1360,15 @@ namespace Aurora::Async
jobWorker->externalFences.clear();
}
features = AuExchange(jobWorker->features, {});
}
{
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms
// -> anything your brain wishes to imagination
for (const auto &thread : jobWorker->features)
for (const auto &thread : features)
{
try
{
@ -1223,8 +1381,13 @@ namespace Aurora::Async
}
}
jobWorker->features.clear();
features.clear();
}
{
AU_LOCK_GUARD(this->rwlock_->AsWritable());
auto itr = state->workers.find(id.second);
auto &jobWorker = itr->second;
state->workers.erase(itr);
}
}
@ -1269,6 +1432,32 @@ namespace Aurora::Async
return state->workers[worker.second];
}
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
auto thread = gCurrentPool.lock();
if (!thread)
{
return {};
}
if (thread.get() != this)
{
return {};
}
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->workers[worker.second];
}
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());

View File

@ -117,17 +117,21 @@ namespace Aurora::Async
void Entrypoint(WorkerId_t id);
private:
void EarlyExitTick();
void ThisExiting();
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
AuSPtr<ThreadState> GetThreadState();
AuSPtr<ThreadState> GetThreadStateNoWarn();
AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id);
AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id);
using ThreadDb_t = AuBST<ThreadGroup_t, AuSPtr<GroupState>>;
ThreadDb_t threads_;
AuUInt shuttingdown_ {};
AuUInt32 shuttingdown_ {};
bool shutdown {};
AuThreadPrimitives::RWLockUnique_t rwlock_;
std::atomic_int tasksRunning_;
bool runnersRunning_ {};

View File

@ -47,6 +47,9 @@ namespace Aurora::Async
Utility::RateLimiter rateLimiter;
ERunMode runMode;
int cookie {0};
bool bAlreadyDoingExitTick {};
bool bBreakEarly {};
bool bIsKiller {};
//
AuThreadPrimitives::SpinLock externalFencesLock;

View File

@ -160,7 +160,7 @@ namespace Aurora::Async
void WorkItem::DispatchEx(bool check)
{
AU_LOCK_GUARD(lock);
AU_LOCK_GUARD(this->lock);
DispatchExLocked(check);
}
@ -228,13 +228,38 @@ namespace Aurora::Async
void WorkItem::CancelAsync()
{
AU_TRY_LOCK_GUARD_NAMED(this->lock, asd);
AU_TRY_LOCK_GUARD_NAMED(this->lock2, asd);
Fail();
}
void WorkItem::RunAsyncLocked2()
{
AU_LOCK_GUARD(this->lock2);
IWorkItemHandler::ProcessInfo info(true);
info.pool = this->owner_->ToThreadPool();
if (this->task_)
{
try
{
this->task_->DispatchFrame(info);
}
catch (...)
{
// TODO: runtime config for root level exception caught behaviour
SysPushErrorCatch();
Fail();
return;
}
}
RunAsyncLocked2(info);
}
void WorkItem::RunAsync()
{
AU_LOCK_GUARD(this->lock);
AU_LOCK_GUARD(this->lock2);
RunAsyncLocked();
}
@ -258,6 +283,13 @@ namespace Aurora::Async
}
}
AU_LOCK_GUARD(this->lock);
RunAsyncLocked2(info);
}
void WorkItem::RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info)
{
switch (info.type)
{
case ETickType::eFinished:
@ -317,7 +349,7 @@ namespace Aurora::Async
for (auto &waiter : this->waiters_)
{
AuReinterpretCast<WorkItem>(waiter)->DispatchEx(true);
AuReinterpretCast<WorkItem>(waiter)->DispatchExLocked(true);
}
}
@ -357,7 +389,7 @@ namespace Aurora::Async
void WorkItem::Cancel()
{
AU_LOCK_GUARD(this->lock);
AU_LOCK_GUARD(this->lock2);
Fail();
}
@ -376,7 +408,7 @@ namespace Aurora::Async
if (!this->task_)
{
// If we aren't actually calling a task interface, we may as well just dispatch objects waiting on us from here
RunAsyncLocked();
RunAsyncLocked2();
}
else
{

View File

@ -52,7 +52,8 @@ namespace Aurora::Async
private:
void RunAsyncLocked();
void RunAsyncLocked2();
void RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info);
bool WaitForLocked(const AuList<AuSPtr<IWorkItem>> &workItem);
void DispatchEx(bool check);
@ -64,6 +65,7 @@ namespace Aurora::Async
AuList<AuSPtr<IWorkItem>> waitOn_;
AuList<AuSPtr<IWorkItem>> waiters_;
AuThreadPrimitives::SpinLock lock;
AuThreadPrimitives::SpinLock lock2;
AuThreadPrimitives::EventUnique_t finishedEvent_;
bool finished {};