[*] Fix regressions in work groups
This commit is contained in:
parent
cf3d4fc001
commit
cb751d0286
@ -80,7 +80,7 @@ namespace Aurora::Async
|
||||
continue;
|
||||
}
|
||||
|
||||
pWorker->sync.SetEvent();
|
||||
pWorker->sync.SetEvent(true, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
|
||||
namespace Aurora::Async
|
||||
{
|
||||
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
|
||||
void GroupWorkQueue::AddWorkEntry(WorkEntry_t entry)
|
||||
{
|
||||
AU_DEBUG_MEMCRUNCH;
|
||||
|
||||
@ -22,14 +22,6 @@ namespace Aurora::Async
|
||||
|
||||
AU_LOCK_GUARD(this->mutex);
|
||||
this->sortedWork[prio].push_back(entry);
|
||||
|
||||
if (entry.first != kThreadIdAny)
|
||||
{
|
||||
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
|
||||
{
|
||||
AuAtomicAdd(&pThat->sync.cvHasWork, 1u);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool GroupWorkQueue::IsEmpty()
|
||||
|
@ -24,7 +24,7 @@ namespace Aurora::Async
|
||||
bool IsEmpty();
|
||||
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
|
||||
|
||||
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
|
||||
void AddWorkEntry(WorkEntry_t entry);
|
||||
|
||||
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
|
||||
};
|
||||
|
@ -54,7 +54,7 @@ namespace Aurora::Async
|
||||
return true;
|
||||
}
|
||||
|
||||
void ThreadStateSync::SetEvent(bool bBoth)
|
||||
void ThreadStateSync::SetEvent(bool bBoth, bool bHasWork)
|
||||
{
|
||||
if (auto pEvent = this->eventLs)
|
||||
{
|
||||
@ -64,6 +64,11 @@ namespace Aurora::Async
|
||||
}
|
||||
}
|
||||
|
||||
if (bHasWork)
|
||||
{
|
||||
AuAtomicAdd(&this->cvHasWork, 1u);
|
||||
}
|
||||
|
||||
if (bBoth)
|
||||
{
|
||||
this->cvWorkMutex->Lock();
|
||||
|
@ -33,7 +33,7 @@ namespace Aurora::Async
|
||||
AuAUInt32 cvHasWork {};
|
||||
AuSPtr<AuLoop::ILSEvent> eventLs;
|
||||
|
||||
void SetEvent(bool bBoth = true);
|
||||
void SetEvent(bool bBoth = true, bool bHasWork = false);
|
||||
bool Init();
|
||||
void UpdateCVState(ThreadState *pState);
|
||||
};
|
||||
|
@ -137,10 +137,13 @@ namespace Aurora::Async
|
||||
|
||||
void ThreadPool::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement)
|
||||
{
|
||||
auto state = GetGroup(target.first);
|
||||
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
|
||||
AuSPtr<ThreadState> pWorker;
|
||||
auto pGroupState = GetGroup(target.first);
|
||||
SysAssert(static_cast<bool>(pGroupState), "couldn't dispatch a task to an offline group");
|
||||
|
||||
auto pWorker = state->GetThreadByIndex(target.second);
|
||||
if (target.second != Async::kThreadIdAny)
|
||||
{
|
||||
pWorker = pGroupState->GetThreadByIndex(target.second);
|
||||
if (!pWorker)
|
||||
{
|
||||
runnable->CancelAsync();
|
||||
@ -152,21 +155,22 @@ namespace Aurora::Async
|
||||
runnable->CancelAsync();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (bIncrement)
|
||||
{
|
||||
AuAtomicAdd(&this->uAtomicCounter, 1u);
|
||||
}
|
||||
|
||||
state->workQueue.AddWorkEntry(pWorker.get(), AuMakePair(target.second, runnable));
|
||||
pGroupState->workQueue.AddWorkEntry(AuMakePair(target.second, runnable));
|
||||
|
||||
if (target.second == Async::kThreadIdAny)
|
||||
{
|
||||
state->SignalAll();
|
||||
pGroupState->SignalAll();
|
||||
}
|
||||
else
|
||||
{
|
||||
pWorker->sync.SetEvent();
|
||||
pWorker->sync.SetEvent(true, true);
|
||||
}
|
||||
}
|
||||
|
||||
@ -441,7 +445,7 @@ namespace Aurora::Async
|
||||
|
||||
for (const auto &item : state->pendingWorkItems)
|
||||
{
|
||||
group->workQueue.AddWorkEntry(state.get(), item);
|
||||
group->workQueue.AddWorkEntry(item);
|
||||
}
|
||||
|
||||
state->pendingWorkItems.clear();
|
||||
@ -1102,6 +1106,7 @@ namespace Aurora::Async
|
||||
return false;
|
||||
}
|
||||
|
||||
pGroup->group = workerId.first;
|
||||
this->threadGroups_[workerId.first] = pGroup;
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,8 @@ namespace Aurora::Async
|
||||
worker_(worker), task_(task), owner_(owner),
|
||||
finishedEvent_(false, true, true)
|
||||
{
|
||||
SysAssert(owner, "Unable to dispatch on an unspecified pool");
|
||||
|
||||
this->uShutdownCookie = owner->uAtomicShutdownCookie;
|
||||
|
||||
if (auto pWorker = this->GetState())
|
||||
|
Loading…
Reference in New Issue
Block a user