AuroraRuntime/Source/Async/ThreadPool.hpp

155 lines
7.2 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ThreadPool.hpp
Date: 2021-10-30
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct GroupState;
struct ThreadState;
struct IAsyncRunnable;
//class WorkItem;
struct IThreadPoolInternal
{
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
virtual IThreadPool *ToThreadPool() = 0;
AuUInt32 uAtomicCounter {};
AuUInt32 uAtomicIOProcessors {};
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
};
struct ThreadPool : IThreadPool, IThreadPoolInternal, AuEnableSharedFromThis<ThreadPool>
{
ThreadPool();
// IThreadPoolInternal
bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms);
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement);
IThreadPool *ToThreadPool() override;
// IThreadPool
virtual bool Spawn(WorkerId_t workerId) override;
virtual void SetRunningMode(bool eventRunning) override;
virtual bool Create(WorkerId_t workerId) override;
virtual bool InRunnerMode() override;
virtual bool Poll() override;
virtual bool RunOnce() override;
virtual bool Run() override;
virtual void Shutdown() override;
virtual bool Exiting() override;
virtual AuUInt32 PollAndCount(bool bStrict = true) override;
virtual AuUInt32 RunAllPending() override;
virtual AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task) override;
virtual AuSPtr<IWorkItem> NewWorkFunction(const WorkerId_t &worker, AuVoidFunc callback) override;
virtual AuSPtr<IWorkItem> NewFence() override;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
virtual WorkerId_t GetCurrentThread() override;
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
virtual void Signal(WorkerId_t workerId) override;
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
virtual void SyncAllSafe() override;
virtual void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override;
virtual void AssertInThreadGroup(ThreadGroup_t group) override;
virtual void AssertWorker(WorkerId_t id) override;
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override;
virtual ERunMode GetCurrentThreadRunMode() override;
virtual ERunMode GetThreadRunMode(WorkerId_t workerId) override;
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
bool IsSelfDepleted();
bool IsDepleted();
//virtual bool ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) override;
// Internal API
bool Spawn(WorkerId_t workerId, bool create);
bool InternalRunOne(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
bool PollInternal(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
void DoThing(ThreadState *pState);
size_t GetThreadWorkersCount(ThreadGroup_t group);
virtual void CleanUpWorker(WorkerId_t wid) {};
virtual void CleanWorkerPoolReservedZeroFree() {}; // calls shutdown under async apps
// Secret old fiber api
bool CtxYield();
int CtxPollPush();
void CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask);
// TLS handle
struct WorkerWPId_t : WorkerId_t
{
WorkerWPId_t()
{}
WorkerWPId_t(const WorkerPId_t &ref) : WorkerId_t(ref.first, ref.second), pool(ref.pool)
{}
AuWPtr<IThreadPool> pool;
};
AuThreads::TLSVariable<WorkerWPId_t> tlsWorkerId;
AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id);
private:
// TODO: BarrierMultiple
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
protected:
void Entrypoint(WorkerId_t id);
private:
void EarlyExitTick();
void ThisExiting();
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
AuSPtr<ThreadState> GetThreadState();
AuSPtr<ThreadState> GetThreadStateNoWarn();
AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id);
AuSPtr<GroupState> threadGroups_[255];
AuUInt32 shuttingdown_ {};
bool shutdown {};
AuThreadPrimitives::RWRenterableLock rwlock_;
AuThreadPrimitives::Event shutdownEvent_;
bool runnersRunning_ {};
AuList<AuWPtr<ThreadPool>> listWeakDeps_;
AuList<AuWPtr<ThreadPool>> listWeakDepsParents_;
friend struct KeepGroupAlive;
};
}