[*] Begin cleaning up AuAsync (2/?)

6af9940b, ???
This commit is contained in:
Reece Wilson 2023-11-11 10:11:09 +00:00
parent 73c5904d97
commit e037fc214a
28 changed files with 650 additions and 507 deletions

View File

@ -15,19 +15,6 @@ namespace Aurora::IO::Loop
namespace Aurora::Async namespace Aurora::Async
{ {
AUE_DEFINE(ERunMode,
(
eLowLatencyYield, // uses high perf cond var + yield + trylock
eLowLatencyFreqKernel, // uses high perf cond var + timeout(freqency)
eEfficient // delegates sleep to the kernel once kernel objects are scheduled
));
struct RunMode
{
ERunMode mode;
AuUInt16 freqMsTick;
};
struct IThreadPool struct IThreadPool
{ {
// Spawning // Spawning
@ -122,9 +109,6 @@ namespace Aurora::Async
// AuIO overlapped-IO glue // AuIO overlapped-IO glue
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue() = 0; virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue() = 0;
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) = 0; virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) = 0;
virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) = 0;
virtual ERunMode GetCurrentThreadRunMode() = 0;
virtual ERunMode GetThreadRunMode(WorkerId_t workerId) = 0;
// //
virtual AuSPtr<Aurora::Threading::IWaitable> GetShutdownEvent() = 0; virtual AuSPtr<Aurora::Threading::IWaitable> GetShutdownEvent() = 0;

View File

@ -7,7 +7,7 @@
***/ ***/
#include <Source/RuntimeInternal.hpp> #include <Source/RuntimeInternal.hpp>
#include "Async.hpp" #include "Async.hpp"
#include "Schedular.hpp" #include "AuSchedular.hpp"
#include "AsyncApp.hpp" #include "AsyncApp.hpp"
namespace Aurora::Async namespace Aurora::Async

View File

@ -9,7 +9,7 @@
#include "Async.hpp" #include "Async.hpp"
#include "AsyncApp.hpp" #include "AsyncApp.hpp"
#include "WorkItem.hpp" #include "WorkItem.hpp"
#include "Schedular.hpp" #include "AuSchedular.hpp"
#include <Source/Console/Commands/Commands.hpp> #include <Source/Console/Commands/Commands.hpp>
@ -237,19 +237,4 @@ namespace Aurora::Async
{ {
return ThreadPool::ToKernelWorkQueue(workerId); return ThreadPool::ToKernelWorkQueue(workerId);
} }
void AsyncApp::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
{
ThreadPool::UpdateWorkMode(workerId, mode);
}
ERunMode AsyncApp::GetCurrentThreadRunMode()
{
return ThreadPool::GetCurrentThreadRunMode();
}
ERunMode AsyncApp::GetThreadRunMode(WorkerId_t workerId)
{
return ThreadPool::GetThreadRunMode(workerId);
}
} }

View File

@ -45,9 +45,6 @@ namespace Aurora::Async
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override; AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override; AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override;
ERunMode GetCurrentThreadRunMode() override;
ERunMode GetThreadRunMode(WorkerId_t workerId) override;
// Main thread logic // Main thread logic
void Start() override; void Start() override;

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAThreadPoolInternal.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuAThreadPoolInternal.hpp"
namespace Aurora::Async
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuGroupWorkQueue.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -0,0 +1,47 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncKeepGroupAlive.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuAsyncKeepGroupAlive.hpp"
#include "ThreadPool.hpp"
#include "ThreadState.hpp"
#include "GroupState.hpp"
namespace Aurora::Async
{
struct KeepGroupAlive
{
KeepGroupAlive(AuSPtr<AuAsync::IThreadPool> pPool) :
pPool(AuStaticCast<AuAsync::ThreadPool>(pPool))
{
AuAtomicAdd(&this->pPool->uAtomicCounter, 1u);
}
~KeepGroupAlive()
{
auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u);
if (uNow == 0)
{
for (const auto &pState : this->pPool->threadGroups_)
{
if (pState)
{
pState->SignalAll();
}
}
}
}
AuSPtr<AuAsync::ThreadPool> pPool;
};
AUKN_SYM AuSPtr<void> KeepThreadPoolAlive(AuSPtr<AuAsync::IThreadPool> pPool)
{
return AuMakeSharedThrow<KeepGroupAlive>(pPool);
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncKeepGroupAlive.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncMicrocounter.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuAsyncMicrocounter.hpp"
namespace Aurora::Async
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncMicrocounter.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -0,0 +1,112 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuGroupWorkQueue.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuGroupWorkQueue.hpp"
#include "ThreadState.hpp"
#include "GroupState.hpp"
#include "ThreadPool.hpp"
namespace Aurora::Async
{
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
{
AU_DEBUG_MEMCRUNCH;
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
AU_LOCK_GUARD(this->mutex);
this->sortedWork[prio].push_back(entry);
if (entry.first != kThreadIdAny)
{
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
{
AuAtomicAdd(&pThat->sync.cvHasWork, 1u);
}
}
}
bool GroupWorkQueue::IsEmpty()
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
if (this->sortedWork[i].size())
{
return false;
}
}
return true;
}
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
{
#if 1
auto pHandle = pPool->GetThreadHandle(id);
if (!pHandle)
{
return false;
}
return !AuAtomicLoad(&pHandle->sync.cvHasWork);
#else
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
for (const auto &[srcId, pA] : this->sortedWork[i])
{
if (id == srcId)
{
return false;
}
}
}
return true;
#endif
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
{
if (itr->first == Async::kThreadIdAny)
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) &&
(itr->first == id))
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
itr++;
}
if (queue.size())
{
break;
}
}
}
}

View File

