[*] 560ca079 cont perf improvement

This commit is contained in:
Reece Wilson 2023-09-05 13:54:05 +01:00
parent 560ca079a4
commit d3587dbf08
5 changed files with 48 additions and 12 deletions

View File

@ -45,6 +45,7 @@ namespace Aurora::Async
for (const auto &worker : this->workers)
{
AuAtomicAdd(&worker.second->cvHasWork, 1u);
worker.second->cvVariable->Broadcast();
worker.second->eventLs->Set();
}

View File

@ -10,6 +10,7 @@
namespace Aurora::Async
{
struct ThreadPool;
struct GroupWorkQueue
{
AuThreadPrimitives::Mutex mutex;
@ -17,9 +18,9 @@ namespace Aurora::Async
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
bool IsEmpty();
bool IsEmpty(AuWorkerId_t id);
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
void AddWorkEntry(WorkEntry_t entry);
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
};

View File

@ -143,7 +143,7 @@ namespace Aurora::Async
AuAtomicAdd(&this->uAtomicCounter, 1u);
}
state->workQueue.AddWorkEntry(AuMakePair(target.second, runnable));
state->workQueue.AddWorkEntry(pWorker.get(), AuMakePair(target.second, runnable));
if (target.second == Async::kThreadIdAny)
{
@ -341,14 +341,21 @@ namespace Aurora::Async
}
void GroupWorkQueue::AddWorkEntry(WorkEntry_t entry)
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
{
AU_LOCK_GUARD(this->mutex);
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
AU_LOCK_GUARD(this->mutex);
this->sortedWork[prio].push_back(entry);
if (entry.first != kThreadIdAny)
{
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
{
AuAtomicAdd(&pThat->cvHasWork, 1u);
}
}
}
bool GroupWorkQueue::IsEmpty()
@ -365,8 +372,17 @@ namespace Aurora::Async
return true;
}
bool GroupWorkQueue::IsEmpty(AuWorkerId_t id)
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
{
#if 1
auto pHandle = pPool->GetThreadHandle(id);
if (!pHandle)
{
return false;
}
return !AuAtomicLoad(&pHandle->cvHasWork);
#else
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
@ -379,8 +395,9 @@ namespace Aurora::Async
}
}
}
return true;
#endif
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
@ -483,7 +500,23 @@ namespace Aurora::Async
}
while (state->pendingWorkItems.empty() && block);
if (group->workQueue.IsEmpty(state->id))
{
auto uState = state->cvHasWork;
while (uState &&
AuAtomicCompareExchange(&state->cvHasWork, uState - 1, uState) != uState)
{
uState = state->cvHasWork;
if (uState == 0)
{
break;
}
}
}
if (!block && // quick hack: is worthy of io reset by virtue of having polled externally (most likely)?
group->workQueue.IsEmpty(this, state->id))
{
state->eventLs->Reset();
AuAtomicStore(&state->cvLSActive, 0u);
@ -578,7 +611,7 @@ namespace Aurora::Async
for (const auto &item : state->pendingWorkItems)
{
group->workQueue.AddWorkEntry(item);
group->workQueue.AddWorkEntry(state.get(), item);
}
state->pendingWorkItems.clear();

View File

@ -122,6 +122,7 @@ namespace Aurora::Async
AuThreads::TLSVariable<WorkerWPId_t> tlsWorkerId;
AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id);
private:
// TODO: BarrierMultiple
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
@ -137,7 +138,6 @@ namespace Aurora::Async
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
AuSPtr<ThreadState> GetThreadState();
AuSPtr<ThreadState> GetThreadStateNoWarn();
AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id);
AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id);
AuSPtr<GroupState> threadGroups_[255];

View File

@ -70,6 +70,7 @@ namespace Aurora::Async
AuThreadPrimitives::ConditionVariable cvVariable;
AuAUInt32 cvSleepCount {};
AuAUInt32 cvLSActive {};
AuAUInt32 cvHasWork {};
AuSPtr<AuLoop::ILSEvent> eventLs;
AuSPtr<AuLoop::ILoopSource> asyncLoopSourceShared;
};