AuroraRuntime/Source/Async/AuGroupWorkQueue.cpp

111 lines
2.7 KiB
C++

/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuGroupWorkQueue.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuGroupWorkQueue.hpp"
#include "Async.hpp"
#include "ThreadPool.hpp"
namespace Aurora::Async
{
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
{
AU_DEBUG_MEMCRUNCH;
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
AU_LOCK_GUARD(this->mutex);
this->sortedWork[prio].push_back(entry);
if (entry.first != kThreadIdAny)
{
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
{
AuAtomicAdd(&pThat->sync.cvHasWork, 1u);
}
}
}
bool GroupWorkQueue::IsEmpty()
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
if (this->sortedWork[i].size())
{
return false;
}
}
return true;
}
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
{
#if 1
auto pHandle = pPool->GetThreadHandle(id);
if (!pHandle)
{
return false;
}
return !AuAtomicLoad(&pHandle->sync.cvHasWork);
#else
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
for (const auto &[srcId, pA] : this->sortedWork[i])
{
if (id == srcId)
{
return false;
}
}
}
return true;
#endif
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
{
if (itr->first == Async::kThreadIdAny)
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) &&
(itr->first == id))
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
itr++;
}
if (queue.size())
{
break;
}
}
}
}