AuroraRuntime/Source/IO/Async/AuIOThreadPool.cpp
Jamie Reece Wilson fceb937bfd [+] Aurora::IO::Async::SpawnMoreThreads
[+] Aurora::IO::Async::GetSpawnedThreads
2024-03-10 11:34:18 +00:00

182 lines
4.2 KiB
C++

/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOThreadPool.cpp
Date: 2024-2-25
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuIOThreadPool.hpp"
namespace Aurora::IO::Async
{
static AuMutex gMutex;
static AuAsync::WorkerPId_t gDefaultGroup;
static AuSPtr<AuAsync::IThreadPool> gThreadPool;
static AuList<AuWPtr<AuAsync::IThreadPool>> gWorkersRegistered;
static AuUInt gJobRunners = 0;
static bool bOwnsThreadPool { true };
static AuUInt32 GetDefaultThreadCount()
{
AuUInt32 uRet {};
// TODO: gJobRunners = runtime config?
if (!uRet)
{
uRet = AuClamp(AuHWInfo::GetCPUInfo().uThreads / 3, 2, 8);
}
return uRet;
}
AUKN_SYM void UseSpecifiedWorkerGroup(AuAsync::WorkerPId_t worker)
{
AU_LOCK_GUARD(gMutex);
gDefaultGroup = worker;
bOwnsThreadPool = false;
}
AUKN_SYM AuUInt32 GetSpawnedThreads()
{
return bOwnsThreadPool ? gJobRunners : 0;
}
AUKN_SYM AuUInt32 SpawnMoreThreads(AuUInt32 uRequest)
{
AU_LOCK_GUARD(gMutex);
AuUInt32 ret {};
AuUInt32 uStartOffset {};
if (uRequest > 64)
{
return 0;
}
if (!bOwnsThreadPool)
{
return 0;
}
if (!gThreadPool)
{
gThreadPool = AuAsync::NewThreadPool();
if (!gThreadPool)
{
return 0;
}
uRequest += (uStartOffset = GetDefaultThreadCount());
}
{
AuAsync::WorkerPId_t workerId { gThreadPool, 0, AuAsync::kThreadIdAny };
auto uMax = AuClamp<AuUInt>(gJobRunners + uRequest, 0, 255);
for (AU_ITERATE_N_TO_X(i, gJobRunners, uMax))
{
AuAsync::WorkerPId_t copy = workerId;
copy.second = i;
if (!gThreadPool->Spawn(copy))
{
break;
}
ret++;
}
}
gJobRunners += ret;
return ret - uStartOffset;
}
static AuAsync::WorkerPId_t SpawnDefaultGroup()
{
AuAsync::WorkerPId_t ret;
gJobRunners = GetDefaultThreadCount();
gThreadPool = AuAsync::NewThreadPool();
if (!gThreadPool)
{
return {};
}
ret = { gThreadPool, 0, AuAsync::kThreadIdAny };
for (AU_ITERATE_N(i, gJobRunners))
{
AuAsync::WorkerPId_t copy = ret;
copy.second = i;
if (!gThreadPool->Spawn(copy))
{
if (i > 1)
{
return ret;
}
else
{
return {};
}
}
}
return ret;
}
static AuAsync::WorkerPId_t GetAuxWorkerPool()
{
if (gDefaultGroup)
{
return gDefaultGroup;
}
else
{
return gDefaultGroup = SpawnDefaultGroup();
}
}
AuAsync::WorkerPId_t GetAuxWorkerPoolAndRegister()
{
AU_LOCK_GUARD(gMutex);
if (auto worker = GetAuxWorkerPool())
{
if (auto selfWorker = AuAsync::GetCurrentWorkerPId())
{
if (gWorkersRegistered.size() > 10)
{
AuRemoveAllIf(gWorkersRegistered, [](AuWPtr<AuAsync::IThreadPool> wpThat)
{
return wpThat.expired();
});
}
for (const auto &wpThat : gWorkersRegistered)
{
if (auto pThat = AuTryLockMemoryType(wpThat))
{
if (pThat == selfWorker.GetPool())
{
return worker;
}
}
}
gWorkersRegistered.push_back(selfWorker.GetPool());
selfWorker.GetPool()->AddDependency(worker.GetPool());
}
return worker;
}
else
{
SysPanic("No async threadpool");
}
}
}