@ -0,0 +1,31 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuGroupWorkQueue.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IAsyncRunnable;
struct ThreadPool;
struct ThreadState;
using WorkEntry_t = AuPair<ThreadId_t, AuSPtr<IAsyncRunnable>>;
struct GroupWorkQueue
{
AuThreadPrimitives::Mutex mutex;
AuUInt32 uItems {};
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
bool IsEmpty();
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
};
}

View File

@ -1,13 +1,13 @@
/*** /***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. Copyright (C) 2021-2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Schedular.cpp File: AuSchedular.cpp
Date: 2021-6-26 Date: 2021-6-26
Author: Reece Author: Reece
***/ ***/
#include <Source/RuntimeInternal.hpp> #include <Source/RuntimeInternal.hpp>
#include "Async.hpp" #include "Async.hpp"
#include "Schedular.hpp" #include "AuSchedular.hpp"
//#include "AsyncApp.hpp" //#include "AsyncApp.hpp"
#include "ThreadPool.hpp" #include "ThreadPool.hpp"
#include <Console/Commands/Commands.hpp> #include <Console/Commands/Commands.hpp>

View File

@ -1,7 +1,7 @@
/*** /***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Schedular.hpp File: AuSchedular.hpp
Date: 2021-6-26 Date: 2021-6-26
Author: Reece Author: Reece
***/ ***/

View File

@ -0,0 +1,93 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadState.hpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "AuThreadState.hpp"
#include <Source/IO/Loop/LSAsync.hpp>
#include "ThreadWorkerQueueShim.hpp"
namespace Aurora::Async
{
ThreadStateSync::ThreadStateSync():
cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer()))
{
}
bool ThreadStateSync::Init()
{
this->eventLs = AuLoop::NewLSAsync();
return true;
}
bool ThreadStateBase::Init()
{
if (!(this->asyncLoop = AuMakeShared<AsyncLoop>()))
{
SysPushErrorMemory();
return {};
}
this->asyncLoop->pParent = this;
if (!this->asyncLoop->Init())
{
SysPushErrorNested();
return {};
}
if (!this->sync.Init())
{
SysPushErrorNested();
return {};
}
if (!this->asyncLoop->SourceAdd(this->sync.eventLs))
{
SysPushErrorNested();
return {};
}
return true;
}
void ThreadStateSync::SetEvent(bool bBoth)
{
if (auto pEvent = this->eventLs)
{
if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0)
{
pEvent->Set();
}
}
if (bBoth)
{
this->cvWorkMutex->Lock();
this->cvWorkMutex->Unlock();
this->cvVariable->Signal();
}
}
void ThreadStateSync::UpdateCVState(ThreadState *pState)
{
auto uState = pState->sync.cvHasWork;
auto uMin = AuMin(uState, pState->pendingWorkItems.size());
if (!uMin) uMin = 1;
while (uState &&
AuAtomicCompareExchange<AuUInt32>(&pState->sync.cvHasWork, uState - uMin, uState) != uState)
{
uState = pState->sync.cvHasWork;
if (uState < uMin)
{
uMin = uState;
}
}
}
}

View File

@ -0,0 +1,62 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadState.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct AsyncLoop;
struct ThreadState;
struct ThreadStateShutdown
{
bool bBreakMainLoop {};
bool bIsKillerThread {};
AuUInt32 uShutdownFence { 1 };
};
struct ThreadStateSync
{
ThreadStateSync();
AuThreadPrimitives::ConditionMutex cvWorkMutex;
AuThreadPrimitives::ConditionVariable cvVariable;
AuAUInt32 cvLSActive {};
AuAUInt32 cvHasWork {};
AuSPtr<AuLoop::ILSEvent> eventLs;
void SetEvent(bool bBoth = true);
bool Init();
void UpdateCVState(ThreadState *pState);
};
struct ThreadStateMeta
{
bool bOwnsThread {};
AuThreads::ThreadShared_t pThread;
};
struct ThreadStateStack
{
AuUInt32 uStackCookie {};
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
AuUInt32 uStackCallDepth {};
AuUInt32 uStackMaxCookie { 5 };
};
struct ThreadStateBase
{
ThreadStateShutdown shutdown;
ThreadStateMeta thread;
ThreadStateSync sync;
AuSPtr<AsyncLoop> asyncLoop;
ThreadStateStack stackState;
bool Init();
};
}

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadStateSingletons.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuThreadStateSingletons.hpp"
namespace Aurora::Async
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadStateSingletons.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -64,7 +64,7 @@ namespace Aurora::Async
} }
} }
void GroupState::BroadCast() void GroupState::SignalAll()
{ {
AU_LOCK_GUARD(this->workersMutex); AU_LOCK_GUARD(this->workersMutex);
@ -75,12 +75,11 @@ namespace Aurora::Async
continue; continue;
} }
AuAtomicAdd(&pWorker->cvHasWork, 1u); pWorker->sync.SetEvent();
pWorker->SetEvent();
} }
} }
void GroupState::AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState) bool GroupState::AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState)
{ {
AU_LOCK_GUARD(this->workersMutex); AU_LOCK_GUARD(this->workersMutex);
@ -89,6 +88,41 @@ namespace Aurora::Async
this->wpWorkers[id] = pState; this->wpWorkers[id] = pState;
} }
SysAssert(AuTryInsert(this->workers, AuMakePair(id, pState))); if (!AuTryInsert(this->workers, AuMakePair(id, pState)))
{
this->wpWorkers[id] = {};
return {};
}
return true;
}
AuSPtr<ThreadState> GroupState::CreateWorker(WorkerId_t workerId, bool bCreate)
{
if (auto pThreadState = AuMakeShared<ThreadState>())
{
pThreadState->thread.bOwnsThread = !bCreate;
pThreadState->parent = this->SharedFromThis();
pThreadState->id = workerId;
if (!pThreadState->Init())
{
SysPushErrorNested();
return {};
}
if (!this->AddWorker(workerId.second, pThreadState))
{
SysPushErrorNested();
return {};
}
return pThreadState;
}
else
{
SysPushErrorMemory();
return {};
}
} }
} }

