[+] IThreadPool::IncrementAbortFenceOnPool

[+] IThreadPool::IncrementAbortFenceOnWorker
[+] IThreadPool::QueryAbortFence
[+] IThreadPool::QueryShouldAbort
This commit is contained in:
Reece Wilson 2023-10-24 15:33:30 +01:00
parent 5565189d2c
commit ceb67798f1
9 changed files with 204 additions and 8 deletions

View File

@ -134,5 +134,29 @@ namespace Aurora::Async
* @param pPool
*/
virtual void AddDependency(AuSPtr<IThreadPool> pPool) = 0;
/**
* @brief
*/
virtual void IncrementAbortFenceOnPool() = 0;
/**
* @brief
* @param workerId
*/
virtual void IncrementAbortFenceOnWorker(WorkerId_t workerId) = 0;
/**
* @brief
* @return
*/
virtual AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) = 0;
/**
* @brief
* @param uFenceMagic
* @return
*/
virtual bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) = 0;
};
}

View File

@ -78,6 +78,26 @@ namespace Aurora::Async
ThreadPool::Shutdown();
}
void AsyncApp::IncrementAbortFenceOnPool()
{
ThreadPool::IncrementAbortFenceOnPool();
}
void AsyncApp::IncrementAbortFenceOnWorker(WorkerId_t workerId)
{
ThreadPool::IncrementAbortFenceOnWorker(workerId);
}
AuUInt64 AsyncApp::QueryAbortFence(AuOptional<WorkerId_t> optWorkerId)
{
return ThreadPool::QueryAbortFence(optWorkerId);
}
bool AsyncApp::QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic)
{
return ThreadPool::QueryShouldAbort(optWorkerId, uFenceMagic);
}
AUKN_SYM IAsyncApp *GetAsyncApp()
{
return gAsyncAppMem.get();

View File

@ -57,6 +57,12 @@ namespace Aurora::Async
AuSPtr<Aurora::Threading::IWaitable> GetShutdownEvent() override;
void AddDependency(AuSPtr<IThreadPool> pPool) override;
void IncrementAbortFenceOnPool() override;
void IncrementAbortFenceOnWorker(WorkerId_t workerId) override;
AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) override;
bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) override;
void CleanUpWorker(WorkerId_t wid) override;
void CleanWorkerPoolReservedZeroFree() override;

View File

@ -19,6 +19,11 @@ namespace Aurora::Async
virtual void RunAsync() = 0;
virtual void CancelAsync() {}
inline virtual AuOptional<AuPair<AuUInt32, AuUInt32>> QueryFences()
{
return {};
}
};
struct AsyncFuncRunnable : IAsyncRunnable

View File

@ -704,6 +704,18 @@ namespace Aurora::Async
}
}
{
for (const auto &id : toBarrier)
{
if (trySelfPid == id)
{
continue;
}
AuAtomicAdd(&this->uAtomicShutdownCookie, 1u);
}
}
// Time for fuckiness
// Specify the root-level shutdown flag for 'ok, u can work, but you're shutting down after sync barrier'
@ -1172,6 +1184,58 @@ namespace Aurora::Async
#endif
}
//
void ThreadPool::IncrementAbortFenceOnPool()
{
AuAtomicAdd(&this->uAtomicShutdownCookie, 1u);
}
void ThreadPool::IncrementAbortFenceOnWorker(WorkerId_t workerId)
{
if (auto pState = this->GetThreadHandle(workerId))
{
AuAtomicAdd(&pState->uShutdownFence, 1u);
}
}
AuUInt64 ThreadPool::QueryAbortFence(AuOptional<WorkerId_t> optWorkerId)
{
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{
return (AuUInt64(pState->uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie);
}
else
{
return this->uAtomicShutdownCookie;
}
}
bool ThreadPool::QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic)
{
auto uSelfCookie = AuBitsToLower(uFenceMagic);
if (uSelfCookie != AuAtomicLoad(&this->uAtomicShutdownCookie))
{
return true;
}
auto uThreadCookie = AuBitsToHigher(uFenceMagic);
if (!uThreadCookie)
{
return false;
}
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
{
return uThreadCookie != pState->uShutdownFence;
}
else
{
return false;
}
}
// internal api

View File

@ -19,11 +19,13 @@ namespace Aurora::Async
{
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
virtual IThreadPool *ToThreadPool() = 0;
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
AuUInt32 uAtomicShutdownCookie {};
};
@ -86,6 +88,12 @@ namespace Aurora::Async
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
virtual void IncrementAbortFenceOnPool() override;
virtual void IncrementAbortFenceOnWorker(WorkerId_t workerId) override;
virtual AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) override;
virtual bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) override;
bool IsSelfDepleted();
bool IsDepleted();

