168 lines
7.8 KiB
C++
168 lines
7.8 KiB
C++
/***
|
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: ThreadPool.hpp
|
|
Date: 2021-10-30
|
|
Author: Reece
|
|
***/
|
|
#pragma once
|
|
|
|
#include "IThreadPoolInternal.hpp"
|
|
|
|
namespace Aurora::Async
|
|
{
|
|
struct GroupState;
|
|
struct ThreadState;
|
|
struct IAsyncRunnable;
|
|
//class WorkItem;
|
|
|
|
|
|
struct ThreadPool :
|
|
IThreadPool,
|
|
IThreadPoolInternal,
|
|
AuEnableSharedFromThis<ThreadPool>
|
|
{
|
|
ThreadPool();
|
|
|
|
// IThreadPoolInternal
|
|
bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
|
|
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms);
|
|
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) override;
|
|
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable, bool bIncrement);
|
|
IThreadPool *ToThreadPool() override;
|
|
|
|
// IThreadPool
|
|
virtual bool Spawn(WorkerId_t workerId) override;
|
|
|
|
virtual void SetRunningMode(bool eventRunning) override;
|
|
|
|
virtual bool Create(WorkerId_t workerId) override;
|
|
|
|
virtual bool InRunnerMode() override;
|
|
|
|
virtual bool Poll() override;
|
|
virtual bool RunOnce() override;
|
|
virtual bool Run() override;
|
|
|
|
virtual void Shutdown() override;
|
|
virtual bool Exiting() override;
|
|
|
|
virtual AuUInt32 PollAndCount(bool bStrict = true) override;
|
|
virtual AuUInt32 RunAllPending() override;
|
|
|
|
virtual AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task) override;
|
|
virtual AuSPtr<IWorkItem> NewWorkFunction(const WorkerId_t &worker, AuVoidFunc callback) override;
|
|
virtual AuSPtr<IWorkItem> NewFence() override;
|
|
|
|
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
|
|
|
|
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
|
|
|
|
virtual WorkerId_t GetCurrentThread() override;
|
|
|
|
virtual AuSPtr<AuIO::IIOProcessor> GetIOProcessor(WorkerId_t id) override;
|
|
virtual AuSPtr<AuIO::Net::INetInterface> GetIONetInterface(WorkerId_t pid) override;
|
|
virtual AuSPtr<AuIO::Net::INetWorker> GetIONetWorker(WorkerId_t id) override;
|
|
|
|
AuSPtr<IO::IIOProcessor> GetSelfIOProcessor();
|
|
AuSPtr<IO::Net::INetInterface> GetSelfIONetInterface();
|
|
AuSPtr<IO::Net::INetWorker> GetSelfIONetWorker();
|
|
|
|
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
|
|
virtual void Signal(WorkerId_t workerId) override;
|
|
virtual void Wakeup(WorkerId_t workerId) override;
|
|
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
|
|
virtual void SyncAllSafe() override;
|
|
|
|
virtual void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override;
|
|
|
|
virtual void AssertInThreadGroup(ThreadGroup_t group) override;
|
|
virtual void AssertWorker(WorkerId_t id) override;
|
|
|
|
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
|
|
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
|
|
|
|
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
|
|
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
|
|
|
|
virtual void IncrementAbortFenceOnPool() override;
|
|
virtual void IncrementAbortFenceOnWorker(WorkerId_t workerId) override;
|
|
|
|
virtual AuUInt64 QueryAbortFence(AuOptional<WorkerId_t> optWorkerId) override;
|
|
virtual bool QueryShouldAbort(AuOptional<WorkerId_t> optWorkerId, AuUInt64 uFenceMagic) override;
|
|
|
|
bool IsSelfDepleted();
|
|
bool IsDepleted();
|
|
|
|
//virtual bool ScheduleLoopSource(const AuSPtr<Loop::ILoopSource> &loopSource, WorkerId_t workerId, AuUInt32 timeout, const AuConsumer<AuSPtr<Loop::ILoopSource>, bool> &callback) override;
|
|
|
|
// Internal API
|
|
|
|
bool Spawn(WorkerId_t workerId, bool create);
|
|
|
|
bool InternalRunOne(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
|
|
bool PollInternal(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
|
|
#if defined(__AUHAS_COROUTINES_CO_AWAIT) && defined(AU_LANG_CPP_20_)
|
|
AuVoidTask PollInternal_ForceCoRoutine(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount, bool &bRet);
|
|
#endif
|
|
bool PollInternal_Base(AuSPtr<ThreadState>, bool block, bool bUntilWork, AuUInt32 &uCount);
|
|
|
|
size_t GetThreadWorkersCount(ThreadGroup_t group);
|
|
|
|
virtual void CleanUpWorker(WorkerId_t wid) {};
|
|
virtual void CleanWorkerPoolReservedZeroFree() {}; // calls shutdown under async apps
|
|
|
|
// Secret old fiber api
|
|
bool CtxYield();
|
|
int CtxPollPush();
|
|
void CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask);
|
|
|
|
// TLS handle
|
|
struct WorkerWPId_t : WorkerId_t
|
|
{
|
|
WorkerWPId_t()
|
|
{}
|
|
|
|
WorkerWPId_t(const WorkerPId_t &ref) :
|
|
WorkerId_t(ref.first, ref.second),
|
|
pool(ref.GetPool())
|
|
{}
|
|
|
|
AuWPtr<IThreadPool> pool;
|
|
};
|
|
|
|
AuThreads::TLSVariable<WorkerWPId_t> tlsWorkerId;
|
|
|
|
AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) override;
|
|
private:
|
|
// TODO: BarrierMultiple
|
|
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
|
|
|
|
protected:
|
|
void Entrypoint(WorkerId_t id);
|
|
|
|
private:
|
|
void EarlyExitTick();
|
|
|
|
void ThisExiting();
|
|
|
|
AuSPtr<GroupState> GetGroup(ThreadGroup_t type);
|
|
AuSPtr<ThreadState> GetThreadState();
|
|
AuSPtr<ThreadState> GetThreadStateNoWarn();
|
|
AuList<AuSPtr<ThreadState>> GetThreadHandles(WorkerId_t id);
|
|
|
|
AuSPtr<GroupState> threadGroups_[255];
|
|
AuUInt32 shuttingdown_ {};
|
|
bool shutdown {};
|
|
|
|
AuRWRenterableLock rwlock_;
|
|
AuThreading::IWaitable *pRWReadView {};
|
|
|
|
AuEvent shutdownEvent_;
|
|
bool runnersRunning_ {};
|
|
AuList<AuWPtr<ThreadPool>> listWeakDeps_;
|
|
AuList<AuWPtr<ThreadPool>> listWeakDepsParents_;
|
|
|
|
friend struct KeepGroupAlive;
|
|
};
|
|
} |