parent
73c5904d97
commit
e037fc214a
@ -15,19 +15,6 @@ namespace Aurora::IO::Loop
|
|||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
AUE_DEFINE(ERunMode,
|
|
||||||
(
|
|
||||||
eLowLatencyYield, // uses high perf cond var + yield + trylock
|
|
||||||
eLowLatencyFreqKernel, // uses high perf cond var + timeout(freqency)
|
|
||||||
eEfficient // delegates sleep to the kernel once kernel objects are scheduled
|
|
||||||
));
|
|
||||||
|
|
||||||
struct RunMode
|
|
||||||
{
|
|
||||||
ERunMode mode;
|
|
||||||
AuUInt16 freqMsTick;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct IThreadPool
|
struct IThreadPool
|
||||||
{
|
{
|
||||||
// Spawning
|
// Spawning
|
||||||
@ -122,9 +109,6 @@ namespace Aurora::Async
|
|||||||
// AuIO overlapped-IO glue
|
// AuIO overlapped-IO glue
|
||||||
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue() = 0;
|
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue() = 0;
|
||||||
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) = 0;
|
virtual AuSPtr<IO::Loop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) = 0;
|
||||||
virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) = 0;
|
|
||||||
virtual ERunMode GetCurrentThreadRunMode() = 0;
|
|
||||||
virtual ERunMode GetThreadRunMode(WorkerId_t workerId) = 0;
|
|
||||||
|
|
||||||
//
|
//
|
||||||
virtual AuSPtr<Aurora::Threading::IWaitable> GetShutdownEvent() = 0;
|
virtual AuSPtr<Aurora::Threading::IWaitable> GetShutdownEvent() = 0;
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
***/
|
***/
|
||||||
#include <Source/RuntimeInternal.hpp>
|
#include <Source/RuntimeInternal.hpp>
|
||||||
#include "Async.hpp"
|
#include "Async.hpp"
|
||||||
#include "Schedular.hpp"
|
#include "AuSchedular.hpp"
|
||||||
#include "AsyncApp.hpp"
|
#include "AsyncApp.hpp"
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
#include "Async.hpp"
|
#include "Async.hpp"
|
||||||
#include "AsyncApp.hpp"
|
#include "AsyncApp.hpp"
|
||||||
#include "WorkItem.hpp"
|
#include "WorkItem.hpp"
|
||||||
#include "Schedular.hpp"
|
#include "AuSchedular.hpp"
|
||||||
#include <Source/Console/Commands/Commands.hpp>
|
#include <Source/Console/Commands/Commands.hpp>
|
||||||
|
|
||||||
|
|
||||||
@ -237,19 +237,4 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
return ThreadPool::ToKernelWorkQueue(workerId);
|
return ThreadPool::ToKernelWorkQueue(workerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncApp::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
|
|
||||||
{
|
|
||||||
ThreadPool::UpdateWorkMode(workerId, mode);
|
|
||||||
}
|
|
||||||
|
|
||||||
ERunMode AsyncApp::GetCurrentThreadRunMode()
|
|
||||||
{
|
|
||||||
return ThreadPool::GetCurrentThreadRunMode();
|
|
||||||
}
|
|
||||||
|
|
||||||
ERunMode AsyncApp::GetThreadRunMode(WorkerId_t workerId)
|
|
||||||
{
|
|
||||||
return ThreadPool::GetThreadRunMode(workerId);
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -45,9 +45,6 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
|
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
|
||||||
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
|
AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
|
||||||
void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override;
|
|
||||||
ERunMode GetCurrentThreadRunMode() override;
|
|
||||||
ERunMode GetThreadRunMode(WorkerId_t workerId) override;
|
|
||||||
|
|
||||||
// Main thread logic
|
// Main thread logic
|
||||||
void Start() override;
|
void Start() override;
|
||||||
|
14
Source/Async/AuAThreadPoolInternal.cpp
Normal file
14
Source/Async/AuAThreadPoolInternal.cpp
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuAThreadPoolInternal.cpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuAThreadPoolInternal.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
13
Source/Async/AuAThreadPoolInternal.hpp
Normal file
13
Source/Async/AuAThreadPoolInternal.hpp
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuGroupWorkQueue.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
47
Source/Async/AuAsyncKeepGroupAlive.cpp
Normal file
47
Source/Async/AuAsyncKeepGroupAlive.cpp
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuAsyncKeepGroupAlive.cpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuAsyncKeepGroupAlive.hpp"
|
||||||
|
#include "ThreadPool.hpp"
|
||||||
|
#include "ThreadState.hpp"
|
||||||
|
#include "GroupState.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
struct KeepGroupAlive
|
||||||
|
{
|
||||||
|
KeepGroupAlive(AuSPtr<AuAsync::IThreadPool> pPool) :
|
||||||
|
pPool(AuStaticCast<AuAsync::ThreadPool>(pPool))
|
||||||
|
{
|
||||||
|
AuAtomicAdd(&this->pPool->uAtomicCounter, 1u);
|
||||||
|
}
|
||||||
|
|
||||||
|
~KeepGroupAlive()
|
||||||
|
{
|
||||||
|
auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u);
|
||||||
|
|
||||||
|
if (uNow == 0)
|
||||||
|
{
|
||||||
|
for (const auto &pState : this->pPool->threadGroups_)
|
||||||
|
{
|
||||||
|
if (pState)
|
||||||
|
{
|
||||||
|
pState->SignalAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<AuAsync::ThreadPool> pPool;
|
||||||
|
};
|
||||||
|
|
||||||
|
AUKN_SYM AuSPtr<void> KeepThreadPoolAlive(AuSPtr<AuAsync::IThreadPool> pPool)
|
||||||
|
{
|
||||||
|
return AuMakeSharedThrow<KeepGroupAlive>(pPool);
|
||||||
|
}
|
||||||
|
}
|
13
Source/Async/AuAsyncKeepGroupAlive.hpp
Normal file
13
Source/Async/AuAsyncKeepGroupAlive.hpp
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuAsyncKeepGroupAlive.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
14
Source/Async/AuAsyncMicrocounter.cpp
Normal file
14
Source/Async/AuAsyncMicrocounter.cpp
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuAsyncMicrocounter.cpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuAsyncMicrocounter.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
13
Source/Async/AuAsyncMicrocounter.hpp
Normal file
13
Source/Async/AuAsyncMicrocounter.hpp
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuAsyncMicrocounter.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
112
Source/Async/AuGroupWorkQueue.cpp
Normal file
112
Source/Async/AuGroupWorkQueue.cpp
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuGroupWorkQueue.cpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuGroupWorkQueue.hpp"
|
||||||
|
#include "ThreadState.hpp"
|
||||||
|
#include "GroupState.hpp"
|
||||||
|
#include "ThreadPool.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
|
||||||
|
{
|
||||||
|
AU_DEBUG_MEMCRUNCH;
|
||||||
|
|
||||||
|
auto prio = (int)entry.second->GetPrio();
|
||||||
|
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
|
||||||
|
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
this->sortedWork[prio].push_back(entry);
|
||||||
|
|
||||||
|
if (entry.first != kThreadIdAny)
|
||||||
|
{
|
||||||
|
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
|
||||||
|
{
|
||||||
|
AuAtomicAdd(&pThat->sync.cvHasWork, 1u);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool GroupWorkQueue::IsEmpty()
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
|
||||||
|
{
|
||||||
|
if (this->sortedWork[i].size())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
|
||||||
|
{
|
||||||
|
#if 1
|
||||||
|
auto pHandle = pPool->GetThreadHandle(id);
|
||||||
|
if (!pHandle)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return !AuAtomicLoad(&pHandle->sync.cvHasWork);
|
||||||
|
#else
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
|
||||||
|
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
|
||||||
|
{
|
||||||
|
for (const auto &[srcId, pA] : this->sortedWork[i])
|
||||||
|
{
|
||||||
|
if (id == srcId)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
|
||||||
|
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
|
||||||
|
{
|
||||||
|
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
|
||||||
|
|
||||||
|
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
|
||||||
|
{
|
||||||
|
if (itr->first == Async::kThreadIdAny)
|
||||||
|
{
|
||||||
|
queue.push_back(*itr);
|
||||||
|
itr = group.erase(itr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((itr->first != Async::kThreadIdAny) &&
|
||||||
|
(itr->first == id))
|
||||||
|
{
|
||||||
|
queue.push_back(*itr);
|
||||||
|
itr = group.erase(itr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
itr++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (queue.size())
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
31
Source/Async/AuGroupWorkQueue.hpp
Normal file
31
Source/Async/AuGroupWorkQueue.hpp
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuGroupWorkQueue.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
struct IAsyncRunnable;
|
||||||
|
struct ThreadPool;
|
||||||
|
struct ThreadState;
|
||||||
|
|
||||||
|
using WorkEntry_t = AuPair<ThreadId_t, AuSPtr<IAsyncRunnable>>;
|
||||||
|
|
||||||
|
struct GroupWorkQueue
|
||||||
|
{
|
||||||
|
AuThreadPrimitives::Mutex mutex;
|
||||||
|
AuUInt32 uItems {};
|
||||||
|
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
|
||||||
|
|
||||||
|
bool IsEmpty();
|
||||||
|
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
|
||||||
|
|
||||||
|
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
|
||||||
|
|
||||||
|
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
|
||||||
|
};
|
||||||
|
}
|
@ -1,13 +1,13 @@
|
|||||||
/***
|
/***
|
||||||
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
Copyright (C) 2021-2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
File: Schedular.cpp
|
File: AuSchedular.cpp
|
||||||
Date: 2021-6-26
|
Date: 2021-6-26
|
||||||
Author: Reece
|
Author: Reece
|
||||||
***/
|
***/
|
||||||
#include <Source/RuntimeInternal.hpp>
|
#include <Source/RuntimeInternal.hpp>
|
||||||
#include "Async.hpp"
|
#include "Async.hpp"
|
||||||
#include "Schedular.hpp"
|
#include "AuSchedular.hpp"
|
||||||
//#include "AsyncApp.hpp"
|
//#include "AsyncApp.hpp"
|
||||||
#include "ThreadPool.hpp"
|
#include "ThreadPool.hpp"
|
||||||
#include <Console/Commands/Commands.hpp>
|
#include <Console/Commands/Commands.hpp>
|
@ -1,7 +1,7 @@
|
|||||||
/***
|
/***
|
||||||
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
File: Schedular.hpp
|
File: AuSchedular.hpp
|
||||||
Date: 2021-6-26
|
Date: 2021-6-26
|
||||||
Author: Reece
|
Author: Reece
|
||||||
***/
|
***/
|
93
Source/Async/AuThreadState.cpp
Normal file
93
Source/Async/AuThreadState.cpp
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuThreadState.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "Async.hpp"
|
||||||
|
#include "AuThreadState.hpp"
|
||||||
|
#include <Source/IO/Loop/LSAsync.hpp>
|
||||||
|
#include "ThreadWorkerQueueShim.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
ThreadStateSync::ThreadStateSync():
|
||||||
|
cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer()))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ThreadStateSync::Init()
|
||||||
|
{
|
||||||
|
this->eventLs = AuLoop::NewLSAsync();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ThreadStateBase::Init()
|
||||||
|
{
|
||||||
|
if (!(this->asyncLoop = AuMakeShared<AsyncLoop>()))
|
||||||
|
{
|
||||||
|
SysPushErrorMemory();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
this->asyncLoop->pParent = this;
|
||||||
|
|
||||||
|
if (!this->asyncLoop->Init())
|
||||||
|
{
|
||||||
|
SysPushErrorNested();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->sync.Init())
|
||||||
|
{
|
||||||
|
SysPushErrorNested();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->asyncLoop->SourceAdd(this->sync.eventLs))
|
||||||
|
{
|
||||||
|
SysPushErrorNested();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStateSync::SetEvent(bool bBoth)
|
||||||
|
{
|
||||||
|
if (auto pEvent = this->eventLs)
|
||||||
|
{
|
||||||
|
if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0)
|
||||||
|
{
|
||||||
|
pEvent->Set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bBoth)
|
||||||
|
{
|
||||||
|
this->cvWorkMutex->Lock();
|
||||||
|
this->cvWorkMutex->Unlock();
|
||||||
|
|
||||||
|
this->cvVariable->Signal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadStateSync::UpdateCVState(ThreadState *pState)
|
||||||
|
{
|
||||||
|
auto uState = pState->sync.cvHasWork;
|
||||||
|
auto uMin = AuMin(uState, pState->pendingWorkItems.size());
|
||||||
|
if (!uMin) uMin = 1;
|
||||||
|
|
||||||
|
while (uState &&
|
||||||
|
AuAtomicCompareExchange<AuUInt32>(&pState->sync.cvHasWork, uState - uMin, uState) != uState)
|
||||||
|
{
|
||||||
|
uState = pState->sync.cvHasWork;
|
||||||
|
|
||||||
|
if (uState < uMin)
|
||||||
|
{
|
||||||
|
uMin = uState;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
62
Source/Async/AuThreadState.hpp
Normal file
62
Source/Async/AuThreadState.hpp
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuThreadState.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
struct AsyncLoop;
|
||||||
|
struct ThreadState;
|
||||||
|
|
||||||
|
struct ThreadStateShutdown
|
||||||
|
{
|
||||||
|
bool bBreakMainLoop {};
|
||||||
|
bool bIsKillerThread {};
|
||||||
|
AuUInt32 uShutdownFence { 1 };
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ThreadStateSync
|
||||||
|
{
|
||||||
|
ThreadStateSync();
|
||||||
|
|
||||||
|
AuThreadPrimitives::ConditionMutex cvWorkMutex;
|
||||||
|
AuThreadPrimitives::ConditionVariable cvVariable;
|
||||||
|
AuAUInt32 cvLSActive {};
|
||||||
|
AuAUInt32 cvHasWork {};
|
||||||
|
AuSPtr<AuLoop::ILSEvent> eventLs;
|
||||||
|
|
||||||
|
void SetEvent(bool bBoth = true);
|
||||||
|
bool Init();
|
||||||
|
void UpdateCVState(ThreadState *pState);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ThreadStateMeta
|
||||||
|
{
|
||||||
|
bool bOwnsThread {};
|
||||||
|
AuThreads::ThreadShared_t pThread;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ThreadStateStack
|
||||||
|
{
|
||||||
|
AuUInt32 uStackCookie {};
|
||||||
|
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
|
||||||
|
AuUInt32 uStackCallDepth {};
|
||||||
|
AuUInt32 uStackMaxCookie { 5 };
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ThreadStateBase
|
||||||
|
{
|
||||||
|
ThreadStateShutdown shutdown;
|
||||||
|
ThreadStateMeta thread;
|
||||||
|
ThreadStateSync sync;
|
||||||
|
AuSPtr<AsyncLoop> asyncLoop;
|
||||||
|
ThreadStateStack stackState;
|
||||||
|
|
||||||
|
bool Init();
|
||||||
|
};
|
||||||
|
}
|
14
Source/Async/AuThreadStateSingletons.cpp
Normal file
14
Source/Async/AuThreadStateSingletons.cpp
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuThreadStateSingletons.cpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuThreadStateSingletons.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
13
Source/Async/AuThreadStateSingletons.hpp
Normal file
13
Source/Async/AuThreadStateSingletons.hpp
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuThreadStateSingletons.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
@ -64,7 +64,7 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void GroupState::BroadCast()
|
void GroupState::SignalAll()
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->workersMutex);
|
AU_LOCK_GUARD(this->workersMutex);
|
||||||
|
|
||||||
@ -75,12 +75,11 @@ namespace Aurora::Async
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
AuAtomicAdd(&pWorker->cvHasWork, 1u);
|
pWorker->sync.SetEvent();
|
||||||
pWorker->SetEvent();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void GroupState::AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState)
|
bool GroupState::AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->workersMutex);
|
AU_LOCK_GUARD(this->workersMutex);
|
||||||
|
|
||||||
@ -89,6 +88,41 @@ namespace Aurora::Async
|
|||||||
this->wpWorkers[id] = pState;
|
this->wpWorkers[id] = pState;
|
||||||
}
|
}
|
||||||
|
|
||||||
SysAssert(AuTryInsert(this->workers, AuMakePair(id, pState)));
|
if (!AuTryInsert(this->workers, AuMakePair(id, pState)))
|
||||||
|
{
|
||||||
|
this->wpWorkers[id] = {};
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<ThreadState> GroupState::CreateWorker(WorkerId_t workerId, bool bCreate)
|
||||||
|
{
|
||||||
|
if (auto pThreadState = AuMakeShared<ThreadState>())
|
||||||
|
{
|
||||||
|
pThreadState->thread.bOwnsThread = !bCreate;
|
||||||
|
pThreadState->parent = this->SharedFromThis();
|
||||||
|
pThreadState->id = workerId;
|
||||||
|
|
||||||
|
if (!pThreadState->Init())
|
||||||
|
{
|
||||||
|
SysPushErrorNested();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->AddWorker(workerId.second, pThreadState))
|
||||||
|
{
|
||||||
|
SysPushErrorNested();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
return pThreadState;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
SysPushErrorMemory();
|
||||||
|
return {};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -6,47 +6,38 @@
|
|||||||
Author: Reece
|
Author: Reece
|
||||||
***/
|
***/
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "ThreadState.hpp"
|
#include "ThreadState.hpp"
|
||||||
|
#include "AuGroupWorkQueue.hpp"
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
struct ThreadPool;
|
struct ThreadPool;
|
||||||
struct GroupWorkQueue
|
|
||||||
{
|
|
||||||
AuThreadPrimitives::Mutex mutex;
|
struct GroupState :
|
||||||
AuUInt32 uItems {};
|
AuEnableSharedFromThis<GroupState>
|
||||||
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
|
|
||||||
|
|
||||||
bool IsEmpty();
|
|
||||||
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);
|
|
||||||
|
|
||||||
void AddWorkEntry(ThreadState *pState, WorkEntry_t entry);
|
|
||||||
|
|
||||||
void Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t idd);
|
|
||||||
};
|
|
||||||
|
|
||||||
struct GroupState
|
|
||||||
{
|
{
|
||||||
|
// group id
|
||||||
ThreadGroup_t group;
|
ThreadGroup_t group;
|
||||||
|
|
||||||
|
// work items
|
||||||
GroupWorkQueue workQueue;
|
GroupWorkQueue workQueue;
|
||||||
|
|
||||||
|
// tracked workers
|
||||||
AuThreadPrimitives::Mutex workersMutex;
|
AuThreadPrimitives::Mutex workersMutex;
|
||||||
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
|
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
|
||||||
AuWPtr<ThreadState> wpWorkers[32];
|
AuWPtr<ThreadState> wpWorkers[32]; // linear non-locking lookup table
|
||||||
|
//
|
||||||
|
|
||||||
bool Init();
|
bool Init();
|
||||||
|
|
||||||
void BroadCast();
|
void SignalAll();
|
||||||
void Decommit(ThreadId_t id);
|
void Decommit(ThreadId_t id);
|
||||||
|
|
||||||
void AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState);
|
bool AddWorker(ThreadId_t id, AuSPtr<ThreadState> pState);
|
||||||
|
AuSPtr<ThreadState> CreateWorker(WorkerId_t id, bool bCreate);
|
||||||
|
|
||||||
AuSPtr<ThreadState> GetThreadByIndex(ThreadId_t uIndex);
|
AuSPtr<ThreadState> GetThreadByIndex(ThreadId_t uIndex);
|
||||||
|
|
||||||
bool inline IsSysThread()
|
|
||||||
{
|
|
||||||
return group == 0;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
0
Source/Async/IAsyncRunnable.hpp
Normal file
0
Source/Async/IAsyncRunnable.hpp
Normal file
27
Source/Async/IThreadPoolInternal.hpp
Normal file
27
Source/Async/IThreadPoolInternal.hpp
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: IThreadPoolInternal.hpp
|
||||||
|
Date: 2023-11-04
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::Async
|
||||||
|
{
|
||||||
|
struct ThreadState;
|
||||||
|
struct IAsyncRunnable;
|
||||||
|
|
||||||
|
struct IThreadPoolInternal
|
||||||
|
{
|
||||||
|
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
|
||||||
|
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
|
||||||
|
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
|
||||||
|
virtual IThreadPool *ToThreadPool() = 0;
|
||||||
|
|
||||||
|
AuUInt32 uAtomicCounter {};
|
||||||
|
AuUInt32 uAtomicIOProcessors {};
|
||||||
|
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
|
||||||
|
AuUInt32 uAtomicShutdownCookie {};
|
||||||
|
};
|
||||||
|
}
|
@ -10,17 +10,14 @@
|
|||||||
#include "ThreadPool.hpp"
|
#include "ThreadPool.hpp"
|
||||||
#include "AsyncApp.hpp"
|
#include "AsyncApp.hpp"
|
||||||
#include "WorkItem.hpp"
|
#include "WorkItem.hpp"
|
||||||
#include "Schedular.hpp"
|
#include "AuSchedular.hpp"
|
||||||
#include "ThreadWorkerQueueShim.hpp"
|
#include "ThreadWorkerQueueShim.hpp"
|
||||||
#include <Source/IO/Loop/LSAsync.hpp>
|
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
//STATIC_TLS(WorkerId_t, tlsWorkerId);
|
//STATIC_TLS(WorkerId_t, tlsWorkerId);
|
||||||
static thread_local AuWPtr<ThreadPool> gCurrentPool;
|
static thread_local AuWPtr<ThreadPool> tlsCurrentThreadPool;
|
||||||
static const auto kMagicResortThreshold = 15;
|
|
||||||
|
|
||||||
static thread_local int tlsCallStack;
|
|
||||||
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
|
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
|
||||||
{
|
{
|
||||||
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
|
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
|
||||||
@ -33,17 +30,28 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
|
AUKN_SYM WorkerPId_t GetCurrentWorkerPId()
|
||||||
{
|
{
|
||||||
auto lkPool = gCurrentPool.lock();
|
auto lkPool = tlsCurrentThreadPool.lock();
|
||||||
if (!lkPool) return {};
|
if (!lkPool)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
auto cpy = *lkPool->tlsWorkerId;
|
auto cpy = *lkPool->tlsWorkerId;
|
||||||
auto lkPool2 = cpy.pool.lock();
|
if (auto pPool = AuTryLockMemoryType(cpy.pool))
|
||||||
return WorkerPId_t(lkPool, cpy);
|
{
|
||||||
|
return WorkerPId_t(pPool, cpy);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
||||||
ThreadPool::ThreadPool() : shutdownEvent_(false, false, true)
|
ThreadPool::ThreadPool() : shutdownEvent_(false, false, true)
|
||||||
{
|
{
|
||||||
|
this->pRWReadView = this->rwlock_->AsReadable();
|
||||||
}
|
}
|
||||||
|
|
||||||
// internal pool interface
|
// internal pool interface
|
||||||
@ -136,8 +144,6 @@ namespace Aurora::Async
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
AU_DEBUG_MEMCRUNCH;
|
|
||||||
|
|
||||||
if (bIncrement)
|
if (bIncrement)
|
||||||
{
|
{
|
||||||
AuAtomicAdd(&this->uAtomicCounter, 1u);
|
AuAtomicAdd(&this->uAtomicCounter, 1u);
|
||||||
@ -147,20 +153,11 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (target.second == Async::kThreadIdAny)
|
if (target.second == Async::kThreadIdAny)
|
||||||
{
|
{
|
||||||
state->BroadCast();
|
state->SignalAll();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (AuAtomicLoad(&pWorker->cvSleepCount))
|
pWorker->sync.SetEvent();
|
||||||
{
|
|
||||||
// Barrier:
|
|
||||||
pWorker->cvWorkMutex->Lock();
|
|
||||||
pWorker->cvWorkMutex->Unlock();
|
|
||||||
|
|
||||||
pWorker->cvVariable->Signal();
|
|
||||||
}
|
|
||||||
|
|
||||||
pWorker->SetEvent(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,7 +170,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
|
size_t ThreadPool::GetThreadWorkersCount(ThreadGroup_t group)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
return GetGroup(group)->workers.size();
|
return GetGroup(group)->workers.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -221,14 +218,12 @@ namespace Aurora::Async
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
gCurrentPool = AuWeakFromThis();
|
auto oldTlsHandle = AuExchange(tlsCurrentThreadPool, AuSharedFromThis());
|
||||||
|
|
||||||
auto auThread = AuThreads::GetThread();
|
auto auThread = AuThreads::GetThread();
|
||||||
|
|
||||||
|
|
||||||
while ((!auThread->Exiting()) &&
|
while ((!auThread->Exiting()) &&
|
||||||
((this->shuttingdown_ & 2) != 2) &&
|
((this->shuttingdown_ & 2) != 2) &&
|
||||||
(!pJobRunner->bBreakEarly))
|
(!pJobRunner->shutdown.bBreakMainLoop))
|
||||||
{
|
{
|
||||||
AuUInt32 uCount {};
|
AuUInt32 uCount {};
|
||||||
|
|
||||||
@ -243,6 +238,8 @@ namespace Aurora::Async
|
|||||||
ranOnce = true;
|
ranOnce = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tlsCurrentThreadPool = oldTlsHandle;
|
||||||
|
|
||||||
return ranOnce;
|
return ranOnce;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,163 +251,34 @@ namespace Aurora::Async
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool success {};
|
bool bSuccess {};
|
||||||
auto runMode = GetCurrentThreadRunMode();
|
|
||||||
|
|
||||||
EarlyExitTick();
|
EarlyExitTick();
|
||||||
|
|
||||||
//do
|
|
||||||
{
|
{
|
||||||
auto asyncLoop = state->asyncLoop;
|
auto asyncLoop = state->asyncLoop;
|
||||||
|
|
||||||
asyncLoop->OnFrame();
|
asyncLoop->OnFrame();
|
||||||
|
|
||||||
if (asyncLoop->GetSourceCount() > 1)
|
if (asyncLoop->GetSourceCount() > 1)
|
||||||
{
|
{
|
||||||
bool bShouldTrySleepForKernel = block;
|
if (block)
|
||||||
if (!block)
|
|
||||||
{
|
{
|
||||||
bShouldTrySleepForKernel = asyncLoop->IsSignaledPeek();
|
asyncLoop->WaitAny(0);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
if (bShouldTrySleepForKernel)
|
|
||||||
{
|
{
|
||||||
AuAtomicAdd(&state->cvSleepCount, 1u);
|
asyncLoop->PumpNonblocking();
|
||||||
if (block)
|
|
||||||
{
|
|
||||||
asyncLoop->WaitAny(0);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
asyncLoop->PumpNonblocking();
|
|
||||||
}
|
|
||||||
AuAtomicSub(&state->cvSleepCount, 1u);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
success = PollInternal(state, false, uCount);
|
bSuccess = PollInternal(state, false, uCount);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
success = PollInternal(state, block, uCount);
|
bSuccess = PollInternal(state, block, uCount);
|
||||||
}
|
}
|
||||||
} //while (success);
|
}
|
||||||
|
|
||||||
EarlyExitTick();
|
EarlyExitTick();
|
||||||
|
|
||||||
return success;
|
return bSuccess;
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void GroupWorkQueue::AddWorkEntry(ThreadState *pState, WorkEntry_t entry)
|
|
||||||
{
|
|
||||||
auto prio = (int)entry.second->GetPrio();
|
|
||||||
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
|
|
||||||
|
|
||||||
AU_LOCK_GUARD(this->mutex);
|
|
||||||
this->sortedWork[prio].push_back(entry);
|
|
||||||
|
|
||||||
if (entry.first != kThreadIdAny)
|
|
||||||
{
|
|
||||||
if (auto pThat = pState->parent.lock()->GetThreadByIndex(entry.first))
|
|
||||||
{
|
|
||||||
AuAtomicAdd(&pThat->cvHasWork, 1u);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool GroupWorkQueue::IsEmpty()
|
|
||||||
{
|
|
||||||
AU_LOCK_GUARD(this->mutex);
|
|
||||||
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
|
|
||||||
{
|
|
||||||
if (this->sortedWork[i].size())
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool GroupWorkQueue::IsEmpty(ThreadPool *pPool, AuWorkerId_t id)
|
|
||||||
{
|
|
||||||
#if 1
|
|
||||||
auto pHandle = pPool->GetThreadHandle(id);
|
|
||||||
if (!pHandle)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return !AuAtomicLoad(&pHandle->cvHasWork);
|
|
||||||
#else
|
|
||||||
AU_LOCK_GUARD(this->mutex);
|
|
||||||
|
|
||||||
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
|
|
||||||
{
|
|
||||||
for (const auto &[srcId, pA] : this->sortedWork[i])
|
|
||||||
{
|
|
||||||
if (id == srcId)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
void GroupWorkQueue::Dequeue(AuList<WorkEntry_t> &queue, int maxPopCount, AuAsync::ThreadId_t id)
|
|
||||||
{
|
|
||||||
AU_LOCK_GUARD(this->mutex);
|
|
||||||
|
|
||||||
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
|
|
||||||
{
|
|
||||||
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
|
|
||||||
|
|
||||||
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
|
|
||||||
{
|
|
||||||
if (itr->first == Async::kThreadIdAny)
|
|
||||||
{
|
|
||||||
queue.push_back(*itr);
|
|
||||||
itr = group.erase(itr);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((itr->first != Async::kThreadIdAny) &&
|
|
||||||
(itr->first == id))
|
|
||||||
{
|
|
||||||
queue.push_back(*itr);
|
|
||||||
itr = group.erase(itr);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
itr++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queue.size())
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ThreadPool::DoThing(ThreadState *pState)
|
|
||||||
{
|
|
||||||
auto uState = pState->cvHasWork;
|
|
||||||
auto uMin = AuMin(uState, pState->pendingWorkItems.size());
|
|
||||||
if (!uMin) uMin = 1;
|
|
||||||
|
|
||||||
while (uState &&
|
|
||||||
AuAtomicCompareExchange<AuUInt32>(&pState->cvHasWork, uState - uMin, uState) != uState)
|
|
||||||
{
|
|
||||||
uState = pState->cvHasWork;
|
|
||||||
|
|
||||||
if (uState < uMin)
|
|
||||||
{
|
|
||||||
uMin = uState;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount)
|
bool ThreadPool::PollInternal(AuSPtr<ThreadState> state, bool block, AuUInt32 &uCount)
|
||||||
@ -423,17 +291,14 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
auto group = state->parent.lock();
|
auto group = state->parent.lock();
|
||||||
|
|
||||||
//state->pendingWorkItems.clear();
|
|
||||||
|
|
||||||
{
|
{
|
||||||
AuAtomicAdd(&state->cvSleepCount, 1u);
|
AU_LOCK_GUARD(state->sync.cvWorkMutex);
|
||||||
AU_LOCK_GUARD(state->cvWorkMutex);
|
|
||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
|
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
|
||||||
|
|
||||||
this->DoThing(state.get());
|
state->sync.UpdateCVState(state.get());
|
||||||
|
|
||||||
// Consider blocking for more work
|
// Consider blocking for more work
|
||||||
if (!block)
|
if (!block)
|
||||||
@ -442,12 +307,11 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
|
|
||||||
// pre-wakeup thread terminating check
|
// pre-wakeup thread terminating check
|
||||||
if (state->threadObject->Exiting())
|
if (state->thread.pThread->Exiting())
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Block if no work items are present
|
// Block if no work items are present
|
||||||
if (state->pendingWorkItems.empty())
|
if (state->pendingWorkItems.empty())
|
||||||
{
|
{
|
||||||
@ -456,7 +320,7 @@ namespace Aurora::Async
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
state->cvVariable->WaitForSignal();
|
state->sync.cvVariable->WaitForSignal();
|
||||||
|
|
||||||
if (this->shuttingdown_ & 2)
|
if (this->shuttingdown_ & 2)
|
||||||
{
|
{
|
||||||
@ -465,7 +329,7 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Post-wakeup thread terminating check
|
// Post-wakeup thread terminating check
|
||||||
if (state->threadObject->Exiting())
|
if (state->thread.pThread->Exiting())
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -474,7 +338,6 @@ namespace Aurora::Async
|
|||||||
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
|
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
|
||||||
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
|
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
|
||||||
{
|
{
|
||||||
AuAtomicSub(&state->cvSleepCount, 1u);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -490,12 +353,10 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (group->workQueue.IsEmpty(this, state->id))
|
if (group->workQueue.IsEmpty(this, state->id))
|
||||||
{
|
{
|
||||||
state->eventLs->Reset(); // ...until we're done
|
state->sync.eventLs->Reset(); // ...until we're done
|
||||||
AuAtomicStore(&state->cvLSActive, 0u);
|
AuAtomicStore(&state->sync.cvLSActive, 0u);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AuAtomicSub(&state->cvSleepCount, 1u);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state->pendingWorkItems.empty())
|
if (state->pendingWorkItems.empty())
|
||||||
@ -513,24 +374,16 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
|
|
||||||
int runningTasks {};
|
int runningTasks {};
|
||||||
|
auto uStartCookie = state->stackState.uStackCookie++;
|
||||||
auto oldTlsHandle = AuExchange(gCurrentPool, AuSharedFromThis());
|
|
||||||
|
|
||||||
bool lowPrioCont {};
|
|
||||||
bool lowPrioContCached {};
|
|
||||||
|
|
||||||
state->cookie++;
|
|
||||||
|
|
||||||
int start = state->cookie;
|
|
||||||
|
|
||||||
// Account for
|
// Account for
|
||||||
// while (AuAsync.GetCurrentPool()->runForever());
|
// while (AuAsync.GetCurrentPool()->runForever());
|
||||||
// in the first task (or deeper)
|
// in the first task (or deeper)
|
||||||
if (InRunnerMode() && tlsCallStack) // are we one call deep?
|
if (InRunnerMode() && state->stackState.uStackCallDepth) // are we one call deep?
|
||||||
{
|
{
|
||||||
auto queue = ToKernelWorkQueue();
|
auto queue = ToKernelWorkQueue();
|
||||||
|
|
||||||
if ((this->uAtomicCounter == tlsCallStack) &&
|
if ((this->uAtomicCounter == state->stackState.uStackCallDepth) &&
|
||||||
this->IsDepleted())
|
this->IsDepleted())
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
@ -538,25 +391,20 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
||||||
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
|
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
|
||||||
{
|
{
|
||||||
if (state->threadObject->Exiting())
|
if (state->thread.pThread->Exiting())
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the last frame time for a watchdog later down the line
|
|
||||||
state->lastFrameTime = Time::CurrentClockMS();
|
|
||||||
|
|
||||||
|
|
||||||
// Dispatch
|
// Dispatch
|
||||||
auto oops = itr->second;
|
auto oops = itr->second;
|
||||||
|
|
||||||
// Remove from our local job queue
|
// Remove from our local job queue
|
||||||
itr = state->pendingWorkItems.erase(itr);
|
itr = state->pendingWorkItems.erase(itr);
|
||||||
|
|
||||||
tlsCallStack++;
|
state->stackState.uStackCallDepth++;
|
||||||
|
|
||||||
//SysBenchmark(fmt::format("RunAsync: {}", block));
|
//SysBenchmark(fmt::format("RunAsync: {}", block));
|
||||||
// Dispatch
|
// Dispatch
|
||||||
@ -566,21 +414,19 @@ namespace Aurora::Async
|
|||||||
// Atomically decrement global task counter
|
// Atomically decrement global task counter
|
||||||
runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u);
|
runningTasks = AuAtomicSub(&this->uAtomicCounter, 1u);
|
||||||
|
|
||||||
tlsCallStack--;
|
state->stackState.uStackCallDepth--;
|
||||||
|
|
||||||
if (start != state->cookie)
|
if (uStartCookie != state->stackState.uStackCookie)
|
||||||
{
|
{
|
||||||
start = state->cookie;
|
uStartCookie = state->stackState.uStackCookie;
|
||||||
itr = state->pendingWorkItems.begin();
|
itr = state->pendingWorkItems.begin();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
gCurrentPool = oldTlsHandle;
|
|
||||||
|
|
||||||
// Return popped work back to the groups work pool when our -pump loops were preempted
|
// Return popped work back to the groups work pool when our -pump loops were preempted
|
||||||
if (state->pendingWorkItems.size())
|
if (state->pendingWorkItems.size())
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(state->cvWorkMutex);
|
AU_LOCK_GUARD(state->sync.cvWorkMutex);
|
||||||
|
|
||||||
for (const auto &item : state->pendingWorkItems)
|
for (const auto &item : state->pendingWorkItems)
|
||||||
{
|
{
|
||||||
@ -588,8 +434,8 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
|
|
||||||
state->pendingWorkItems.clear();
|
state->pendingWorkItems.clear();
|
||||||
state->cvVariable->Broadcast();
|
state->sync.cvVariable->Broadcast();
|
||||||
state->eventLs->Set();
|
state->sync.eventLs->Set();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Account for
|
// Account for
|
||||||
@ -631,29 +477,25 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
// wait for regular prio work to complete
|
// wait for regular prio work to complete
|
||||||
{
|
{
|
||||||
|
for (auto pGroup : this->threadGroups_)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
if (!pGroup)
|
||||||
|
|
||||||
for (auto pGroup : this->threadGroups_)
|
|
||||||
{
|
{
|
||||||
if (!pGroup)
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
AU_LOCK_GUARD(pGroup->workersMutex);
|
||||||
|
for (auto &[id, worker] : pGroup->workers)
|
||||||
|
{
|
||||||
|
if (trySelfPid == worker->id)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto &[id, worker] : pGroup->workers)
|
toBarrier.push_back(worker->id);
|
||||||
{
|
|
||||||
if (trySelfPid == worker->id)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
toBarrier.push_back(worker->id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
for (const auto &id : toBarrier)
|
for (const auto &id : toBarrier)
|
||||||
{
|
{
|
||||||
if (trySelfPid == id)
|
if (trySelfPid == id)
|
||||||
@ -689,7 +531,7 @@ namespace Aurora::Async
|
|||||||
AuList<AuThreads::ThreadShared_t> threads;
|
AuList<AuThreads::ThreadShared_t> threads;
|
||||||
AuList<AuSPtr<ThreadState>> states;
|
AuList<AuSPtr<ThreadState>> states;
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
for (auto pGroup : this->threadGroups_)
|
for (auto pGroup : this->threadGroups_)
|
||||||
{
|
{
|
||||||
@ -701,7 +543,7 @@ namespace Aurora::Async
|
|||||||
for (auto &[id, pState] : pGroup->workers)
|
for (auto &[id, pState] : pGroup->workers)
|
||||||
{
|
{
|
||||||
// main loop:
|
// main loop:
|
||||||
if (pState && pState->cvWorkMutex && pState->cvVariable)
|
if (pState)
|
||||||
{
|
{
|
||||||
states.push_back(pState);
|
states.push_back(pState);
|
||||||
pState->shuttingdown = true;
|
pState->shuttingdown = true;
|
||||||
@ -712,10 +554,10 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
|
|
||||||
// thread object:
|
// thread object:
|
||||||
if (pState->bCreate)
|
if (pState->thread.bOwnsThread)
|
||||||
{
|
{
|
||||||
pState->threadObject->SendExitSignal();
|
pState->thread.pThread->SendExitSignal();
|
||||||
threads.push_back(pState->threadObject);
|
threads.push_back(pState->thread.pThread);
|
||||||
}
|
}
|
||||||
|
|
||||||
// unrefreeze signals:
|
// unrefreeze signals:
|
||||||
@ -728,13 +570,10 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Break all condvar loops, just in case
|
||||||
|
for (const auto &pState : states)
|
||||||
{
|
{
|
||||||
for (const auto &pState : states)
|
pState->sync.SetEvent();
|
||||||
{
|
|
||||||
AU_LOCK_GUARD(pState->cvWorkMutex);
|
|
||||||
pState->cvVariable->Broadcast();
|
|
||||||
pState->eventLs->Set();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final sync to exit
|
// Final sync to exit
|
||||||
@ -764,11 +603,13 @@ namespace Aurora::Async
|
|||||||
// Is dead flag
|
// Is dead flag
|
||||||
this->shutdownEvent_->Set();
|
this->shutdownEvent_->Set();
|
||||||
|
|
||||||
|
//
|
||||||
if (pLocalRunner)
|
if (pLocalRunner)
|
||||||
{
|
{
|
||||||
pLocalRunner->bIsKiller = true;
|
pLocalRunner->shutdown.bIsKillerThread = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify observing threads of our work exhaustion
|
||||||
for (const auto &wOther : this->listWeakDepsParents_)
|
for (const auto &wOther : this->listWeakDepsParents_)
|
||||||
{
|
{
|
||||||
if (auto pThat = AuTryLockMemoryType(wOther))
|
if (auto pThat = AuTryLockMemoryType(wOther))
|
||||||
@ -854,12 +695,12 @@ namespace Aurora::Async
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
return pState->threadObject;
|
return pState->thread.pThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ThreadPool::GetThreads()
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> ret;
|
||||||
|
|
||||||
@ -892,7 +733,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
|
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
auto group = GetGroup(workerId.first);
|
auto group = GetGroup(workerId.first);
|
||||||
auto currentWorkerId = GetCurrentThread().second;
|
auto currentWorkerId = GetCurrentThread().second;
|
||||||
@ -917,7 +758,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
void ThreadPool::Signal(WorkerId_t workerId)
|
void ThreadPool::Signal(WorkerId_t workerId)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
auto group = GetGroup(workerId.first);
|
auto group = GetGroup(workerId.first);
|
||||||
if (workerId.second == Async::kThreadIdAny)
|
if (workerId.second == Async::kThreadIdAny)
|
||||||
@ -935,7 +776,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
|
AuSPtr<AuLoop::ILoopSource> ThreadPool::WorkerToLoopSource(WorkerId_t workerId)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
auto a = GetThreadHandle(workerId);
|
auto a = GetThreadHandle(workerId);
|
||||||
if (!a)
|
if (!a)
|
||||||
@ -943,12 +784,12 @@ namespace Aurora::Async
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
return a->asyncLoopSourceShared;
|
return a->sync.eventLs;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::SyncAllSafe()
|
void ThreadPool::SyncAllSafe()
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
for (auto pGroup : this->threadGroups_)
|
for (auto pGroup : this->threadGroups_)
|
||||||
{
|
{
|
||||||
@ -1008,46 +849,6 @@ namespace Aurora::Async
|
|||||||
return worker->asyncLoop;
|
return worker->asyncLoop;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::UpdateWorkMode(WorkerId_t workerId, RunMode mode)
|
|
||||||
{
|
|
||||||
auto states = this->GetThreadHandles(workerId);
|
|
||||||
if (!states.size())
|
|
||||||
{
|
|
||||||
SysPushErrorGeneric("Couldn't find requested worker");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const auto &state : states)
|
|
||||||
{
|
|
||||||
state->runMode = mode.mode;
|
|
||||||
if (mode.freqMsTick)
|
|
||||||
{
|
|
||||||
state->rateLimiter.SetNextStep(mode.freqMsTick * 1'000'000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ERunMode ThreadPool::GetCurrentThreadRunMode()
|
|
||||||
{
|
|
||||||
auto state = this->GetThreadState();
|
|
||||||
if (!state)
|
|
||||||
{
|
|
||||||
return ERunMode::eEfficient;
|
|
||||||
}
|
|
||||||
return state->runMode;
|
|
||||||
}
|
|
||||||
|
|
||||||
ERunMode ThreadPool::GetThreadRunMode(WorkerId_t workerId)
|
|
||||||
{
|
|
||||||
auto worker = this->GetThreadHandle(workerId);
|
|
||||||
if (!worker)
|
|
||||||
{
|
|
||||||
SysPushErrorGeneric("Couldn't find requested worker");
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
return worker->runMode;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ThreadPool::IsSelfDepleted()
|
bool ThreadPool::IsSelfDepleted()
|
||||||
{
|
{
|
||||||
auto queue = ToKernelWorkQueue();
|
auto queue = ToKernelWorkQueue();
|
||||||
@ -1152,7 +953,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
if (auto pState = this->GetThreadHandle(workerId))
|
if (auto pState = this->GetThreadHandle(workerId))
|
||||||
{
|
{
|
||||||
AuAtomicAdd(&pState->uShutdownFence, 1u);
|
AuAtomicAdd(&pState->shutdown.uShutdownFence, 1u);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1160,7 +961,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
|
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
|
||||||
{
|
{
|
||||||
return (AuUInt64(pState->uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie);
|
return (AuUInt64(pState->shutdown.uShutdownFence) << 32ull) | AuUInt64(this->uAtomicShutdownCookie);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1185,7 +986,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
|
if (auto pState = this->GetThreadHandle(optWorkerId.value_or(GetCurrentWorkerPId())))
|
||||||
{
|
{
|
||||||
return uThreadCookie != pState->uShutdownFence;
|
return uThreadCookie != pState->shutdown.uShutdownFence;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1202,7 +1003,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (create)
|
if (create)
|
||||||
{
|
{
|
||||||
gCurrentPool = AuSharedFromThis();
|
tlsCurrentThreadPool = AuSharedFromThis();
|
||||||
}
|
}
|
||||||
|
|
||||||
AuSPtr<GroupState> pGroup;
|
AuSPtr<GroupState> pGroup;
|
||||||
@ -1234,73 +1035,34 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto threadState = AuMakeShared<ThreadState>();
|
auto pThreadState = pGroup->CreateWorker(workerId, create);
|
||||||
if (!threadState)
|
if (!pThreadState)
|
||||||
{
|
{
|
||||||
SysPushErrorMemory();
|
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
threadState->parent = pGroup;
|
|
||||||
threadState->id = workerId;
|
|
||||||
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
|
|
||||||
if (!threadState->asyncLoop)
|
|
||||||
{
|
|
||||||
SysPushErrorMemory();
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
threadState->eventLs = AuLoop::NewLSAsync();
|
|
||||||
if (!threadState->eventLs)
|
|
||||||
{
|
|
||||||
SysPushErrorMemory();
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
threadState->asyncLoopSourceShared = threadState->eventLs;
|
|
||||||
|
|
||||||
threadState->asyncLoop->pParent = threadState.get();
|
|
||||||
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
|
|
||||||
threadState->runMode = ERunMode::eEfficient;
|
|
||||||
|
|
||||||
if (!threadState->asyncLoop)
|
|
||||||
{
|
|
||||||
SysPushErrorMemory();
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!threadState->asyncLoop->Init())
|
|
||||||
{
|
|
||||||
SysPushErrorNested();
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
threadState->asyncLoop->SourceAdd(threadState->eventLs);
|
|
||||||
|
|
||||||
threadState->bCreate = create;
|
|
||||||
|
|
||||||
if (!create)
|
if (!create)
|
||||||
{
|
{
|
||||||
threadState->threadObject = AuThreads::ThreadShared(AuThreads::ThreadInfo(
|
pThreadState->thread.pThread= AuThreads::ThreadShared(AuThreads::ThreadInfo(
|
||||||
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, threadState->id)),
|
AuMakeShared<AuThreads::IThreadVectorsFunctional>(AuThreads::IThreadVectorsFunctional::OnEntry_t(std::bind(&ThreadPool::Entrypoint, this, workerId)),
|
||||||
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
|
AuThreads::IThreadVectorsFunctional::OnExit_t{}),
|
||||||
gRuntimeConfig.async.threadPoolDefaultStackSize
|
gRuntimeConfig.async.threadPoolDefaultStackSize
|
||||||
));
|
));
|
||||||
|
|
||||||
if (!threadState->threadObject)
|
if (!pThreadState->thread.pThread)
|
||||||
{
|
{
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
threadState->threadObject->Run();
|
pThreadState->thread.pThread->Run();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
threadState->threadObject = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
|
pThreadState->thread.pThread = AuSPtr<AuThreads::IAuroraThread>(AuThreads::GetThread(), [](AuThreads::IAuroraThread *){});
|
||||||
|
|
||||||
// TODO: this is just a hack
|
// TODO: this is just a hack
|
||||||
// we should implement this properly
|
// we should implement this properly
|
||||||
threadState->threadObject->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
|
pThreadState->thread.pThread->AddLastHopeTlsHook(AuMakeShared<AuThreads::IThreadFeatureFunctional>([]() -> void
|
||||||
{
|
{
|
||||||
|
|
||||||
}, []() -> void
|
}, []() -> void
|
||||||
@ -1312,13 +1074,11 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
//
|
tlsCurrentThreadPool = AuWeakFromThis();
|
||||||
gCurrentPool = AuWeakFromThis();
|
|
||||||
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
|
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), workerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
pGroup->AddWorker(workerId.second, threadState);
|
pGroup->AddWorker(workerId.second, pThreadState);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1377,7 +1137,11 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
void ThreadPool::Entrypoint(WorkerId_t id)
|
void ThreadPool::Entrypoint(WorkerId_t id)
|
||||||
{
|
{
|
||||||
gCurrentPool = AuWeakFromThis();
|
{
|
||||||
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsCurrentThreadPool = AuWeakFromThis();
|
||||||
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
|
tlsWorkerId = WorkerPId_t(AuSharedFromThis(), id);
|
||||||
|
|
||||||
auto job = GetThreadState();
|
auto job = GetThreadState();
|
||||||
@ -1386,7 +1150,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (id != WorkerId_t {0, 0})
|
if (id != WorkerId_t {0, 0})
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
if (!this->shuttingdown_ && !job->rejecting)
|
if (!this->shuttingdown_ && !job->rejecting)
|
||||||
{
|
{
|
||||||
@ -1418,7 +1182,7 @@ namespace Aurora::Async
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
state->BroadCast();
|
state->SignalAll();
|
||||||
|
|
||||||
{
|
{
|
||||||
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
|
if (AuExchange(jobWorker->bAlreadyDoingExitTick, true))
|
||||||
@ -1457,11 +1221,9 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
jobWorker->isDeadEvent->Set();
|
jobWorker->isDeadEvent->Set();
|
||||||
|
|
||||||
|
|
||||||
jobWorker->bAlreadyDoingExitTick = false;
|
jobWorker->bAlreadyDoingExitTick = false;
|
||||||
jobWorker->bBreakEarly = true;
|
jobWorker->shutdown.bBreakMainLoop = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::ThisExiting()
|
void ThreadPool::ThisExiting()
|
||||||
@ -1473,7 +1235,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
pLocalState->isDeadEvent->Set();
|
pLocalState->isDeadEvent->Set();
|
||||||
|
|
||||||
@ -1530,7 +1292,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AuSPtr<ThreadState> ThreadPool::GetThreadState()
|
AuSPtr<ThreadState> ThreadPool::GetThreadState()
|
||||||
{
|
{
|
||||||
auto thread = gCurrentPool.lock();
|
auto thread = tlsCurrentThreadPool.lock();
|
||||||
if (!thread)
|
if (!thread)
|
||||||
{
|
{
|
||||||
return {};
|
return {};
|
||||||
@ -1545,7 +1307,6 @@ namespace Aurora::Async
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
auto worker = *tlsWorkerId;
|
auto worker = *tlsWorkerId;
|
||||||
|
|
||||||
auto state = GetGroup(worker.first);
|
auto state = GetGroup(worker.first);
|
||||||
if (!state)
|
if (!state)
|
||||||
{
|
{
|
||||||
@ -1557,7 +1318,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
|
AuSPtr<ThreadState> ThreadPool::GetThreadStateNoWarn()
|
||||||
{
|
{
|
||||||
auto thread = gCurrentPool.lock();
|
auto thread = tlsCurrentThreadPool.lock();
|
||||||
if (!thread)
|
if (!thread)
|
||||||
{
|
{
|
||||||
return {};
|
return {};
|
||||||
@ -1592,7 +1353,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
|
AuList<AuSPtr<ThreadState>> ThreadPool::GetThreadHandles(WorkerId_t id)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->rwlock_->AsReadable());
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
auto group = GetGroup(id.first);
|
auto group = GetGroup(id.first);
|
||||||
if (!group)
|
if (!group)
|
||||||
@ -1626,34 +1387,4 @@ namespace Aurora::Async
|
|||||||
return AuMakeShared<ThreadPool>();
|
return AuMakeShared<ThreadPool>();
|
||||||
}
|
}
|
||||||
|
|
||||||
struct KeepGroupAlive
|
|
||||||
{
|
|
||||||
KeepGroupAlive(AuSPtr<AuAsync::IThreadPool> pPool) : pPool(AuStaticCast<AuAsync::ThreadPool>(pPool))
|
|
||||||
{
|
|
||||||
AuAtomicAdd(&this->pPool->uAtomicCounter, 1u);
|
|
||||||
}
|
|
||||||
|
|
||||||
~KeepGroupAlive()
|
|
||||||
{
|
|
||||||
auto uNow = AuAtomicSub(&this->pPool->uAtomicCounter, 1u);
|
|
||||||
|
|
||||||
if (uNow == 0)
|
|
||||||
{
|
|
||||||
for (const auto &pState : this->pPool->threadGroups_)
|
|
||||||
{
|
|
||||||
if (pState)
|
|
||||||
{
|
|
||||||
pState->BroadCast();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
AuSPtr<AuAsync::ThreadPool> pPool;
|
|
||||||
};
|
|
||||||
|
|
||||||
AUKN_SYM AuSPtr<void> KeepThreadPoolAlive(AuSPtr<AuAsync::IThreadPool> pPool)
|
|
||||||
{
|
|
||||||
return AuMakeSharedThrow<KeepGroupAlive>(pPool);
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -7,6 +7,8 @@
|
|||||||
***/
|
***/
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "IThreadPoolInternal.hpp"
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
struct GroupState;
|
struct GroupState;
|
||||||
@ -15,21 +17,10 @@ namespace Aurora::Async
|
|||||||
//class WorkItem;
|
//class WorkItem;
|
||||||
|
|
||||||
|
|
||||||
struct IThreadPoolInternal
|
struct ThreadPool :
|
||||||
{
|
IThreadPool,
|
||||||
virtual bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) = 0;
|
IThreadPoolInternal,
|
||||||
virtual void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) = 0;
|
AuEnableSharedFromThis<ThreadPool>
|
||||||
virtual AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id) = 0;
|
|
||||||
virtual IThreadPool *ToThreadPool() = 0;
|
|
||||||
|
|
||||||
AuUInt32 uAtomicCounter {};
|
|
||||||
AuUInt32 uAtomicIOProcessors {};
|
|
||||||
AuUInt32 uAtomicIOProcessorsWorthlessSources {};
|
|
||||||
AuUInt32 uAtomicShutdownCookie {};
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct ThreadPool : IThreadPool, IThreadPoolInternal, AuEnableSharedFromThis<ThreadPool>
|
|
||||||
{
|
{
|
||||||
ThreadPool();
|
ThreadPool();
|
||||||
|
|
||||||
@ -81,9 +72,6 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
|
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue() override;
|
||||||
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
|
virtual AuSPtr<AuLoop::ILoopQueue> ToKernelWorkQueue(WorkerId_t workerId) override;
|
||||||
virtual void UpdateWorkMode(WorkerId_t workerId, RunMode mode) override;
|
|
||||||
virtual ERunMode GetCurrentThreadRunMode() override;
|
|
||||||
virtual ERunMode GetThreadRunMode(WorkerId_t workerId) override;
|
|
||||||
|
|
||||||
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
|
virtual AuSPtr<AuThreading::IWaitable> GetShutdownEvent() override;
|
||||||
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
|
virtual void AddDependency(AuSPtr<IThreadPool> pPool) override;
|
||||||
@ -105,7 +93,6 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
bool InternalRunOne(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
|
bool InternalRunOne(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
|
||||||
bool PollInternal(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
|
bool PollInternal(AuSPtr<ThreadState>, bool block, AuUInt32 &uCount);
|
||||||
void DoThing(ThreadState *pState);
|
|
||||||
|
|
||||||
size_t GetThreadWorkersCount(ThreadGroup_t group);
|
size_t GetThreadWorkersCount(ThreadGroup_t group);
|
||||||
|
|
||||||
@ -152,7 +139,10 @@ namespace Aurora::Async
|
|||||||
AuSPtr<GroupState> threadGroups_[255];
|
AuSPtr<GroupState> threadGroups_[255];
|
||||||
AuUInt32 shuttingdown_ {};
|
AuUInt32 shuttingdown_ {};
|
||||||
bool shutdown {};
|
bool shutdown {};
|
||||||
|
|
||||||
AuThreadPrimitives::RWRenterableLock rwlock_;
|
AuThreadPrimitives::RWRenterableLock rwlock_;
|
||||||
|
AuThreading::IWaitable *pRWReadView {};
|
||||||
|
|
||||||
AuThreadPrimitives::Event shutdownEvent_;
|
AuThreadPrimitives::Event shutdownEvent_;
|
||||||
bool runnersRunning_ {};
|
bool runnersRunning_ {};
|
||||||
AuList<AuWPtr<ThreadPool>> listWeakDeps_;
|
AuList<AuWPtr<ThreadPool>> listWeakDeps_;
|
||||||
|
@ -8,11 +8,11 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "AsyncRunnable.hpp"
|
#include "AsyncRunnable.hpp"
|
||||||
|
#include "AuThreadState.hpp"
|
||||||
|
#include "AuGroupWorkQueue.hpp"
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
using WorkEntry_t = AuPair<ThreadId_t, AuSPtr<IAsyncRunnable>>;
|
|
||||||
|
|
||||||
// TODO: this is a hack because i havent implemented an epoll abstraction yet
|
// TODO: this is a hack because i havent implemented an epoll abstraction yet
|
||||||
struct AsyncAppWaitSourceRequest
|
struct AsyncAppWaitSourceRequest
|
||||||
{
|
{
|
||||||
@ -26,10 +26,9 @@ namespace Aurora::Async
|
|||||||
struct GroupState;
|
struct GroupState;
|
||||||
struct AsyncLoop;
|
struct AsyncLoop;
|
||||||
|
|
||||||
struct ThreadState
|
struct ThreadState : ThreadStateBase
|
||||||
{
|
{
|
||||||
ThreadState() : running(true, false, true),
|
ThreadState() : running(true, false, true),
|
||||||
cvVariable(AuUnsafeRaiiToShared(cvWorkMutex.AsPointer())),
|
|
||||||
isDeadEvent(false, false, true)
|
isDeadEvent(false, false, true)
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -38,8 +37,6 @@ namespace Aurora::Async
|
|||||||
// :vomit:
|
// :vomit:
|
||||||
WorkerId_t id;
|
WorkerId_t id;
|
||||||
AuUInt8 multipopCount = 32;
|
AuUInt8 multipopCount = 32;
|
||||||
AuUInt32 lastFrameTime {};
|
|
||||||
AuThreads::ThreadShared_t threadObject;
|
|
||||||
AuWPtr<GroupState> parent;
|
AuWPtr<GroupState> parent;
|
||||||
AuThreadPrimitives::Semaphore syncSema;
|
AuThreadPrimitives::Semaphore syncSema;
|
||||||
AuThreadPrimitives::Event isDeadEvent;
|
AuThreadPrimitives::Event isDeadEvent;
|
||||||
@ -53,14 +50,8 @@ namespace Aurora::Async
|
|||||||
//bool running;
|
//bool running;
|
||||||
AuList<AsyncAppWaitSourceRequest> loopSources;
|
AuList<AsyncAppWaitSourceRequest> loopSources;
|
||||||
AuList<WorkEntry_t> pendingWorkItems;
|
AuList<WorkEntry_t> pendingWorkItems;
|
||||||
AuSPtr<AsyncLoop> asyncLoop;
|
|
||||||
Utility::RateLimiter rateLimiter;
|
|
||||||
ERunMode runMode;
|
|
||||||
int cookie {0};
|
int cookie {0};
|
||||||
bool bAlreadyDoingExitTick {};
|
bool bAlreadyDoingExitTick {};
|
||||||
bool bBreakEarly {};
|
|
||||||
bool bIsKiller {};
|
|
||||||
bool bCreate {};
|
|
||||||
|
|
||||||
//
|
//
|
||||||
AuThreadPrimitives::SpinLock externalFencesLock;
|
AuThreadPrimitives::SpinLock externalFencesLock;
|
||||||
@ -68,31 +59,8 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
//
|
//
|
||||||
|
|
||||||
AuThreadPrimitives::ConditionMutex cvWorkMutex;
|
|
||||||
AuThreadPrimitives::ConditionVariable cvVariable;
|
|
||||||
AuAUInt32 cvSleepCount {};
|
|
||||||
AuAUInt32 cvLSActive {};
|
|
||||||
AuAUInt32 cvHasWork {};
|
|
||||||
AuSPtr<AuLoop::ILSEvent> eventLs;
|
|
||||||
AuSPtr<AuLoop::ILoopSource> asyncLoopSourceShared;
|
|
||||||
|
|
||||||
inline void SetEvent(bool bBoth = true)
|
|
||||||
{
|
|
||||||
if (auto pEvent = this->eventLs)
|
|
||||||
{
|
|
||||||
if (AuAtomicTestAndSet(&this->cvLSActive, 0u) == 0)
|
|
||||||
{
|
|
||||||
pEvent->Set();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bBoth)
|
|
||||||
{
|
|
||||||
this->cvVariable->Signal();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
//
|
||||||
AuUInt32 uShutdownFence { 1 };
|
|
||||||
};
|
};
|
||||||
}
|
}
|
@ -54,13 +54,10 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
void AsyncLoop::Schedule()
|
void AsyncLoop::Schedule()
|
||||||
{
|
{
|
||||||
if (AuThreads::GetThread() != this->pParent->threadObject.get())
|
if (AuThreads::GetThread() != this->pParent->thread.pThread.get())
|
||||||
{
|
{
|
||||||
AuAtomicAdd(&this->commitPending_, 1u);
|
AuAtomicAdd(&this->commitPending_, 1u);
|
||||||
{
|
this->pParent->sync.SetEvent();
|
||||||
AU_LOCK_GUARD(this->pParent->parent.lock()->workQueue.mutex);
|
|
||||||
this->pParent->SetEvent();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -14,7 +14,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
struct AsyncLoop : AuLoop::LoopQueue
|
struct AsyncLoop : AuLoop::LoopQueue
|
||||||
{
|
{
|
||||||
ThreadState *pParent;
|
ThreadStateBase *pParent;
|
||||||
void OnFrame();
|
void OnFrame();
|
||||||
|
|
||||||
virtual bool AddCallback (const AuSPtr<AuLoop::ILoopSource> &source, const AuSPtr<AuLoop::ILoopSourceSubscriber> &subscriber) override;
|
virtual bool AddCallback (const AuSPtr<AuLoop::ILoopSource> &source, const AuSPtr<AuLoop::ILoopSourceSubscriber> &subscriber) override;
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
#include "Async.hpp"
|
#include "Async.hpp"
|
||||||
#include "WorkItem.hpp"
|
#include "WorkItem.hpp"
|
||||||
#include "AsyncApp.hpp"
|
#include "AsyncApp.hpp"
|
||||||
#include "Schedular.hpp"
|
#include "AuSchedular.hpp"
|
||||||
|
|
||||||
#if defined(AURORA_COMPILER_CLANG)
|
#if defined(AURORA_COMPILER_CLANG)
|
||||||
// warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch
|
// warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch
|
||||||
@ -37,7 +37,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (auto pWorker = this->GetState())
|
if (auto pWorker = this->GetState())
|
||||||
{
|
{
|
||||||
this->optOtherCookie = pWorker->uShutdownFence;
|
this->optOtherCookie = pWorker->shutdown.uShutdownFence;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -288,7 +288,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
if (auto pWorker = this->GetState())
|
if (auto pWorker = this->GetState())
|
||||||
{
|
{
|
||||||
if (this->optOtherCookie.value() != pWorker->uShutdownFence)
|
if (this->optOtherCookie.value() != pWorker->shutdown.uShutdownFence)
|
||||||
{
|
{
|
||||||
this->Fail();
|
this->Fail();
|
||||||
return false;
|
return false;
|
||||||
|
Loading…
Reference in New Issue
Block a user