[*] Begin cleaning up AuAsync (2/?)

6af9940b, ???
This commit is contained in:
Reece Wilson 2023-11-11 10:11:09 +00:00
parent 73c5904d97
commit e037fc214a
28 changed files with 650 additions and 507 deletions

View File

@ -15,19 +15,6 @@ namespace Aurora::IO::Loop
namespace Aurora::Async
{
AUE_DEFINE(ERunMode,
(
eLowLatencyYield, // uses high perf cond var + yield + trylock
eLowLatencyFreqKernel, // uses high perf cond var + timeout(freqency)
eEfficient // delegates sleep to the kernel once kernel objects are scheduled
));
struct RunMode
{
ERunMode mode;
AuUInt16 freqMsTick;
};
struct IThreadPool
{
// Spawning
@ -122,9 +109,6 @@ namespace Aurora::Async
// AuIO overlapped-IO glue
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue() = 0;
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) = 0;
virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) = 0;
virtual ERunMode GetCurrentThreadRunMode() = 0;
virtual ERunMode GetThreadRunMode(WorkerId_t workerId) = 0;
//
virtual AuSPtr<Aurora::Threading::IWaitable> GetShutdownEvent() = 0;

View File

@ -7,7 +7,7 @@
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "Schedular.hpp"
#include "AuSchedular.hpp"
#include "AsyncApp.hpp"
namespace Aurora::Async

View File

@ -9,7 +9,7 @@
#include "Async.hpp"
#include "AsyncApp.hpp"
#include "WorkItem.hpp"
#include "Schedular.hpp"
#include "AuSchedular.hpp"
#include <Source/Console/Commands/Commands.hpp>
@ -237,19 +237,4 @@ namespace Aurora::Async
{
return ThreadPool::ToKernelWorkQueue(workerId);
}
void AsyncApp::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
{
ThreadPool::UpdateWorkMode(workerId, mode);
}
ERunMode AsyncApp::GetCurrentThreadRunMode()
{
return ThreadPool::GetCurrentThreadRunMode();
}
ERunMode AsyncApp::GetThreadRunMode(WorkerId_t workerId)
{
return ThreadPool::GetThreadRunMode(workerId);
}
}

View File

@ -45,9 +45,6 @@ namespace Aurora::Async
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override;
ERunMode GetCurrentThreadRunMode() override;
ERunMode GetThreadRunMode(WorkerId_t workerId) override;
// Main thread logic
void Start() override;

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAThreadPoolInternal.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuAThreadPoolInternal.hpp"
namespace Aurora::Async
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuGroupWorkQueue.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -0,0 +1,47 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncKeepGroupAlive.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuAsyncKeepGroupAlive.hpp"
#include "ThreadPool.hpp"
#include "ThreadState.hpp"
#include "GroupState.hpp"
namespace Aurora::Async
{
struct KeepGroupAlive
{
KeepGroupAlive(AuSPtr<AuAsync::IThreadPool> pPool) :
pPool(AuStaticCast<AuAsync::ThreadPool>(pPool))
{
AuAtomicAdd(&this->pPool->uAtomicCounter, 1u);
}
~KeepGroupAlive()
{
auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u);
if (uNow == 0)
{
for (const auto &pState : this->pPool->threadGroups_)
{
if (pState)
{
pState->SignalAll();
}
}
}
}
AuSPtr<AuAsync::ThreadPool> pPool;
};
AUKN_SYM AuSPtr<void> KeepThreadPoolAlive(AuSPtr<AuAsync::IThreadPool> pPool)
{
return AuMakeSharedThrow<KeepGroupAlive>(pPool);
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncKeepGroupAlive.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncMicrocounter.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuAsyncMicrocounter.hpp"
namespace Aurora::Async
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncMicrocounter.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -0,0 +1,112 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuGroupWorkQueue.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuGroupWorkQueue.hpp"
#include "ThreadState.hpp"
#include "GroupState.hpp"
#include "ThreadPool.hpp"
namespace Aurora::Async
{
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
{
AU_DEBUG_MEMCRUNCH;
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
AU_LOCK_GUARD(this->mutex);
this->sortedWork[prio].push_back(entry);
if (entry.first != kThreadIdAny)
{
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
{
AuAtomicAdd(&pThat->sync.cvHasWork, 1u);
}
}
}
bool GroupWorkQueue::IsEmpty()
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
if (this->sortedWork[i].size())
{
return false;
}
}
return true;
}
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
{
#if 1
auto pHandle = pPool->GetThreadHandle(id);
if (!pHandle)
{
return false;
}
return !AuAtomicLoad(&pHandle->sync.cvHasWork);
#else
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
for (const auto &[srcId, pA] : this->sortedWork[i])
{
if (id == srcId)
{
return false;
}
}
}
return true;
#endif
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
{
if (itr->first == Async::kThreadIdAny)
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) &&
(itr->first == id))
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
itr++;
}
if (queue.size())
{
break;
}
}
}
}

