[+] AuAsync::ThreadPool::Wakeup(WorkerId_t)

This commit is contained in:
Reece Wilson 2024-05-03 11:32:31 +01:00
parent 373ccc3660
commit a35c1f165a
8 changed files with 37 additions and 11 deletions

View File

@ -93,13 +93,19 @@ namespace Aurora::Async
static AuSPtr<IO::Net::INetWorker> GetSelfIONetWorker(); static AuSPtr<IO::Net::INetWorker> GetSelfIONetWorker();
// Synchronization // Synchronization
// Legacy Barrier: blocks a/a group of threads
// Note: syncing to yourself will nullify requireSignal to prevent deadlock conditions // Note: syncing to yourself will nullify requireSignal to prevent deadlock conditions
virtual bool Sync(WorkerId_t workerId, virtual bool Sync(WorkerId_t workerId,
AuUInt32 timeoutMs = 0, AuUInt32 timeoutMs = 0,
bool bRequireSignal = false) = 0; bool bRequireSignal = false) = 0;
// Legacy Barrier: wakes up a/a group of ::Sync()'d threads
virtual void Signal(WorkerId_t workerId) = 0; virtual void Signal(WorkerId_t workerId) = 0;
// Breaks ::RunOnce()
virtual void Wakeup(WorkerId_t workerId) = 0;
virtual AuSPtr<IO::Loop::ILoopSource> WorkerToLoopSource(WorkerId_t id) = 0; virtual AuSPtr<IO::Loop::ILoopSource> WorkerToLoopSource(WorkerId_t id) = 0;
virtual void SyncAllSafe() = 0; virtual void SyncAllSafe() = 0;

View File

@ -208,6 +208,11 @@ namespace Aurora::Async
ThreadPool::Signal(workerId); ThreadPool::Signal(workerId);
} }
void AsyncApp::Wakeup(WorkerId_t workerId)
{
ThreadPool::Wakeup(workerId);
}
void AsyncApp::SyncAllSafe() void AsyncApp::SyncAllSafe()
{ {
ThreadPool::SyncAllSafe(); ThreadPool::SyncAllSafe();

View File

@ -38,6 +38,7 @@ namespace Aurora::Async
bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs = 0, bool requireSignal = false) override; bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs = 0, bool requireSignal = false) override;
AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override; AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
void Signal(WorkerId_t workerId) override; void Signal(WorkerId_t workerId) override;
void Wakeup(WorkerId_t workerId) override;
void SyncAllSafe() override; void SyncAllSafe() override;
void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override; void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override;
void AssertInThreadGroup(ThreadGroup_t group) override; void AssertInThreadGroup(ThreadGroup_t group) override;

View File

@ -69,7 +69,7 @@ namespace Aurora::Async
} }
} }
void GroupState::SignalAll() void GroupState::SignalAll(bool bHasWork)
{ {
AU_LOCK_GUARD(this->workersMutex); AU_LOCK_GUARD(this->workersMutex);
@ -80,7 +80,7 @@ namespace Aurora::Async
continue; continue;
} }
pWorker->sync.SetEvent(true, true); pWorker->sync.SetEvent(true, bHasWork);
} }
} }

View File

@ -32,7 +32,7 @@ namespace Aurora::Async
bool Init(); bool Init();
void SignalAll(); void SignalAll(bool bHasWork = true);
void Decommit(ThreadId_t id); void Decommit(ThreadId_t id);
bool AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState); bool AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState);

View File

@ -74,6 +74,11 @@ namespace Aurora::Async
void ThreadStateSync::SetEvent(bool bBoth, bool bHasWork) void ThreadStateSync::SetEvent(bool bBoth, bool bHasWork)
{ {
if (bHasWork)
{
AuAtomicAdd(&this->cvHasWork, 1u);
}
if (auto pEvent = this->eventLs) if (auto pEvent = this->eventLs)
{ {
if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0) if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0)
@ -82,11 +87,6 @@ namespace Aurora::Async
} }
} }
if (bHasWork)
{
AuAtomicAdd(&this->cvHasWork, 1u);
}
if (bBoth) if (bBoth)
{ {
this->cvWorkMutex->Lock(); this->cvWorkMutex->Lock();

View File

@ -894,7 +894,7 @@ namespace Aurora::Async
return true; return true;
} }
void ThreadPool::Signal(WorkerId_t workerId) void ThreadPool::Signal(WorkerId_t workerId)
{ {
auto group = GetGroup(workerId.first); auto group = GetGroup(workerId.first);
@ -906,9 +906,22 @@ namespace Aurora::Async
jobWorker.second->running->Set(); jobWorker.second->running->Set();
} }
} }
else else if (auto pThread = GetThreadHandle(workerId))
{ {
GetThreadHandle(workerId)->running->Set(); pThread->running->Set();
}
}
void ThreadPool::Wakeup(WorkerId_t workerId)
{
auto group = GetGroup(workerId.first);
if (workerId.second == Async::kThreadIdAny)
{
group->SignalAll(false);
}
else if (auto pThread = GetThreadHandle(workerId))
{
pThread->sync.SetEvent(true, false);
} }
} }

View File

@ -70,6 +70,7 @@ namespace Aurora::Async
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override; virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
virtual void Signal(WorkerId_t workerId) override; virtual void Signal(WorkerId_t workerId) override;
virtual void Wakeup(WorkerId_t workerId) override;
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override; virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
virtual void SyncAllSafe() override; virtual void SyncAllSafe() override;