2021-11-05 17:34:23 +00:00
/***
Copyright ( C ) 2021 J Reece Wilson ( a / k / a " Reece " ) . All rights reserved .
File : ThreadPool . cpp
Date : 2021 - 10 - 30
Author : Reece
* * */
# include <Source/RuntimeInternal.hpp>
# include "Async.hpp"
# include "ThreadPool.hpp"
# include "WorkItem.hpp"
# include "Schedular.hpp"
namespace Aurora : : Async
{
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static thread_local AuWPtr < ThreadPool > gCurrentPool ;
static const auto kMagicResortThreshold = 15 ;
AUKN_SYM WorkerPId_t GetCurrentWorkerPId ( )
{
auto lkPool = gCurrentPool . lock ( ) ;
auto cpy = * lkPool - > tlsWorkerId ;
auto lkPool2 = cpy . pool . lock ( ) ;
return WorkerPId_t ( lkPool , cpy ) ;
}
//
ThreadPool : : ThreadPool ( )
{
this - > rwlock_ = AuThreadPrimitives : : RWLockUnique ( ) ;
SysAssert ( static_cast < bool > ( this - > rwlock_ ) , " Couldn't initialize ThreadPool. Unable to allocate an RWLock " ) ;
}
// internal pool interface
bool ThreadPool : : WaitFor ( WorkerId_t unlocker , const AuSPtr < Threading : : IWaitable > & primitive , AuUInt32 timeoutMs )
{
auto curThread = GetThreadState ( ) ;
bool workerIdMatches = ( unlocker . second = = curThread - > id . second ) | | ( ( unlocker . second = = Async : : kThreadIdAny ) & & ( GetThreadWorkersCount ( unlocker . first ) = = 1 ) ) ;
if ( ( unlocker . first = = curThread - > id . first ) & & // work group matches
( workerIdMatches ) ) // well, crap
{
bool queryAsync = false ;
while ( ! ( queryAsync ? primitive - > TryLock ( ) : Threading : : WaitFor ( primitive . get ( ) , 2 ) ) )
{
queryAsync = CtxYield ( ) ;
}
return true ;
}
else
{
return Threading : : WaitFor ( primitive . get ( ) , timeoutMs ) ;
}
}
void ThreadPool : : Run ( WorkerId_t target , AuSPtr < IAsyncRunnable > runnable )
{
auto state = GetGroup ( target . first ) ;
SysAssert ( static_cast < bool > ( state ) , " couldn't dispatch a task to an offline group " ) ;
IncrementTasksRunning ( ) ;
{
AU_LOCK_GUARD ( state - > cvWorkMutex ) ;
# if defined(STAGING) || defined(DEBUG)
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
if ( target . second ! = Async : : kThreadIdAny )
{
auto itr = state - > workers . find ( target . second ) ;
if ( ( itr = = state - > workers . end ( ) ) | | ( itr - > second - > rejecting ) )
{
SysPushErrorGen ( " worker: {}:{} is offline " , target . first , target . second ) ;
DecrementTasksRunning ( ) ;
#if 0
throw " Requested job worker is offline " ;
# else
runnable - > CancelAsync ( ) ;
return ;
# endif
}
}
else
{
auto workers = state - > workers ;
bool found = false ;
for ( const auto & worker : state - > workers )
{
if ( ! worker . second - > rejecting )
{
found = true ;
break ;
}
}
if ( ! found )
{
DecrementTasksRunning ( ) ;
#if 0
throw " No workers available " ;
# else
runnable - > CancelAsync ( ) ;
return ;
# endif
}
}
# endif
if ( ! AuTryInsert ( state - > workQueue , AuMakePair ( target . second , runnable ) ) )
{
runnable - > CancelAsync ( ) ;
return ;
}
state - > dirty + + ;
if ( state - > dirty > kMagicResortThreshold )
{
state - > dirty = 0 ;
state - > sorted = false ;
}
state - > eventLs - > Set ( ) ;
}
if ( target . second = = Async : : kThreadIdAny )
{
state - > cvVariable - > Signal ( ) ;
}
else
{
// sad :(
// TODO: when we have wait any, add support (^ the trigger) for it here
state - > cvVariable - > Broadcast ( ) ;
}
}
IThreadPool * ThreadPool : : ToThreadPool ( )
{
return this ;
}
void ThreadPool : : IncrementTasksRunning ( )
{
this - > tasksRunning_ + + ;
}
void ThreadPool : : DecrementTasksRunning ( )
{
if ( ( - - this - > tasksRunning_ ) = = 0 )
{
if ( InRunnerMode ( ) )
{
Shutdown ( ) ;
}
}
}
// ithreadpool
size_t ThreadPool : : GetThreadWorkersCount ( ThreadGroup_t group )
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
2021-11-05 17:34:23 +00:00
return GetGroup ( group ) - > workers . size ( ) ;
}
void ThreadPool : : SetRunningMode ( bool eventRunning )
{
this - > runnersRunning_ = eventRunning ;
}
bool ThreadPool : : Spawn ( WorkerId_t workerId )
{
return Spawn ( workerId , false ) ;
}
bool ThreadPool : : Create ( WorkerId_t workerId )
{
return Spawn ( workerId , true ) ;
}
bool ThreadPool : : InRunnerMode ( )
{
return this - > runnersRunning_ ;
}
bool ThreadPool : : Poll ( )
{
return InternalRunOne ( false ) ;
}
bool ThreadPool : : RunOnce ( )
{
return InternalRunOne ( true ) ;
}
bool ThreadPool : : Run ( )
{
bool ranOnce { } ;
auto auThread = AuThreads : : GetThread ( ) ;
auto job = GetThreadState ( ) ;
while ( ( ! auThread - > Exiting ( ) ) & & ( ! job - > shuttingdown ) )
{
// Do work (blocking)
InternalRunOne ( true ) ;
ranOnce = true ;
}
return ranOnce ;
}
bool ThreadPool : : InternalRunOne ( bool block )
{
auto state = GetThreadState ( ) ;
bool success { } ;
do
{
if ( state - > inLoopSourceMode )
{
success = PollLoopSource ( block ) ;
}
else
{
success = PollInternal ( block ) ;
success | = state - > inLoopSourceMode ;
}
} while ( success ) ;
return success ;
}
bool ThreadPool : : PollInternal ( bool block )
{
auto state = GetThreadState ( ) ;
auto group = state - > parent . lock ( ) ;
//state->pendingWorkItems.clear();
auto magic = CtxPollPush ( ) ;
{
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
2021-11-05 17:40:06 +00:00
// TODO: reimplement this
// this is stupid and gross
2021-11-05 17:34:23 +00:00
if ( group - > workQueue . size ( ) > 2 )
{
2021-11-05 17:40:06 +00:00
if ( ! group - > sorted )
2021-11-05 17:34:23 +00:00
{
auto cpy = group - > workQueue ;
std : : sort ( group - > workQueue . begin ( ) , group - > workQueue . end ( ) , [ & ] ( const WorkEntry_t & a , const WorkEntry_t & b )
{
if ( a . second - > GetPrio ( ) ! = b . second - > GetPrio ( ) )
return a . second - > GetPrio ( ) > b . second - > GetPrio ( ) ;
AuUInt32 ia { } , ib { } ;
for ( ; ia < cpy . size ( ) ; ia + + )
if ( cpy [ ia ] . second = = a . second )
break ;
for ( ; ib < cpy . size ( ) ; ib + + )
if ( cpy [ ib ] . second = = b . second )
break ;
return ia < ib ;
} ) ;
group - > sorted = true ;
group - > dirty = 0 ;
}
}
do
{
// Deque tasks the current thread runner could dipatch
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
// It's probable `multipopCount` will equal 1 for your use case
//
// Only increment when you know tasks within a group queue will not depend on one another
// *and* tasks require a small amount of execution time
//
// This could be potentially useful for an event dispatcher whereby you're dispatching
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
// is a waste of CPU cycles.
//
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
// thread group waits
for ( auto itr = group - > workQueue . begin ( ) ;
( ( itr ! = group - > workQueue . end ( ) ) & &
( state - > pendingWorkItems . size ( ) < state - > multipopCount ) ) ;
)
{
if ( itr - > first = = Async : : kThreadIdAny )
{
state - > pendingWorkItems . push_back ( * itr ) ;
itr = group - > workQueue . erase ( itr ) ;
continue ;
}
if ( ( itr - > first ! = Async : : kThreadIdAny ) & & ( itr - > first = = state - > id . second ) )
{
state - > pendingWorkItems . push_back ( * itr ) ;
itr = group - > workQueue . erase ( itr ) ;
continue ;
}
itr + + ;
}
// Consider blocking for more work
if ( ! block )
{
break ;
}
// Block if no work items are present
if ( state - > pendingWorkItems . empty ( ) )
{
group - > cvVariable - > WaitForSignal ( ) ;
}
// Post-wakeup thread terminating check
if ( state - > threadObject - > Exiting ( ) | | state - > shuttingdown )
{
break ;
}
} while ( state - > pendingWorkItems . empty ( ) ) ;
if ( group - > workQueue . empty ( ) )
{
group - > eventLs - > Reset ( ) ;
}
}
if ( state - > pendingWorkItems . empty ( ) )
{
CtxPollReturn ( state , magic , false ) ;
return false ;
}
int runningTasks { } ;
auto oldTlsHandle = std : : exchange ( gCurrentPool , AuSharedFromThis ( ) ) ;
bool lowPrioCont { } ;
bool lowPrioContCached { } ;
for ( auto itr = state - > pendingWorkItems . begin ( ) ; itr ! = state - > pendingWorkItems . end ( ) ; )
{
if ( state - > threadObject - > Exiting ( ) | | state - > shuttingdown )
{
break ;
}
// Set the last frame time for a watchdog later down the line
state - > lastFrameTime = Time : : CurrentClockMS ( ) ;
if ( itr - > second - > GetPrio ( ) < 0.25 )
{
if ( lowPrioCont ) continue ;
if ( ! lowPrioContCached )
{
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
{
for ( const auto & [ pendingWorkA , pendingWorkB ] : group - > workQueue )
{
if ( pendingWorkB - > GetPrio ( ) > .5 )
{
lowPrioCont = true ;
break ;
}
}
}
lowPrioContCached = true ;
if ( lowPrioCont ) continue ;
}
}
// Dispatch
itr - > second - > RunAsync ( ) ;
// Remove from our local job queue
itr = state - > pendingWorkItems . erase ( itr ) ;
// Atomically decrement global task counter
runningTasks = this - > tasksRunning_ . fetch_sub ( 1 ) - 1 ;
}
gCurrentPool = oldTlsHandle ;
// Return popped work back to the groups work pool when our -pump loops were preempted
if ( state - > pendingWorkItems . size ( ) )
{
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
group - > workQueue . insert ( group - > workQueue . end ( ) , state - > pendingWorkItems . begin ( ) , state - > pendingWorkItems . end ( ) ) ;
group - > eventLs - > Set ( ) ;
state - > pendingWorkItems . clear ( ) ;
}
CtxPollReturn ( state , magic , true ) ;
if ( InRunnerMode ( ) )
{
if ( runningTasks = = 0 )
{
Shutdown ( ) ;
}
}
return true ;
}
bool ThreadPool : : PollLoopSource ( bool block )
{
auto state = GetThreadState ( ) ;
auto group = state - > parent . lock ( ) ;
//state->pendingWorkItems.clear();
auto magic = CtxPollPush ( ) ;
bool retValue { } ;
// TODO (reece): This function isn't very efficient
{
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
AuList < AsyncAppWaitSourceRequest > curLoopReq = state - > loopSources ;
AuList < AuSPtr < Loop : : ILoopSource > > curLoopSources ;
auto lenLoopReqs = curLoopReq . size ( ) ;
curLoopSources . resize ( lenLoopReqs + 1 ) ;
for ( auto i = 0 ; i < lenLoopReqs ; i + + )
{
curLoopSources [ i ] = curLoopReq [ i ] . loopSource ;
}
curLoopSources [ lenLoopReqs ] = group - > eventLs ;
AuList < AuSPtr < Loop : : ILoopSource > > nextLoopSources ;
if ( block )
{
// TODO (reece): work on async epoll like abstraction
nextLoopSources = Loop : : WaitMultipleObjects ( curLoopSources , 0 ) ;
}
else
{
nextLoopSources . reserve ( curLoopSources . size ( ) ) ;
for ( const auto & source : curLoopSources )
{
if ( source - > IsSignaled ( ) )
{
nextLoopSources . push_back ( source ) ;
}
}
}
auto time = Time : : CurrentClockMS ( ) ;
state - > loopSources . clear ( ) ;
state - > loopSources . reserve ( curLoopReq . size ( ) ) ;
if ( AuExists ( nextLoopSources , group - > eventLs ) )
{
PollInternal ( false ) ;
}
for ( const auto & request : curLoopReq )
{
bool remove { } ;
bool removeType { } ;
if ( AuExists ( nextLoopSources , request . loopSource ) )
{
remove = true ;
removeType = true ;
}
else
{
if ( request . requestedOffset )
{
if ( request . endTime < time )
{
remove = true ;
removeType = false ;
}
}
}
if ( ! remove )
{
state - > loopSources . push_back ( request ) ;
}
else
{
request . callback ( request . loopSource , removeType ) ;
retValue | = removeType ;
}
}
state - > inLoopSourceMode = state - > loopSources . size ( ) ;
}
return retValue ;
}
void ThreadPool : : Shutdown ( )
{
// Nested shutdowns can happen; prevent a write lock
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
if ( this - > shuttingdown_ )
{
return ;
}
}
// Set shutdown flag
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsWritable ( ) ) ;
if ( std : : exchange ( this - > shuttingdown_ , true ) )
{
return ;
}
}
// Noting
// 1) that StopSched may lockup under a writable lock
// -> we will terminate a thread that may be dispatching a sys pump event
// 2) that barrier doesn't need to be under a write lock
//
// Perform the following shutdown of the schedular and other available threads under a read lock
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
StopSched ( ) ;
for ( auto & [ groupId , group ] : this - > threads_ )
{
for ( auto & [ id , worker ] : group - > workers )
{
Barrier ( worker - > id , 0 , false , true ) ;
}
}
}
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
AuList < AuThreads : : ThreadShared_t > threads ;
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsWritable ( ) ) ;
for ( auto & [ groupId , group ] : this - > threads_ )
{
for ( auto & [ id , worker ] : group - > workers )
{
worker - > shuttingdown = true ;
if ( groupId ! = 0 )
{
worker - > threadObject - > SendExitSignal ( ) ;
threads . push_back ( worker - > threadObject ) ;
}
auto & event = worker - > running ;
if ( event )
{
event - > Set ( ) ;
}
}
if ( group - > cvVariable )
{
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
group - > cvVariable - > Broadcast ( ) ;
}
}
}
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
for ( const auto & thread : threads )
{
thread - > Exit ( ) ;
}
}
bool ThreadPool : : Exiting ( )
{
2021-11-07 20:17:08 +00:00
return this - > shuttingdown_ | | GetThreadState ( ) - > exiting ;
2021-11-05 17:34:23 +00:00
}
AuSPtr < IWorkItem > ThreadPool : : NewWorkItem ( const WorkerId_t & worker , const AuSPtr < IWorkItemHandler > & task , bool supportsBlocking )
{
if ( ! task )
{
return { } ;
}
return AuMakeShared < WorkItem > ( this , worker , task , supportsBlocking ) ;
}
AuSPtr < IWorkItem > ThreadPool : : NewFence ( )
{
return AuMakeShared < WorkItem > ( this , WorkerId_t { } , AuSPtr < IWorkItemHandler > { } , true ) ;
}
AuThreads : : ThreadShared_t ThreadPool : : ResolveHandle ( WorkerId_t id )
{
return GetThreadHandle ( id ) - > threadObject ;
}
AuBST < ThreadGroup_t , AuList < ThreadId_t > > ThreadPool : : GetThreads ( )
{
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
AuBST < ThreadGroup_t , AuList < ThreadId_t > > ret ;
for ( const auto & group : this - > threads_ )
{
AuList < ThreadId_t > workers ;
for ( const auto & thread : group . second - > workers )
{
workers . push_back ( thread . second - > id . second ) ;
}
ret [ group . first ] = workers ;
}
return ret ;
}
WorkerId_t ThreadPool : : GetCurrentThread ( )
{
return tlsWorkerId ;
}
bool ThreadPool : : Sync ( WorkerId_t workerId , AuUInt32 timeoutMs , bool requireSignal )
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
2021-11-05 17:34:23 +00:00
auto group = GetGroup ( workerId . first ) ;
auto currentWorkerId = GetCurrentThread ( ) . second ;
if ( workerId . second = = Async : : kThreadIdAny )
{
for ( auto & jobWorker : group - > workers )
{
if ( ! Barrier ( jobWorker . second - > id , timeoutMs , requireSignal & & jobWorker . second - > id . second ! = currentWorkerId , false ) ) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{
return false ;
}
}
}
else
{
return Barrier ( workerId , timeoutMs , requireSignal & & workerId . second ! = currentWorkerId , false ) ;
}
return true ;
}
void ThreadPool : : Signal ( WorkerId_t workerId )
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
2021-11-05 17:34:23 +00:00
auto group = GetGroup ( workerId . first ) ;
if ( workerId . second = = Async : : kThreadIdAny )
{
for ( auto & jobWorker : group - > workers )
{
jobWorker . second - > running - > Set ( ) ;
}
}
else
{
GetThreadHandle ( workerId ) - > running - > Set ( ) ;
}
}
void ThreadPool : : SyncAllSafe ( )
{
2021-11-07 20:17:08 +00:00
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
2021-11-05 17:34:23 +00:00
for ( const auto & re : this - > threads_ )
{
for ( auto & jobWorker : re . second - > workers )
{
SysAssert ( Barrier ( jobWorker . second - > id , 0 , false , false ) ) ;
}
}
}
void ThreadPool : : AddFeature ( WorkerId_t id , AuSPtr < AuThreads : : IThreadFeature > feature , bool async )
{
auto work = AuMakeShared < BasicWorkStdFunc > ( ( [ = ] ( )
{
GetThreadState ( ) - > features . push_back ( feature ) ;
feature - > Init ( ) ;
} ) ) ;
auto workItem = this - > NewWorkItem ( id , work , ! async ) - > Dispatch ( ) ;
if ( ! async )
{
workItem - > BlockUntilComplete ( ) ;
}
}
void ThreadPool : : AssertInThreadGroup ( ThreadGroup_t group )
{
SysAssert ( static_cast < WorkerId_t > ( tlsWorkerId ) . first = = group ) ;
}
void ThreadPool : : AssertWorker ( WorkerId_t id )
{
SysAssert ( static_cast < WorkerId_t > ( tlsWorkerId ) = = id ) ;
}
bool ThreadPool : : ScheduleLoopSource ( const AuSPtr < Loop : : ILoopSource > & loopSource , WorkerId_t workerId , AuUInt32 timeout , const AuConsumer < AuSPtr < Loop : : ILoopSource > , bool > & callback )
{
auto thread = this - > GetThreadHandle ( workerId ) ;
if ( ! thread )
{
return false ;
}
auto group = thread - > parent . lock ( ) ;
{
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
AsyncAppWaitSourceRequest req { } ;
req . startTime = Time : : CurrentClockMS ( ) ;
if ( timeout )
{
req . requestedOffset = timeout ;
req . endTime = req . startTime + timeout ;
}
req . loopSource = loopSource ;
req . callback = callback ;
if ( ! AuTryInsert ( thread - > loopSources , req ) )
{
return false ;
}
thread - > inLoopSourceMode = thread - > loopSources . size ( ) ;
}
return true ;
}
// Unimplemented fiber hooks, 'twas used for science
int ThreadPool : : CtxPollPush ( )
{
// TOOD (Reece): implement a context switching library
// Refer to the old implementation of this on pastebin
return 0 ;
}
void ThreadPool : : CtxPollReturn ( const AuSPtr < ThreadState > & state , int status , bool hitTask )
{
}
bool ThreadPool : : CtxYield ( )
{
bool ranAtLeastOne = false ;
while ( this - > InternalRunOne ( false ) )
{
ranAtLeastOne = true ;
}
return ranAtLeastOne ;
}
// internal api
bool ThreadPool : : Spawn ( WorkerId_t workerId , bool create )
{
AU_LOCK_GUARD ( rwlock_ - > AsWritable ( ) ) ;
if ( GetCurrentWorkerPId ( ) . pool & & create )
{
SysPushErrorGeneric ( " TODO (reece): add support for multiple runners per thread " ) ;
return { } ;
}
AuSPtr < GroupState > group ;
// Try fetch or allocate group
{
AuSPtr < GroupState > * groupPtr ;
if ( ! AuTryFind ( this - > threads_ , workerId . first , groupPtr ) )
{
group = AuMakeShared < GroupState > ( ) ;
if ( ! group - > Init ( ) )
{
SysPushErrorMem ( " Not enough memory to intiialize a new group state " ) ;
return false ;
}
if ( ! AuTryInsert ( this - > threads_ , AuMakePair ( workerId . first , group ) ) )
{
return false ;
}
}
else
{
group = * groupPtr ;
}
}
// Assert worker does not already exist
{
AuSPtr < ThreadState > * ret ;
if ( AuTryFind ( group - > workers , workerId . second , ret ) )
{
SysPushErrorGen ( " Thread ID already exists " ) ;
return false ;
}
}
auto threadState = AuMakeShared < ThreadState > ( ) ;
threadState - > parent = group ;
threadState - > running = AuThreadPrimitives : : EventUnique ( true , false , true ) ;
threadState - > syncSema = AuThreadPrimitives : : SemaphoreUnique ( 0 ) ;
threadState - > id = workerId ;
//threadState->eventDriven = runner;
if ( ! create )
{
threadState - > threadObject = AuThreads : : ThreadUnique ( AuThreads : : ThreadInfo (
AuMakeShared < AuThreads : : IThreadVectorsFunctional > ( AuThreads : : IThreadVectorsFunctional : : OnEntry_t ( std : : bind ( & ThreadPool : : Entrypoint , this , threadState - > id ) ) ,
AuThreads : : IThreadVectorsFunctional : : OnExit_t { } ) ,
gRuntimeConfig . async . threadPoolDefaultStackSize
) ) ;
if ( ! threadState - > threadObject )
{
return { } ;
}
threadState - > threadObject - > Run ( ) ;
}
else
{
threadState - > threadObject = AuSPtr < AuThreads : : IAuroraThread > ( AuThreads : : GetThread ( ) , [ ] ( AuThreads : : IAuroraThread * ) { } ) ;
// TODO: this is just a hack
// we should implement this properly
threadState - > threadObject - > AddLastHopeTlsHook ( AuMakeShared < AuThreads : : IThreadFeatureFunctional > ( [ ] ( ) - > void
{
} , [ ] ( ) - > void
{
auto pid = GetCurrentWorkerPId ( ) ;
if ( pid . pool )
{
std : : static_pointer_cast < ThreadPool > ( pid . pool ) - > ThisExiting ( ) ;
}
} ) ) ;
//
gCurrentPool = AuWeakFromThis ( ) ;
tlsWorkerId = WorkerPId_t ( AuSharedFromThis ( ) , workerId ) ;
}
group - > workers . insert ( AuMakePair ( workerId . second , threadState ) ) ;
return true ;
}
// private api
bool ThreadPool : : Barrier ( WorkerId_t workerId , AuUInt32 ms , bool requireSignal , bool drop )
{
// TODO: barrier multiple
auto & semaphore = GetThreadState ( ) - > syncSema ;
auto unsafeSemaphore = semaphore . get ( ) ;
bool failed { } ;
auto work = AuMakeShared < AsyncFuncRunnable > (
[ = ] ( )
{
auto state = GetThreadState ( ) ;
if ( drop )
{
state - > rejecting = true ;
}
if ( requireSignal )
{
state - > running - > Reset ( ) ;
}
unsafeSemaphore - > Unlock ( 1 ) ;
if ( requireSignal )
{
state - > running - > Lock ( ) ;
}
} ,
[ & ] ( )
{
unsafeSemaphore - > Unlock ( 1 ) ;
failed = true ;
}
) ;
if ( ! work )
{
return false ;
}
Run ( workerId , work ) ;
return WaitFor ( workerId , AuUnsafeRaiiToShared ( semaphore ) , ms ) & & ! failed ;
}
void ThreadPool : : Entrypoint ( WorkerId_t id )
{
gCurrentPool = AuWeakFromThis ( ) ;
tlsWorkerId = WorkerPId_t ( AuSharedFromThis ( ) , id ) ;
auto job = GetThreadState ( ) ;
Run ( ) ;
if ( id ! = WorkerId_t { 0 , 0 } )
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
if ( ! this - > shuttingdown_ & & ! job - > rejecting )
{
// Pump and barrier + reject all after atomically
Barrier ( id , 0 , false , true ) ;
}
}
ThisExiting ( ) ;
if ( id = = WorkerId_t { 0 , 0 } )
{
CleanWorkerPoolReservedZeroFree ( ) ;
}
}
void ThreadPool : : ThisExiting ( )
{
auto id = GetCurrentThread ( ) ;
auto state = GetGroup ( id . first ) ;
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsWritable ( ) ) ;
auto itr = state - > workers . find ( id . second ) ;
auto & jobWorker = itr - > second ;
CleanUpWorker ( id ) ;
// Abort scheduled tasks
TerminateSceduledTasks ( this , id ) ;
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms
// -> anything your brain wishes to imagination
for ( const auto & thread : jobWorker - > features )
{
try
{
thread - > Cleanup ( ) ;
}
catch ( . . . )
{
LogWarn ( " Couldn't clean up thread feature! " ) ;
Debug : : PrintError ( ) ;
}
}
jobWorker - > features . clear ( ) ;
state - > workers . erase ( itr ) ;
}
}
AuSPtr < GroupState > ThreadPool : : GetGroup ( ThreadGroup_t type )
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
AuSPtr < GroupState > * ret ;
if ( ! AuTryFind ( this - > threads_ , type , ret ) )
{
return { } ;
}
return * ret ;
}
AuSPtr < ThreadState > ThreadPool : : GetThreadState ( )
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
auto id = GetCurrentThread ( ) ;
auto state = GetGroup ( id . first ) ;
return state - > workers [ id . second ] ;
}
AuSPtr < ThreadState > ThreadPool : : GetThreadHandle ( WorkerId_t id )
{
AU_LOCK_GUARD ( this - > rwlock_ - > AsReadable ( ) ) ;
auto group = GetGroup ( id . first ) ;
if ( ! group )
{
return { } ;
}
AuSPtr < ThreadState > * ret ;
if ( ! AuTryFind ( group - > workers , id . second , ret ) )
{
return { } ;
}
return * ret ;
}
AUKN_SYM AuSPtr < IThreadPool > NewThreadPool ( )
{
// apps that don't require async shouldn't be burdened with the overhead of this litl spiner
StartSched ( ) ;
return AuMakeShared < ThreadPool > ( ) ;
}
}