View File

@ -0,0 +1,31 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuGroupWorkQueue.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IAsyncRunnable;
struct ThreadPool;
struct ThreadState;
using WorkEntry_t = AuPair<ThreadId_t, AuSPtr<IAsyncRunnable>>;
struct GroupWorkQueue
{
AuThreadPrimitives::Mutex mutex;
AuUInt32 uItems {};
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
bool IsEmpty();
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
};
}

View File

@ -1,13 +1,13 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
Copyright (C) 2021-2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Schedular.cpp
File: AuSchedular.cpp
Date: 2021-6-26
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "Schedular.hpp"
#include "AuSchedular.hpp"
//#include "AsyncApp.hpp"
#include "ThreadPool.hpp"
#include <Console/Commands/Commands.hpp>

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Schedular.hpp
File: AuSchedular.hpp
Date: 2021-6-26
Author: Reece
***/

View File

@ -0,0 +1,93 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadState.hpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "AuThreadState.hpp"
#include <Source/IO/Loop/LSAsync.hpp>
#include "ThreadWorkerQueueShim.hpp"
namespace Aurora::Async
{
ThreadStateSync::ThreadStateSync():
cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer()))
{
}
bool ThreadStateSync::Init()
{
this->eventLs = AuLoop::NewLSAsync();
return true;
}
bool ThreadStateBase::Init()
{
if (!(this->asyncLoop = AuMakeShared<AsyncLoop>()))
{
SysPushErrorMemory();
return {};
}
this->asyncLoop->pParent = this;
if (!this->asyncLoop->Init())
{
SysPushErrorNested();
return {};
}
if (!this->sync.Init())
{
SysPushErrorNested();
return {};
}
if (!this->asyncLoop->SourceAdd(this->sync.eventLs))
{
SysPushErrorNested();
return {};
}
return true;
}
void ThreadStateSync::SetEvent(bool bBoth)
{
if (auto pEvent = this->eventLs)
{
if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0)
{
pEvent->Set();
}
}
if (bBoth)
{
this->cvWorkMutex->Lock();
this->cvWorkMutex->Unlock();
this->cvVariable->Signal();
}
}
void ThreadStateSync::UpdateCVState(ThreadState *pState)
{
auto uState = pState->sync.cvHasWork;
auto uMin = AuMin(uState, pState->pendingWorkItems.size());
if (!uMin) uMin = 1;
while (uState &&
AuAtomicCompareExchange<AuUInt32>(&pState->sync.cvHasWork, uState - uMin, uState) != uState)
{
uState = pState->sync.cvHasWork;
if (uState < uMin)
{
uMin = uState;
}
}
}
}

View File

@ -0,0 +1,62 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadState.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct AsyncLoop;
struct ThreadState;
struct ThreadStateShutdown
{
bool bBreakMainLoop {};
bool bIsKillerThread {};
AuUInt32 uShutdownFence { 1 };
};
struct ThreadStateSync
{
ThreadStateSync();
AuThreadPrimitives::ConditionMutex cvWorkMutex;
AuThreadPrimitives::ConditionVariable cvVariable;
AuAUInt32 cvLSActive {};
AuAUInt32 cvHasWork {};
AuSPtr<AuLoop::ILSEvent> eventLs;
void SetEvent(bool bBoth = true);
bool Init();
void UpdateCVState(ThreadState *pState);
};
struct ThreadStateMeta
{
bool bOwnsThread {};
AuThreads::ThreadShared_t pThread;
};
struct ThreadStateStack
{
AuUInt32 uStackCookie {};
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
AuUInt32 uStackCallDepth {};
AuUInt32 uStackMaxCookie { 5 };
};
struct ThreadStateBase
{
ThreadStateShutdown shutdown;
ThreadStateMeta thread;
ThreadStateSync sync;
AuSPtr<AsyncLoop> asyncLoop;
ThreadStateStack stackState;
bool Init();
};
}

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadStateSingletons.cpp
Date: 2023-11-04
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuThreadStateSingletons.hpp"
namespace Aurora::Async
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuThreadStateSingletons.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
}

View File