View File

@ -74,5 +74,8 @@ namespace Aurora::Async
AuAUInt32 cvHasWork {};
AuSPtr<AuLoop::ILSEvent> eventLs;
AuSPtr<AuLoop::ILoopSource> asyncLoopSourceShared;
//
AuUInt32 uShutdownFence { 1 };
};
}

View File

@ -25,7 +25,6 @@ namespace Aurora::Async
WorkItem(owner, worker, {}),
func(func)
{
}
WorkItem::WorkItem(IThreadPoolInternal *owner,
@ -34,6 +33,12 @@ namespace Aurora::Async
worker_(worker), task_(task), owner_(owner),
finishedEvent_(false, true, true)
{
this->uShutdownCookie = owner->uAtomicShutdownCookie;
if (auto pWorker = this->GetState())
{
this->optOtherCookie = pWorker->uShutdownFence;
}
}
WorkItem::~WorkItem()
@ -262,11 +267,45 @@ namespace Aurora::Async
void WorkItem::CancelAsync()
{
AU_TRY_LOCK_GUARD_NAMED(this->lock2, asd);
Fail();
this->Fail();
}
AuOptional<AuPair<AuUInt32, AuUInt32>> WorkItem::QueryFences()
{
return AuPair<AuUInt32, AuUInt32>{ this->uShutdownCookie, this->optOtherCookie.ValueOr(0) };
}
bool WorkItem::CheckAlive()
{
if (this->owner_ &&
this->uShutdownCookie != this->owner_->uAtomicShutdownCookie)
{
this->Fail();
return false;
}
if (this->optOtherCookie)
{
if (auto pWorker = this->GetState())
{
if (this->optOtherCookie.value() != pWorker->uShutdownFence)
{
this->Fail();
return false;
}
}
}
return true;
}
void WorkItem::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
if (!this->CheckAlive())
{
return;
}
if (this->task_)
{
try
@ -277,12 +316,21 @@ namespace Aurora::Async
{
// TODO: runtime config for root level exception caught behaviour
SysPushErrorCatch();
Fail();
this->Fail();
return;
}
}
}
AuSPtr<ThreadState> WorkItem::GetState()
{
if (!this->worker_.HasValue())
{
return {};
}
return this->owner_->GetThreadHandle(this->worker_.value());
}
void WorkItem::RunAsyncLocked2()
{
AU_LOCK_GUARD(this->lock2);
@ -349,7 +397,7 @@ namespace Aurora::Async
if (!WaitForLocked(info.waitFor))
{
Fail();
this->Fail();
}
}
@ -361,7 +409,7 @@ namespace Aurora::Async
}
case ETickType::eFailed:
{
Fail();
this->Fail();
return;
}
}
@ -453,6 +501,13 @@ namespace Aurora::Async
void FuncWorker::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
auto func = AuExchange(this->func, {});
if (!this->CheckAlive())
{
return;
}
if (func)
{
func();

View File

@ -11,6 +11,8 @@
namespace Aurora::Async
{
struct ThreadState;
struct WorkItem :
IWorkItem,
IAsyncRunnable,
@ -49,6 +51,11 @@ namespace Aurora::Async
EWorkPrio GetPrio() override;
void SetPrio(EWorkPrio prio) override;
AuOptional<AuPair<AuUInt32, AuUInt32>> QueryFences();
protected:
bool CheckAlive();
private:
void RunAsyncLocked();
void RunAsyncLocked2();
@ -60,6 +67,9 @@ namespace Aurora::Async
virtual void DispatchTask(IWorkItemHandler::ProcessInfo &info);
IThreadPoolInternal *owner_ {};
AuSPtr<ThreadState> GetState();
AuSPtr<IWorkItemHandler> task_;
AuOptionalEx<WorkerPId_t> worker_;
EWorkPrio prio_ = EWorkPrio::eNormalPrio;
@ -68,13 +78,14 @@ namespace Aurora::Async
AuThreadPrimitives::CriticalSection lock;
AuThreadPrimitives::CriticalSection lock2;
AuThreadPrimitives::Event finishedEvent_;
AuUInt32 uShutdownCookie {};
AuOptionalEx<AuUInt32> optOtherCookie {};
bool finished {};
bool failed {};
bool dispatchPending_ {};
AuUInt64 dispatchTimeNs_ {};
AuUInt64 delayTimeNs_ {};
IThreadPoolInternal *owner_ {};
void Fail();
bool Schedule();