[*] Rewrite a bit of AuAsync

(maybe 1/3 of a rewrite)
This commit is contained in:
Reece Wilson 2023-05-25 01:55:55 +01:00
parent 0b265c80cf
commit 6af9940bd4
9 changed files with 262 additions and 276 deletions

View File

@ -9,6 +9,14 @@
namespace Aurora::Async
{
AUE_DEFINE(EWorkPrio, (
eLowestPrio,
eLowPrio,
eNormalPrio,
eHighPrio,
eHighestPrio
));
struct IWorkItem
{
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &pWorkItem) = 0;
@ -37,12 +45,7 @@ namespace Aurora::Async
virtual AuSPtr<IWorkItem> Dispatch() = 0;
// per work frame, available work is dequed and sorted.
// work is sorted by prio and then by initial submission index.
//
// prios under .25 will yield for new work at time of possible dispatch
// only if there are work entries of > .5 in the pending work queue.
virtual void SetPrio(float val = 0.5f) = 0;
virtual void SetPrio(EWorkPrio prio) = 0;
virtual bool BlockUntilComplete() = 0;
virtual bool HasFinished() = 0;

View File

@ -9,22 +9,23 @@
namespace Aurora::Async
{
class IAsyncRunnable
struct IAsyncRunnable
{
public:
virtual float GetPrio() { return 0.5f; };
inline virtual EWorkPrio GetPrio()
{
return EWorkPrio::eNormalPrio;
};
virtual void RunAsync() = 0;
virtual void CancelAsync() {}
};
class AsyncFuncRunnable : public IAsyncRunnable
struct AsyncFuncRunnable : IAsyncRunnable
{
public:
AuFunction<void()> callback;
AuFunction<void()> fail;
AuThreadPrimitives::SpinLock lock;
AuThreadPrimitives::Mutex lock;
AsyncFuncRunnable(AuFunction<void()> &&callback) : callback(AuMove(callback))
{}

View File

@ -14,26 +14,50 @@ namespace Aurora::Async
{
bool GroupState::Init()
{
this->cvWorkMutex = AuThreadPrimitives::ConditionMutexUnique();
if (!this->cvWorkMutex)
{
return false;
}
this->cvVariable = AuThreadPrimitives::ConditionVariableUnique(AuUnsafeRaiiToShared(this->cvWorkMutex));
if (!this->cvVariable)
{
return false;
}
this->eventLs = AuLoop::NewLSAsync();
if (!this->eventLs)
{
return false;
}
this->asyncLoopSourceShared = this->eventLs;
return true;
}
AuSPtr<ThreadState> GroupState::GetThreadByIndex(ThreadId_t uIndex)
{
AU_LOCK_GUARD(this->workersMutex);
if (AuArraySize(this->wpWorkers) > uIndex)
{
if (auto pState = AuTryLockMemoryType(this->wpWorkers[uIndex]))
{
return pState;
}
}
auto itr = this->workers.find(uIndex);
if (itr == this->workers.end())
{
return {};
}
return itr->second;
}
void GroupState::BroadCast()
{
AU_LOCK_GUARD(this->workersMutex);
for (const auto &worker : this->workers)
{
worker.second->cvVariable->Broadcast();
worker.second->eventLs->Set();
}
}
void GroupState::AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState)
{
AU_LOCK_GUARD(this->workersMutex);
if (AuArraySize(this->wpWorkers) > id)
{
this->wpWorkers[id] = pState;
}
SysAssert(AuTryInsert(this->workers, AuMakePair(id, pState)));
}
}

View File

@ -10,23 +10,38 @@
namespace Aurora::Async
{
struct GroupWorkQueue
{
AuThreadPrimitives::Mutex mutex;
AuUInt32 uItems {};
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
bool IsEmpty();
bool IsEmpty(AuWorkerId_t id);
void AddWorkEntry(WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
};
struct GroupState
{
ThreadGroup_t group;
AuThreadPrimitives::ConditionMutexUnique_t cvWorkMutex;
AuThreadPrimitives::ConditionVariableUnique_t cvVariable;
AuSPtr<AuLoop::ILSEvent> eventLs;
AuSPtr<AuLoop::ILoopSource> asyncLoopSourceShared;
AuList<WorkEntry_t> workQueue;
bool sorted {};
AuUInt32 dirty {};
GroupWorkQueue workQueue;
AuThreadPrimitives::Mutex workersMutex;
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
AuWPtr<ThreadState> wpWorkers[32];
bool Init();
void BroadCast();
void AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState);
AuSPtr<ThreadState> GetThreadByIndex(ThreadId_t uIndex);
bool inline IsSysThread()
{
return group == 0;

View File

@ -12,6 +12,7 @@
#include "WorkItem.hpp"
#include "Schedular.hpp"
#include "ThreadWorkerQueueShim.hpp"
#include <Source/IO/Loop/LSAsync.hpp>
namespace Aurora::Async
{
@ -123,81 +124,31 @@ namespace Aurora::Async
auto state = GetGroup(target.first);
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
auto pWorker = state->GetThreadByIndex(target.second);
if (!pWorker)
{
runnable->CancelAsync();
return;
}
AuDebug::AddMemoryCrunch();
IncrementTasksRunning();
{
AU_LOCK_GUARD(state->cvWorkMutex);
AU_LOCK_GUARD(pWorker->cvWorkMutex);
#if defined(STAGING) || defined(DEBUG)
AU_LOCK_GUARD(rwlock_->AsReadable());
state->workQueue.AddWorkEntry(AuMakePair(target.second, runnable));
if (target.second != Async::kThreadIdAny)
{
auto itr = state->workers.find(target.second);
if ((itr == state->workers.end()) || (itr->second->rejecting))
{
SysPushErrorGeneric("worker: {}:{} is offline", target.first, target.second);
DecrementTasksRunning();
#if 0
throw "Requested job worker is offline";
#else
runnable->CancelAsync();
return;
#endif
}
}
else
{
auto workers = state->workers;
bool found = false;
for (const auto &worker : state->workers)
{
if (!worker.second->rejecting)
{
found = true;
break;
}
}
if (!found)
{
DecrementTasksRunning();
#if 0
throw "No workers available";
#else
runnable->CancelAsync();
return;
#endif
}
}
#endif
if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable)))
{
DecrementTasksRunning();
runnable->CancelAsync();
return;
}
state->dirty++;
if (state->dirty > kMagicResortThreshold)
{
state->dirty = 0;
state->sorted = false;
}
state->eventLs->Set();
pWorker->cvVariable->Signal();
pWorker->eventLs->Set();
}
if (target.second == Async::kThreadIdAny)
{
state->cvVariable->Signal();
}
else
{
// sad :(
// TODO: when we have wait any, add support (^ the trigger) for it here
state->cvVariable->Broadcast();
state->BroadCast();
}
AuDebug::DecMemoryCrunch();
}
IThreadPool *ThreadPool::ToThreadPool()
@ -391,6 +342,84 @@ namespace Aurora::Async
return success;
}
void GroupWorkQueue::AddWorkEntry(WorkEntry_t entry)
{
AU_LOCK_GUARD(this->mutex);
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
this->sortedWork[prio].push_back(entry);
}
bool GroupWorkQueue::IsEmpty()
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
if (this->sortedWork[i].size())
{
return true;
}
}
return false;
}
bool GroupWorkQueue::IsEmpty(AuWorkerId_t id)
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
for (const auto &[srcId, pA] : this->sortedWork[i])
{
if (id == srcId)
{
return true;
}
}
}
return false;
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
{
if (itr->first == Async::kThreadIdAny)
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) &&
(itr->first == id))
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
itr++;
}
if (queue.size())
{
break;
}
}
}
// TODO: rewrite queues
bool ThreadPool::PollInternal(bool block, AuUInt32 &uCount)
{
@ -404,78 +433,12 @@ namespace Aurora::Async
//state->pendingWorkItems.clear();
auto magic = CtxPollPush();
{
AU_LOCK_GUARD(group->cvWorkMutex);
AU_LOCK_GUARD(state->cvWorkMutex);
// TODO: reimplement this
// this is stupid and gross
if (group->workQueue.size() > group->workers.size()*3)
{
if (!group->sorted)
{
auto cpy = group->workQueue;
std::sort(group->workQueue.begin(), group->workQueue.end(), [&](const WorkEntry_t &a, const WorkEntry_t &b)
{
if (a.second->GetPrio() != b.second->GetPrio())
return a.second->GetPrio() > b.second->GetPrio();
AuUInt32 ia {}, ib {};
for (; ia < cpy.size(); ia++)
if (cpy[ia].second == a.second)
break;
for (; ib < cpy.size(); ib++)
if (cpy[ib].second == b.second)
break;
return ia < ib;
});
group->sorted = true;
group->dirty = 0;
}
}
do
{
// Deque tasks the current thread runner could dipatch
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
// It's probable `multipopCount` will equal 1 for your use case
//
// Only increment when you know tasks within a group queue will not depend on one another
// *and* tasks require a small amount of execution time
//
// This could be potentially useful for an event dispatcher whereby you're dispatching
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
// is a waste of CPU cycles.
//
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
// thread group waits
for (auto itr = group->workQueue.begin();
((itr != group->workQueue.end()) &&
(state->pendingWorkItems.size() < state->multipopCount));
)
{
// TODO: catch low memory condition
if (itr->first == Async::kThreadIdAny)
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) && (itr->first == state->id.second))
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
itr++;
}
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
// Consider blocking for more work
if (!block)
@ -499,7 +462,7 @@ namespace Aurora::Async
break;
}
group->cvVariable->WaitForSignal();
state->cvVariable->WaitForSignal();
if (this->shuttingdown_)
{
@ -523,15 +486,25 @@ namespace Aurora::Async
} while (state->pendingWorkItems.empty() && block);
if (group->workQueue.empty())
if (group->workQueue.IsEmpty(state->id))
{
group->eventLs->Reset();
state->eventLs->Reset();
}
}
if (state->pendingWorkItems.empty())
{
CtxPollReturn(state, magic, false);
if (InRunnerMode())
{
auto queue = ToKernelWorkQueue();
if ((this->tasksRunning_ == 0) &&
(!queue || queue->GetSourceCount() <= 1))
{
Shutdown();
}
}
return false;
}
@ -572,38 +545,6 @@ namespace Aurora::Async
// Set the last frame time for a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS();
if (itr->second->GetPrio() < 0.25)
{
group->sorted = false;
if (lowPrioCont)
{
itr++;
continue;
}
if (!lowPrioContCached)
{
AU_LOCK_GUARD(group->cvWorkMutex);
{
for (const auto &[pendingWorkA, pendingWorkB] : group->workQueue)
{
if (pendingWorkB->GetPrio() > .5)
{
lowPrioCont = true;
break;
}
}
}
lowPrioContCached = true;
if (lowPrioCont)
{
itr++;
continue;
}
}
}
// Dispatch
auto oops = itr->second;
@ -635,15 +576,19 @@ namespace Aurora::Async
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
AU_LOCK_GUARD(group->cvWorkMutex);
// TODO: low memory condition slow path
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
group->eventLs->Set();
AU_LOCK_GUARD(state->cvWorkMutex);
for (const auto &item : state->pendingWorkItems)
{
group->workQueue.AddWorkEntry(item);
}
state->pendingWorkItems.clear();
state->cvVariable->Broadcast();
state->eventLs->Set();
state->pendingWorkItems.clear();
}
CtxPollReturn(state, magic, true);
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
@ -741,12 +686,14 @@ namespace Aurora::Async
{
for (auto &[id, worker] : group->workers)
{
if (group->cvWorkMutex && group->cvVariable)
auto pState = group->GetThreadByIndex(worker);
if (pState->cvWorkMutex && pState->cvVariable)
{
bool bLocked = group->cvWorkMutex->TryLock();
bool bLocked = pState->cvWorkMutex->TryLock();
worker->shuttingdown = true;
group->cvVariable->Broadcast();
if (bLocked) group->cvWorkMutex->Unlock();
pState->cvVariable->Broadcast();
if (bLocked) pState->cvWorkMutex->Unlock();
}
else
{
@ -940,7 +887,7 @@ namespace Aurora::Async
return {};
}
return a->parent.lock()->asyncLoopSourceShared;
return a->asyncLoopSourceShared;
}
void ThreadPool::SyncAllSafe()
@ -1146,8 +1093,6 @@ namespace Aurora::Async
}
threadState->parent = group;
threadState->running = AuThreadPrimitives::EventUnique(true, false, true);
threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0);
threadState->id = workerId;
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
if (!threadState->asyncLoop)
@ -1156,6 +1101,15 @@ namespace Aurora::Async
return {};
}
threadState->eventLs = AuLoop::NewLSAsync();
if (!threadState->eventLs)
{
SysPushErrorMemory();
return {};
}
threadState->asyncLoopSourceShared = threadState->eventLs;
threadState->asyncLoop->pParent = threadState.get();
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
threadState->runMode = ERunMode::eEfficient;
@ -1172,19 +1126,7 @@ namespace Aurora::Async
return {};
}
if (!threadState->syncSema)
{
SysPushErrorMemory();
return {};
}
if (!threadState->syncSema)
{
SysPushErrorMemory();
return {};
}
threadState->asyncLoop->SourceAdd(group->eventLs);
threadState->asyncLoop->SourceAdd(threadState->eventLs);
if (!create)
{
@ -1222,7 +1164,8 @@ namespace Aurora::Async
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
}
group->workers.insert(AuMakePair(workerId.second, threadState));
group->AddWorker(workerId.second, threadState);
return true;
}
@ -1237,7 +1180,7 @@ namespace Aurora::Async
}
auto &semaphore = self->syncSema;
auto unsafeSemaphore = semaphore.get();
auto unsafeSemaphore = semaphore.AsPointer();
bool failed {};
auto work = AuMakeShared<AsyncFuncRunnable>(
@ -1276,7 +1219,7 @@ namespace Aurora::Async
Run(workerId, work);
return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore), ms) && !failed;
return WaitFor(workerId, AuUnsafeRaiiToShared(semaphore.AsPointer()), ms) && !failed;
}
void ThreadPool::Entrypoint(WorkerId_t id)
@ -1322,8 +1265,7 @@ namespace Aurora::Async
return;
}
state->eventLs->Set();
state->cvVariable->Broadcast();
state->BroadCast();
{
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
@ -1486,7 +1428,7 @@ namespace Aurora::Async
return {};
}
return state->workers[worker.second];
return state->GetThreadByIndex(worker.second);
}
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
@ -1512,7 +1454,7 @@ namespace Aurora::Async
return {};
}
return state->workers[worker.second];
return state->GetThreadByIndex(worker.second);
}
AuSPtr<ThreadState> ThreadPool::GetThreadHandle(WorkerId_t id)
@ -1524,14 +1466,8 @@ namespace Aurora::Async
{
return {};
}
AuSPtr<ThreadState> *ret;
if (!AuTryFind(group->workers, id.second, ret))
{
return {};
}
return *ret;
return group->GetThreadByIndex(id.second);
}
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
@ -1547,12 +1483,10 @@ namespace Aurora::Async
AuList<AuSPtr<ThreadState>> ret;
if (id.second != Async::kThreadIdAny)
{
AuSPtr<ThreadState> *ptr;
if (!AuTryFind(group->workers, id.second, ptr))
if (auto pPtr = group->GetThreadByIndex(id.second))
{
return {};
ret.push_back(pPtr);
}
ret.push_back(*ptr);
}
else
{
@ -1562,7 +1496,6 @@ namespace Aurora::Async
}
}
return ret;
}