@ -64,7 +64,7 @@ namespace Aurora::Async
}
}
void GroupState::BroadCast()
void GroupState::SignalAll()
{
AU_LOCK_GUARD(this->workersMutex);
@ -75,12 +75,11 @@ namespace Aurora::Async
continue;
}
AuAtomicAdd(&pWorker->cvHasWork, 1u);
pWorker->SetEvent();
pWorker->sync.SetEvent();
}
}
void GroupState::AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState)
bool GroupState::AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState)
{
AU_LOCK_GUARD(this->workersMutex);
@ -89,6 +88,41 @@ namespace Aurora::Async
this->wpWorkers[id] = pState;
}
SysAssert(AuTryInsert(this->workers, AuMakePair(id, pState)));
if (!AuTryInsert(this->workers, AuMakePair(id, pState)))
{
this->wpWorkers[id] = {};
return {};
}
return true;
}
AuSPtr<ThreadState> GroupState::CreateWorker(WorkerId_t workerId, bool bCreate)
{
if (auto pThreadState = AuMakeShared<ThreadState>())
{
pThreadState->thread.bOwnsThread = !bCreate;
pThreadState->parent = this->SharedFromThis();
pThreadState->id = workerId;
if (!pThreadState->Init())
{
SysPushErrorNested();
return {};
}
if (!this->AddWorker(workerId.second, pThreadState))
{
SysPushErrorNested();
return {};
}
return pThreadState;
}
else
{
SysPushErrorMemory();
return {};
}
}
}

View File

@ -6,47 +6,38 @@
Author: Reece
***/
#pragma once
#include "ThreadState.hpp"
#include "AuGroupWorkQueue.hpp"
namespace Aurora::Async
{
struct ThreadPool;
struct GroupWorkQueue
{
AuThreadPrimitives::Mutex mutex;
AuUInt32 uItems {};
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
bool IsEmpty();
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
};
struct GroupState
struct GroupState :
AuEnableSharedFromThis<GroupState>
{
// group id
ThreadGroup_t group;
// work items
GroupWorkQueue workQueue;
// tracked workers
AuThreadPrimitives::Mutex workersMutex;
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
AuWPtr<ThreadState> wpWorkers[32];
AuWPtr<ThreadState> wpWorkers[32]; // linear non-locking lookup table
//
bool Init();
void BroadCast();
void SignalAll();
void Decommit(ThreadId_t id);
void AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState);
bool AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState);
AuSPtr<ThreadState> CreateWorker(WorkerId_t id, bool bCreate);
AuSPtr<ThreadState> GetThreadByIndex(ThreadId_t uIndex);
bool inline IsSysThread()
{
return group == 0;
}
};
}

View File

View File

@ -0,0 +1,27 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IThreadPoolInternal.hpp
Date: 2023-11-04
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct ThreadState;
struct IAsyncRunnable;
struct IThreadPoolInternal
{
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
virtual IThreadPool *ToThreadPool() = 0;
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
AuUInt32 uAtomicShutdownCookie {};
};
}

View File