View File

@ -6,47 +6,38 @@
Author: Reece Author: Reece
***/ ***/
#pragma once #pragma once
#include "ThreadState.hpp" #include "ThreadState.hpp"
#include "AuGroupWorkQueue.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
struct ThreadPool; struct ThreadPool;
struct GroupWorkQueue
{
AuThreadPrimitives::Mutex mutex; struct GroupState :
AuUInt32 uItems {}; AuEnableSharedFromThis<GroupState>
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
bool IsEmpty();
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
};
struct GroupState
{ {
// group id
ThreadGroup_t group; ThreadGroup_t group;
// work items
GroupWorkQueue workQueue; GroupWorkQueue workQueue;
// tracked workers
AuThreadPrimitives::Mutex workersMutex; AuThreadPrimitives::Mutex workersMutex;
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers; AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
AuWPtr<ThreadState> wpWorkers[32]; AuWPtr<ThreadState> wpWorkers[32]; // linear non-locking lookup table
//
bool Init(); bool Init();
void BroadCast(); void SignalAll();
void Decommit(ThreadId_t id); void Decommit(ThreadId_t id);
void AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState); bool AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState);
AuSPtr<ThreadState> CreateWorker(WorkerId_t id, bool bCreate);
AuSPtr<ThreadState> GetThreadByIndex(ThreadId_t uIndex); AuSPtr<ThreadState> GetThreadByIndex(ThreadId_t uIndex);
bool inline IsSysThread()
{
return group == 0;
}
}; };
} }

View File

View File

@ -0,0 +1,27 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IThreadPoolInternal.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct ThreadState;
struct IAsyncRunnable;
struct IThreadPoolInternal
{
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
virtual IThreadPool *ToThreadPool() = 0;
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
AuUInt32 uAtomicShutdownCookie {};
};
}

View File

