AuroraRuntime/Source/Async/ThreadPool.cpp

1643 lines
44 KiB
C++
Raw Normal View History

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ThreadPool.cpp
Date: 2021-10-30
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "ThreadPool.hpp"
#include "AsyncApp.hpp"
#include "WorkItem.hpp"
#include "AuSchedular.hpp"
2022-03-10 15:35:01 +00:00
#include "ThreadWorkerQueueShim.hpp"
#include "IAsyncRunnable.hpp"
#include "AuAsyncFuncRunnable.hpp"
#include "AuAsyncFuncWorker.hpp"
namespace Aurora::Async
{
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static thread_local AuWPtr<ThreadPool> tlsCurrentThreadPool;
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
{
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
{
return AuUnsafeRaiiToShared(AuStaticCast<ThreadPool>(gAsyncApp));
}
return AuStaticPointerCast<ThreadPool>(pool);
}
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
{
auto lkPool = tlsCurrentThreadPool.lock();
if (!lkPool)
{
return {};
}
auto cpy = *lkPool->tlsWorkerId;
if (auto pPool = AuTryLockMemoryType(cpy.pool))
{
return WorkerPId_t(pPool, cpy);
}
else
{
return {};
}
}
//
ThreadPool::ThreadPool() : shutdownEvent_(false, false, true)
{
this->pRWReadView = this->rwlock_->AsReadable();
}
// internal pool interface
bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
return WaitFor(WorkerPId_t { AuAsync::GetCurrentWorkerPId().GetPool(), unlocker }, primitive, timeoutMs);
}
bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
AuUInt64 uEndTimeNS = timeoutMs ?
AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(timeoutMs) :
0;
if (auto pCurThread = GetThreadState())
{
bool bStat {};
{
bStat = !bool(unlocker);
}
if (!bStat)
{
// old shid (to clean up)
bool bWorkerIdMatches = (unlocker.second == pCurThread->thread.id.second) ||
((unlocker.second == Async::kThreadIdAny) &&
(GetThreadWorkersCount(unlocker.first) == 1));
bStat = (unlocker.first == pCurThread->thread.id.first) &&
(unlocker.GetPool().get() == this) &&
(bWorkerIdMatches);
}
if (bStat)
{
while (true)
{
AuUInt32 didntAsk;
bool bTimedOut {};
if (primitive->TryLock())
{
return true;
}
this->InternalRunOne(pCurThread, true, false, didntAsk);
if (uEndTimeNS)
{
bTimedOut = AuTime::SteadyClockNS() >= uEndTimeNS;
}
if (primitive->TryLock())
{
return true;
}
if (!AuAtomicLoad(&this->shuttingdown_) ||
bTimedOut)
{
return false;
}
}
}
}
{
AuSPtr<ThreadState> pHandle;
if (auto pPool = unlocker)
{
auto pPoolEx = AuStaticCast<ThreadPool>(unlocker.GetPool());
AU_LOCK_GLOBAL_GUARD(pPoolEx->rwlock_->AsReadable());
auto pShutdownLock = Aurora::Threading::GetShutdownReadLock();
if ((pHandle = AuStaticCast<ThreadPool>(unlocker.GetPool())->GetThreadHandle(unlocker)))
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
if (pHandle->exitingflag2)
{
pShutdownLock->Unlock();
bool bRet = primitive->TryLock();
pShutdownLock->Lock();
return bRet;
}
else
{
pHandle->externalFences.push_back(primitive.get());
}
}
else if (unlocker.GetPool().get() == this)
{
pShutdownLock->Unlock();
bool bRet = primitive->LockMS(timeoutMs);
pShutdownLock->Lock();
return bRet;
}
}
bool bRet = primitive->LockAbsNS(uEndTimeNS);
if (pHandle)
{
AU_LOCK_GLOBAL_GUARD(pHandle->externalFencesLock);
AuTryRemove(pHandle->externalFences, primitive.get());
}
return bRet;
}
}
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
{
return this->Run(target, runnable, true);
}
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement)
{
2024-01-23 22:35:18 +00:00
AuSPtr<ThreadState> pWorker;
auto pGroupState = GetGroup(target.first);
SysAssert(static_cast<bool>(pGroupState), "couldn't dispatch a task to an offline group");
2024-01-23 22:35:18 +00:00
if (target.second != Async::kThreadIdAny)
{
2024-01-23 22:35:18 +00:00
pWorker = pGroupState->GetThreadByIndex(target.second);
if (!pWorker)
{
runnable->CancelAsync();
return;
}
2024-01-23 22:35:18 +00:00
if (pWorker->shutdown.bDropSubmissions)
{
runnable->CancelAsync();
return;
}
2023-11-11 11:27:01 +00:00
}
if (bIncrement)
{
AuAtomicAdd(&this->uAtomicCounter, 1u);
}
2024-01-23 22:35:18 +00:00
pGroupState->workQueue.AddWorkEntry(AuMakePair(target.second, runnable));
2023-06-07 19:51:23 +00:00
if (target.second == Async::kThreadIdAny)
{
2024-01-23 22:35:18 +00:00
pGroupState->SignalAll();
2023-06-07 19:51:23 +00:00
}
else
{
2024-01-23 22:35:18 +00:00
pWorker->sync.SetEvent(true, true);
}
}
IThreadPool *ThreadPool::ToThreadPool()
{
return this;
}
// ithreadpool
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
{
return GetGroup(group)->workers.size();
}
void ThreadPool::SetRunningMode(bool eventRunning)
{
this->runnersRunning_ = eventRunning;
}
bool ThreadPool::Spawn(WorkerId_t workerId)
{
return Spawn(workerId, false);
}
bool ThreadPool::Create(WorkerId_t workerId)
{
return Spawn(workerId, true);
}
bool ThreadPool::InRunnerMode()
{
return this->runnersRunning_;
}
#define ASYNC_THREADGROUP_TLS_PREP \
auto &weakTlsHandle = tlsCurrentThreadPool; \
auto tlsHandle = AuTryLockMemoryType(weakTlsHandle); \
if (!tlsHandle || tlsHandle.get() != this) \
{ \
weakTlsHandle = AuSharedFromThis(); \
}
#define ASYNC_THREADGROUP_TLS_UNSET \
if (tlsHandle && tlsHandle.get() != this) \
{ \
tlsCurrentThreadPool = weakTlsHandle; \
}
bool ThreadPool::Poll()
{
AuUInt32 uCount {};
ASYNC_THREADGROUP_TLS_PREP;
auto bRet = InternalRunOne(GetThreadStateLocal(), false, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return bRet;
}
bool ThreadPool::RunOnce()
{
AuUInt32 uCount {};
ASYNC_THREADGROUP_TLS_PREP;
auto bRet = InternalRunOne(GetThreadStateLocal(), true, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return bRet;
}
bool ThreadPool::Run()
{
bool ranOnce {};
ASYNC_THREADGROUP_TLS_PREP;
auto pJobRunner = GetThreadStateLocal();
if (!pJobRunner)
{
ASYNC_THREADGROUP_TLS_UNSET;
this->shutdownEvent_->LockMS(0);
return true;
}
auto auThread = AuThreads::GetThread();
while ((!auThread->Exiting()) &&
((AuAtomicLoad(&this->shuttingdown_) & 2) != 2) &&
(!pJobRunner->shutdown.bBreakMainLoop))
{
AuUInt32 uCount {};
// Do work (blocking)
if (!InternalRunOne(pJobRunner, true, true, uCount))
{
if ((AuAtomicLoad(&this->shuttingdown_) & 2) == 2)
{
return ranOnce;
}
}
ranOnce = true;
}
ASYNC_THREADGROUP_TLS_UNSET;
return ranOnce;
}
bool ThreadPool::InternalRunOne(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{
bool bSuccess {};
if (!state)
{
SysPushErrorUninitialized("Not an async thread");
return false;
}
EarlyExitTick();
2022-03-10 15:35:01 +00:00
{
2022-03-10 15:35:01 +00:00
auto asyncLoop = state->asyncLoop;
asyncLoop->OnFrame();
if (asyncLoop->GetSourceCount() > 1)
{
if (block)
2022-03-10 15:35:01 +00:00
{
asyncLoop->WaitAny(0);
2022-03-10 15:35:01 +00:00
}
else
2023-06-26 07:11:45 +00:00
{
asyncLoop->PumpNonblocking();
2022-03-10 15:35:01 +00:00
}
2023-06-26 07:11:45 +00:00
bSuccess = PollInternal(state, false, bUntilWork, uCount);
}
else
{
bSuccess = PollInternal(state, block, bUntilWork, uCount);
}
}
EarlyExitTick();
2023-09-05 13:48:14 +00:00
return bSuccess;
2023-09-05 13:48:14 +00:00
}
#if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_)
AuVoidTask ThreadPool::PollInternal_ForceCoRoutine(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount, bool &bRet)
{
bRet = PollInternal_Base(state, block, bUntilWork, uCount);
co_return;
}
#endif
bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{
#if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_)
if (state->stackState.uStackCallDepth &&
gRuntimeConfig.async.bEnableCpp20RecursiveCallstack)
{
bool bRet {};
PollInternal_ForceCoRoutine(state, block, bUntilWork, uCount, bRet);
return bRet;
}
#endif
return PollInternal_Base(state, block, bUntilWork, uCount);
}
bool ThreadPool::PollInternal_Base(AuSPtr<ThreadState> state, bool block, bool bUntilWork, AuUInt32 &uCount)
{
if (!state)
{
SysPushErrorUninitialized("Not an async thread");
return false;
}
auto group = state->parent.lock();
{
AU_LOCK_GUARD(state->sync.cvWorkMutex);
do
{
2024-09-05 16:38:54 +00:00
bool bFailedOOM = group->workQueue.Dequeue(state->pendingWorkItems,
state->stackState.uWorkMultipopCount,
state->thread.id.second);
state->sync.UpdateCVState(state.get());
2023-09-05 13:48:14 +00:00
// Consider blocking for more work
if (!block)
{
break;
}
2024-09-05 16:38:54 +00:00
// OOM: hardened: sleep for 0.01MS if the heap for task dequeue is full.
// Until the mixed heap object is implemented, we can only dequeue 2^16 tasks globally at a time into a reserved heap.
if (!bFailedOOM)
{
if (state->pendingWorkItems.empty())
{
AuThreading::SleepNs(10'000);
continue;
}
else
{
break;
}
}
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
// pre-wakeup thread terminating check
if (state->thread.pThread->Exiting())
{
break;
}
if (AuAtomicLoad(&this->shuttingdown_) & 2)
{
break;
}
2024-09-05 16:38:54 +00:00
// OOM: hardened: do not sleep after OOM re-try
if (group->workQueue.IsEmpty(this, state->thread.id))
{
state->sync.cvVariable->WaitForSignal();
}
if (AuAtomicLoad(&this->shuttingdown_) & 2)
{
break;
}
// Post-wakeup thread terminating check
if (state->thread.pThread->Exiting())
{
break;
}
}
if (state->pendingWorkItems.empty() && (
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
{
return false;
}
2023-09-05 13:48:14 +00:00
}
while (state->pendingWorkItems.empty() && (block && bUntilWork));
2024-02-01 21:35:42 +00:00
if (!block &&
!(this->shuttingdown_ & 2)) // quick hack: is worthy of io reset by virtue of having polled externally (most likely for IO ticks, unlikely for intraprocess ticks)
{
AU_LOCK_GLOBAL_GUARD(group->workQueue.mutex); // dont atomically increment our work counters [signal under mutex group]...
2023-09-05 21:22:13 +00:00
AU_LOCK_GUARD(group->workersMutex); // dont atomically increment our work counters [broadcast]...
// ...these primitives are far less expensive to hit than resetting kernel primitives
// AU_LOCK_GUARD(state->cvWorkMutex) used to protect us
2023-11-11 11:27:01 +00:00
if (group->workQueue.IsEmpty(this, state->thread.id))
2023-09-05 21:22:13 +00:00
{
state->sync.eventLs->Reset(); // ...until we're done
AuAtomicStore(&state->sync.cvLSActive, 0u);
2023-09-05 21:22:13 +00:00
}
}
}
if (state->pendingWorkItems.empty())
{
if (InRunnerMode())
{
if ((AuAtomicLoad(&this->uAtomicCounter) == 0) &&
this->IsDepleted(state))
{
Shutdown();
}
}
return false;
}
int runningTasks {};
auto uStartCookie = state->stackState.uStackCookie++;
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the first task (or deeper)
if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep?
{
if ((AuAtomicLoad(&this->uAtomicCounter) == state->stackState.uStackCallDepth) &&
this->IsDepleted(state))
{
return false;
}
}
//
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
if (state->thread.pThread->Exiting())
{
break;
}
// Dispatch
2022-05-19 03:07:10 +00:00
auto oops = itr->second;
// Remove from our local job queue
itr = state->pendingWorkItems.erase(itr);
state->stackState.uStackCallDepth++;
//SysBenchmark(fmt::format("RunAsync: {}", block));
2022-05-19 03:07:10 +00:00
// Dispatch
2023-12-24 22:03:14 +00:00
if (oops)
{
oops->RunAsync();
}
uCount++;
2022-05-19 03:07:10 +00:00
// Atomically decrement global task counter
runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u);
state->stackState.uStackCallDepth--;
if (uStartCookie != state->stackState.uStackCookie)
{
uStartCookie = state->stackState.uStackCookie;
itr = state->pendingWorkItems.begin();
}
}
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
AU_LOCK_GUARD(state->sync.cvWorkMutex);
2023-09-05 13:48:14 +00:00
for (const auto &item : state->pendingWorkItems)
{
2024-01-23 22:35:18 +00:00
group->workQueue.AddWorkEntry(item);
}
state->pendingWorkItems.clear();
state->sync.cvVariable->Broadcast();
state->sync.eventLs->Set();
}
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the top most task
if (InRunnerMode())
{
if ((runningTasks == 0) &&
(AuAtomicLoad(&this->uAtomicCounter) == 0) &&
this->IsDepleted(state))
{
Shutdown();
}
}
return true;
}
// While much of this subsystem needs good rewrite, under no circumstance should the shutdown process be "simpified" or "cleaned up"
// This is our expected behaviour. Any changes will likely introduce hard to catch bugs across various softwares and exit conditions.
void ThreadPool::Shutdown()
{
AU_DEBUG_MEMCRUNCH;
auto trySelfPid = AuAsync::GetCurrentWorkerPId();
// Update shutting down flag
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down soon [microseconds, probably]'
{
if (AuAtomicTestAndSet(&this->shuttingdown_, 0) != 0)
{
return;
}
}
auto pLocalRunner = this->GetThreadStateNoWarn();
AuList<WorkerId_t> toBarrier;
// wait for regular prio work to complete
2023-08-10 00:20:34 +00:00
{
for (auto pGroup : this->threadGroups_)
{
if (!pGroup)
{
continue;
}
AU_LOCK_GLOBAL_GUARD(pGroup->workersMutex);
for (auto &[id, worker] : pGroup->workers)
{
2023-11-11 11:27:01 +00:00
if (trySelfPid == worker->thread.id)
{
continue;
}
2023-11-11 11:27:01 +00:00
toBarrier.push_back(worker->thread.id);
}
}
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
this->Barrier(id, 0, false, false /* no reject*/); // absolute safest point in shutdown; sync to already submitted work
}
}
// increment abort cookies
{
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
AuAtomicAdd(&this->uAtomicShutdownCookie, 1u);
}
}
// set shutdown flags
{
AuAtomicTestAndSet(&this->shuttingdown_, 1);
}
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
AuList<AuThreads::ThreadShared_t> threads;
AuList<AuSPtr<ThreadState>> states;
{
AU_LOCK_GLOBAL_GUARD(this->pRWReadView);
for (auto pGroup : this->threadGroups_)
{
if (!pGroup)
{
continue;
}
for (auto &[id, pState] : pGroup->workers)
{
// main loop:
if (pState)
{
states.push_back(pState);
pState->shuttingdown = true;
}
2022-01-19 11:47:29 +00:00
else
{
pState->shuttingdown = true;
2022-01-19 11:47:29 +00:00
}
// thread object:
if (pState->thread.bOwnsThread)
{
pState->thread.pThread->SendExitSignal();
threads.push_back(pState->thread.pThread);
}
// unrefreeze signals:
auto &event = pState->running;
if (event)
{
event->Set();
}
}
}
}
// Break all condvar loops, just in case
for (const auto &pState : states)
{
pState->sync.SetEvent();
}
// Final sync to exit
{
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
auto handle = this->GetThreadHandle(id);
if (handle)
{
2023-11-11 11:27:01 +00:00
handle->shutdown.bDropSubmissions = false;
2023-10-25 19:08:42 +00:00
handle->isDeadEvent->LockMS(250);
}
}
}
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
auto pSelf = AuThreads::GetThread();
for (const auto &thread : threads)
{
if (thread.get() != pSelf)
{
2024-05-27 12:26:20 +00:00
thread->Terminate();
}
}
// Is dead flag
this->shutdownEvent_->Set();
//
if (pLocalRunner)
{
pLocalRunner->shutdown.bIsKillerThread = true;
}
// Notify observing threads of our work exhaustion
for (const auto &wOther : this->listWeakDepsParents_)
{
if (auto pThat = AuTryLockMemoryType(wOther))
{
if (pThat->InRunnerMode())
{
continue;
}
if (!pThat->IsSelfDepleted(nullptr))
{
continue;
}
if (pThat->uAtomicCounter)
{
continue;
}
pThat->Shutdown();
}
}
}
bool ThreadPool::Exiting()
{
return this->shuttingdown_ ||
GetThreadState()->exiting;
}
AuUInt32 ThreadPool::PollAndCount(bool bStrict)
{
AuUInt32 uCount {};
ASYNC_THREADGROUP_TLS_PREP;
auto bRanAtLeastOne = this->InternalRunOne(this->GetThreadStateNoWarn(), false, false, uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return uCount ? uCount : (bStrict ? bRanAtLeastOne : 0);
}
AuUInt32 ThreadPool::RunAllPending()
{
AuUInt32 uCount {};
AuUInt32 uCountTotal {};
ASYNC_THREADGROUP_TLS_PREP;
do
{
uCount = 0;
(void)this->InternalRunOne(this->GetThreadStateNoWarn(), false, true, uCount);
uCountTotal += uCount;
}
while (uCount);
ASYNC_THREADGROUP_TLS_UNSET;
return uCountTotal;
}
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker,
const AuSPtr<IWorkItemHandler> &task)
{
// Error pass-through
if (!task)
{
return {};
}
return AuMakeShared<WorkItem>(this, WorkerPId_t { this->SharedFromThis(), worker }, task);
}
AuSPtr<IWorkItem> ThreadPool::NewWorkFunction(const WorkerId_t &worker,
AuVoidFunc callback)
{
SysAssert(callback);
return AuMakeShared<AsyncFuncWorker>(this, WorkerPId_t { this->SharedFromThis(), worker }, AuMove(callback));
}
AuSPtr<IWorkItem> ThreadPool::NewFence()
{
return AuMakeShared<WorkItem>(this, AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{});
}
AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id)
{
auto pState = GetThreadHandle(id);
if (!pState)
{
return {};
}
return pState->thread.pThread;
}
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
{
AU_DEBUG_MEMCRUNCH;
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
for (auto pGroup : this->threadGroups_)
{
AuList<ThreadId_t> workers;
if (!pGroup)
{
continue;
}
AU_LOCK_GUARD(pGroup->workersMutex);
AuTryReserve(workers, pGroup->workers.size());
for (const auto &thread : pGroup->workers)
{
2023-11-11 11:27:01 +00:00
workers.push_back(thread.second->thread.id.second);
}
ret[pGroup->group] = workers;
}
return ret;
}
WorkerId_t ThreadPool::GetCurrentThread()
{
return tlsWorkerId;
}
AuSPtr<AuIO::IIOProcessor> ThreadPool::GetIOProcessor(WorkerId_t id)
{
if (auto pState = this->GetThreadHandle(id))
{
return pState->singletons.GetIOProcessor({ this->SharedFromThis(), id });
}
return {};
}
AuSPtr<IO::CompletionGroup::ICompletionGroup> ThreadPool::GetIOGroup(WorkerId_t id)
{
if (auto pState = this->GetThreadHandle(id))
{
return pState->singletons.GetIOGroup({ this->SharedFromThis(), id });
}
return {};
}
AuSPtr<AuIO::Net::INetInterface> ThreadPool::GetIONetInterface(WorkerId_t id)
{
if (auto pState = this->GetThreadHandle(id))
{
return pState->singletons.GetIONetInterface({ this->SharedFromThis(), id });
}
return {};
}
AuSPtr<AuIO::Net::INetWorker> ThreadPool::GetIONetWorker(WorkerId_t id)
{
if (auto pState = this->GetThreadHandle(id))
{
return pState->singletons.GetIONetWorker({ this->SharedFromThis(), id });
}
return {};
}
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
{
//AU_LOCK_GUARD(this->pRWReadView);
auto currentWorkerId = GetCurrentThread().second;
if (workerId.second == Async::kThreadIdAny)
{
decltype(GroupState::workers) workers;
{
AU_LOCK_GLOBAL_GUARD(this->pRWReadView);
if (auto pGroup = GetGroup(workerId.first))
{
workers = pGroup->workers;
}
}
for (auto &jobWorker : workers)
{
2023-11-11 11:27:01 +00:00
if (!Barrier(jobWorker.second->thread.id, timeoutMs, requireSignal && jobWorker.second->thread.id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{
return false;
}
}
}
else
{
return Barrier(workerId, timeoutMs, requireSignal && workerId.second != currentWorkerId, false);
}
return true;
}
void ThreadPool::Signal(WorkerId_t workerId)
{
auto group = GetGroup(workerId.first);
if (workerId.second == Async::kThreadIdAny)
{
AU_LOCK_GLOBAL_GUARD(group->workersMutex);
for (auto &jobWorker : group->workers)
{
jobWorker.second->running->Set();
}
}
else if (auto pThread = GetThreadHandle(workerId))
{
pThread->running->Set();
}
}
void ThreadPool::Wakeup(WorkerId_t workerId)
{
auto group = GetGroup(workerId.first);
if (workerId.second == Async::kThreadIdAny)
{
group->SignalAll(false);
}
else if (auto pThread = GetThreadHandle(workerId))
{
pThread->sync.SetEvent(true, false);
}
}
[*/+/-] MEGA COMMIT. ~2 weeks compressed. The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
2022-02-17 00:11:40 +00:00
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
[*/+/-] MEGA COMMIT. ~2 weeks compressed. The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
2022-02-17 00:11:40 +00:00
{
auto a = GetThreadHandle(workerId);
if (!a)
{
return {};
}
return a->sync.eventLs;
[*/+/-] MEGA COMMIT. ~2 weeks compressed. The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
2022-02-17 00:11:40 +00:00
}
void ThreadPool::SyncAllSafe()
{
AU_LOCK_GLOBAL_GUARD(this->pRWReadView);
for (auto pGroup : this->threadGroups_)
{
if (!pGroup)
{
continue;
}
for (auto &jobWorker : pGroup->workers)
{
2023-11-11 11:27:01 +00:00
SysAssert(Barrier(jobWorker.second->thread.id, 0, false, false));
}
}
}
void ThreadPool::AddFeature(WorkerId_t id,
AuSPtr<AuThreads::IThreadFeature> pFeature,
bool bNonBlock)
{
2023-09-22 05:05:39 +00:00
auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]()
{
auto pState = GetThreadState();
2023-11-11 11:27:01 +00:00
{
AU_LOCK_GUARD(pState->tlsFeatures.mutex);
pState->tlsFeatures.features.push_back(pFeature);
}
pFeature->Init();
2023-09-22 05:05:39 +00:00
});
if (!bNonBlock)
{
pWorkItem->BlockUntilComplete();
}
}
void ThreadPool::AssertInThreadGroup(ThreadGroup_t group)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId).first == group);
}
void ThreadPool::AssertWorker(WorkerId_t id)
{
SysAssert(static_cast<WorkerId_t>(tlsWorkerId) == id);
}
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue()
{
2022-03-10 15:35:01 +00:00
return this->GetThreadState()->asyncLoop;
}
AuSPtr<AuLoop::ILoopQueue> ThreadPool::ToKernelWorkQueue(WorkerId_t workerId)
2022-03-10 15:35:01 +00:00
{
auto worker = this->GetThreadHandle(workerId);
if (!worker)
{
SysPushErrorGeneric("Couldn't find requested worker");
2022-03-10 15:35:01 +00:00
return {};
}
2022-03-10 15:35:01 +00:00
return worker->asyncLoop;
}
bool ThreadPool::IsSelfDepleted(const AuSPtr<ThreadState> &pState)
2023-08-13 08:30:17 +00:00
{
AuSPtr<AuLoop::ILoopQueue> pLoopQueue;
if (pState)
{
pLoopQueue = pState->asyncLoop;
}
else
{
if (auto pLocalThread = this->GetThreadStateLocal())
{
pLoopQueue = pLocalThread->asyncLoop;
}
}
if (!pLoopQueue)
{
return true;
}
return pLoopQueue->GetSourceCount() <= 1 + this->uAtomicIOProcessorsWorthlessSources + this->uAtomicIOProcessors;
2023-08-13 08:30:17 +00:00
}
bool ThreadPool::IsDepleted(const AuSPtr<ThreadState> &state)
2023-08-13 08:30:17 +00:00
{
if (!IsSelfDepleted(state))
2023-08-13 08:30:17 +00:00
{
return false;
}
for (const auto &wOther : this->listWeakDeps_)
{
if (auto pThat = AuTryLockMemoryType(wOther))
{
if (!pThat->IsSelfDepleted(nullptr))
2023-08-13 08:30:17 +00:00
{
return false;
}
2024-09-05 16:38:54 +00:00
if (AuAtomicLoad(&pThat->uAtomicCounter))
2023-08-13 08:30:17 +00:00
{
return false;
}
}
}
return true;
}
void ThreadPool::AddDependency(AuSPtr<IThreadPool> pPool)
{
if (!pPool)
{
return;
}
auto pOther = AuStaticCast<ThreadPool>(pPool);
this->listWeakDeps_.push_back(pOther);
pOther->listWeakDepsParents_.push_back(AuSharedFromThis());
2023-08-13 08:30:17 +00:00
}
AuSPtr<AuThreading::IWaitable> ThreadPool::GetShutdownEvent()
{
return AuSPtr<AuThreading::IWaitable>(AuSharedFromThis(), this->shutdownEvent_.AsPointer());
}
// Unimplemented fiber hooks, 'twas used for science. no longer in use
int ThreadPool::CtxPollPush()
{
// TOOD (Reece): implement a context switching library
// Refer to the old implementation of this on pastebin
return 0;
}
void ThreadPool::CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask)
{
}
bool ThreadPool::CtxYield()
{
bool ranAtLeastOne = false;
// !!!
auto pA = this->GetThreadStateNoWarn();
2024-09-05 16:38:54 +00:00
if (AuAtomicLoad(&this->shuttingdown_) & 2) // fast
{
2023-11-11 11:27:01 +00:00
if (pA->shutdown.bDropSubmissions)
{
return false;
}
}
AuUInt32 uCount {};
#if 1
return this->InternalRunOne(pA, false, false, uCount);
#else
do
{
uCount = 0;
ranAtLeastOne |= this->InternalRunOne(pA, false, uCount);
}
while (uCount);
return uCount || ranAtLeastOne;
#endif
}
//
void ThreadPool::IncrementAbortFenceOnPool()
{
AuAtomicAdd(&this->uAtomicShutdownCookie, 1u);
}
void ThreadPool::IncrementAbortFenceOnWorker(WorkerId_t workerId)
{
auto group = GetGroup(workerId.first);
if (workerId.second == kThreadIdAny)
{
AU_LOCK_GLOBAL_GUARD(group->workersMutex);
for (auto &[jobWorker, pState]: group->workers)
{
AuAtomicAdd(&pState->shutdown.uShutdownFence, 1u);
}
}
else
{
if (auto pState = this->GetThreadHandle(workerId))
{
AuAtomicAdd(&pState->shutdown.uShutdownFence, 1u);
}
}
}
AuUInt64 ThreadPool::QueryAbortFence(AuOptional<WorkerId_t> optWorkerId)
{
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{
return (AuUInt64(pState->shutdown.uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie);
}
else
{
return this->uAtomicShutdownCookie;
}
}
bool ThreadPool::QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic)
{
auto uSelfCookie = AuBitsToLower(uFenceMagic);
if (uSelfCookie != AuAtomicLoad(&this->uAtomicShutdownCookie))
{
return true;
}
auto uThreadCookie = AuBitsToHigher(uFenceMagic);
if (!uThreadCookie)
{
return false;
}
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{
return uThreadCookie != pState->shutdown.uShutdownFence;
}
else
{
return false;
}
}
// internal api
bool ThreadPool::Spawn(WorkerId_t workerId, bool create)
{
AU_LOCK_GLOBAL_GUARD(this->rwlock_->AsWritable());
if (create)
{
tlsCurrentThreadPool = AuSharedFromThis();
}
AuSPtr<GroupState> pGroup;
// Try fetch or allocate group
{
if (!(pGroup = threadGroups_[workerId.first]))
{
pGroup = AuMakeShared<GroupState>();
if (!pGroup->Init())
{
SysPushErrorMemory("Not enough memory to intiialize a new group state");
return false;
}
2024-01-23 22:35:18 +00:00
pGroup->group = workerId.first;
this->threadGroups_[workerId.first] = pGroup;
}
}
// Assert worker does not already exist
{
AuSPtr<ThreadState>* ret;
if (AuTryFind(pGroup->workers, workerId.second, ret))
{
SysPushErrorGeneric("Thread ID already exists");
return false;
}
}
auto pThreadState = pGroup->CreateWorker(workerId, create);
if (!pThreadState)
{
return {};
}
if (this->pHeap)
{
AuSetAllocator(pThreadState->pendingWorkItems, this->pHeap.get());
}
if (!create)
{
pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, workerId)),
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
gRuntimeConfig.async.threadPoolDefaultStackSize
));
if (!pThreadState->thread.pThread)
{
return {};
}
pThreadState->thread.pThread->Run();
}
else
{
pThreadState->thread.pThread = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
// TODO: this is just a hack
// we should implement this properly
pThreadState->thread.pThread->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
{
}, []() -> void
{
auto pid = GetCurrentWorkerPId();
if (pid)
{
GetWorkerInternal(pid.GetPool())->ThisExiting();
}
}));
tlsCurrentThreadPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
}
pGroup->AddWorker(workerId.second, pThreadState);
return true;
}
// private api
AU_NOINLINE bool ThreadPool::Barrier(WorkerId_t workerId, AuUInt32 ms, bool requireSignal, bool drop)
{
auto self = GetThreadState();
if (!self)
{
return {};
}
auto &semaphore = self->syncSema;
auto unsafeSemaphore = semaphore.AsPointer();
bool failed {};
auto work = AuMakeShared<AsyncFuncRunnable>(
[=]()
{
auto state = GetThreadState();
if (drop)
{
2023-11-11 11:27:01 +00:00
state->shutdown.bDropSubmissions = true;
}
if (requireSignal)
{
state->running->Reset();
}
unsafeSemaphore->Unlock(1);
if (requireSignal)
{
state->running->Lock();
}
},
[&]()
{
unsafeSemaphore->Unlock(1);
failed = true;
}
);
if (!work)
{
return false;
}
Run(workerId, work);
return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore.AsPointer()), ms) && !failed;
}
void ThreadPool::Entrypoint(WorkerId_t id)
{
{
AU_LOCK_GLOBAL_GUARD(this->pRWReadView);
}
tlsCurrentThreadPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
auto job = GetThreadState();
Run();
if (id != WorkerId_t {0, 0})
{
AU_LOCK_GLOBAL_GUARD(this->pRWReadView);
if (!AuAtomicLoad(&this->shuttingdown_) && !job->shutdown.bDropSubmissions)
{
// Pump and barrier + reject all after atomically
Barrier(id, 0, false, true);
}
}
ThisExiting();
if (id == WorkerId_t {0, 0})
{
CleanWorkerPoolReservedZeroFree();
}
}
void ThreadPool::EarlyExitTick()
{
if ((AuAtomicLoad(&this->shuttingdown_) & 2) != 2)
{
return;
}
auto jobWorker = GetThreadState();
auto state = jobWorker->parent.lock();
if (!jobWorker)
{
SysPushErrorUninitialized("Not an async thread");
return;
}
state->SignalAll();
{
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
{
return;
}
AuUInt32 uCount {};
do
{
uCount = 0;
this->PollInternal(jobWorker, false, false, uCount);
}
while (uCount);
}
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
2023-11-11 11:27:01 +00:00
AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex);
features = AuExchange(jobWorker->tlsFeatures.features, {});
}
{
for (const auto &thread : features)
{
try
{
thread->Cleanup();
}
catch (...)
{
SysPushErrorCatch("Couldn't clean up thread feature!");
}
}
2023-06-07 19:51:23 +00:00
jobWorker->isDeadEvent->Set();
jobWorker->bAlreadyDoingExitTick = false;
jobWorker->shutdown.bBreakMainLoop = true;
}
}
void ThreadPool::ThisExiting()
{
AU_DEBUG_MEMCRUNCH;
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
2023-10-25 19:08:42 +00:00
auto pLocalState = state->GetThreadByIndex(id.second);
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GLOBAL_GUARD(this->pRWReadView);
2023-10-25 19:08:42 +00:00
pLocalState->isDeadEvent->Set();
CleanUpWorker(id);
TerminateSceduledTasks(this, id);
2023-10-25 19:08:42 +00:00
pLocalState->syncSema->Unlock(10); // prevent ::Barrier dead-locks
{
2023-10-25 19:08:42 +00:00
AU_LOCK_GUARD(pLocalState->externalFencesLock);
pLocalState->exitingflag2 = true;
2023-10-25 19:08:42 +00:00
for (const auto &pIWaitable : pLocalState->externalFences)
{
pIWaitable->Unlock();
}
2023-10-25 19:08:42 +00:00
pLocalState->externalFences.clear();
}
{
2023-11-11 11:27:01 +00:00
AU_LOCK_GUARD(pLocalState->tlsFeatures.mutex);
features = AuExchange(pLocalState->tlsFeatures.features, {});
}
}
{
for (const auto &thread : features)
{
try
{
thread->Cleanup();
}
catch (...)
{
2023-10-25 19:08:42 +00:00
SysPushErrorConcurrentRejected("Couldn't clean up thread feature!");
}
}
features.clear();
}
2023-10-25 19:08:42 +00:00
{
2023-10-25 19:08:42 +00:00
state->Decommit(id.second);
}
pLocalState->Deinit();
}
AuSPtr<GroupState> ThreadPool::GetGroup(ThreadGroup_t type)
{
return this->threadGroups_[type];
}
AuSPtr<ThreadState> ThreadPool::GetThreadState()
{
auto thread = tlsCurrentThreadPool.lock();
if (!thread)
{
return {};
}
#if defined(AU_CFG_ID_INTERNAL) || defined(AU_CFG_ID_DEBUG)
if (thread.get() != this)
{
SysPushErrorGeneric("wrong thread");
return {};
}
#endif
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->GetThreadByIndex(worker.second);
}
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
{
auto thread = tlsCurrentThreadPool.lock();
if (!thread)
{
return {};
}
if (thread.get() != this)
{
return {};
}
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->GetThreadByIndex(worker.second);
}
AuSPtr<ThreadState> ThreadPool::GetThreadStateLocal()
{
auto worker = *tlsWorkerId;
if (auto pSelf = AuTryLockMemoryType(worker.pool))
{
auto state = GetGroup(worker.first);
if (!state)
{
return {};
}
return state->GetThreadByIndex(worker.second);
}
else
{
return {};
}
}
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
{
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
return group->GetThreadByIndex(id.second);
}
2022-03-10 15:35:01 +00:00
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
{
auto group = GetGroup(id.first);
if (!group)
{
return {};
}
AuList<AuSPtr<ThreadState>> ret;
if (id.second != Async::kThreadIdAny)
{
if (auto pPtr = group->GetThreadByIndex(id.second))
2022-03-10 15:35:01 +00:00
{
ret.push_back(pPtr);
2022-03-10 15:35:01 +00:00
}
}
else
{
AU_LOCK_GLOBAL_GUARD(group->workersMutex);
2022-03-10 15:35:01 +00:00
for (const auto &[key, value] : group->workers)
{
ret.push_back(value);
}
}
return AuMove(ret);
2022-03-10 15:35:01 +00:00
}
AUKN_SYM AuSPtr<IThreadPool> NewThreadPool()
{
// apps that don't require async shouldn't be burdened with the overhead of this litl spiner
StartSched();
return AuMakeShared<ThreadPool>();
}
}