2021-06-27 21:33:58 +00:00
/***
Copyright ( C ) 2021 J Reece Wilson ( a / k / a " Reece " ) . All rights reserved .
File : AsyncApp . cpp
Date : 2021 - 6 - 26
Author : Reece
* * */
2021-06-27 21:25:29 +00:00
# include <RuntimeInternal.hpp>
2021-06-30 09:28:52 +00:00
# include "Async.hpp"
2021-06-27 21:25:29 +00:00
# include "AsyncApp.hpp"
2021-06-30 09:28:52 +00:00
# include "WorkItem.hpp"
2021-07-15 16:16:23 +00:00
# include "Schedular.hpp"
# include <Console/Commands/Commands.hpp>
2021-06-27 21:25:29 +00:00
namespace Aurora : : Async
{
2021-06-30 09:28:52 +00:00
static AsyncApp gAsyncApp ;
static std : : atomic_int gRunningTasks { } ;
2021-06-30 12:00:32 +00:00
void IncRunningTasks ( )
{
gRunningTasks + + ;
}
void DecRunningTasks ( )
{
if ( ( - - gRunningTasks ) = = 0 )
{
2021-07-15 16:16:23 +00:00
gAsyncApp . Shutdown ( ) ;
2021-06-30 12:00:32 +00:00
}
2021-09-06 10:58:08 +00:00
2021-06-30 12:00:32 +00:00
}
2021-06-27 21:25:29 +00:00
2021-06-30 09:28:52 +00:00
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static Threading : : Threads : : TLSVariable < WorkerId_t , true > tlsWorkerId ;
2021-07-15 16:16:23 +00:00
2021-09-06 10:58:08 +00:00
using WorkEntry_t = AuPair < AuOptional < ThreadId_t > , AuSPtr < IAsyncRunnable > > ;
2021-06-30 09:28:52 +00:00
struct ThreadState
{
WorkerId_t id ;
AuUInt8 multipopCount = 1 ;
AuUInt32 lastFrameTime { } ;
Threading : : Threads : : ThreadShared_t threadObject ;
AuWPtr < GroupState > parent ;
Threading : : Primitives : : SemaphoreUnique_t syncSema ;
AuList < AuSPtr < Threading : : Threads : : IThreadFeature > > features ;
bool rejecting { } ;
bool exiting { } ;
2021-07-15 16:16:23 +00:00
bool shuttingdown { } ;
2021-06-30 09:28:52 +00:00
Threading : : Primitives : : EventUnique_t running ;
//bool running;
bool inline IsSysThread ( )
{
return id . first = = 0 ;
}
2021-07-15 16:16:23 +00:00
AuList < WorkEntry_t > pendingWorkItems ;
2021-06-30 09:28:52 +00:00
} ;
struct GroupState
{
ThreadGroup_t group ;
Threading : : Primitives : : ConditionMutexUnique_t cvWorkMutex ;
Threading : : Primitives : : ConditionVariableUnique_t cvVariable ;
AuList < WorkEntry_t > workQueue ;
AuBST < ThreadId_t , AuSPtr < ThreadState > > workers ;
bool Init ( ) ;
bool inline IsSysThread ( )
{
return group = = 0 ;
}
} ;
bool GroupState : : Init ( )
{
cvWorkMutex = Threading : : Primitives : : ConditionMutexUnique ( ) ;
if ( ! cvWorkMutex )
{
return false ;
}
cvVariable = Threading : : Primitives : : ConditionVariableUnique ( cvWorkMutex . get ( ) ) ;
if ( ! cvVariable )
{
return false ;
}
return true ;
}
AsyncApp : : AsyncApp ( )
{
rwlock_ = Threading : : Primitives : : RWLockUnique ( ) ;
SysAssert ( static_cast < bool > ( rwlock_ ) , " Couldn't initialize AsyncApp. Unable to allocate an RWLock " ) ;
}
// TODO: barrier multiple
bool AsyncApp : : Barrier ( WorkerId_t worker , AuUInt32 ms , bool requireSignal , bool drop )
{
auto & semaphore = GetThreadState ( ) - > syncSema ;
auto unsafeSemaphore = semaphore . get ( ) ;
2021-09-06 10:58:08 +00:00
auto work = AuMakeShared < /*Async::BasicWorkStdFunc*/ AsyncFuncRunnable > ( ( [ = ] ( )
2021-06-30 09:28:52 +00:00
{
auto state = GetThreadState ( ) ;
if ( drop )
{
state - > rejecting = true ;
}
if ( requireSignal )
{
state - > running - > Reset ( ) ;
}
unsafeSemaphore - > Unlock ( 1 ) ;
2021-07-15 16:16:23 +00:00
if ( requireSignal )
{
state - > running - > Lock ( ) ;
}
2021-06-30 09:28:52 +00:00
} ) ) ;
#if 0
NewWorkItem ( { worker . first , worker . second } , work ) - > Dispatch ( ) ;
# else
Run ( worker , work ) ;
# endif
return WaitFor ( worker , semaphore . get ( ) , ms ) ;
}
void AsyncApp : : Run ( DispatchTarget_t target , AuSPtr < IAsyncRunnable > runnable )
{
auto state = GetGroup ( target . first ) ;
SysAssert ( static_cast < bool > ( state ) , " couldn't dispatch a task to an offline group " ) ;
2021-07-15 16:16:23 +00:00
IncRunningTasks ( ) ;
2021-06-30 09:28:52 +00:00
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( state - > cvWorkMutex ) ;
2021-06-30 09:28:52 +00:00
2021-09-06 10:58:08 +00:00
# if defined(STAGING) || defined(DEBUG)
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-06-30 09:28:52 +00:00
if ( target . second . has_value ( ) )
{
2021-07-15 16:16:23 +00:00
auto itr = state - > workers . find ( * target . second ) ;
if ( ( itr = = state - > workers . end ( ) ) | | ( itr - > second - > rejecting ) )
2021-06-30 09:28:52 +00:00
{
SysPushErrorGen ( " worker: {}:{} is offline " , target . first , target . second . value_or ( 0 ) ) ;
2021-07-15 16:16:23 +00:00
DecRunningTasks ( ) ;
2021-06-30 09:28:52 +00:00
throw " Requested job worker is offline " ;
}
}
else
{
auto workers = state - > workers ;
bool found = false ;
for ( const auto & worker : state - > workers )
{
if ( ! worker . second - > rejecting )
{
found = true ;
break ;
}
}
if ( ! found )
{
2021-07-15 16:16:23 +00:00
DecRunningTasks ( ) ;
2021-06-30 09:28:52 +00:00
throw " No workers available " ;
}
}
# endif
2021-09-06 10:58:08 +00:00
state - > workQueue . push_back ( AuMakePair ( target . second , runnable ) ) ;
2021-06-30 09:28:52 +00:00
}
2021-09-06 10:58:08 +00:00
if ( target . second . has_value ( ) )
{
// sad :(
state - > cvVariable - > Broadcast ( ) ;
}
else
{
state - > cvVariable - > Signal ( ) ;
}
2021-06-30 09:28:52 +00:00
}
bool AsyncApp : : Poll ( bool blocking )
{
auto state = GetThreadState ( ) ;
auto group = state - > parent . lock ( ) ;
2021-09-06 10:58:08 +00:00
//state->pendingWorkItems.clear();
2021-06-30 09:28:52 +00:00
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
2021-06-30 09:28:52 +00:00
do
{
2021-07-15 16:16:23 +00:00
// Deque tasks the current thread runner could dipatch
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
// It's probable `multipopCount` will equal 1 for your use case
//
// Only increment when you know tasks within a group queue will not depend on one another
// *and* tasks require a small amount of execution time
//
// This could be potentially useful for an event dispatcher whereby you're dispatching
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
// is a waste of CPU cycles.
//
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
// thread group waits
2021-06-30 09:28:52 +00:00
for ( auto itr = group - > workQueue . begin ( ) ;
( ( itr ! = group - > workQueue . end ( ) ) & &
( state - > pendingWorkItems . size ( ) < state - > multipopCount ) ) ;
)
{
if ( ! itr - > first . has_value ( ) )
{
2021-07-15 16:16:23 +00:00
state - > pendingWorkItems . push_back ( * itr ) ;
2021-06-30 09:28:52 +00:00
itr = group - > workQueue . erase ( itr ) ;
continue ;
}
if ( ( itr - > first . has_value ( ) ) & & ( itr - > first . value ( ) = = state - > id . second ) )
{
2021-07-15 16:16:23 +00:00
state - > pendingWorkItems . push_back ( * itr ) ;
2021-06-30 09:28:52 +00:00
itr = group - > workQueue . erase ( itr ) ;
continue ;
}
itr + + ;
}
2021-07-15 16:16:23 +00:00
// Consider blocking for more work
2021-06-30 09:28:52 +00:00
if ( ! blocking )
{
break ;
}
2021-07-15 16:16:23 +00:00
// Block if no work items are present
2021-06-30 09:28:52 +00:00
if ( state - > pendingWorkItems . empty ( ) )
{
group - > cvVariable - > WaitForSignal ( ) ;
}
2021-07-15 16:16:23 +00:00
// Post-wakeup thread terminating check
if ( state - > threadObject - > Exiting ( ) | | state - > shuttingdown )
{
break ;
}
2021-06-30 09:28:52 +00:00
} while ( state - > pendingWorkItems . empty ( ) ) ;
}
if ( state - > pendingWorkItems . empty ( ) )
{
return false ;
}
int runningTasks { } ;
for ( auto itr = state - > pendingWorkItems . begin ( ) ; itr ! = state - > pendingWorkItems . end ( ) ; )
{
2021-07-15 16:16:23 +00:00
if ( state - > threadObject - > Exiting ( ) | | state - > shuttingdown )
{
break ;
}
// Set the last frame time for a watchdog later down the line
2021-06-30 09:28:52 +00:00
state - > lastFrameTime = Time : : CurrentClockMS ( ) ;
2021-07-15 16:16:23 +00:00
// Dispatch
itr - > second - > RunAsync ( ) ;
// Remove from our local job queue
2021-06-30 09:28:52 +00:00
itr = state - > pendingWorkItems . erase ( itr ) ;
2021-07-15 16:16:23 +00:00
// Atomically decrement global task counter
2021-09-06 10:58:08 +00:00
runningTasks = gRunningTasks . fetch_sub ( 1 ) - 1 ;
2021-06-30 09:28:52 +00:00
}
2021-07-15 16:16:23 +00:00
// Return popped work back to the groups work pool when our -pump loops were preempted
if ( state - > pendingWorkItems . size ( ) )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
2021-07-15 16:16:23 +00:00
group - > workQueue . insert ( group - > workQueue . end ( ) , state - > pendingWorkItems . begin ( ) , state - > pendingWorkItems . end ( ) ) ;
2021-09-06 10:58:08 +00:00
state - > pendingWorkItems . clear ( ) ;
2021-07-15 16:16:23 +00:00
}
2021-09-06 10:58:08 +00:00
2021-06-30 12:00:32 +00:00
if ( runningTasks = = 0 )
2021-06-30 09:28:52 +00:00
{
2021-07-15 16:16:23 +00:00
ShutdownZero ( ) ;
2021-06-30 09:28:52 +00:00
}
return true ;
}
2021-07-07 20:32:59 +00:00
bool AsyncApp : : WaitFor ( WorkerId_t worker , Threading : : IWaitable * primitive , AuUInt32 timeoutMs )
2021-06-30 09:28:52 +00:00
{
auto curThread = GetThreadState ( ) ;
if ( worker = = curThread - > id )
{
2021-07-07 20:32:59 +00:00
// TODO: nest counter or jump out
while ( ! Threading : : WaitFor ( primitive , 2 ) )
{
while ( this - > Poll ( false ) ) ;
}
return true ;
}
else
{
return Threading : : WaitFor ( primitive , timeoutMs ) ;
}
}
bool AsyncApp : : WaitFor ( DispatchTarget_t unlocker , Threading : : IWaitable * primitive , AuUInt32 timeoutMs )
{
auto curThread = GetThreadState ( ) ;
bool workerIdMatches = ( ! unlocker . second . has_value ( ) ) & & ( unlocker . second . value ( ) = = curThread - > id . second ) ;
if ( ( unlocker . first = = curThread - > id . first ) & & // work group matches
( ( GetThreadWorkersCount ( unlocker . first ) < 2 ) | | // is there anyone besides us who might deal with this? unlikely fast path
( workerIdMatches ) ) ) // well, crap
{
2021-07-11 17:26:38 +00:00
if ( ( workerIdMatches ) & &
( unlocker . first ! = 0 ) ) // UI code is always hacky. dont judge people for nesting tasks within tasks.
// if theres a stack overflow problem, the native dev responsable for the sysloop and ui would already know about it
2021-07-07 20:32:59 +00:00
{
2021-07-11 17:26:38 +00:00
LogWarn ( " Nested Task: {}:{}. This is not an error, it's just bad practice. " , unlocker . first , unlocker . second . value_or ( 0 ) ) ;
SysPushErrorLogicError ( " [telemetry] Nested Task: {}:{} " , unlocker . first , unlocker . second . value_or ( 0 ) ) ;
2021-07-07 20:32:59 +00:00
}
2021-06-30 09:28:52 +00:00
// TODO: timeout isn't respected here as well
while ( ! Threading : : WaitFor ( primitive , 2 ) )
{
while ( this - > Poll ( false ) ) ;
}
return true ;
}
else
{
return Threading : : WaitFor ( primitive , timeoutMs ) ;
}
}
void AsyncApp : : Start ( )
{
2021-09-29 13:40:30 +00:00
SysAssert ( Spawn ( { 0 , 1 } ) ) ;
2021-07-15 16:16:23 +00:00
StartSched ( ) ;
2021-06-30 09:28:52 +00:00
}
void AsyncApp : : Main ( )
{
2021-09-29 13:43:49 +00:00
Entrypoint ( { 0 , 1 } ) ;
2021-06-30 09:28:52 +00:00
}
2021-07-15 16:16:23 +00:00
void AsyncApp : : ShutdownZero ( )
{
Shutdown ( ) ;
}
2021-06-30 09:28:52 +00:00
void AsyncApp : : Shutdown ( )
{
2021-07-15 16:16:23 +00:00
// Nested shutdowns can happen a write lock
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-07-15 16:16:23 +00:00
if ( shuttingdown_ )
{
return ;
}
}
2021-06-30 09:28:52 +00:00
2021-07-15 16:16:23 +00:00
// Set shutdown flag
2021-06-30 09:28:52 +00:00
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsWritable ( ) ) ;
2021-07-15 16:16:23 +00:00
if ( std : : exchange ( shuttingdown_ , true ) )
2021-06-30 09:28:52 +00:00
{
2021-07-15 16:16:23 +00:00
return ;
2021-06-30 09:28:52 +00:00
}
}
2021-07-15 16:16:23 +00:00
// Noting
// 1) that StopSched may lockup under a writable lock
// -> we will terminate a thread that may be dispatching a sys pump event
// 2) that barrier doesn't need to be under a write lock
//
// Perform the following shutdown of the schedular and other available threads under a read lock
2021-06-30 09:28:52 +00:00
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-07-15 16:16:23 +00:00
StopSched ( ) ;
for ( auto & [ groupId , group ] : this - > threads_ )
2021-06-30 09:28:52 +00:00
{
2021-07-15 16:16:23 +00:00
for ( auto & [ id , worker ] : group - > workers )
{
Barrier ( worker - > id , 0 , false , true ) ;
}
2021-06-30 09:28:52 +00:00
}
}
2021-07-15 16:16:23 +00:00
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
AuList < Threading : : Threads : : ThreadShared_t > threads ;
2021-06-30 09:28:52 +00:00
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsWritable ( ) ) ;
2021-06-30 09:28:52 +00:00
2021-07-15 16:16:23 +00:00
for ( auto & [ groupId , group ] : this - > threads_ )
2021-06-30 09:28:52 +00:00
{
2021-07-15 16:16:23 +00:00
for ( auto & [ id , worker ] : group - > workers )
{
worker - > shuttingdown = true ;
if ( groupId ! = 0 )
{
worker - > threadObject - > SendExitSignal ( ) ;
threads . push_back ( worker - > threadObject ) ;
}
auto & event = worker - > running ;
if ( event )
{
event - > Set ( ) ;
}
}
if ( group - > cvVariable )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( group - > cvWorkMutex ) ;
2021-07-15 16:16:23 +00:00
group - > cvVariable - > Broadcast ( ) ;
}
2021-06-30 09:28:52 +00:00
}
}
2021-07-15 16:16:23 +00:00
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
for ( const auto & thread : threads )
{
thread - > Exit ( ) ;
}
2021-06-30 09:28:52 +00:00
}
bool AsyncApp : : Exiting ( )
{
return shuttingdown_ | | GetThreadState ( ) - > exiting ;
}
bool AsyncApp : : Spawn ( WorkerId_t workerId )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsWritable ( ) ) ;
2021-09-29 13:40:30 +00:00
if ( workerId . second = = 0 )
{
LogWarn ( " WorkerIds must not start from zero to preserve std::optiona nullability " ) ;
return false ;
}
2021-06-30 09:28:52 +00:00
AuSPtr < GroupState > group ;
// Try fetch or allocate group
{
AuSPtr < GroupState > * groupPtr ;
2021-09-06 10:58:08 +00:00
if ( ! AuTryFind ( this - > threads_ , workerId . first , groupPtr ) )
2021-06-30 09:28:52 +00:00
{
2021-09-06 10:58:08 +00:00
group = AuMakeShared < GroupState > ( ) ;
2021-06-30 09:28:52 +00:00
if ( ! group - > Init ( ) )
{
SysPushErrorMem ( " Not enough memory to intiialize a new group state " ) ;
return false ;
}
2021-09-06 10:58:08 +00:00
if ( ! AuTryInsert ( this - > threads_ , AuMakePair ( workerId . first , group ) ) )
2021-06-30 09:28:52 +00:00
{
return false ;
}
}
else
{
group = * groupPtr ;
}
}
// Assert worker does not already exist
{
AuSPtr < ThreadState > * ret ;
2021-09-06 10:58:08 +00:00
if ( AuTryFind ( group - > workers , workerId . second , ret ) )
2021-06-30 09:28:52 +00:00
{
SysPushErrorGen ( " Thread ID already exists " ) ;
return false ;
}
}
2021-09-06 10:58:08 +00:00
auto threadState = AuMakeShared < ThreadState > ( ) ;
2021-06-30 09:28:52 +00:00
threadState - > parent = group ;
2021-07-15 16:16:23 +00:00
threadState - > running = Threading : : Primitives : : EventUnique ( true , false , true ) ;
2021-06-30 09:28:52 +00:00
threadState - > syncSema = Threading : : Primitives : : SemaphoreUnique ( 0 ) ;
threadState - > id = workerId ;
if ( ! threadState - > IsSysThread ( ) )
{
Threading : : Threads : : AbstractThreadVectors handler ;
handler . DoRun = [ = ] ( const Threading : : Threads : : IAuroraThread * thread )
{
Entrypoint ( threadState - > id ) ;
} ;
threadState - > threadObject = Threading : : Threads : : ThreadUnique ( handler ) ;
threadState - > threadObject - > Run ( ) ;
}
else
{
2021-09-06 10:58:08 +00:00
threadState - > threadObject = AuSPtr < Threading : : Threads : : IAuroraThread > ( Threading : : Threads : : GetThread ( ) , [ ] ( Threading : : Threads : : IAuroraThread * ) { } ) ;
2021-06-30 09:28:52 +00:00
}
2021-09-06 10:58:08 +00:00
group - > workers . insert ( AuMakePair ( workerId . second , threadState ) ) ;
2021-06-30 09:28:52 +00:00
return true ;
}
Threading : : Threads : : ThreadShared_t AsyncApp : : ResolveHandle ( WorkerId_t id )
{
auto group = GetGroup ( id . first ) ;
if ( ! group )
{
return { } ;
}
AuSPtr < ThreadState > * ret ;
2021-09-06 10:58:08 +00:00
if ( ! AuTryFind ( group - > workers , id . second , ret ) )
2021-06-30 09:28:52 +00:00
{
return { } ;
}
return ret - > get ( ) - > threadObject ;
}
AuBST < ThreadGroup_t , AuList < ThreadId_t > > AsyncApp : : GetThreads ( )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-06-30 09:28:52 +00:00
AuBST < ThreadGroup_t , AuList < ThreadId_t > > ret ;
for ( const auto & group : this - > threads_ )
{
AuList < ThreadId_t > workers ;
for ( const auto & thread : group . second - > workers )
{
workers . push_back ( thread . second - > id . second ) ;
}
ret [ group . first ] = workers ;
}
return ret ;
}
WorkerId_t AsyncApp : : GetCurrentThread ( )
{
return tlsWorkerId ;
}
bool AsyncApp : : Sync ( ThreadGroup_t groupId , bool requireSignal , AuUInt32 timeoutMs )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-09-29 14:31:40 +00:00
2021-06-30 09:28:52 +00:00
auto group = GetGroup ( groupId ) ;
2021-09-29 14:31:40 +00:00
auto currentWorkerId = GetCurrentThread ( ) . second ;
2021-06-30 09:28:52 +00:00
for ( auto & jobWorker : group - > workers )
{
2021-09-29 14:31:40 +00:00
if ( ! Barrier ( jobWorker . second - > id , timeoutMs , requireSignal & & jobWorker . second - > id . second ! = currentWorkerId , false ) ) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
2021-06-30 09:28:52 +00:00
{
return false ;
}
}
return true ;
}
void AsyncApp : : Signal ( ThreadGroup_t groupId )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-06-30 09:28:52 +00:00
auto group = GetGroup ( groupId ) ;
for ( auto & jobWorker : group - > workers )
{
jobWorker . second - > running - > Set ( ) ;
}
}
bool AsyncApp : : SyncTimeout ( ThreadGroup_t group , AuUInt32 ms )
{
return Sync ( group , false , ms ) ;
}
void AsyncApp : : SyncAllSafe ( )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-06-30 09:28:52 +00:00
for ( const auto & re : this - > threads_ )
{
for ( auto & jobWorker : re . second - > workers )
{
SysAssert ( Barrier ( jobWorker . second - > id , 0 , false , false ) ) ;
}
}
}
AuSPtr < GroupState > AsyncApp : : GetGroup ( ThreadGroup_t type )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-06-30 09:28:52 +00:00
AuSPtr < GroupState > * ret ;
2021-09-06 10:58:08 +00:00
if ( ! AuTryFind ( this - > threads_ , type , ret ) )
2021-06-30 09:28:52 +00:00
{
return { } ;
}
return * ret ;
}
2021-07-07 20:32:59 +00:00
size_t AsyncApp : : GetThreadWorkersCount ( ThreadGroup_t group )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-07-07 20:32:59 +00:00
return GetGroup ( group ) - > workers . size ( ) ;
}
2021-06-30 09:28:52 +00:00
AuSPtr < ThreadState > AsyncApp : : GetThreadState ( )
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-06-30 09:28:52 +00:00
auto id = GetCurrentThread ( ) ;
auto state = GetGroup ( id . first ) ;
return state - > workers [ id . second ] ;
}
void AsyncApp : : Entrypoint ( WorkerId_t id )
{
tlsWorkerId = id ;
auto auThread = Threading : : Threads : : GetThread ( ) ;
auto job = GetThreadState ( ) ;
2021-07-15 16:16:23 +00:00
while ( ( ! auThread - > Exiting ( ) ) & & ( ! job - > shuttingdown ) )
2021-06-30 09:28:52 +00:00
{
// Do work (blocking)
Poll ( true ) ;
}
2021-07-15 16:16:23 +00:00
if ( id ! = WorkerId_t { 0 , 0 } )
2021-06-30 09:28:52 +00:00
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsReadable ( ) ) ;
2021-07-15 16:16:23 +00:00
if ( ! shuttingdown_ & & ! job - > rejecting )
{
// Pump and barrier + reject all after atomically
Barrier ( id , 0 , false , true ) ;
}
2021-06-30 09:28:52 +00:00
}
2021-07-15 16:16:23 +00:00
ThisExiting ( ) ;
if ( id = = WorkerId_t { 0 , 0 } )
{
Shutdown ( ) ;
}
}
void AsyncApp : : SetConsoleCommandDispatcher ( WorkerId_t id )
{
commandDispatcher_ = id ;
Console : : Commands : : UpdateDispatcher ( commandDispatcher_ ) ;
}
void AsyncApp : : ThisExiting ( )
{
auto id = GetCurrentThread ( ) ;
auto state = GetGroup ( id . first ) ;
{
2021-09-06 10:58:08 +00:00
AU_LOCK_GUARD ( rwlock_ - > AsWritable ( ) ) ;
2021-07-15 16:16:23 +00:00
auto itr = state - > workers . find ( id . second ) ;
auto & jobWorker = itr - > second ;
// This shouldn't be a problem; however, we're going to handle the one edge case where
// some angry sysadmin is spamming commands
if ( ( commandDispatcher_ . has_value ( ) )
& & ( commandDispatcher_ . value ( ) = = id ) )
{
Console : : Commands : : UpdateDispatcher ( { } ) ;
}
// Abort scheduled tasks
TerminateSceduledTasks ( id ) ;
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms
// -> anything your brain wishes to imagination
for ( const auto & thread : jobWorker - > features )
{
try
{
thread - > Cleanup ( ) ;
}
catch ( . . . )
{
LogWarn ( " Couldn't clean up thread feature! " ) ;
Debug : : PrintError ( ) ;
}
}
jobWorker - > features . clear ( ) ;
state - > workers . erase ( itr ) ;
}
2021-06-30 09:28:52 +00:00
}
void AsyncApp : : AddFeature ( WorkerId_t id , AuSPtr < Threading : : Threads : : IThreadFeature > feature , bool async )
{
2021-09-06 10:58:08 +00:00
auto work = AuMakeShared < BasicWorkStdFunc > ( ( [ = ] ( )
2021-06-30 09:28:52 +00:00
{
GetThreadState ( ) - > features . push_back ( feature ) ;
feature - > Init ( ) ;
} ) ) ;
2021-07-15 16:16:23 +00:00
auto workItem = NewWorkItem ( id , work , ! async ) - > Dispatch ( ) ;
2021-06-30 09:28:52 +00:00
if ( ! async )
{
workItem - > BlockUntilComplete ( ) ;
}
}
void AsyncApp : : AssertInThreadGroup ( ThreadGroup_t group )
{
SysAssert ( static_cast < WorkerId_t > ( tlsWorkerId ) . first = = group ) ;
}
void AsyncApp : : AssertWorker ( WorkerId_t id )
{
SysAssert ( static_cast < WorkerId_t > ( tlsWorkerId ) = = id ) ;
}
AUKN_SYM IAsyncApp * GetAsyncApp ( )
{
return & gAsyncApp ;
}
2021-06-27 21:25:29 +00:00
}