@ -10,17 +10,14 @@
#include "ThreadPool.hpp" #include "ThreadPool.hpp"
#include "AsyncApp.hpp" #include "AsyncApp.hpp"
#include "WorkItem.hpp" #include "WorkItem.hpp"
#include "Schedular.hpp" #include "AuSchedular.hpp"
#include "ThreadWorkerQueueShim.hpp" #include "ThreadWorkerQueueShim.hpp"
#include <Source/IO/Loop/LSAsync.hpp>
namespace Aurora::Async namespace Aurora::Async
{ {
//STATIC_TLS(WorkerId_t, tlsWorkerId); //STATIC_TLS(WorkerId_t, tlsWorkerId);
static thread_local AuWPtr<ThreadPool> gCurrentPool; static thread_local AuWPtr<ThreadPool> tlsCurrentThreadPool;
static const auto kMagicResortThreshold = 15;
static thread_local int tlsCallStack;
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool) inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
{ {
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp)) if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
@ -33,17 +30,28 @@ namespace Aurora::Async
AUKN_SYM WorkerPId_t GetCurrentWorkerPId() AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
{ {
auto lkPool = gCurrentPool.lock(); auto lkPool = tlsCurrentThreadPool.lock();
if (!lkPool) return {}; if (!lkPool)
{
return {};
}
auto cpy = *lkPool->tlsWorkerId; auto cpy = *lkPool->tlsWorkerId;
auto lkPool2 = cpy.pool.lock(); if (auto pPool = AuTryLockMemoryType(cpy.pool))
return WorkerPId_t(lkPool, cpy); {
return WorkerPId_t(pPool, cpy);
}
else
{
return {};
}
} }
// //
ThreadPool::ThreadPool() : shutdownEvent_(false, false, true) ThreadPool::ThreadPool() : shutdownEvent_(false, false, true)
{ {
this->pRWReadView = this->rwlock_->AsReadable();
} }
// internal pool interface // internal pool interface
@ -136,8 +144,6 @@ namespace Aurora::Async
return; return;
} }
AU_DEBUG_MEMCRUNCH;
if (bIncrement) if (bIncrement)
{ {
AuAtomicAdd(&this->uAtomicCounter, 1u); AuAtomicAdd(&this->uAtomicCounter, 1u);
@ -147,20 +153,11 @@ namespace Aurora::Async
if (target.second == Async::kThreadIdAny) if (target.second == Async::kThreadIdAny)
{ {
state->BroadCast(); state->SignalAll();
} }
else else
{ {
if (AuAtomicLoad(&pWorker->cvSleepCount)) pWorker->sync.SetEvent();
{
// Barrier:
pWorker->cvWorkMutex->Lock();
pWorker->cvWorkMutex->Unlock();
pWorker->cvVariable->Signal();
}
pWorker->SetEvent(false);
} }
} }
@ -173,7 +170,7 @@ namespace Aurora::Async
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group) size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
return GetGroup(group)->workers.size(); return GetGroup(group)->workers.size();
} }
@ -221,14 +218,12 @@ namespace Aurora::Async
return true; return true;
} }
gCurrentPool = AuWeakFromThis(); auto oldTlsHandle = AuExchange(tlsCurrentThreadPool, AuSharedFromThis());
auto auThread = AuThreads::GetThread(); auto auThread = AuThreads::GetThread();
while ((!auThread->Exiting()) && while ((!auThread->Exiting()) &&
((this->shuttingdown_ & 2) != 2) && ((this->shuttingdown_ & 2) != 2) &&
(!pJobRunner->bBreakEarly)) (!pJobRunner->shutdown.bBreakMainLoop))
{ {
AuUInt32 uCount {}; AuUInt32 uCount {};
@ -243,6 +238,8 @@ namespace Aurora::Async
ranOnce = true; ranOnce = true;
} }
tlsCurrentThreadPool = oldTlsHandle;
return ranOnce; return ranOnce;
} }
@ -254,163 +251,34 @@ namespace Aurora::Async
return false; return false;
} }
bool success {}; bool bSuccess {};
auto runMode = GetCurrentThreadRunMode();
EarlyExitTick(); EarlyExitTick();
//do
{ {
auto asyncLoop = state->asyncLoop; auto asyncLoop = state->asyncLoop;
asyncLoop->OnFrame(); asyncLoop->OnFrame();
if (asyncLoop->GetSourceCount() > 1) if (asyncLoop->GetSourceCount() > 1)
{ {
bool bShouldTrySleepForKernel = block; if (block)
if (!block)
{ {
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek(); asyncLoop->WaitAny(0);
} }
else
if (bShouldTrySleepForKernel)
{ {
AuAtomicAdd(&state->cvSleepCount, 1u); asyncLoop->PumpNonblocking();
if (block)
{
asyncLoop->WaitAny(0);
}
else
{
asyncLoop->PumpNonblocking();
}
AuAtomicSub(&state->cvSleepCount, 1u);
} }
success = PollInternal(state, false, uCount); bSuccess = PollInternal(state, false, uCount);
} }
else else
{ {
success = PollInternal(state, block, uCount); bSuccess = PollInternal(state, block, uCount);
} }
} //while (success); }
EarlyExitTick(); EarlyExitTick();
return success; return bSuccess;
}
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
{
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
AU_LOCK_GUARD(this->mutex);
this->sortedWork[prio].push_back(entry);
if (entry.first != kThreadIdAny)
{
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
{
AuAtomicAdd(&pThat->cvHasWork, 1u);
}
}
}
bool GroupWorkQueue::IsEmpty()
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
if (this->sortedWork[i].size())
{
return false;
}
}
return true;
}
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
{
#if 1
auto pHandle = pPool->GetThreadHandle(id);
if (!pHandle)
{
return false;
}
return !AuAtomicLoad(&pHandle->cvHasWork);
#else
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
for (const auto &[srcId, pA] : this->sortedWork[i])
{
if (id == srcId)
{
return false;
}
}
}
return true;
#endif
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
{
if (itr->first == Async::kThreadIdAny)
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) &&
(itr->first == id))
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
itr++;
}
if (queue.size())
{
break;
}
}
}
void ThreadPool::DoThing(ThreadState *pState)
{
auto uState = pState->cvHasWork;
auto uMin = AuMin(uState, pState->pendingWorkItems.size());
if (!uMin) uMin = 1;
while (uState &&
AuAtomicCompareExchange<AuUInt32>(&pState->cvHasWork, uState - uMin, uState) != uState)
{
uState = pState->cvHasWork;
if (uState < uMin)
{
uMin = uState;
}
}
} }
bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount) bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount)
@ -423,17 +291,14 @@ namespace Aurora::Async
auto group = state->parent.lock(); auto group = state->parent.lock();
//state->pendingWorkItems.clear();
{ {
AuAtomicAdd(&state->cvSleepCount, 1u); AU_LOCK_GUARD(state->sync.cvWorkMutex);
AU_LOCK_GUARD(state->cvWorkMutex);
do do
{ {
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second); group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
this->DoThing(state.get()); state->sync.UpdateCVState(state.get());
// Consider blocking for more work // Consider blocking for more work
if (!block) if (!block)
@ -442,12 +307,11 @@ namespace Aurora::Async
} }
// pre-wakeup thread terminating check // pre-wakeup thread terminating check
if (state->threadObject->Exiting()) if (state->thread.pThread->Exiting())
{ {
break; break;
} }
// Block if no work items are present // Block if no work items are present
if (state->pendingWorkItems.empty()) if (state->pendingWorkItems.empty())
{ {
@ -456,7 +320,7 @@ namespace Aurora::Async
break; break;
} }
state->cvVariable->WaitForSignal(); state->sync.cvVariable->WaitForSignal();
if (this->shuttingdown_ & 2) if (this->shuttingdown_ & 2)
{ {
@ -465,7 +329,7 @@ namespace Aurora::Async
} }
// Post-wakeup thread terminating check // Post-wakeup thread terminating check
if (state->threadObject->Exiting()) if (state->thread.pThread->Exiting())
{ {
break; break;
} }
@ -474,7 +338,6 @@ namespace Aurora::Async
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) || (this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek())) this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
{ {
AuAtomicSub(&state->cvSleepCount, 1u);
return false; return false;
} }
@ -490,12 +353,10 @@ namespace Aurora::Async
if (group->workQueue.IsEmpty(this, state->id)) if (group->workQueue.IsEmpty(this, state->id))
{ {
state->eventLs->Reset(); // ...until we're done state->sync.eventLs->Reset(); // ...until we're done
AuAtomicStore(&state->cvLSActive, 0u); AuAtomicStore(&state->sync.cvLSActive, 0u);
} }
} }
AuAtomicSub(&state->cvSleepCount, 1u);
} }
if (state->pendingWorkItems.empty()) if (state->pendingWorkItems.empty())
@ -513,24 +374,16 @@ namespace Aurora::Async
} }
int runningTasks {}; int runningTasks {};
auto uStartCookie = state->stackState.uStackCookie++;
auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis());
bool lowPrioCont {};
bool lowPrioContCached {};
state->cookie++;
int start = state->cookie;
// Account for // Account for
// while (AuAsync.GetCurrentPool()->runForever()); // while (AuAsync.GetCurrentPool()->runForever());
// in the first task (or deeper) // in the first task (or deeper)
if (InRunnerMode() && tlsCallStack) // are we one call deep? if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep?
{ {
auto queue = ToKernelWorkQueue(); auto queue = ToKernelWorkQueue();
if ((this->uAtomicCounter == tlsCallStack) && if ((this->uAtomicCounter == state->stackState.uStackCallDepth) &&
this->IsDepleted()) this->IsDepleted())
{ {
return false; return false;
@ -538,25 +391,20 @@ namespace Aurora::Async
} }
// //
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{ {
if (state->threadObject->Exiting()) if (state->thread.pThread->Exiting())
{ {
break; break;
} }
// Set the last frame time for a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS();
// Dispatch // Dispatch
auto oops = itr->second; auto oops = itr->second;
// Remove from our local job queue // Remove from our local job queue
itr = state->pendingWorkItems.erase(itr); itr = state->pendingWorkItems.erase(itr);
tlsCallStack++; state->stackState.uStackCallDepth++;
//SysBenchmark(fmt::format("RunAsync: {}", block)); //SysBenchmark(fmt::format("RunAsync: {}", block));
// Dispatch // Dispatch
@ -566,21 +414,19 @@ namespace Aurora::Async
// Atomically decrement global task counter // Atomically decrement global task counter
runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u); runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u);
tlsCallStack--; state->stackState.uStackCallDepth--;
if (start != state->cookie) if (uStartCookie != state->stackState.uStackCookie)
{ {
start = state->cookie; uStartCookie = state->stackState.uStackCookie;
itr = state->pendingWorkItems.begin(); itr = state->pendingWorkItems.begin();
} }
} }
gCurrentPool = oldTlsHandle;
// Return popped work back to the groups work pool when our -pump loops were preempted // Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size()) if (state->pendingWorkItems.size())
{ {
AU_LOCK_GUARD(state->cvWorkMutex); AU_LOCK_GUARD(state->sync.cvWorkMutex);
for (const auto &item : state->pendingWorkItems) for (const auto &item : state->pendingWorkItems)
{ {
@ -588,8 +434,8 @@ namespace Aurora::Async
} }
state->pendingWorkItems.clear(); state->pendingWorkItems.clear();
state->cvVariable->Broadcast(); state->sync.cvVariable->Broadcast();
state->eventLs->Set(); state->sync.eventLs->Set();
} }
// Account for // Account for
@ -631,29 +477,25 @@ namespace Aurora::Async
// wait for regular prio work to complete // wait for regular prio work to complete
{ {
for (auto pGroup : this->threadGroups_)
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); if (!pGroup)
for (auto pGroup : this->threadGroups_)
{ {
if (!pGroup) continue;
}
AU_LOCK_GUARD(pGroup->workersMutex);
for (auto &[id, worker] : pGroup->workers)
{
if (trySelfPid == worker->id)
{ {
continue; continue;
} }
for (auto &[id, worker] : pGroup->workers) toBarrier.push_back(worker->id);
{
if (trySelfPid == worker->id)
{
continue;
}
toBarrier.push_back(worker->id);
}
} }
} }
for (const auto &id : toBarrier) for (const auto &id : toBarrier)
{ {
if (trySelfPid == id) if (trySelfPid == id)
@ -689,7 +531,7 @@ namespace Aurora::Async
AuList<AuThreads::ThreadShared_t> threads; AuList<AuThreads::ThreadShared_t> threads;
AuList<AuSPtr<ThreadState>> states; AuList<AuSPtr<ThreadState>> states;
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
for (auto pGroup : this->threadGroups_) for (auto pGroup : this->threadGroups_)
{ {
@ -701,7 +543,7 @@ namespace Aurora::Async
for (auto &[id, pState] : pGroup->workers) for (auto &[id, pState] : pGroup->workers)
{ {
// main loop: // main loop:
if (pState && pState->cvWorkMutex && pState->cvVariable) if (pState)
{ {
states.push_back(pState); states.push_back(pState);
pState->shuttingdown = true; pState->shuttingdown = true;
@ -712,10 +554,10 @@ namespace Aurora::Async
} }
// thread object: // thread object:
if (pState->bCreate) if (pState->thread.bOwnsThread)
{ {
pState->threadObject->SendExitSignal(); pState->thread.pThread->SendExitSignal();
threads.push_back(pState->threadObject); threads.push_back(pState->thread.pThread);
} }
// unrefreeze signals: // unrefreeze signals:
@ -728,13 +570,10 @@ namespace Aurora::Async
} }
} }
// Break all condvar loops, just in case
for (const auto &pState : states)
{ {
for (const auto &pState : states) pState->sync.SetEvent();
{
AU_LOCK_GUARD(pState->cvWorkMutex);
pState->cvVariable->Broadcast();
pState->eventLs->Set();
}
} }
// Final sync to exit // Final sync to exit
@ -764,11 +603,13 @@ namespace Aurora::Async
// Is dead flag // Is dead flag
this->shutdownEvent_->Set(); this->shutdownEvent_->Set();
//
if (pLocalRunner) if (pLocalRunner)
{ {
pLocalRunner->bIsKiller = true; pLocalRunner->shutdown.bIsKillerThread = true;
} }
// Notify observing threads of our work exhaustion
for (const auto &wOther : this->listWeakDepsParents_) for (const auto &wOther : this->listWeakDepsParents_)
{ {
if (auto pThat = AuTryLockMemoryType(wOther)) if (auto pThat = AuTryLockMemoryType(wOther))
@ -854,12 +695,12 @@ namespace Aurora::Async
return {}; return {};
} }
return pState->threadObject; return pState->thread.pThread;
} }
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads() AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
{ {
AU_LOCK_GUARD(rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret; AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
@ -892,7 +733,7 @@ namespace Aurora::Async
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
auto group = GetGroup(workerId.first); auto group = GetGroup(workerId.first);
auto currentWorkerId = GetCurrentThread().second; auto currentWorkerId = GetCurrentThread().second;
@ -917,7 +758,7 @@ namespace Aurora::Async
void ThreadPool::Signal(WorkerId_t workerId) void ThreadPool::Signal(WorkerId_t workerId)
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
auto group = GetGroup(workerId.first); auto group = GetGroup(workerId.first);
if (workerId.second == Async::kThreadIdAny) if (workerId.second == Async::kThreadIdAny)
@ -935,7 +776,7 @@ namespace Aurora::Async
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId) AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
auto a = GetThreadHandle(workerId); auto a = GetThreadHandle(workerId);
if (!a) if (!a)
@ -943,12 +784,12 @@ namespace Aurora::Async
return {}; return {};
} }
return a->asyncLoopSourceShared; return a->sync.eventLs;
} }
void ThreadPool::SyncAllSafe() void ThreadPool::SyncAllSafe()
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
for (auto pGroup : this->threadGroups_) for (auto pGroup : this->threadGroups_)
{ {
@ -1008,46 +849,6 @@ namespace Aurora::Async
return worker->asyncLoop; return worker->asyncLoop;
} }
void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
{
auto states = this->GetThreadHandles(workerId);
if (!states.size())
{
SysPushErrorGeneric("Couldn't find requested worker");
return;
}
for (const auto &state : states)
{
state->runMode = mode.mode;
if (mode.freqMsTick)
{
state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000);
}
}
}
ERunMode ThreadPool::GetCurrentThreadRunMode()
{
auto state = this->GetThreadState();
if (!state)
{
return ERunMode::eEfficient;
}
return state->runMode;
}
ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId)
{
auto worker = this->GetThreadHandle(workerId);
if (!worker)
{
SysPushErrorGeneric("Couldn't find requested worker");
return {};
}
return worker->runMode;
}
bool ThreadPool::IsSelfDepleted() bool ThreadPool::IsSelfDepleted()
{ {
auto queue = ToKernelWorkQueue(); auto queue = ToKernelWorkQueue();
@ -1152,7 +953,7 @@ namespace Aurora::Async
{ {
if (auto pState = this->GetThreadHandle(workerId)) if (auto pState = this->GetThreadHandle(workerId))
{ {
AuAtomicAdd(&pState->uShutdownFence, 1u); AuAtomicAdd(&pState->shutdown.uShutdownFence, 1u);
} }
} }
@ -1160,7 +961,7 @@ namespace Aurora::Async
{ {
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId()))) if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{ {
return (AuUInt64(pState->uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie); return (AuUInt64(pState->shutdown.uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie);
} }
else else
{ {
@ -1185,7 +986,7 @@ namespace Aurora::Async
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId()))) if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{ {
return uThreadCookie != pState->uShutdownFence; return uThreadCookie != pState->shutdown.uShutdownFence;
} }
else else
{ {
@ -1202,7 +1003,7 @@ namespace Aurora::Async
if (create) if (create)
{ {
gCurrentPool = AuSharedFromThis(); tlsCurrentThreadPool = AuSharedFromThis();
} }
AuSPtr<GroupState> pGroup; AuSPtr<GroupState> pGroup;
@ -1234,73 +1035,34 @@ namespace Aurora::Async
} }
} }
auto threadState = AuMakeShared<ThreadState>(); auto pThreadState = pGroup->CreateWorker(workerId, create);
if (!threadState) if (!pThreadState)
{ {
SysPushErrorMemory();
return {}; return {};
} }
threadState->parent = pGroup;
threadState->id = workerId;
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
return {};
}
threadState->eventLs = AuLoop::NewLSAsync();
if (!threadState->eventLs)
{
SysPushErrorMemory();
return {};
}
threadState->asyncLoopSourceShared = threadState->eventLs;
threadState->asyncLoop->pParent = threadState.get();
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
threadState->runMode = ERunMode::eEfficient;
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
return {};
}
if (!threadState->asyncLoop->Init())
{
SysPushErrorNested();
return {};
}
threadState->asyncLoop->SourceAdd(threadState->eventLs);
threadState->bCreate = create;
if (!create) if (!create)
{ {
threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo( pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)), AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, workerId)),
AuThreads::IThreadVectorsFunctional::OnExit_t{}), AuThreads::IThreadVectorsFunctional::OnExit_t{}),
gRuntimeConfig.async.threadPoolDefaultStackSize gRuntimeConfig.async.threadPoolDefaultStackSize
)); ));
if (!threadState->threadObject) if (!pThreadState->thread.pThread)
{ {
return {}; return {};
} }
threadState->threadObject->Run(); pThreadState->thread.pThread->Run();
} }
else else
{ {
threadState->threadObject = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){}); pThreadState->thread.pThread = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
// TODO: this is just a hack // TODO: this is just a hack
// we should implement this properly // we should implement this properly
threadState->threadObject->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void pThreadState->thread.pThread->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
{ {
}, []() -> void }, []() -> void
@ -1312,13 +1074,11 @@ namespace Aurora::Async
} }
})); }));
// tlsCurrentThreadPool = AuWeakFromThis();
gCurrentPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId); tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
} }
pGroup->AddWorker(workerId.second, threadState); pGroup->AddWorker(workerId.second, pThreadState);
return true; return true;
} }
@ -1377,7 +1137,11 @@ namespace Aurora::Async
void ThreadPool::Entrypoint(WorkerId_t id) void ThreadPool::Entrypoint(WorkerId_t id)
{ {
gCurrentPool = AuWeakFromThis(); {
AU_LOCK_GUARD(this->pRWReadView);
}
tlsCurrentThreadPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id); tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
auto job = GetThreadState(); auto job = GetThreadState();
@ -1386,7 +1150,7 @@ namespace Aurora::Async
if (id != WorkerId_t {0, 0}) if (id != WorkerId_t {0, 0})
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
if (!this->shuttingdown_ && !job->rejecting) if (!this->shuttingdown_ && !job->rejecting)
{ {
@ -1418,7 +1182,7 @@ namespace Aurora::Async
return; return;
} }
state->BroadCast(); state->SignalAll();
{ {
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true)) if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
@ -1457,11 +1221,9 @@ namespace Aurora::Async
jobWorker->isDeadEvent->Set(); jobWorker->isDeadEvent->Set();
jobWorker->bAlreadyDoingExitTick = false; jobWorker->bAlreadyDoingExitTick = false;
jobWorker->bBreakEarly = true; jobWorker->shutdown.bBreakMainLoop = true;
} }
} }
void ThreadPool::ThisExiting() void ThreadPool::ThisExiting()
@ -1473,7 +1235,7 @@ namespace Aurora::Async
AuList<AuSPtr<AuThreads::IThreadFeature>> features; AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
pLocalState->isDeadEvent->Set(); pLocalState->isDeadEvent->Set();
@ -1530,7 +1292,7 @@ namespace Aurora::Async
AuSPtr<ThreadState> ThreadPool::GetThreadState() AuSPtr<ThreadState> ThreadPool::GetThreadState()
{ {
auto thread = gCurrentPool.lock(); auto thread = tlsCurrentThreadPool.lock();
if (!thread) if (!thread)
{ {
return {}; return {};
@ -1545,7 +1307,6 @@ namespace Aurora::Async
#endif #endif
auto worker = *tlsWorkerId; auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first); auto state = GetGroup(worker.first);
if (!state) if (!state)
{ {
@ -1557,7 +1318,7 @@ namespace Aurora::Async
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn() AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
{ {
auto thread = gCurrentPool.lock(); auto thread = tlsCurrentThreadPool.lock();
if (!thread) if (!thread)
{ {
return {}; return {};
@ -1592,7 +1353,7 @@ namespace Aurora::Async
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id) AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
{ {
AU_LOCK_GUARD(this->rwlock_->AsReadable()); AU_LOCK_GUARD(this->pRWReadView);
auto group = GetGroup(id.first); auto group = GetGroup(id.first);
if (!group) if (!group)
@ -1626,34 +1387,4 @@ namespace Aurora::Async
return AuMakeShared<ThreadPool>(); return AuMakeShared<ThreadPool>();
} }
struct KeepGroupAlive
{
KeepGroupAlive(AuSPtr<AuAsync::IThreadPool> pPool) : pPool(AuStaticCast<AuAsync::ThreadPool>(pPool))
{
AuAtomicAdd(&this->pPool->uAtomicCounter, 1u);
}
~KeepGroupAlive()
{
auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u);
if (uNow == 0)
{
for (const auto &pState : this->pPool->threadGroups_)
{
if (pState)
{
pState->BroadCast();
}
}
}
}
AuSPtr<AuAsync::ThreadPool> pPool;
};
AUKN_SYM AuSPtr<void> KeepThreadPoolAlive(AuSPtr<AuAsync::IThreadPool> pPool)
{
return AuMakeSharedThrow<KeepGroupAlive>(pPool);
}
} }

View File

@ -7,6 +7,8 @@
***/ ***/
#pragma once #pragma once
#include "IThreadPoolInternal.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
struct GroupState; struct GroupState;
@ -15,21 +17,10 @@ namespace Aurora::Async
//class WorkItem; //class WorkItem;
struct IThreadPoolInternal struct ThreadPool :
{ IThreadPool,
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0; IThreadPoolInternal,
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0; AuEnableSharedFromThis<ThreadPool>
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
virtual IThreadPool *ToThreadPool() = 0;
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
AuUInt32 uAtomicShutdownCookie {};
};
struct ThreadPool : IThreadPool, IThreadPoolInternal, AuEnableSharedFromThis<ThreadPool>
{ {
ThreadPool(); ThreadPool();
@ -81,9 +72,6 @@ namespace Aurora::Async
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override; virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override; virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override;
virtual ERunMode GetCurrentThreadRunMode() override;
virtual ERunMode GetThreadRunMode(WorkerId_t workerId) override;
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override; virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override; virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
@ -105,7 +93,6 @@ namespace Aurora::Async
bool InternalRunOne(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount); bool InternalRunOne(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
bool PollInternal(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount); bool PollInternal(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
void DoThing(ThreadState *pState);
size_t GetThreadWorkersCount(ThreadGroup_t group); size_t GetThreadWorkersCount(ThreadGroup_t group);
@ -152,7 +139,10 @@ namespace Aurora::Async
AuSPtr<GroupState> threadGroups_[255]; AuSPtr<GroupState> threadGroups_[255];
AuUInt32 shuttingdown_ {}; AuUInt32 shuttingdown_ {};
bool shutdown {}; bool shutdown {};
AuThreadPrimitives::RWRenterableLock rwlock_; AuThreadPrimitives::RWRenterableLock rwlock_;
AuThreading::IWaitable *pRWReadView {};
AuThreadPrimitives::Event shutdownEvent_; AuThreadPrimitives::Event shutdownEvent_;
bool runnersRunning_ {}; bool runnersRunning_ {};
AuList<AuWPtr<ThreadPool>> listWeakDeps_; AuList<AuWPtr<ThreadPool>> listWeakDeps_;

View File

@ -8,11 +8,11 @@
#pragma once #pragma once
#include "AsyncRunnable.hpp" #include "AsyncRunnable.hpp"
#include "AuThreadState.hpp"
#include "AuGroupWorkQueue.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
using WorkEntry_t = AuPair<ThreadId_t, AuSPtr<IAsyncRunnable>>;
// TODO: this is a hack because i havent implemented an epoll abstraction yet // TODO: this is a hack because i havent implemented an epoll abstraction yet
struct AsyncAppWaitSourceRequest struct AsyncAppWaitSourceRequest
{ {
@ -26,10 +26,9 @@ namespace Aurora::Async
struct GroupState; struct GroupState;
struct AsyncLoop; struct AsyncLoop;
struct ThreadState struct ThreadState : ThreadStateBase
{ {
ThreadState() : running(true, false, true), ThreadState() : running(true, false, true),
cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer())),
isDeadEvent(false, false, true) isDeadEvent(false, false, true)
{ {
@ -38,8 +37,6 @@ namespace Aurora::Async
// :vomit: // :vomit:
WorkerId_t id; WorkerId_t id;
AuUInt8 multipopCount = 32; AuUInt8 multipopCount = 32;
AuUInt32 lastFrameTime {};
AuThreads::ThreadShared_t threadObject;
AuWPtr<GroupState> parent; AuWPtr<GroupState> parent;
AuThreadPrimitives::Semaphore syncSema; AuThreadPrimitives::Semaphore syncSema;
AuThreadPrimitives::Event isDeadEvent; AuThreadPrimitives::Event isDeadEvent;
@ -53,14 +50,8 @@ namespace Aurora::Async
//bool running; //bool running;
AuList<AsyncAppWaitSourceRequest> loopSources; AuList<AsyncAppWaitSourceRequest> loopSources;
AuList<WorkEntry_t> pendingWorkItems; AuList<WorkEntry_t> pendingWorkItems;
AuSPtr<AsyncLoop> asyncLoop;
Utility::RateLimiter rateLimiter;
ERunMode runMode;
int cookie {0}; int cookie {0};
bool bAlreadyDoingExitTick {}; bool bAlreadyDoingExitTick {};
bool bBreakEarly {};
bool bIsKiller {};
bool bCreate {};
// //
AuThreadPrimitives::SpinLock externalFencesLock; AuThreadPrimitives::SpinLock externalFencesLock;
@ -68,31 +59,8 @@ namespace Aurora::Async
// //
AuThreadPrimitives::ConditionMutex cvWorkMutex;
AuThreadPrimitives::ConditionVariable cvVariable;
AuAUInt32 cvSleepCount {};
AuAUInt32 cvLSActive {};
AuAUInt32 cvHasWork {};
AuSPtr<AuLoop::ILSEvent> eventLs;
AuSPtr<AuLoop::ILoopSource> asyncLoopSourceShared;
inline void SetEvent(bool bBoth = true)
{
if (auto pEvent = this->eventLs)
{
if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0)
{
pEvent->Set();
}
}
if (bBoth)
{
this->cvVariable->Signal();
}
}
// //
AuUInt32 uShutdownFence { 1 };
}; };
} }

View File

@ -54,13 +54,10 @@ namespace Aurora::Async
void AsyncLoop::Schedule() void AsyncLoop::Schedule()
{ {
if (AuThreads::GetThread() != this->pParent->threadObject.get()) if (AuThreads::GetThread() != this->pParent->thread.pThread.get())
{ {
AuAtomicAdd(&this->commitPending_, 1u); AuAtomicAdd(&this->commitPending_, 1u);
{ this->pParent->sync.SetEvent();
AU_LOCK_GUARD(this->pParent->parent.lock()->workQueue.mutex);
this->pParent->SetEvent();
}
} }
else else
{ {

View File

@ -14,7 +14,7 @@ namespace Aurora::Async
{ {
struct AsyncLoop : AuLoop::LoopQueue struct AsyncLoop : AuLoop::LoopQueue
{ {
ThreadState *pParent; ThreadStateBase *pParent;
void OnFrame(); void OnFrame();
virtual bool AddCallback (const AuSPtr<AuLoop::ILoopSource> &source, const AuSPtr<AuLoop::ILoopSourceSubscriber> &subscriber) override; virtual bool AddCallback (const AuSPtr<AuLoop::ILoopSource> &source, const AuSPtr<AuLoop::ILoopSourceSubscriber> &subscriber) override;

View File

@ -9,7 +9,7 @@
#include "Async.hpp" #include "Async.hpp"
#include "WorkItem.hpp" #include "WorkItem.hpp"
#include "AsyncApp.hpp" #include "AsyncApp.hpp"
#include "Schedular.hpp" #include "AuSchedular.hpp"
#if defined(AURORA_COMPILER_CLANG) #if defined(AURORA_COMPILER_CLANG)
// warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch // warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch
@ -37,7 +37,7 @@ namespace Aurora::Async
if (auto pWorker = this->GetState()) if (auto pWorker = this->GetState())
{ {
this->optOtherCookie = pWorker->uShutdownFence; this->optOtherCookie = pWorker->shutdown.uShutdownFence;
} }
} }
@ -288,7 +288,7 @@ namespace Aurora::Async
{ {
if (auto pWorker = this->GetState()) if (auto pWorker = this->GetState())
{ {
if (this->optOtherCookie.value() != pWorker->uShutdownFence) if (this->optOtherCookie.value() != pWorker->shutdown.uShutdownFence)
{ {
this->Fail(); this->Fail();
return false; return false;