View File

@ -28,18 +28,25 @@ namespace Aurora::Async
struct ThreadState
{
ThreadState() : running(true, false, true),
cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer()))
{
}
// :vomit:
WorkerId_t id;
AuUInt8 multipopCount = 1;
AuUInt8 multipopCount = 32;
AuUInt32 lastFrameTime {};
AuThreads::ThreadShared_t threadObject;
AuWPtr<GroupState> parent;
AuThreadPrimitives::SemaphoreUnique_t syncSema;
AuThreadPrimitives::Semaphore syncSema;
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
bool rejecting {};
bool exiting {};
bool shuttingdown {};
bool exitingflag2 {};
AuThreadPrimitives::EventUnique_t running;
AuThreadPrimitives::Event running;
//bool running;
AuList<AsyncAppWaitSourceRequest> loopSources;
AuList<WorkEntry_t> pendingWorkItems;
@ -54,5 +61,12 @@ namespace Aurora::Async
//
AuThreadPrimitives::SpinLock externalFencesLock;
AuList<AuThreading::IWaitable *> externalFences;
//
AuThreadPrimitives::ConditionMutex cvWorkMutex;
AuThreadPrimitives::ConditionVariable cvVariable;
AuSPtr<AuLoop::ILSEvent> eventLs;
AuSPtr<AuLoop::ILoopSource> asyncLoopSourceShared;
};
}