@ -10,17 +10,14 @@
#include "ThreadPool.hpp"
#include "AsyncApp.hpp"
#include "WorkItem.hpp"
#include "Schedular.hpp"
#include "AuSchedular.hpp"
#include "ThreadWorkerQueueShim.hpp"
#include <Source/IO/Loop/LSAsync.hpp>
namespace Aurora::Async
{
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static thread_local AuWPtr<ThreadPool> gCurrentPool;
static const auto kMagicResortThreshold = 15;
static thread_local AuWPtr<ThreadPool> tlsCurrentThreadPool;
static thread_local int tlsCallStack;
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
{
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
@ -33,17 +30,28 @@ namespace Aurora::Async
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
{
auto lkPool = gCurrentPool.lock();
if (!lkPool) return {};
auto lkPool = tlsCurrentThreadPool.lock();
if (!lkPool)
{
return {};
}
auto cpy = *lkPool->tlsWorkerId;
auto lkPool2 = cpy.pool.lock();
return WorkerPId_t(lkPool, cpy);
if (auto pPool = AuTryLockMemoryType(cpy.pool))
{
return WorkerPId_t(pPool, cpy);
}
else
{
return {};
}
}
//
ThreadPool::ThreadPool() : shutdownEvent_(false, false, true)
{
this->pRWReadView = this->rwlock_->AsReadable();
}
// internal pool interface
@ -136,8 +144,6 @@ namespace Aurora::Async
return;
}
AU_DEBUG_MEMCRUNCH;
if (bIncrement)
{
AuAtomicAdd(&this->uAtomicCounter, 1u);
@ -147,20 +153,11 @@ namespace Aurora::Async
if (target.second == Async::kThreadIdAny)
{
state->BroadCast();
state->SignalAll();
}
else
{
if (AuAtomicLoad(&pWorker->cvSleepCount))
{
// Barrier:
pWorker->cvWorkMutex->Lock();
pWorker->cvWorkMutex->Unlock();
pWorker->cvVariable->Signal();
}
pWorker->SetEvent(false);
pWorker->sync.SetEvent();
}
}
@ -173,7 +170,7 @@ namespace Aurora::Async
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
return GetGroup(group)->workers.size();
}
@ -221,14 +218,12 @@ namespace Aurora::Async
return true;
}
gCurrentPool = AuWeakFromThis();
auto oldTlsHandle = AuExchange(tlsCurrentThreadPool, AuSharedFromThis());
auto auThread = AuThreads::GetThread();
while ((!auThread->Exiting()) &&
((this->shuttingdown_ & 2) != 2) &&
(!pJobRunner->bBreakEarly))
(!pJobRunner->shutdown.bBreakMainLoop))
{
AuUInt32 uCount {};
@ -243,6 +238,8 @@ namespace Aurora::Async
ranOnce = true;
}
tlsCurrentThreadPool = oldTlsHandle;
return ranOnce;
}
@ -254,163 +251,34 @@ namespace Aurora::Async
return false;
}
bool success {};
auto runMode = GetCurrentThreadRunMode();
bool bSuccess {};
EarlyExitTick();
//do
{
auto asyncLoop = state->asyncLoop;
asyncLoop->OnFrame();
if (asyncLoop->GetSourceCount() > 1)
{
bool bShouldTrySleepForKernel = block;
if (!block)
if (block)
{
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
asyncLoop->WaitAny(0);
}
if (bShouldTrySleepForKernel)
else
{
AuAtomicAdd(&state->cvSleepCount, 1u);
if (block)
{
asyncLoop->WaitAny(0);
}
else
{
asyncLoop->PumpNonblocking();
}
AuAtomicSub(&state->cvSleepCount, 1u);
asyncLoop->PumpNonblocking();
}
success = PollInternal(state, false, uCount);
bSuccess = PollInternal(state, false, uCount);
}
else
{
success = PollInternal(state, block, uCount);
bSuccess = PollInternal(state, block, uCount);
}
} //while (success);
}
EarlyExitTick();
return success;
}
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
{
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
AU_LOCK_GUARD(this->mutex);
this->sortedWork[prio].push_back(entry);
if (entry.first != kThreadIdAny)
{
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
{
AuAtomicAdd(&pThat->cvHasWork, 1u);
}
}
}
bool GroupWorkQueue::IsEmpty()
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
if (this->sortedWork[i].size())
{
return false;
}
}
return true;
}
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
{
#if 1
auto pHandle = pPool->GetThreadHandle(id);
if (!pHandle)
{
return false;
}
return !AuAtomicLoad(&pHandle->cvHasWork);
#else
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
for (const auto &[srcId, pA] : this->sortedWork[i])
{
if (id == srcId)
{
return false;
}
}
}
return true;
#endif
}
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
{
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
{
if (itr->first == Async::kThreadIdAny)
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
if ((itr->first != Async::kThreadIdAny) &&
(itr->first == id))
{
queue.push_back(*itr);
itr = group.erase(itr);
continue;
}
itr++;
}
if (queue.size())
{
break;
}
}
}
void ThreadPool::DoThing(ThreadState *pState)
{
auto uState = pState->cvHasWork;
auto uMin = AuMin(uState, pState->pendingWorkItems.size());
if (!uMin) uMin = 1;
while (uState &&
AuAtomicCompareExchange<AuUInt32>(&pState->cvHasWork, uState - uMin, uState) != uState)
{
uState = pState->cvHasWork;
if (uState < uMin)
{
uMin = uState;
}
}
return bSuccess;
}
bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount)
@ -423,17 +291,14 @@ namespace Aurora::Async
auto group = state->parent.lock();
//state->pendingWorkItems.clear();
{
AuAtomicAdd(&state->cvSleepCount, 1u);
AU_LOCK_GUARD(state->cvWorkMutex);
AU_LOCK_GUARD(state->sync.cvWorkMutex);
do
{
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
this->DoThing(state.get());
state->sync.UpdateCVState(state.get());
// Consider blocking for more work
if (!block)
@ -442,12 +307,11 @@ namespace Aurora::Async
}
// pre-wakeup thread terminating check
if (state->threadObject->Exiting())
if (state->thread.pThread->Exiting())
{
break;
}
// Block if no work items are present
if (state->pendingWorkItems.empty())
{
@ -456,7 +320,7 @@ namespace Aurora::Async
break;
}
state->cvVariable->WaitForSignal();
state->sync.cvVariable->WaitForSignal();
if (this->shuttingdown_ & 2)
{
@ -465,7 +329,7 @@ namespace Aurora::Async
}
// Post-wakeup thread terminating check
if (state->threadObject->Exiting())
if (state->thread.pThread->Exiting())
{
break;
}
@ -474,7 +338,6 @@ namespace Aurora::Async
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
{
AuAtomicSub(&state->cvSleepCount, 1u);
return false;
}
@ -490,12 +353,10 @@ namespace Aurora::Async
if (group->workQueue.IsEmpty(this, state->id))
{
state->eventLs->Reset(); // ...until we're done
AuAtomicStore(&state->cvLSActive, 0u);
state->sync.eventLs->Reset(); // ...until we're done
AuAtomicStore(&state->sync.cvLSActive, 0u);
}
}
AuAtomicSub(&state->cvSleepCount, 1u);
}
if (state->pendingWorkItems.empty())
@ -513,24 +374,16 @@ namespace Aurora::Async
}
int runningTasks {};
auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis());
bool lowPrioCont {};
bool lowPrioContCached {};
state->cookie++;
int start = state->cookie;
auto uStartCookie = state->stackState.uStackCookie++;
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the first task (or deeper)
if (InRunnerMode() && tlsCallStack) // are we one call deep?
if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep?
{
auto queue = ToKernelWorkQueue();
if ((this->uAtomicCounter == tlsCallStack) &&
if ((this->uAtomicCounter == state->stackState.uStackCallDepth) &&
this->IsDepleted())
{
return false;
@ -538,25 +391,20 @@ namespace Aurora::Async
}
//
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
if (state->threadObject->Exiting())
if (state->thread.pThread->Exiting())
{
break;
}
// Set the last frame time for a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS();
// Dispatch
auto oops = itr->second;
// Remove from our local job queue
itr = state->pendingWorkItems.erase(itr);
tlsCallStack++;
state->stackState.uStackCallDepth++;
//SysBenchmark(fmt::format("RunAsync: {}", block));
// Dispatch
@ -566,21 +414,19 @@ namespace Aurora::Async
// Atomically decrement global task counter
runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u);
tlsCallStack--;
state->stackState.uStackCallDepth--;
if (start != state->cookie)
if (uStartCookie != state->stackState.uStackCookie)
{
start = state->cookie;
uStartCookie = state->stackState.uStackCookie;
itr = state->pendingWorkItems.begin();
}
}
gCurrentPool = oldTlsHandle;
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
AU_LOCK_GUARD(state->cvWorkMutex);
AU_LOCK_GUARD(state->sync.cvWorkMutex);
for (const auto &item : state->pendingWorkItems)
{
@ -588,8 +434,8 @@ namespace Aurora::Async
}
state->pendingWorkItems.clear();
state->cvVariable->Broadcast();
state->eventLs->Set();
state->sync.cvVariable->Broadcast();
state->sync.eventLs->Set();
}
// Account for
@ -631,29 +477,25 @@ namespace Aurora::Async
// wait for regular prio work to complete
{
for (auto pGroup : this->threadGroups_)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
for (auto pGroup : this->threadGroups_)
if (!pGroup)
{
if (!pGroup)
continue;
}
AU_LOCK_GUARD(pGroup->workersMutex);
for (auto &[id, worker] : pGroup->workers)
{
if (trySelfPid == worker->id)
{
continue;
}
for (auto &[id, worker] : pGroup->workers)
{
if (trySelfPid == worker->id)
{
continue;
}
toBarrier.push_back(worker->id);
}
toBarrier.push_back(worker->id);
}
}
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
@ -689,7 +531,7 @@ namespace Aurora::Async
AuList<AuThreads::ThreadShared_t> threads;
AuList<AuSPtr<ThreadState>> states;
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
for (auto pGroup : this->threadGroups_)
{
@ -701,7 +543,7 @@ namespace Aurora::Async
for (auto &[id, pState] : pGroup->workers)
{
// main loop:
if (pState && pState->cvWorkMutex && pState->cvVariable)
if (pState)
{
states.push_back(pState);
pState->shuttingdown = true;
@ -712,10 +554,10 @@ namespace Aurora::Async
}
// thread object:
if (pState->bCreate)
if (pState->thread.bOwnsThread)
{
pState->threadObject->SendExitSignal();
threads.push_back(pState->threadObject);
pState->thread.pThread->SendExitSignal();
threads.push_back(pState->thread.pThread);
}
// unrefreeze signals:
@ -728,13 +570,10 @@ namespace Aurora::Async
}
}
// Break all condvar loops, just in case
for (const auto &pState : states)
{
for (const auto &pState : states)
{
AU_LOCK_GUARD(pState->cvWorkMutex);
pState->cvVariable->Broadcast();
pState->eventLs->Set();
}
pState->sync.SetEvent();
}
// Final sync to exit
@ -764,11 +603,13 @@ namespace Aurora::Async
// Is dead flag
this->shutdownEvent_->Set();
//
if (pLocalRunner)
{
pLocalRunner->bIsKiller = true;
pLocalRunner->shutdown.bIsKillerThread = true;
}
// Notify observing threads of our work exhaustion
for (const auto &wOther : this->listWeakDepsParents_)
{
if (auto pThat = AuTryLockMemoryType(wOther))
@ -854,12 +695,12 @@ namespace Aurora::Async
return {};
}
return pState->threadObject;
return pState->thread.pThread;
}
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
{
AU_LOCK_GUARD(rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
@ -892,7 +733,7 @@ namespace Aurora::Async
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
auto group = GetGroup(workerId.first);
auto currentWorkerId = GetCurrentThread().second;
@ -917,7 +758,7 @@ namespace Aurora::Async
void ThreadPool::Signal(WorkerId_t workerId)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
auto group = GetGroup(workerId.first);
if (workerId.second == Async::kThreadIdAny)
@ -935,7 +776,7 @@ namespace Aurora::Async
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
auto a = GetThreadHandle(workerId);
if (!a)
@ -943,12 +784,12 @@ namespace Aurora::Async
return {};
}
return a->asyncLoopSourceShared;
return a->sync.eventLs;
}
void ThreadPool::SyncAllSafe()
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
for (auto pGroup : this->threadGroups_)
{
@ -1008,46 +849,6 @@ namespace Aurora::Async
return worker->asyncLoop;
}
void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
{
auto states = this->GetThreadHandles(workerId);
if (!states.size())
{
SysPushErrorGeneric("Couldn't find requested worker");
return;
}
for (const auto &state : states)
{
state->runMode = mode.mode;
if (mode.freqMsTick)
{
state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000);
}
}
}
ERunMode ThreadPool::GetCurrentThreadRunMode()
{
auto state = this->GetThreadState();
if (!state)
{
return ERunMode::eEfficient;
}
return state->runMode;
}
ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId)
{
auto worker = this->GetThreadHandle(workerId);
if (!worker)
{
SysPushErrorGeneric("Couldn't find requested worker");
return {};
}
return worker->runMode;
}
bool ThreadPool::IsSelfDepleted()
{
auto queue = ToKernelWorkQueue();
@ -1152,7 +953,7 @@ namespace Aurora::Async
{
if (auto pState = this->GetThreadHandle(workerId))
{
AuAtomicAdd(&pState->uShutdownFence, 1u);
AuAtomicAdd(&pState->shutdown.uShutdownFence, 1u);
}
}
@ -1160,7 +961,7 @@ namespace Aurora::Async
{
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{
return (AuUInt64(pState->uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie);
return (AuUInt64(pState->shutdown.uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie);
}
else
{
@ -1185,7 +986,7 @@ namespace Aurora::Async
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{
return uThreadCookie != pState->uShutdownFence;
return uThreadCookie != pState->shutdown.uShutdownFence;
}
else
{
@ -1202,7 +1003,7 @@ namespace Aurora::Async
if (create)
{
gCurrentPool = AuSharedFromThis();
tlsCurrentThreadPool = AuSharedFromThis();
}
AuSPtr<GroupState> pGroup;
@ -1234,73 +1035,34 @@ namespace Aurora::Async
}
}
auto threadState = AuMakeShared<ThreadState>();
if (!threadState)
auto pThreadState = pGroup->CreateWorker(workerId, create);
if (!pThreadState)
{
SysPushErrorMemory();
return {};
}
threadState->parent = pGroup;
threadState->id = workerId;
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
return {};
}
threadState->eventLs = AuLoop::NewLSAsync();
if (!threadState->eventLs)
{
SysPushErrorMemory();
return {};
}
threadState->asyncLoopSourceShared = threadState->eventLs;
threadState->asyncLoop->pParent = threadState.get();
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
threadState->runMode = ERunMode::eEfficient;
if (!threadState->asyncLoop)
{
SysPushErrorMemory();
return {};
}
if (!threadState->asyncLoop->Init())
{
SysPushErrorNested();
return {};
}
threadState->asyncLoop->SourceAdd(threadState->eventLs);
threadState->bCreate = create;
if (!create)
{
threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)),
pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo(
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, workerId)),
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
gRuntimeConfig.async.threadPoolDefaultStackSize
));
if (!threadState->threadObject)
if (!pThreadState->thread.pThread)
{
return {};
}
threadState->threadObject->Run();
pThreadState->thread.pThread->Run();
}
else
{
threadState->threadObject = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
pThreadState->thread.pThread = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
// TODO: this is just a hack
// we should implement this properly
threadState->threadObject->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
pThreadState->thread.pThread->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
{
}, []() -> void
@ -1312,13 +1074,11 @@ namespace Aurora::Async
}
}));
//
gCurrentPool = AuWeakFromThis();
tlsCurrentThreadPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
}
pGroup->AddWorker(workerId.second, threadState);
pGroup->AddWorker(workerId.second, pThreadState);
return true;
}
@ -1377,7 +1137,11 @@ namespace Aurora::Async
void ThreadPool::Entrypoint(WorkerId_t id)
{
gCurrentPool = AuWeakFromThis();
{
AU_LOCK_GUARD(this->pRWReadView);
}
tlsCurrentThreadPool = AuWeakFromThis();
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
auto job = GetThreadState();
@ -1386,7 +1150,7 @@ namespace Aurora::Async
if (id != WorkerId_t {0, 0})
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
if (!this->shuttingdown_ && !job->rejecting)
{
@ -1418,7 +1182,7 @@ namespace Aurora::Async
return;
}
state->BroadCast();
state->SignalAll();
{
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
@ -1457,11 +1221,9 @@ namespace Aurora::Async
jobWorker->isDeadEvent->Set();
jobWorker->bAlreadyDoingExitTick = false;
jobWorker->bBreakEarly = true;
jobWorker->shutdown.bBreakMainLoop = true;
}
}
void ThreadPool::ThisExiting()
@ -1473,7 +1235,7 @@ namespace Aurora::Async
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
pLocalState->isDeadEvent->Set();
@ -1530,7 +1292,7 @@ namespace Aurora::Async
AuSPtr<ThreadState> ThreadPool::GetThreadState()
{
auto thread = gCurrentPool.lock();
auto thread = tlsCurrentThreadPool.lock();
if (!thread)
{
return {};
@ -1545,7 +1307,6 @@ namespace Aurora::Async
#endif
auto worker = *tlsWorkerId;
auto state = GetGroup(worker.first);
if (!state)
{
@ -1557,7 +1318,7 @@ namespace Aurora::Async
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
{
auto thread = gCurrentPool.lock();
auto thread = tlsCurrentThreadPool.lock();
if (!thread)
{
return {};
@ -1592,7 +1353,7 @@ namespace Aurora::Async
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
{
AU_LOCK_GUARD(this->rwlock_->AsReadable());
AU_LOCK_GUARD(this->pRWReadView);
auto group = GetGroup(id.first);
if (!group)
@ -1626,34 +1387,4 @@ namespace Aurora::Async
return AuMakeShared<ThreadPool>();
}
struct KeepGroupAlive
{
KeepGroupAlive(AuSPtr<AuAsync::IThreadPool> pPool) : pPool(AuStaticCast<AuAsync::ThreadPool>(pPool))
{
AuAtomicAdd(&this->pPool->uAtomicCounter, 1u);
}
~KeepGroupAlive()
{
auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u);
if (uNow == 0)
{
for (const auto &pState : this->pPool->threadGroups_)
{
if (pState)
{
pState->BroadCast();
}
}
}
}
AuSPtr<AuAsync::ThreadPool> pPool;
};
AUKN_SYM AuSPtr<void> KeepThreadPoolAlive(AuSPtr<AuAsync::IThreadPool> pPool)
{
return AuMakeSharedThrow<KeepGroupAlive>(pPool);
}
}

View File

@ -7,6 +7,8 @@
***/
#pragma once
#include "IThreadPoolInternal.hpp"
namespace Aurora::Async
{
struct GroupState;
@ -15,21 +17,10 @@ namespace Aurora::Async
//class WorkItem;
struct IThreadPoolInternal
{
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
virtual IThreadPool *ToThreadPool() = 0;
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
AuUInt32 uAtomicShutdownCookie {};
};
struct ThreadPool : IThreadPool, IThreadPoolInternal, AuEnableSharedFromThis<ThreadPool>
struct ThreadPool :
IThreadPool,
IThreadPoolInternal,
AuEnableSharedFromThis<ThreadPool>
{
ThreadPool();
@ -81,9 +72,6 @@ namespace Aurora::Async
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override;
virtual ERunMode GetCurrentThreadRunMode() override;
virtual ERunMode GetThreadRunMode(WorkerId_t workerId) override;
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
@ -105,7 +93,6 @@ namespace Aurora::Async
bool InternalRunOne(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
bool PollInternal(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
void DoThing(ThreadState *pState);
size_t GetThreadWorkersCount(ThreadGroup_t group);
@ -152,7 +139,10 @@ namespace Aurora::Async
AuSPtr<GroupState> threadGroups_[255];
AuUInt32 shuttingdown_ {};
bool shutdown {};
AuThreadPrimitives::RWRenterableLock rwlock_;
AuThreading::IWaitable *pRWReadView {};
AuThreadPrimitives::Event shutdownEvent_;
bool runnersRunning_ {};
AuList<AuWPtr<ThreadPool>> listWeakDeps_;

View File

@ -8,11 +8,11 @@
#pragma once
#include "AsyncRunnable.hpp"
#include "AuThreadState.hpp"
#include "AuGroupWorkQueue.hpp"
namespace Aurora::Async
{
using WorkEntry_t = AuPair<ThreadId_t, AuSPtr<IAsyncRunnable>>;
// TODO: this is a hack because i havent implemented an epoll abstraction yet
struct AsyncAppWaitSourceRequest
{
@ -26,10 +26,9 @@ namespace Aurora::Async
struct GroupState;
struct AsyncLoop;
struct ThreadState
struct ThreadState : ThreadStateBase
{
ThreadState() : running(true, false, true),
cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer())),
isDeadEvent(false, false, true)
{
@ -38,8 +37,6 @@ namespace Aurora::Async
// :vomit:
WorkerId_t id;
AuUInt8 multipopCount = 32;
AuUInt32 lastFrameTime {};
AuThreads::ThreadShared_t threadObject;
AuWPtr<GroupState> parent;
AuThreadPrimitives::Semaphore syncSema;
AuThreadPrimitives::Event isDeadEvent;
@ -53,14 +50,8 @@ namespace Aurora::Async
//bool running;
AuList<AsyncAppWaitSourceRequest> loopSources;
AuList<WorkEntry_t> pendingWorkItems;
AuSPtr<AsyncLoop> asyncLoop;
Utility::RateLimiter rateLimiter;
ERunMode runMode;
int cookie {0};
bool bAlreadyDoingExitTick {};
bool bBreakEarly {};
bool bIsKiller {};
bool bCreate {};
//
AuThreadPrimitives::SpinLock externalFencesLock;
@ -68,31 +59,8 @@ namespace Aurora::Async
//
AuThreadPrimitives::ConditionMutex cvWorkMutex;
AuThreadPrimitives::ConditionVariable cvVariable;
AuAUInt32 cvSleepCount {};
AuAUInt32 cvLSActive {};
AuAUInt32 cvHasWork {};
AuSPtr<AuLoop::ILSEvent> eventLs;
AuSPtr<AuLoop::ILoopSource> asyncLoopSourceShared;
inline void SetEvent(bool bBoth = true)
{
if (auto pEvent = this->eventLs)
{
if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0)
{
pEvent->Set();
}
}
if (bBoth)
{
this->cvVariable->Signal();
}
}
//
AuUInt32 uShutdownFence { 1 };
};
}

View File

@ -54,13 +54,10 @@ namespace Aurora::Async
void AsyncLoop::Schedule()
{
if (AuThreads::GetThread() != this->pParent->threadObject.get())
if (AuThreads::GetThread() != this->pParent->thread.pThread.get())
{
AuAtomicAdd(&this->commitPending_, 1u);
{
AU_LOCK_GUARD(this->pParent->parent.lock()->workQueue.mutex);
this->pParent->SetEvent();
}
this->pParent->sync.SetEvent();
}
else
{

View File

@ -14,7 +14,7 @@ namespace Aurora::Async
{
struct AsyncLoop : AuLoop::LoopQueue
{
ThreadState *pParent;
ThreadStateBase *pParent;
void OnFrame();
virtual bool AddCallback (const AuSPtr<AuLoop::ILoopSource> &source, const AuSPtr<AuLoop::ILoopSourceSubscriber> &subscriber) override;

View File

@ -9,7 +9,7 @@
#include "Async.hpp"
#include "WorkItem.hpp"
#include "AsyncApp.hpp"
#include "Schedular.hpp"
#include "AuSchedular.hpp"
#if defined(AURORA_COMPILER_CLANG)
// warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch
@ -37,7 +37,7 @@ namespace Aurora::Async
if (auto pWorker = this->GetState())
{
this->optOtherCookie = pWorker->uShutdownFence;
this->optOtherCookie = pWorker->shutdown.uShutdownFence;
}
}
@ -288,7 +288,7 @@ namespace Aurora::Async
{
if (auto pWorker = this->GetState())
{
if (this->optOtherCookie.value() != pWorker->uShutdownFence)
if (this->optOtherCookie.value() != pWorker->shutdown.uShutdownFence)
{
this->Fail();
return false;