AuroraRuntime/Source/Async/ThreadPool.hpp

169 lines
8.0 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ThreadPool.hpp
Date: 2021-10-30
Author: Reece
***/
#pragma once
#include "IThreadPoolInternal.hpp"
namespace Aurora::Async
{
struct GroupState;
struct ThreadState;
struct IAsyncRunnable;
//class WorkItem;
struct ThreadPool :
IThreadPool,
IThreadPoolInternal,
AuEnableSharedFromThis<ThreadPool>
{
ThreadPool();
// IThreadPoolInternal
bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms);
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement);
IThreadPool *ToThreadPool() override;
// IThreadPool
virtual bool Spawn(WorkerId_t workerId) override;
virtual void SetRunningMode(bool eventRunning) override;
virtual bool Create(WorkerId_t workerId) override;
virtual bool InRunnerMode() override;
virtual bool Poll() override;
virtual bool RunOnce() override;
virtual bool Run() override;
virtual void Shutdown() override;
virtual bool Exiting() override;
virtual AuUInt32 PollAndCount(bool bStrict = true) override;
virtual AuUInt32 RunAllPending() override;
virtual AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task) override;
virtual AuSPtr<IWorkItem> NewWorkFunction(const WorkerId_t &worker, AuVoidFunc callback) override;
virtual AuSPtr<IWorkItem> NewFence() override;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
virtual WorkerId_t GetCurrentThread() override;
virtual AuSPtr<AuIO::IIOProcessor> GetIOProcessor(WorkerId_t id) override;
virtual AuSPtr<AuIO::Net::INetInterface> GetIONetInterface(WorkerId_t pid) override;
virtual AuSPtr<AuIO::Net::INetWorker> GetIONetWorker(WorkerId_t id) override;
virtual AuSPtr<AuIO::CompletionGroup::ICompletionGroup> GetIOGroup(WorkerId_t id) override;
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
virtual void Signal(WorkerId_t workerId) override;
virtual void Wakeup(WorkerId_t workerId) override;
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
virtual void SyncAllSafe() override;
virtual void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override;
virtual void AssertInThreadGroup(ThreadGroup_t group) override;
virtual void AssertWorker(WorkerId_t id) override;
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
virtual void IncrementAbortFenceOnPool() override;
virtual void IncrementAbortFenceOnWorker(WorkerId_t workerId) override;
virtual AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) override;
virtual bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) override;
bool IsSelfDepleted(const AuSPtr<ThreadState> &state);
bool IsDepleted(const AuSPtr<ThreadState> &state);
//virtual bool ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) override;
// Internal API
bool Spawn(WorkerId_t workerId, bool create);
bool InternalRunOne(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
bool PollInternal(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
#if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_)
AuVoidTask PollInternal_ForceCoRoutine(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount, bool &bRet);
#endif
bool PollInternal_Base(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
size_t GetThreadWorkersCount(ThreadGroup_t group);
virtual void CleanUpWorker(WorkerId_t wid) {};
virtual void CleanWorkerPoolReservedZeroFree() {}; // calls shutdown under async apps
// Secret old fiber api
bool CtxYield();
int CtxPollPush();
void CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask);
// TLS handle
struct WorkerWPId_t : WorkerId_t
{
WorkerWPId_t()
{}
WorkerWPId_t(const WorkerPId_t &ref) :
WorkerId_t(ref.first, ref.second),
pool(ref.GetPool())
{}
AuWPtr<IThreadPool> pool;
};
AuThreads::TLSVariable<WorkerWPId_t> tlsWorkerId;
AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) override;
private:
// TODO: BarrierMultiple
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
protected:
void Entrypoint(WorkerId_t id);
private:
void EarlyExitTick();
void ThisExiting();
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
AuSPtr<ThreadState> GetThreadState();
AuSPtr<ThreadState> GetThreadStateNoWarn();
AuSPtr<ThreadState> GetThreadStateLocal();
AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id);
AuSPtr<GroupState> threadGroups_[255];
AuUInt32 shuttingdown_ {};
bool shutdown {};
AuRWRenterableLock rwlock_;
AuThreading::IWaitable *pRWReadView {};
AuEvent shutdownEvent_;
bool runnersRunning_ {};
AuList<AuWPtr<ThreadPool>> listWeakDeps_;
AuList<AuWPtr<ThreadPool>> listWeakDepsParents_;
// TODO: fallback heap
AuMemory::AllocHeapUnique_t pHeap { AuMemory::AllocHeapUnique(512 * 1024) };
friend struct KeepGroupAlive;
};
}