View File

@ -58,7 +58,7 @@ namespace Aurora::Async
if (AuThreads::GetThread() != this->pParent->threadObject.get())
{
this->pParent->parent.lock()->cvVariable->Broadcast();
this->pParent->cvVariable->Broadcast();
}
}

View File

@ -17,13 +17,9 @@ namespace Aurora::Async
const WorkerPId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking) :
worker_(worker), task_(task), owner_(owner)
worker_(worker), task_(task), owner_(owner),
finishedEvent_(false, true, true)
{
if (bSupportsBlocking)
{
this->finishedEvent_ = AuThreadPrimitives::EventUnique(false, true, true);
SysAssert(this->finishedEvent_);
}
}
WorkItem::~WorkItem()
@ -227,14 +223,14 @@ namespace Aurora::Async
SendOff();
}
float WorkItem::GetPrio()
EWorkPrio WorkItem::GetPrio()
{
return prio_;
return this->prio_;
}
void WorkItem::SetPrio(float val)
void WorkItem::SetPrio(EWorkPrio prio)
{
prio_ = val;
this->prio_ = prio;
}
void WorkItem::CancelAsync()
@ -390,7 +386,7 @@ namespace Aurora::Async
bool WorkItem::BlockUntilComplete()
{
if (!this->finishedEvent_) return false;
return this->owner_->WaitFor(this->worker_, AuUnsafeRaiiToShared(this->finishedEvent_), 0);
return this->owner_->WaitFor(this->worker_, AuUnsafeRaiiToShared(this->finishedEvent_.AsPointer()), 0);
}
bool WorkItem::HasFinished()

View File

@ -47,8 +47,8 @@ namespace Aurora::Async
void *GetPrivateData() override;
AuOptional<void *> ToWorkResultT() override;
float GetPrio() override;
void SetPrio(float val) override;
EWorkPrio GetPrio() override;
void SetPrio(EWorkPrio prio) override;
private:
void RunAsyncLocked();
@ -61,12 +61,12 @@ namespace Aurora::Async
AuSPtr<IWorkItemHandler> task_;
WorkerPId_t worker_;
float prio_ = 0.5f;
EWorkPrio prio_ = EWorkPrio::eNormalPrio;
AuList<AuSPtr<IWorkItem>> waitOn_;
AuList<AuSPtr<IWorkItem>> waiters_;
AuThreadPrimitives::SpinLock lock;
AuThreadPrimitives::SpinLock lock2;
AuThreadPrimitives::EventUnique_t finishedEvent_;
AuThreadPrimitives::CriticalSection lock;
AuThreadPrimitives::CriticalSection lock2;
AuThreadPrimitives::Event finishedEvent_;
bool finished {};
bool failed {};